Akka Streams

Akka Streams

Actor Streams leverage the actor system to consume streams of data. In fact, each element of a stream is a Message within the Actor System

Here, data flows through a chain of processing stages:

  • Sources: The “source” (e.g., CSV file).
  • Sinks: The “destination” (e.g., a file).
  • Flows: Transformations made to the data within the Stream (e.g., total number of lines).
  • Runnable Graphs: A stream where all inputs and outputs are connected.

All these stages are composable and, in order to start the flow they have to be materialized. As soon they are connected, each staga materializes a value. See available operators here .

Moreover, there are two types of Streams:

  • Linear Streams: Linear flow.
  • Graphs: Where there may be branches points in the Stream (aka Junctions). Useful for more complex use-cases.

In any case, by default the stages in a Linear Stream run syncronously inside a single together (“Fused” together) but can also be configured to run asyncronously in separate actors.

Last but not he least: backpressure is managed by a pull/push mechanism. I.e.:

  1. Subscriber signals demand which is sent upstream via subscription.
  2. Publishers receives demand and pushes data (if available) downstream.

Source #

Stage with single output: Source[+Out, +Mat]:

  • Out: The type of each element that is produced.
  • Mat: Type of the materialized value. Usually NotUsed.

Source only push data as long as there is demand. The source will have to deal with incoming data until demand resumes (how how largely demands on the use-case).

See available operators here .

Sink #

Stage with a single input: Sink[-In, +Mat]:

  • In: The type of each element that is consumed.
  • Mat The type of each element that is produced. E.g., Future[Int].

It creates backpressure by controlling Demand. Note that if the stream is infinite, these sinks may never complete.

See available operators here .

Flows #

Single input and single output: Flow[-In, +Out, +Mat].

Acts both as producer and consumer therefore it propagates demand to the producer as well propagating (and transforming) messages produced to downstream stages.

The most notable operators (way too many):

Note that some of these operations are directly accessible from Source and does not require additional typing.

Additional notes:

  • Buffer smooths flow inconsistencies.
  • extrapolate to deal with slow producers.
  • batch to deal with slow consumers.
  • conflate which creates a summary of the elements when the producer is faster - What is the usefulness?

Runnable Graphs #

Connects source, flows and sinks so that data can start flowing.

This is done using via followed to and finally run on a Source as follows:

import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}

implicit val system = ActorSystem("QuickStart")
Source(1 to 10)
   .via(Flow[Int].map(_ * 2))
   .to(Sink.foreach(println))
   .run

Notes:

  • via: connects Flow to a Source returning a new Source. Also allows composing two Flows.
  • to: connects a Sink to a Source returning a RunnableGraph. Also connects to a Flow to build new Sink. In essence, it materializes the value from the stage is called on.
  • The run is a terminal operator. There are others.

While the flow is running, values are materialized. These values are then acessed using to, source.toMat(Sink)(Transform/CombineFunction), ~source.viaMat(flow)(Transform/CombineFunction).

Finally, there are some shortcuts:

  • Source.runWith(Sink).
  • Source.runForeach(Function).
  • Source.runFold(0)(_ + _).
  • Source.runReduce(_ + _).

Fault Tolerancy - TODO: Review Examples #

Default strategy is to stop processing the stream and can be overriden within the ActorMaterializer by passing a decider that given an exception it either decides:

  • Stop: terminate with an error.
  • Resume: Drop the failing element.
  • Restart: The element is dropped and the stream continues after restarting the stage. Any state acumulated by that stage will be cleared.

Via attributes, each stage can be fine-tuned:

  • Dispatcher
  • Buffer Sizes
  • Log Level
  • Supervision

With a Supervision Strategy:

import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}
import java.lang.ArithmeticException

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}

val possibleDivisionByZero =
  Flow[Int].map(i => 100 / i)
    .withAttributes(
      ActorAttributes.supervisionStrategy(decider)
    )

implicit val system = ActorSystem("QuickStart")
Source(-1 to 1)
   .via(possibleDivisionByZero)
   .runWith(Sink.foreach(println))

However some errors are recoverable, in this case we provide a PartialFunction[Throwable, T]. It will terminate the stream graciously passing the resulting value as the final value.

import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}
import java.lang.ArithmeticException

val possibleDivisionByZero =
  Flow[Int].map(i => 100/i)
    .withAttributes(
      ActorAttributes.supervisionStrategy(decider)
    )
    .recover {
      case _: ArithmeticException => 0
    }

implicit val system = ActorSystem("QuickStart")
Source(-1 to 1)
   .via(possibleDivisionByZero)
   .runWith(Sink.foreach(println))

Graphs #

Introduces Junctions which take multiple inputs and multiple outputs. Basic ones are:

  • Fan-in: N inputs + 1 output. See operators .
  • Fan-out: 1 input + N outputs. See operators .

For example, using Fan-in we can randombly select one of the inputs, give preference or merely zip them. Then, using Fan-out, we can broadcast the values or unzip to create two individual streams.

We can use GraphDSL to easily connect them visually (documentation ). E.g.:

import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ClosedShape, Supervision}
import akka.stream.scaladsl.{Broadcast, Flow, Merge, RunnableGraph, Sink, Source, GraphDSL}

implicit val system = ActorSystem("QuickStart")
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.foreach(println)

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val f1, f2, f3, f4 = Flow[Int].map(_ + 10)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape                 // Indicates no open inputs or outputs, this means that the graph is runnable. The opposite means that it is a partial graph.
}).run()

It is also possible to build partial graphs, or Shapes. There are already some built-in:

  • Linear Shapes (<Source|Flow|Sink>Shape)
  • Junction Shapes with the same input/output types (UniformFan<In|Out>Shape).
  • Junction Shapes with different inputs/outputs types (Fan<In|Out>Shape<arity>).

Simpler graphs can be done using, for example, the simpler Sink.combine API.

Fusion #

By default, Akka “fuses” all stages onto a single syncronous one to run on a single actor (auto-fusing can be disabled) but this limits the benefits we are looking for.

In order to add a asyncronous boundary, we just need to add async which disables fusing for that stage, which means that we are adding an additional overhead (Actors, mailboxes and buffers). Its benefits largely depends on the use-case. A good principle is

  1. Insert an async boundary to bisect the stream into two subsections of roughly equal processing time.
  2. We insert an async boundary to bisect the stream into two subsections of roughly equal processing time.

In other words, check at the current pipeline where the stages can be split so that they can be performed in paralell and joined almost at the same time. This implies looking at Telemetry and verify which stages can be processed in paralell given the graph we have.