Building microservices with Akka HTTP and MongoDB

It's been a while since my last post, but, here I am again.

You probably know that both Akka HTTP and MongoDB Scala Driver are reactive, so, they're a good way to go when we talk about highly scalable microservices, and, since I've playing with both lately, I guess it's fair to show a simple REST application using them.

Also, this example uses Fongo, a fake in memory MongoDB written in Java, to help us create our tests and not worry with our database access.

Well, let's start then.

build.sbt

name := "akka-http-mongodb"
organization := "com.thedevpiece"
version := "0.0.1"
scalaVersion := "2.12.3"

resolvers += Resolver.jcenterRepo

val akkaHttp = "10.0.10"
val akka = "2.4.19"
val jackson = "2.8.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http" % akkaHttp,
  "com.typesafe.akka" %% "akka-slf4j" % akka,
  "com.fasterxml.jackson.core" % "jackson-core" % jackson,
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % jackson,
  "ch.qos.logback" % "logback-classic" % "1.2.3",
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.1.0",

  //test libraries
  "org.scalatest" %% "scalatest" % "3.0.1" % "test",
  "org.pegdown" % "pegdown" % "1.6.0" % "test",
  "com.typesafe.akka" %% "akka-http-testkit" % akkaHttp % "test",
  "org.mongodb" % "mongo-java-driver" % "3.4.0" % "test",
  "com.github.fakemongo" % "fongo" % "2.1.0" % "test"
)

testOptions in Test ++= Seq(
  Tests.Argument(TestFrameworks.ScalaTest, "-u", "target/test-reports"),
  Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/test-reports")
)

And, the code...

application.conf

Before we start, let's add a file named application.conf with our MongoDB connection data:

mongo {
  database = "user-api"
  uri = "mongodb://<username>:<password>@<hostname>:<port>/"${mongo.database}
}

And it is up to you to configure it properly, of course.

Our model

... which is very simple, by the way.

package com.thedevpiece.akka.http.mongodb

import org.bson.types.ObjectId
import org.mongodb.scala.bson.ObjectId

case class FindByIdRequest(id: String) {
  require(ObjectId.isValid(id), "the informed id is not a representation of a valid hex string")
}

case class UserResource(id: String, username: String, age: Int) {
  require(username != null, "username not informed")
  require(username.nonEmpty, "username cannot be empty")
  require(age > 0, "age cannot be lower than 1")

  def asDomain = User(if (id == null) ObjectId.get() else new ObjectId(id), username, age)
}

case class User(_id: ObjectId, username: String, age: Int) {
  def asResource = UserResource(_id.toHexString, username, age)
}

There are a few validations. Based on them, Akka HTTP will know when to return a bad request or not.

Connecting to MongoDB

This object loads our application.conf data and connects to the MongoDB.
It also create a CodecRegistry to map our User class into a MongoDB Document so we won't need to worry to convert it ourselves.

package com.thedevpiece.akka.http.mongodb

import com.typesafe.config.ConfigFactory
import org.bson.codecs.configuration.CodecRegistries._
import org.mongodb.scala._
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._

object Mongo {
  lazy val config = ConfigFactory.load()
  lazy val mongoClient: MongoClient = MongoClient(config.getString("mongo.uri"))
  lazy val codecRegistry = fromRegistries(fromProviders(classOf[User]), DEFAULT_CODEC_REGISTRY)
  lazy val database: MongoDatabase = mongoClient.getDatabase(config.getString("mongo.database")).withCodecRegistry(codecRegistry)

  lazy val userCollection: MongoCollection[User] = database.getCollection[User]("users")
}

The Jackson marshaller/unmarshaller

I really like to use Jackson instead of other json libraries to avoid boilerplate code, but, you can use whatever json library you want.

package com.thedevpiece.akka.http.mongodb

import akka.http.scaladsl.marshalling.Marshaller.withFixedContentType
import akka.http.scaladsl.marshalling.ToEntityMarshaller
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpRequest}
import akka.http.scaladsl.unmarshalling.{FromRequestUnmarshaller, Unmarshal, Unmarshaller}
import akka.stream.Materializer
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, PropertyNamingStrategy}
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import scala.concurrent.{Await, ExecutionContext, Future}

object JsonSupport {
  val objectMapper = new ObjectMapper()
    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    .setSerializationInclusion(Include.NON_EMPTY)
    .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE)
    .registerModule(DefaultScalaModule)

  implicit class ObjAsJsonUsingJackson(obj: AnyRef) {
    def asJson: String = objectMapper.writeValueAsString(obj)
  }

  implicit class StringJsonAsCaseClass(json: String) {
    def asObject[T](implicit m: Manifest[T]): T = objectMapper.readValue(json, m.runtimeClass).asInstanceOf[T]
  }

  implicit def marshallSync[T <: HttpEntity](entity: Future[T]): T = {
    import scala.concurrent.duration._
    Await.result(entity, 1 second)
  }

  implicit def jsonMarshaller[T <: AnyRef]: ToEntityMarshaller[T] =
    withFixedContentType(ContentTypes.`application/json`) { any =>
      HttpEntity(ContentTypes.`application/json`, any.asJson)
    }

  implicit def jsonUnmarshaller[T](implicit m: Manifest[T], mat: Materializer): FromRequestUnmarshaller[T] =
    Unmarshaller[HttpRequest, T] {
      implicit ec: ExecutionContext => r => Unmarshal(r.entity).to[String].map(_.asObject[T])
    }
}

This class just create a few implicits that Akka HTTP will make use to parse json into case classes and vice versa.

The repository

No news over here, just our Repository class.

package com.thedevpiece.akka.http.mongodb

import org.mongodb.scala._
import org.mongodb.scala.bson.ObjectId

import scala.concurrent.{ExecutionContext, Future}

class UserRepository(collection: MongoCollection[User])(implicit ec: ExecutionContext) {
  def findById(id: String): Future[Option[User]] =
    collection
      .find(Document("_id" -> new ObjectId(id)))
      .first
      .head
      .map(Option(_))

  def save(user: User): Future[String] =
    collection
      .insertOne(user)
      .head
      .map { _ => user._id.toHexString }
}

Remember: MongoDB Scala Driver is non-blocking, so, we always work with Futures.

The endpoint

We are using the high level Akka HTTP dsl (based on Spray.io). It may be terrifying at first, but, once you understand the basic concepts of routes and directives, it is nice to develop and also create your custom features for your endpoints.

package com.thedevpiece.akka.http.mongodb

import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.Materializer
import com.thedevpiece.akka.http.mongodb.JsonSupport._

import scala.concurrent.ExecutionContext

trait UserEndpoint {
  implicit val mat: Materializer
  implicit val ec: ExecutionContext

  val repository: UserRepository

  val userRoute = {
    pathPrefix("api" / "users") {
      get {
        path(Segment).as(FindByIdRequest) { request =>
          complete {
            repository
              .findById(request.id)
              .map { optionalUser => optionalUser.map { _.asResource } }
              .map {
                case None => HttpResponse(status = StatusCodes.NotFound)
                case Some(user) => HttpResponse(entity = Marshal(user).to[ResponseEntity])
              }
          }
        }
      } ~ post {
        entity(as[UserResource]) { user =>
          complete {
            repository
              .save(user.asDomain)
              .map { id =>
                HttpResponse(
                  status = StatusCodes.Created,
                  headers = List(Location(s"/api/users/$id"))
                )
              }
          }
        }
      }
    }
  }
}

The main class

And our main class that links everything together...

package com.thedevpiece.akka.http.mongodb

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

object Application extends App with UserEndpoint {
   implicit val sys: ActorSystem = ActorSystem("akka-http-mongodb-microservice")
   implicit val mat: ActorMaterializer = ActorMaterializer()
   implicit val ec: ExecutionContext = sys.dispatcher

  val log = sys.log

  override val repository: UserRepository = new UserRepository(Mongo.userCollection)

  Http().bindAndHandle(userRoute, "0.0.0.0", 8080).onComplete {
    case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
    case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
  }
}

But, how about the tests?

Glad that you asked! I almost forgot about it.

Akka HTTP gives a nice library to help us test our routes, and, I couldn't help myself to also show how you can test this app with it:

package com.thedevpiece.akka.http.mongodb

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.stream.Materializer
import com.github.fakemongo.async.FongoAsync
import com.mongodb.async.client.MongoDatabase
import com.typesafe.config.ConfigFactory
import org.mongodb.scala.MongoCollection
import org.scalatest.{BeforeAndAfterAll, FeatureSpec, Matchers}

import scala.concurrent.ExecutionContext

class UserEndpointFeature
  extends FeatureSpec
    with Matchers
    with ScalatestRouteTest
    with BeforeAndAfterAll
    with UserEndpoint {

  override val mat: Materializer = materializer
  override val ec: ExecutionContext = executor

  val db: MongoDatabase = {
    val fongo = new FongoAsync("akka-http-mongodb-microservice")
    val db = fongo.getDatabase(ConfigFactory.load().getString("mongo.database"))
    db.withCodecRegistry(Mongo.codecRegistry)
  }

  override val repository: UserRepository = new UserRepository(MongoCollection(db.getCollection("col", classOf[User])))

  feature("user api") {
    scenario("success creation") {
      val validUser =
        """
          {
            "username": "gabfssilva",
            "age": 24
          }
        """

      Post(s"/api/users", validUser) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.Created
      }
    }

    scenario("success get after sucess creation") {
      val validUser =
        """
          {
            "username": "gabfssilva",
            "age": 24
          }
        """

      Post(s"/api/users", validUser) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.Created

        Get(header("Location").orNull.value()) ~> Route.seal(userRoute) ~> check {
          status shouldBe StatusCodes.OK
        }
      }
    }


    scenario("invalid id on get") {
      Get(s"/api/users/1") ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("no body") {
      Post(s"/api/users", "{}") ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("body without age") {
      val invalidUser =
        """
        {
          "username": "gabfssilva"
        }
        """

      Post(s"/api/users", invalidUser) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("body without username") {
      val invalidUser =
        """
        {
          "age": 24
        }
        """

      Post(s"/api/users", invalidUser) ~> Route.seal(userRoute) ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }
  }
}

I bet you also noticied Fongo and its role to help us "mock" our MongoDB Collection.

If you did everything right, when you run your tests you probably will see something like this:

$ sbt
[IJ]> test
[info] Run completed in 3 seconds, 184 milliseconds.
[info] Total number of tests run: 6
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 4 s, completed Oct 9, 2017 12:57:31 AM

And, I guess that's all. Any questions, please, feel free to ask in the comments.

[]'s

Gabriel Francisco

Software Engineer at UOL, 24 years, under graduated in Computer Science and graduated in Service-oriented Software Engineering. Like playing guitar once in a while. I'm pretty sure I'm really boring.

São Paulo

comments powered by Disqus