Akka Streams
Actor Streams leverage the actor system to consume streams of data. In fact, each element of a stream is a Message within the Actor System
Here, data flows through a chain of processing stages:
Sources
: The “source” (e.g., CSV file).Sinks
: The “destination” (e.g., a file).Flows
: Transformations made to the data within the Stream (e.g., total number of lines).Runnable Graphs
: A stream where all inputs and outputs are connected.
All these stages are composable and, in order to start the flow they have to be materialized. As soon they are connected, each staga materializes a value. See available operators here .
Moreover, there are two types of Streams:
Linear Streams
: Linear flow.Graphs
: Where there may be branches points in the Stream (akaJunctions
). Useful for more complex use-cases.
In any case, by default the stages in a Linear Stream
run syncronously inside a single together (“Fused” together) but can also be configured to run asyncronously
in separate actors.
Last but not he least: backpressure is managed by a pull/push mechanism. I.e.:
- Subscriber signals demand which is sent upstream via subscription.
- Publishers receives demand and pushes data (if available) downstream.
Source #
Stage with single output: Source[+Out, +Mat]
:
Out
: The type of each element that is produced.Mat
: Type of the materialized value. UsuallyNotUsed
.
Source only push data as long as there is demand. The source will have to deal with incoming data until demand resumes (how how largely demands on the use-case).
See available operators here .
Sink #
Stage with a single input: Sink[-In, +Mat]
:
- In: The type of each element that is consumed.
- Mat The type of each element that is produced. E.g., Future[Int].
It creates backpressure by controlling Demand. Note that if the stream is infinite, these sinks may never complete.
See available operators here .
Flows #
Single input and single output: Flow[-In, +Out, +Mat]
.
Acts both as producer and consumer therefore it propagates demand to the producer as well propagating (and transforming) messages produced to downstream stages.
The most notable operators (way too many):
- Simple Operators
- Timer Driven
- Asyncronous
- Backpressure Aware
- Nesting and flatenning
- Time Aware
- Fan-in
- Fan-Out
Note that some of these operations are directly accessible from Source
and does not require additional typing.
Additional notes:
Buffer
smooths flow inconsistencies.extrapolate
to deal with slow producers.batch
to deal with slow consumers.conflate
which creates a summary of the elements when the producer is faster - What is the usefulness?
Runnable Graphs #
Connects source, flows and sinks so that data can start flowing.
This is done using via
followed to
and finally run
on a Source
as follows:
import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
implicit val system = ActorSystem("QuickStart")
Source(1 to 10)
.via(Flow[Int].map(_ * 2))
.to(Sink.foreach(println))
.run
Notes:
via
: connectsFlow
to aSource
returning a newSource
. Also allows composing twoFlows
.to
: connects aSink
to aSource
returning aRunnableGraph
. Also connects to aFlow
to build newSink
. In essence, it materializes the value from the stage is called on.- The
run
is a terminal operator. There are others.
While the flow is running, values are materialized. These values are then acessed using to
, source.toMat(Sink)(Transform/CombineFunction)
, ~source.viaMat(flow)(Transform/CombineFunction)
.
Finally, there are some shortcuts:
Source.runWith(Sink)
.Source.runForeach(Function)
.Source.runFold(0)(_ + _)
.Source.runReduce(_ + _).
Fault Tolerancy - TODO: Review Examples #
Default strategy is to stop processing the stream and can be overriden within the ActorMaterializer
by passing a decider that given an exception it either decides:
- Stop: terminate with an error.
- Resume: Drop the failing element.
- Restart: The element is dropped and the stream continues after restarting the stage. Any state acumulated by that stage will be cleared.
Via attributes, each stage can be fine-tuned:
Dispatcher
Buffer Sizes
Log Level
Supervision
With a Supervision Strategy:
import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}
import java.lang.ArithmeticException
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val possibleDivisionByZero =
Flow[Int].map(i => 100 / i)
.withAttributes(
ActorAttributes.supervisionStrategy(decider)
)
implicit val system = ActorSystem("QuickStart")
Source(-1 to 1)
.via(possibleDivisionByZero)
.runWith(Sink.foreach(println))
However some errors are recoverable, in this case we provide a PartialFunction[Throwable, T]
. It will terminate the stream graciously passing the resulting value as the final value.
import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}
import java.lang.ArithmeticException
val possibleDivisionByZero =
Flow[Int].map(i => 100/i)
.withAttributes(
ActorAttributes.supervisionStrategy(decider)
)
.recover {
case _: ArithmeticException => 0
}
implicit val system = ActorSystem("QuickStart")
Source(-1 to 1)
.via(possibleDivisionByZero)
.runWith(Sink.foreach(println))
Graphs #
Introduces Junctions
which take multiple inputs and multiple outputs. Basic ones are:
For example, using Fan-in
we can randombly select one of the inputs, give preference or merely zip them. Then, using Fan-out
, we can broadcast the values or unzip to create two individual streams.
We can use GraphDSL
to easily connect them visually (documentation
). E.g.:
import $ivy.`com.typesafe.akka::akka-actor:2.6.3`
import $ivy.`com.typesafe.akka::akka-stream:2.6.3`
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ClosedShape, Supervision}
import akka.stream.scaladsl.{Broadcast, Flow, Merge, RunnableGraph, Sink, Source, GraphDSL}
implicit val system = ActorSystem("QuickStart")
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach(println)
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape // Indicates no open inputs or outputs, this means that the graph is runnable. The opposite means that it is a partial graph.
}).run()
It is also possible to build partial graphs, or Shapes
. There are already some built-in:
- Linear Shapes (
<Source|Flow|Sink>Shape
) - Junction Shapes with the same input/output types (
UniformFan<In|Out>Shape
). - Junction Shapes with different inputs/outputs types (
Fan<In|Out>Shape<arity>
).
Simpler graphs can be done using, for example, the simpler Sink.combine
API.
Fusion #
By default, Akka “fuses” all stages onto a single syncronous one to run on a single actor (auto-fusing can be disabled) but this limits the benefits we are looking for.
In order to add a asyncronous boundary, we just need to add async
which disables fusing for that stage, which means that we are adding an additional overhead (Actors, mailboxes and buffers). Its benefits largely depends on the use-case. A good principle is
- Insert an async boundary to bisect the stream into two subsections of roughly equal processing time.
- We insert an async boundary to bisect the stream into two subsections of roughly equal processing time.
In other words, check at the current pipeline where the stages can be split so that they can be performed in paralell and joined almost at the same time. This implies looking at Telemetry and verify which stages can be processed in paralell given the graph we have.