Building microservices with Akka HTTP and MongoDB

It's been a while since my last post, but, here I am again.

You probably know that both Akka HTTP and MongoDB Scala Driver are reactive, so, they're a good way to go when we talk about highly scalable microservices, and, since I've playing with both lately, I guess it's fair to show a simple REST application using them.

Also, this example uses Fongo, a fake in memory MongoDB written in Java, to help us create our tests and not worry with our database access.

Well, let's start then.

build.sbt

name := "akka-http-mongodb"
organization := "com.thedevpiece"
version := "0.0.1"
scalaVersion := "2.12.5"

resolvers += Resolver.jcenterRepo

val akkaHttp = "10.1.1"
val akka = "2.5.11"
val circe = "0.9.3"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http" % akkaHttp,
  "com.typesafe.akka" %% "akka-stream" % akka,
  "com.typesafe.akka" %% "akka-slf4j" % akka,

  "de.heikoseeberger" %% "akka-http-circe" % "1.20.1",

  "io.circe" %% "circe-generic" % circe,

  "ch.qos.logback" % "logback-classic" % "1.2.3",
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.1.0",

  //test libraries
  "org.scalatest" %% "scalatest" % "3.0.1" % "test",
  "org.pegdown" % "pegdown" % "1.6.0" % "test",
  "com.typesafe.akka" %% "akka-http-testkit" % akkaHttp % "test",
  "org.mongodb" % "mongo-java-driver" % "3.4.2",
  "com.github.fakemongo" % "fongo" % "2.1.0" % "test"
)

testOptions in Test ++= Seq(
  Tests.Argument(TestFrameworks.ScalaTest, "-u", "target/test-reports"),
  Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/test-reports")
)

And, the code...

application.conf

Before we start, let's add a file named application.conf with our MongoDB connection data:

mongo {
  database = "user-api"
  uri = "mongodb://<username>:<password>@<hostname>:<port>/"${mongo.database}
}

And it is up to you to configure it properly, of course.

Our model

... which is very simple, by the way.

package com.thedevpiece.akka.http.mongodb

import io.circe.syntax._
import io.circe.{Decoder, Encoder, HCursor, Json}
import org.bson.types.ObjectId

import io.circe.generic.semiauto._

case class FindByIdRequest(id: String) {
  require(ObjectId.isValid(id), "the informed id is not a representation of a valid hex string")
}

case class User(_id: ObjectId, username: String, age: Int) {
  require(username != null, "username not informed")
  require(username.nonEmpty, "username cannot be empty")
  require(age > 0, "age cannot be lower than 1")
}

object User {
  implicit val encoder: Encoder[User] = (a: User) => {
    Json.obj(
      "id" -> a._id.toHexString.asJson,
      "username" -> a.username.asJson,
      "age" -> a.age.asJson
    )
  }

  implicit val decoder: Decoder[User] = (c: HCursor) => {
    for {
      username <- c.downField("username").as[String]
      age <- c.downField("age").as[Int]
    } yield User(ObjectId.get(), username, age)
  }
}

case class Message(message: String)

object Message {
  implicit val encoder = deriveEncoder[Message]
}

There are a few validations. Based on them, Akka HTTP will know when to return a bad request or not.
Also note that there are two encoders: user and message, and one decoder: user. Encoders and Decoders are Circe types. An Encoder transforms T => Json and Decoders transform Json => T.
We are encoding and decoding the User object manually. For the Message object, we are using the semi-automatic derivation by using deriveEncoder[Message].

Connecting to MongoDB

This object loads our application.conf data and connects to the MongoDB.
It also create a CodecRegistry to map our User class into a MongoDB Document so we won't need to worry to convert it ourselves.

package com.thedevpiece.akka.http.mongodb

import com.typesafe.config.ConfigFactory
import org.bson.codecs.configuration.CodecRegistries._
import org.mongodb.scala._
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._

object Mongo {
  lazy val config = ConfigFactory.load()
  lazy val mongoClient: MongoClient = MongoClient(config.getString("mongo.uri"))
  lazy val codecRegistry = fromRegistries(fromProviders(classOf[User]), DEFAULT_CODEC_REGISTRY)
  lazy val database: MongoDatabase = mongoClient.getDatabase(config.getString("mongo.database")).withCodecRegistry(codecRegistry)

  lazy val userCollection: MongoCollection[User] = database.getCollection[User]("users")
}

The repository

No news over here, just our Repository class.

package com.thedevpiece.akka.http.mongodb

import org.mongodb.scala._
import org.mongodb.scala.bson.ObjectId

import scala.concurrent.{ExecutionContext, Future}

class UserRepository(collection: MongoCollection[User])(implicit ec: ExecutionContext) {
  def findById(id: String): Future[Option[User]] =
    collection
      .find(Document("_id" -> new ObjectId(id)))
      .first
      .head
      .map(Option(_))

  def save(user: User): Future[String] =
    collection
      .insertOne(user)
      .head
      .map { _ => user._id.toHexString }
}

Remember: MongoDB Scala Driver is non-blocking, so, we always work with Futures.

The endpoint

We are using the high level Akka HTTP dsl (based on Spray.io). It may be terrifying at first, but, once you understand the basic concepts of routes and directives, it is nice to develop and also create your custom features for your endpoints.

package com.thedevpiece.akka.http.mongodb

import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.Directives._
import akka.stream.Materializer
import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

class UserEndpoint(repository: UserRepository)(implicit ec: ExecutionContext, mat: Materializer) {
  val routes =
    pathPrefix("api" / "users") {
      (get & path(Segment).as(FindByIdRequest)) { request =>
        onComplete(repository.findById(request.id)) {
          case Success(Some(user)) =>
            complete(Marshal(user).to[ResponseEntity].map { e => HttpResponse(entity = e) })
          case Success(None)       =>
            complete(HttpResponse(status = StatusCodes.NotFound))
          case Failure(e)          =>
            complete(Marshal(Message(e.getMessage)).to[ResponseEntity].map { e => HttpResponse(entity = e, status = StatusCodes.InternalServerError) })
        }
      } ~ (post & pathEndOrSingleSlash & entity(as[User])) { user =>
        onComplete(repository.save(user)) {
          case Success(id) =>
            complete(HttpResponse(status = StatusCodes.Created, headers = List(Location(s"/api/users/$id"))))
          case Failure(e)  =>
            complete(Marshal(Message(e.getMessage)).to[ResponseEntity].map { e => HttpResponse(entity = e, status = StatusCodes.InternalServerError) })
        }
      }
    }
}

By importing the object ErrorAccumulatingCirceSupport, we say to Akka HTTP to marshal/unmarshal the http entities using Circe. It will use the implicit encoders or decoders to parse the entities.

The main class

And our main class that links everything together...

package com.thedevpiece.akka.http.mongodb
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

object Application extends App {
   implicit val sys: ActorSystem = ActorSystem("akka-http-mongodb-microservice")
   implicit val mat: ActorMaterializer = ActorMaterializer()
   implicit val ec: ExecutionContext = sys.dispatcher

  val log = sys.log

  val routes =
    new UserEndpoint(new UserRepository(Mongo.userCollection)).routes //you can add more routes using the '~' to concatenate: val routes = route1 ~ route2 ~ route3

  Http().bindAndHandle(routes, "0.0.0.0", 8080).onComplete {
    case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
    case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
  }
}

But, how about the tests?

Glad that you asked! I almost forgot about it.

Akka HTTP gives a nice library to help us test our routes, and, I couldn't help myself to also show how you can test this app with it:

package com.thedevpiece.akka.http.mongodb

import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.github.fakemongo.async.FongoAsync
import com.mongodb.async.client.MongoDatabase
import com.typesafe.config.ConfigFactory
import org.mongodb.scala.MongoCollection
import org.scalatest.{BeforeAndAfterAll, FeatureSpec, Matchers}

class UserEndpointFeature
  extends FeatureSpec
    with Matchers
    with ScalatestRouteTest
    with BeforeAndAfterAll  {

  val db: MongoDatabase = {
    val fongo = new FongoAsync("akka-http-mongodb-microservice")
    val db = fongo.getDatabase(ConfigFactory.load().getString("mongo.database"))
    db.withCodecRegistry(Mongo.codecRegistry)
  }

  val repository = new UserRepository(MongoCollection(db.getCollection("col", classOf[User])))

  val routes = Route.seal(new UserEndpoint(repository).routes)

  val httpEntity: (String) => HttpEntity.Strict = (str: String) => HttpEntity(ContentTypes.`application/json`, str)

  feature("user api") {
    scenario("success creation") {
      val validUser =
        """
          {
            "username": "gabfssilva",
            "age": 24
          }
        """

      Post(s"/api/users", httpEntity(validUser)) ~> routes ~> check {
        status shouldBe StatusCodes.Created
      }
    }

    scenario("success get after success creation") {
      val validUser =
        """
          {
            "username": "gabfssilva",
            "age": 24
          }
        """

      Post(s"/api/users", httpEntity(validUser)) ~> routes ~> check {
        status shouldBe StatusCodes.Created

        Get(header("Location").orNull.value()) ~> routes ~> check {
          status shouldBe StatusCodes.OK
        }
      }
    }


    scenario("invalid id on get") {
      Get(s"/api/users/1") ~> routes ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("no body") {
      Post(s"/api/users", httpEntity("{}")) ~> routes ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("body without age") {
      val invalidUser =
        """
        {
          "username": "gabfssilva"
        }
        """

      Post(s"/api/users", httpEntity(invalidUser)) ~> routes ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }

    scenario("body without username") {
      val invalidUser =
        """
        {
          "age": 24
        }
        """

      Post(s"/api/users", httpEntity(invalidUser)) ~> routes ~> check {
        status shouldBe StatusCodes.BadRequest
      }
    }
  }
}

I bet you also noticied Fongo and its role to help us "mock" our MongoDB Collection.

If you did everything right, when you run your tests you probably will see something like this:

$ sbt
[IJ]> test
[info] Run completed in 3 seconds, 184 milliseconds.
[info] Total number of tests run: 6
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 4 s, completed Oct 9, 2017 12:57:31 AM

Other examples

There are a few Akka HTTP templates using libs such as Quill and ScalikeJDBC over here, in this github repository. You can check it out if you want to. =)

Other posts about microservices

Building microservices with Finatra and Slick
Building microservices using Undertow, CDI and JAX-RS
Building microservices with Kotlin and Spring Boot

And, I guess that's all. Any questions, please, feel free to ask in the comments.

[]'s

Gabriel Francisco

Software Engineer at 99, 25 years, under graduated in Computer Science and graduated in Service-oriented Software Engineering. Like playing guitar once in a while. Oh, and I'm kind of boring.

São Paulo

comments powered by Disqus