A dive into Akka Streams

On May 14th, we presented the implementation of a real-world application based on Akka Streams at Scala Italy 2016 conference. The experience was amazing and, most important, it gave us the chance to share our knowledge with the community. Now, we want to share our work with you.

Why did we decide to talk about Akka Streams? No rush! We will gradually answer to this question. In order to understand the main achievements that led us to present our work, we first need to look at the situation in which everything started.

Collecting Data

Measurence provides retailers with meaningful insights thanks to its Analytics Platform Service. Delivering insights is just the last step of a complex processing pipeline that starts with the data collection. Data is collected by installing a set of IoT sensors within either potential or existing stores (or maybe both). Sensors are instructed to capture the Wi-Fi signals in the surrounding environment and send this data to multiple instances of the same Scala microservice, namely the Collector Service. The Collector Service processes the incoming information and finally sends it to Amazon Kinesis.

Overview of our data ingestion and processing pipeline An overview of our data ingestion and processing pipeline

At the very beginning, the Collector Service was implemented using Spray.io over the Actor Model.

Old collector service The first architecture of the Collector Service

For each incoming TCP connection, Spray.io sent a message to the Root Actor existing in the system. Upon the receipt of this message, the Root Actor spawned a new Worker Actor and notified Spray there was a new Actor ready to take care of the incoming TCP connection. At this point, Spray started sending message chunks to the Worker Actor and messages flew continuously from Spray to the Actor’s inbox. Then, one more step was needed to reach the end of the ingestion pipeline. In particular, once the Actor had processed the message, it sent an event to another Actor, namely the Kinesis Client, which performed the actual upload of the information to Amazon Kinesis.

The Problems

Unfortunately, the first version of the Collector Service was subject to two main points of failure.

Old collector critical points Critical points of the first version of the Collector Service

Message overload

The first critical point was given by message overload: as previously stated, messages flew from Spray to the Actors’ inbox without any control and this could lead to potential memory overflows on the Actors’ side. But there’s even more than this. Think about the situation in which there is a set of prolific sensors sending data at a rate that’s not comparable to the sending rate of the other sensors. When there is no control over the ingestion speed of data, prolific sensors may potentially steal all the resources available in the system, thus penalizing the less prolific ones.

Connection issues

The second critical point was given by connection outages. When connection disruptions took place, all the information acquired and processed by the Collector Service simply went lost before reaching Amazon Kinesis because of the repeating failures during the upload of the events.

Building a Reactive System

Although we’ve built the Collector Service over the Actor Model, we hadn’t been able to achieve a Reactive System at the end. Indeed, our system wasn’t responsive because unbounded inputs made it impossible to treat clients fairly and even worse, some clients were possibly left behind others due to the lack of available resources. Furthermore, our system wasn’t elastic since memory wasn’t in any way partitioned between connected clients: despite unlikely, each client could have used as much memory as to starve the others.

In order to get a Reactive System we had to implement recovery mechanisms to avoid huge data losses in case of connection outages and backpressure on the network to gain control over the ingestion speed of data.

Recovery Mechanisms

We’ve implemented a simple buffering system in which unsuccessful messages are stored and retained for later use. The buffer also periodically writes the enqueued information on disk if fits so that it can accommodate new incoming data. Thanks to the implementation of this persistent buffer, the Collector Service is able to keep all the messages for which the upload to Amazon Kinesis fails due to connection disruptions.

Backpressure on the network

On the other side, Akka Streams are the tools that enabled us to achieve *backpressure on the network**: controlling the ingestion speed of data allowed us to avoid situations in which a prolific sensor monopolizes the system resources.

Akka Streams

Akka Streams build up a stream library based on Akka Actors that recently came out of the experimental phase. With this library you can create linear and non-linear streams for all kinds of processing. Here we’re going to focus on linear streams, since our implementation is based on them.

Source Flow Sink

When creating a linear stream you have three main pieces in your toolbelt. The Source class represents the origin of the data, from here the various events will flow downstream; the Flow gets data from upstream, possibly applies transformations and emits the result to the next stage; finally the Sink is the last part of the stream, the final destination of your data. This stages are characterized by the kind of ports they expose: Source has a single output port, Flow has input and output port and Sink has just the input port. For this reason, when you append a Flow to a Source, what you get in return is a new Source, and so on. You can compose as many stages as needed, given that you respect the kind of ports they expose (e.g. you can’t connect two Sources together).

Here we can see a simple example, where we put together a few stages; when there are no more ports exposed we obtain a RunnableGraph instance.

// to run a stream you need a materializer,
// to create a materializer you need an ActorSystem
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val source: Source[Int, NotUsed] = Source(1 to 42)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ + 1)
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val graph: RunnableGraph[NotUsed] =
  source
    .via(flow)
    .to(sink)

// materializer is implicitly used here
graph.run()

//Nice to know: every class used in this example is a subclass of Graph.

Note that the instances in the above example are immutable, they just represent the plan of what your stream will do. As such, each element can be reused in different streams. When the RunnableGraph is run, the actual stream is materialized and the data begins to flow. Again, even the RunnableGraph is immutable: you might run the same RunnableGraph several times and each time a different and completely separated stream will be materialized.

The Source/Flow/Sink classes offer a plethora of ready-made implementation you can decorate and use. In particular the Flow class exposes many of the functions anyone familiar with scala collections already knows, like map, filter and fold. A notable missing piece is the flatMap function: it’s actually there under the mapConcat alias (you know, you can’t run a for-comprehension on a Flow). Finally there are many more functions that are strictly related to the nature of the stream, like mapAsync, throttle or buffer.

Backpressure in action

One of the main reasons we chose Akka Streams is for its built-in backpressure capability. Generally speaking, when you run a stream the data does’t start flowing from the source to the sink, what actually happens is the opposite: requests for data (pull) are issued from the sink and flow upstream until they reach the source; at that point the source is authorized to send the actual data downstream (push). In particular there is a 1 on 1 relationship between the push and pull calls: no stage can push something downstream unless it received a pull beforehand. If a stage is too busy with its job, it will stop pulling, effectively applying backpressure on the upstream stages.

Note that each stage can choose to start pulling upstream without waiting for a pull from its downstream stage, obviously it won’t be able to push though. Also, each stage is not forced to emit as many events as it receives, it might emit less or more events based on its logic.

The Collector Service

Now that we know the basics we can start taking a look to our implementation. In our case, backpressure on the stream isn’t enough unless the backpressure is also applied on the network.

Akka HTTP

Akka HTTP is a library built on top of Akka Streams that offers the possibility to expose an incoming connection in the form of a Source instance: applying backpressure on this Source will make Akka HTTP stop consuming data from the network: in due time this will lead to a 0 TCP window, effectively applying the backpressure on the sending party itself (in our case a sensor).

To start listening on the network with Akka HTTP you need to create a Route and bind to a port. If you are familiar with Spray.io you’ll notice that the syntax to create a Route is practically the same with Akka HTTP.

val route =
  path("data") {
    post {
     extractRequest { request =>

       val source = request.entity.dataBytes
       val flow = processing()
       val sink = Sink.ignore

       source.via(flow).runWith(sink)

      // more stuff here

    }
  }

Http().bindAndHandle(route, hostname, port)

There are three things that should be noted in the above code. First, the source is extracted from the request. Second, the flow contains our application logic, we’ll see it in detail soon. Third, the Sink just ignores everything because we’ve chosen to keep all the logic in the form of a flow (it’s easier to test) so that the only task for the sink is to start the pulling.

The Collector Processing

Let’s take a look to our processing pipeline by composing it piece by piece.

Flow[ByteString]
.via(chunkBuffer)

The first step is converting the chunks coming in from the network in something that makes sense to our protocol. Namely we need \n separated lines. Since each chunk might contain more than one line, or even partial lines, we need to reorganize the flow. Furthermore, we want to deal with strings, not with bytes.

val chunkBuffer = Framing.delimiter(
  ByteString("\n"),
  maxPresenceEventBytes,
  false
)
 .map(_.dropRight(1))
 .map(_.utf8String)

The Framing.delimiter is one of the Akka Streams utilities and it does exactly what we need by reorganizing the chunks based on the specified separator. The library is very rich with all kinds of tools that make writing your logic a breeze. Once the events have been re-organized, we append a couple of stages to drop useless characters and convert the ByteString into a String.

Next in line is a conversion from String into an event object. No need to check that out.

Flow[ByteString]
.via(chunkBuffer)
.via(presenceEventFactory.asStream)

Once we have our events we want to send them to Kinesis but before that, we need a buffer to deal with a possible difference between the ingestion rate and the publishing rate. Even more, we want this buffer to be able to withstand a crash in the service, thus it has to implement some logic that will persist its contents. This buffer won’t be unlimited: when full it will start applying backpressure. If this happens, backpressure will reach the sensor that after a timeout will disconnect from the current Collector Service and will try to connect to a different one. In the meantime the filled buffer will keep on trying to emit data to the publisher downstream.

Flow[ByteString]
.via(chunkBuffer)
.via(presenceEventFactory.asStream)
.via(persistentBuffer)

Unfortunately there is no ready-made buffer with these characteristics, so we had to implement a custom stage. While the details of such implementation are not interesting for the scope of this post, it’s worth to take a look on how a custom stage is implemented.

Implementing a Custom Stage

In order to implement a custom stage, you have to create a class extending GraphStage and specifying the shape of your stage (i.e. the number of kind of ports your stage will have) There are actually other classes you can extend that have different features, but let’s stick to this one now.

class PersistentBuffer[A] (...)
 extends GraphStage[FlowShape[A, A]] {

    val in =
      Inlet[A]("PersistentBuffer.in")

    val out =
      Outlet[A]("PersistentBuffer.out")

    override val shape = FlowShape.of(in,out)

Then you have to implement the logic of the stage. As we’ve seen, a stage has to be immutable, for this reason the logic has to be embedded inside a createLogic call. You might imagine the GraphStage as a factory of GraphStageLogics: every time the GraphStage is materialized inside a stream a new call to createLogic will be performed. For this reason, any state pertaining to a single stage has to be embedded in the GraphStageLogic instance, otherwise such state would be shared among all the materializations of your stage.

override def createLogic(attr: Attributes): GraphStageLogic =

 new GraphStageLogic(shape) {

   var state: StageState[A] = initialState[A]

   override def postStop(): Unit = {
     ...
   }

   override def preStart(): Unit = {
     ...
   }

A stage is single-threaded, there will never be two pieces of the same GraphStageLogic running concurrently. For this reason, it is always possible to access the state of the stage without any synchronization. Beware of Future callbacks though, as those are run on separate threads. If it is required to access the state from a Future callback it is possible to wrap the callback function with getAsyncCallback: this wrapper will take care of moving the execution of the callback in the stage thread, so that state access will be safe again.

val cb = getAsyncCallback[Try[A]] {
   case Success(elements:A) =>
     state = state.copy(...)
	...
}

aFuture.onComplete(cb.invoke)

To implement the actual logic of the application, we have to create handlers for the ports we have declared.

setHandler(in, new InHandler {
  override def onPush() = {
    val element = grab(in)
    pull(in)
    ...
  }
...
})
setHandler(out, new OutHandler {
  override def onPull(): Unit = {
    push(out, something)
    ...
  }
  ...
})

As you can see in the code above, InHandler is extended to implement the handler for an input port. This handler contains the onPush callback that is fired every time there is data ready to be read. The grab call will in turn read that data, and the pull call will ask for more. Note that each call must be issued only when the port is in the correct state, for example you can’t call pull before grabbing the available value, and you can’t grab the value if not notified by the onPush. The OutHandler is similar, you get the onPull call when downstream is ready to ingest new data, at that point you can push to it. Again pay attention to avoid pushing when downstream is not ready or you’ll get an error. Check the official documentation for the full details on the state machines governing the ports and the operations admitted based on the state they are in.

Publishing Through an Actor

Now that our data is safe, we’ll want to start thinking about sending it to Kinesis.

Flow[ByteString]
.via(chunkBuffer)
.via(presenceEventFactory.asStream)
.via(persistentBuffer)
.via(kinesisPublisher)

For this task we implemented an Actor that receives the events to be published as messages and will try to send them to Kinesis until success is met; then it will notify the sender of such event of the success. To enclose the actor call into our processing flow we used the mapAsync utility and the ask pattern.

Flow[KinesisEvent]
  .mapAsync(1) { event =>
    (publisher ? Publish(event))
      .mapTo[Future[PublishCompleted]]
      .flatten
  }

With mapAsync, you specify a function that converts from the Input type, the event in our case, to a Future[Something]. The mapAsync will take care of waiting for the Future to be completed and will then emit Something downstream. It is possible to specify the number of pending futures that can be alive at any given time (see the first param of the mapAsync call, we used 1 in this case); keep in mind that by using mapAsync the results of the Futures will be emitted in the same order as they are received (i.e.: not the order the Futures are resolved). If you don’t mind about the order you can simply switch to mapAsyncUnordered.

Side-Effects

While your implementations deals with the processing of your data, you might want to attach some side-effects at different points in your flow. For example we use them to keep some metrics updated.

Flow[ByteString]
.via(chunkBuffer)
.via(presenceEventFactory.asStream)
.via(persistentBuffer)
.via(kinesisPublisher)
.alsoTo(countEvents)

The alsoTo call can be used to attach extra Sinks to a stream. In the above example, it’s attached at the end of the processing, but we actually have a couple of those in the middle of the stream in the real implementation. Basically these Sinks will receive all the events incoming from the upstream stage, and will be able to act on them, but the result of such acting will not impact the stream in any way.

val countEvents = Sink.foreach[Event] { _ =>
 metricEventsPerTimeUnitOfSensor.mark()
 metricEventsPerTimeUnit.mark()
}

Threads and Fusion

We’ve already seen that in a single stage there is never concurrency, but actually, by default, this is also true for the whole stream: when the stream is materialized an Actor that will handle all the stages is created. You get the benefit of avoiding the overhead of cross-thread communication in exchange for the loss of parallel execution. This might or might not be a problem based on your scenario. In our case we have may streams running at once, thus we already achieve parallel executions this way. Anyway if parallel execution is required within the same stream, the async call will split a single stream on different threads.

source
 .via(doSomething).async
 .via(doSomething).async
 .runWith(Sink.foreach(println))

Materialized Values and TestKit

Every time a stream is materialized, it’s possible to extract a value from any stage. As an example the Sink.ignore materialized value is a Future[Done] that is completed when the stream itself is completed

val sink: Sink[Any, Future[Done]] = Sink.ignore

A great example of useful materialized values is given by the Akka Streams TestKit.

val myFlow = Flow[String].map { v => v.take(1) }

val (pub, sub) = TestSource.probe[String]
 .via(myFlow)
 .toMat(TestSink.probe[Any])(Keep.both)
 .run()

sub.request(1)
pub.sendNext("Gathering")
sub.expectNext("G")

The TestKit contains the TestSource class that materializes a publisher. The publisher can be used in turn to inject events at will inside a stream, while the TestSink materializes a subscriber that can be used to verify that the received values are the expected ones. As you can see above, using both these classes, we can test any Flow as if it was a simple function by publishing the input and verifying its output.

Keep in mind that while it’s easy to extract the materialized value from Sources and Sinks, it’s even possible to extract a materialized value from a Flow.

Supervisioning

Whenever an error occurs in any stage of a stream, the stage is terminated by default and such failure is then spread across all the stages. It is possible to change this behavior by specifying a custom supervisor. The supervisor can be assigned to the whole stream or to a piece of it and will choose, based on the received exception, what to do with the life of the stream.

val flow = otherFlow
  .withAttributes
    ActorAttributes.supervisionStrategy {
      case _ => Supervision.resume
    }
  )

The supervisor has three options:

  • resume will drop the event that caused the error and will keep the stage running;
  • restart will behave as resume but will also reset the internal state of the stage that had the error;
  • stop will fail the stage.

Documentation

While we’ve just given you an idea on how to write a linear stream (but don’t forget that you can also write non-linear streams!), you’ll still need to keep Documentation and API at hand to write your own code with Akka Streams: enjoy!

The Key Learning

As we’ve discussed at the beginning, our data ingestion and processing pipeline is composed by sets of sensors sending Wi-Fi data to a Collector Service which processes and uploads the computed information to Amazon Kinesis. We’ve seen that the very first implementation of the Collector Service was subject to memory leaks and possible data losses. These problems made it impossible to define our system as a Reactive System. However, Akka Streams enabled us to implement backpressure on the network thanks to the Push-Pull mechanism and the development of the persistent buffer helped us avoiding data losses when Kinesis was unavailable.

Which is the key learning of our experience?

The Actor Model is one of the most important message patterns in Reactive Programming. However, although we built the Collector Service over the Actor Model, we did’t get a Reactive System. So, Actors aren’t sufficient to be Reactive: don’t be blind and don’t use the Actor Model as a guarantee to achieve the “reactiveness” property.

Conference Resources

In this section, you can find the presentation and the video of the talk we’ve held at Scala Italy conference 2016.

Presentation

Video

Subscribe now to get updates on our IoT technology!