Building unknown sized streams in Scala

Maybe you've already run into a scenario that you needed to create a finite stream of something that you have no idea about the size of it: a queue, a paged HTTP service, a database table, etc.

Luckly for us, both Play! and Akka Stream provide an easy way to create and to handle this kind of stream.

Consider you have the following function:

def fetchTransactions(page: Int): Future[Seq[Transaction]] = //implementation here

You need, somehow, build a report of transactions. There are A LOT of transactions and your machine cannot handle all of them in memory.

Using Akka Stream

In this case, you can easily build a Stream using Source.unfoldAsync:

def stream =
  Source
    .unfoldAsync(1) { page =>
      fetchTransactions(page).map {
        case Nil => None
        case transactions => Some(page + 1, transactions)
      }
    }

Once you return a None, the Stream calls close and it'll be completed.

For example purposes only, let's consume this stream in a very slow way. To simulate the "slowness", let's thread sleep for 1 second before each call.

stream
  .runForeach { transactions =>
    Thread.sleep(1000)
    println(transactions)
  }
  .onComplete(println)

You will notice that it will only try to fetch the next transaction page after the before was consumed successfully.

You can also use a Flow to map the transactions into something that will be used for your report. Imagine that you have a Flow that transforms transctions into csv lines:

def transactionsAsCsv = Flow[Seq[Transaction]].map { seq => seq.map(_.asCsvRow).mkString("/n").getBytes() }

stream
  .via(transactionsAsCsv)
  .runForeach { byteArray =>
    println(new String(byteArray))
  }

Or maybe you want to return the stream in a HTTP service (using Akka HTTP):

def csvStream = stream.via(transactionsAsCsv)

def route = get { 
  complete {
    HttpResponse(
      entity = HttpEntity(ContentTypes.`application/octet-stream`, csvStream), 
      headers = List(RawHeader("Content-Disposition", s"attachment; filename=transactions.csv"))
    )
  }
}

Using Play!

Play! also provides an easy way to solve this problem, and works pretty much like the Akka Stream solution, but using Enumerators:

def stream = 
  Enumerator.unfoldM(1) { page =>
    fetchTransactions(page).map {
      case Nil => None
      case transactions => Some(page + 1, transactions)
    }
  }

And, same as the Akka Streams, the stream will close once you return a None.

stream
  .run(Iteratee.foreach { transactions =>
    Thread.sleep(1000)
    println(transactions)
  })
  .onComplete(println)

Again, the thread sleep is for example purposes only.

Instead of using Flows (as you'd use if you were using Akka Stream), in Play! you will end up using Enumeratees.

def transactionCsvEnumeratee: Enumeratee[Seq[Transaction], Array[Byte]] = Enumeratee.map[Seq[Transaction]] { seq: Seq[Transaction] => seq.map(_.asCsvRow).mkString("/n").getBytes() }

Then transform it into another stream (or Enumerator, in Play!):

def csvStream = stream through transactionCsvEnumeratee

Also, if you want to return it in an Action you can just:

def downloadAction = Action { implicit request =>
    new Status(200)
      .chunked(csvStream)
      .withHeaders("Content-Disposition" -> "attachment; filename=transactions.csv")
  }

Finalizing

Since I prefer Akka, I'd rather go with it, but, if you're dealing with a Play! application, that's very likely that you will need to know how to use streams whenever you need to use it.

I hope this post could be useful. ;)

[]'s

Gabriel Francisco

Software Engineer at GFG, 25 years, under graduated in Computer Science and graduated in Service-oriented Software Engineering. Like playing guitar once in a while. Oh, and I'm kind of boring.

São Paulo

comments powered by Disqus