Achieving high throughput with SQS using Akka Streams

SQS (Amazon Simple Queue Service) is a managed queue service provided by Amazon. Since it offers high availability and a fully managed service, it is a good choice when you need to deal with queues and do not want to manage and maintain your own message broker. Its price is also very attractive since it costs only USD 0,40 per million requests.

We have already talked about Akka Streams over here, specifically comparing it with Apache Camel, and, we already know it is a great tool to build reactive applications using very high level concepts.

In this post we're going to use Alpakka, the official (and open-sourced) integration library mantained by Lightbend (and the community, of course).

Setup

Basically Alpakka expects only the AmazonSQSAsync instance for you to start working with it:

val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))

implicit val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder
  .standard()
  .withCredentials(credentialsProvider)
  .withExecutorFactory(() => Executors.newFixedThreadPool(10))
  .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1"))
  .build()  

Simple usage

If you want to send messages to SQS using Alpakka, you can just:

Source
  .single(new SendMessageRequest().withMessageBody("alpakka"))
  .runWith(SqsSink.messageSink(queue))

Or, if you want to reduce requests to SQS, use less threads and still achieve higher throughput (almost by 10x), you can always use grouped messages:

Source(List("a", "l", "p", "a", "k", "k", "a"))
  .map(x => new SendMessageRequest().withMessageBody(x))
  .runWith(SqsSink.groupedMessageSink(queue))

But, what if I need to send single messages individually and still want to get high throughput?
Well, using a mix of Source.queue with SqsSink.groupedMessageSink we can achieve this.

First of all, let's create a flow that expects a tuple of SendMessageRequest and Promise[Done]. This flow will group the messages and send them using SqsSink.groupedMessageSink:

def groupedWithinSqsFlow(queue: String) = 
  Flow[(SendMessageRequest, Promise[Done])]
    .groupedWithin(10, 50 millis)
    .mapAsync(10) { request => //the parallelism should equal the SQS execution service thread number
      val (requests, promises) = request.map { case (r, _) => r } -> request.map { case (_, p) => p }

      Source(requests)
        .runWith(SqsSink.groupedMessageSink(queue))
        .map { done => Right(done -> promises) }
        .recover { case e: Exception => Left(e -> promises) }
    }

Once we send the messages, it will complete the promise so the sender know when its message was send. Notice that we are grouping the messages by 10, within 50 millis. If 50 millis has passed and still there are less than 10 messages, nevertheless the messages will be sent to SQS.

Ok, now, this flow produces an Either[(Exception, Seq[Promise[Done]])], so, it'll be handled at the queue source:

def queueSource(queue: String): SourceQueueWithComplete[(SendMessageRequest, Promise[Done])] =
  Source
    .queue(Int.MaxValue, OverflowStrategy.backpressure)
    .via(groupedWithinSqsFlow(queue))
    .to(Sink.foreach {
      case Right((d, promises)) => promises.foreach(_.success(d))
      case Left((e, promises)) => promises.foreach(_.failure(e))
    })
    .run()

Once the messages are sent, we complete all the promises depending on the result of the Either inside the sink.

Using the queueSource, you can create a simple function that handles a unique SendMessageRequest and send it to the queue:

val queue = queueSource("queue-name")

def sendMessage(message: SendMessageRequest): Future[Done] = {
  val promise = Promise[Done]
  queue
    .offer(request, promise)
    .flatMap {
      case QueueOfferResult.Enqueued    => promise.future
      case QueueOfferResult.Failure(e)  => Future.failed(e)
      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("queue closed"))
      case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("message dropped"))
    }
}

Be aware: do not create more than one groupedWithinSqsSink per queue.

Then you can just:

val result1: Future[Done] = sendMessage(new SendMessageRequest().withMessageBody("alpakka 1"))
  
val result2: Future[Done] = sendMessage(new SendMessageRequest().withMessageBody("alpakka 2"))

val result3: Future[Done] = sendMessage(new SendMessageRequest().withMessageBody("alpakka 3"))  

val result4: Future[Done] = sendMessage(new SendMessageRequest().withMessageBody("alpakka 4"))

val result5: Future[Done] = sendMessage(new SendMessageRequest().withMessageBody("alpakka 5"))

If all of these messages are sent within 50 millis, Alpakka will use only one thread and one request to send them to SQS.

Akka Streams is an incredible and powerful tool for increasing throughput using less resources. Go enjoy the most of it. ;)

Gabriel Francisco

Software Engineer at GFG, 25 years, under graduated in Computer Science and graduated in Service-oriented Software Engineering. Like playing guitar once in a while. Oh, and I'm kind of boring.

São Paulo

comments powered by Disqus