Building microservices with Akka HTTP and MongoDB

It's been a while since my last post, but, here I am again.

You probably know that both Akka HTTP and MongoDB Scala Driver are reactive, so, they're a good way to go when we talk about highly scalable microservices, and, since I've playing with both lately, I guess it's fair to show a simple REST application using them.

Also, this example uses Fongo, a fake in memory MongoDB written in Java, to help us create our tests and not worry with our database access.

Well, let's start then.

build.sbt

name := "akka-http-mongodb"
organization := "com.thedevpiece"
version := "0.0.1"
scalaVersion := "2.12.3"

resolvers += Resolver.jcenterRepo

val akkaHttp = "10.0.10"
val akka = "2.4.19"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http" % akkaHttp,
  "com.typesafe.akka" %% "akka-slf4j" % akka,
  "de.heikoseeberger" %% "akka-http-jackson" % "1.18.0",
  "ch.qos.logback" % "logback-classic" % "1.2.3",
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.1.0",

  //test libraries
  "org.scalatest" %% "scalatest" % "3.0.1" % "test",
  "org.pegdown" % "pegdown" % "1.6.0" % "test",
  "com.typesafe.akka" %% "akka-http-testkit" % akkaHttp % "test",
  "org.mongodb" % "mongo-java-driver" % "3.4.0" % "test",
  "com.github.fakemongo" % "fongo" % "2.1.0" % "test"
)

testOptions in Test ++= Seq(
  Tests.Argument(TestFrameworks.ScalaTest, "-u", "target/test-reports"),
  Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/test-reports")
)

And, the code...

application.conf

Before we start, let's add a file named application.conf with our MongoDB connection data:

mongo {
  database = "user-api"
  uri = "mongodb://<username>:<password>@<hostname>:<port>/"${mongo.database}
}

And it is up to you to configure it properly, of course.

Our model

... which is very simple, by the way.

package com.thedevpiece.akka.http.mongodb

import org.bson.types.ObjectId
import org.mongodb.scala.bson.ObjectId

case class FindByIdRequest(id: String) {
  require(ObjectId.isValid(id), "the informed id is not a representation of a valid hex string")
}

case class UserResource(id: String, username: String, age: Int) {
  require(username != null, "username not informed")
  require(username.nonEmpty, "username cannot be empty")
  require(age > 0, "age cannot be lower than 1")

  def asDomain = User(if (id == null) ObjectId.get() else new ObjectId(id), username, age)
}

case class User(_id: ObjectId, username: String, age: Int) {
  def asResource = UserResource(_id.toHexString, username, age)
}

There are a few validations. Based on them, Akka HTTP will know when to return a bad request or not.

Connecting to MongoDB

This object loads our application.conf data and connects to the MongoDB.
It also create a CodecRegistry to map our User class into a MongoDB Document so we won't need to worry to convert it ourselves.

package com.thedevpiece.akka.http.mongodb

import com.typesafe.config.ConfigFactory
import org.bson.codecs.configuration.CodecRegistries._
import org.mongodb.scala._
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._

object Mongo {
  lazy val config = ConfigFactory.load()
  lazy val mongoClient: MongoClient = MongoClient(config.getString("mongo.uri"))
  lazy val codecRegistry = fromRegistries(fromProviders(classOf[User]), DEFAULT_CODEC_REGISTRY)
  lazy val database: MongoDatabase = mongoClient.getDatabase(config.getString("mongo.database")).withCodecRegistry(codecRegistry)

  lazy val userCollection: MongoCollection[User] = database.getCollection[User]("users")
}

The repository

No news over here, just our Repository class.

package com.thedevpiece.akka.http.mongodb

import org.mongodb.scala._
import org.mongodb.scala.bson.ObjectId

import scala.concurrent.{ExecutionContext, Future}

class UserRepository(collection: MongoCollection[User])(implicit ec: ExecutionContext) {
  def findById(id: String): Future[Option[User]] =
    collection
      .find(Document("_id" -> new ObjectId(id)))
      .first
      .head
      .map(Option(_))

  def save(user: User): Future[String] =
    collection
      .insertOne(user)
      .head
      .map { _ => user._id.toHexString }
}

Remember: MongoDB Scala Driver is non-blocking, so, we always work with Futures.

The endpoint

We are using the high level Akka HTTP dsl (based on Spray.io). It may be terrifying at first, but, once you understand the basic concepts of routes and directives, it is nice to develop and also create your custom features for your endpoints.

package com.thedevpiece.akka.http.mongodb

import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.Directives._
import akka.stream.Materializer

import scala.concurrent.Future
import de.heikoseeberger.akkahttpjackson.JacksonSupport._

import scala.concurrent.ExecutionContext

trait UserEndpoint {
  implicit val mat: Materializer
  implicit val ec: ExecutionContext

  val repository: UserRepository

  val userRoute = {
    pathPrefix("api" / "users") {
      get {
        path(Segment).as(FindByIdRequest) { request =>
          complete {
            repository
              .findById(request.id)
              .map { optionalUser => optionalUser.map { _.asResource } }
              .flatMap {
                case None => Future.successful(HttpResponse(status = StatusCodes.NotFound))
                case Some(user) => Marshal(user).to[ResponseEntity].map { e => HttpResponse(entity = e) }
              }
          }
        }
      } ~ post {
        entity(as[UserResource]) { user =>
          complete {
            repository
              .save(user.asDomain)
              .map { id => 
                HttpResponse(status = StatusCodes.Created, headers = List(Location(s"/api/users/$id"))) 
              }
          }
        }
      }
    }
  }
}

By importing the object JacksonSupport, we say to Akka HTTP to use Jackson to marshal/unmarshal the http entities using Jackson.

The main class

And our main class that links everything together...

package com.thedevpiece.akka.http.mongodb

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

object Application extends App with UserEndpoint {
   implicit val sys: ActorSystem = ActorSystem("akka-http-mongodb-microservice")
   implicit val mat: ActorMaterializer = ActorMaterializer()
   implicit val ec: ExecutionContext = sys.dispatcher

  val log = sys.log

  override val repository: UserRepository = new UserRepository(Mongo.userCollection)

  Http().bindAndHandle(userRoute, "0.0.0.0", 8080).onComplete {
    case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
    case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
  }
}

But, how about the tests?

Glad that you asked! I almost forgot about it.

Akka HTTP gives a nice library to help us test our routes, and, I couldn't help myself to also show how you can test this app with it:

package com.thedevpiece.akka.http.mongodb

import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.stream.Materializer
import com.github.fakemongo.async.FongoAsync
import com.mongodb.async.client.MongoDatabase
import com.typesafe.config.ConfigFactory
import org.mongodb.scala.MongoCollection
import org.scalatest.{BeforeAndAfterAll, FeatureSpec, Matchers}

import scala.concurrent.ExecutionContext

class UserEndpointFeature
  extends FeatureSpec
    with Matchers
    with ScalatestRouteTest
    with BeforeAndAfterAll
    with UserEndpoint {

  override val mat: Materializer = materializer
  override val ec: ExecutionContext = executor

  val db: MongoDatabase = {
    val fongo = new FongoAsync("akka-http-mongodb-microservice")
    val db = fongo.getDatabase(ConfigFactory.load().getString("mongo.database"))
    db.withCodecRegistry(Mongo.codecRegistry)
  }

  override val repository: UserRepository = new UserRepository(MongoCollection(db.getCollection("col", classOf[User])))

  val httpEntity: (String) => HttpEntity.Strict = (str: String) => HttpEntity(ContentTypes.`application/json`, str)

  feature("user api") {
    scenario("success creation") {
      val validUser =
        """
          {
            "username": "gabfssilva",
            "age": 24
          }
        """

      Post(s"/api/users", httpEntity(validUser)) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.Created
      }
    }

    scenario("success get after success creation") {
      val validUser =
        """
          {
            "username": "gabfssilva",
            "age": 24
          }
        """

      Post(s"/api/users", httpEntity(validUser)) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.Created

        Get(header("Location").orNull.value()) ~> Route.seal(userRoute) ~> check {
          status shouldBe StatusCodes.OK
        }
      }
    }


    scenario("invalid id on get") {
      Get(s"/api/users/1") ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("no body") {
      Post(s"/api/users", httpEntity("{}")) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("body without age") {
      val invalidUser =
        """
        {
          "username": "gabfssilva"
        }
        """

      Post(s"/api/users", httpEntity(invalidUser)) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("body without username") {
      val invalidUser =
        """
        {
          "age": 24
        }
        """

      Post(s"/api/users", httpEntity(invalidUser)) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }
  }
}

I bet you also noticied Fongo and its role to help us "mock" our MongoDB Collection.

If you did everything right, when you run your tests you probably will see something like this:

$ sbt
[IJ]> test
[info] Run completed in 3 seconds, 184 milliseconds.
[info] Total number of tests run: 6
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 4 s, completed Oct 9, 2017 12:57:31 AM

Other examples

There are a few Akka HTTP templates using libs such as Quill and ScalikeJDBC over here, in this github repository. You can check it out if you want to. =)

Other posts about microservices

Building microservices with Finatra and Slick
Building microservices using Undertow, CDI and JAX-RS
Building microservices with Kotlin and Spring Boot

And, I guess that's all. Any questions, please, feel free to ask in the comments.

[]'s

Gabriel Francisco

Software Engineer at 99, 25 years, under graduated in Computer Science and graduated in Service-oriented Software Engineering. Like playing guitar once in a while. Oh, and I'm kind of boring.

São Paulo

comments powered by Disqus