Akka Sharding

Akka Sharding

Akka Cluster Sharding #

Distribute actors across a cluster:

  • Entities: The main unit (e.g., UserId)
  • Shards: Holds entities (e.g., each shard holds 10 UserIds).
  • Shard Region: Holds Shards.
  • Shard Coordinator: Manages shards.

Entity #

The main unit is: Entity identified by the EntityId which in essence represents the aggregate root’s identifier of a concept of our domain (e.g., UserId) and is unique within the cluster. This leads to Strong Consistency given that Akka provides the single thread illusion.

The EntityId is extracted through the ExtractEntityId partial function. This is often modeled using a Envelope (which is not mandatory when the message contains the identifier):

case class Envelope(entityId: String, message: Any)

val idExtractor: ExtractEntityId = {
   case Envelope(id, msg) => (id, msg)
}

The name of the actor essentially becomes the EntityId.

Shards #

In turn, these entities are grouped into shards. The distribution depends on the its identifiers (ShardId) are generated which is usually based on EntityId. An improper distribution leads to a unbalanced cluster, which leads to hotspots (e.g., names, dates, auto-incrementing ids). As rule of thumb, use ~10 shards per node as too many may be costly to find them and too short reduces the capability to distribute them.

In general, the identifier shard identifier is modelled as follows:

val shardIdExtractor: ExtractShardId = {
    case Envelope(id, _) => (Math.abs(id.hashCode % totalShards)).toString
}

Shard Region #

For a type of entity, there is usually one Shard Region per JVM, in other words, we instantiate a shard region (for a given actor type) on every node that we want to host shards. Alternatively, we may merely create a proxy to a shard region. Last but not the least, a shard region is a ActorRef that we can send messages as usual.

Shard Coordinator #

Manages shards. It is responsible to route the messages addressed to a specific entity. It provides the location of the shard which can then be used to acccess the entity.

Runs as an Akka Cluster singleton and it does not do much of work therefore rarely becomes a bottleneck.

From Stateless systems to Stateful systems #

Strong consistency in a stateless system is done by leveraging a DB as the source of truth, to maintain Strong Consistency. However, as the system grows the DB may become a bottleneck. We can leverage Sharding:

  • Load is distributed across multiple machines in the cluster.
  • State can be cached for fast access with no need to read from DB.
  • Strong Consistency is guaranteed by sharded actors backed by the single thread illusion.

The actors may now become a source of contention, however it is distributed across multiple machines. This means that the system is Elastic and we can scale as needed.

When there is a failure, it is located in a single actor - Bulkheading.

Stateful Actors #

Read may be done directly from the state. But writes may have to go through the DB followed by a update of our internal state. This is important as when the Actor fails, we need to rebuild our internal state from DB.

In order to avoid blocking the system while reading from the DB we can use the Stash to store incoming messages until we rebuild the internal state. See following example:

class MyActor(repository: Repository) extends Actor with Stash {

    var state: Option[State] = None

    repository.read(id)                 // Non-blocking DB. Asyncronous read from DB.
      .map(state => StateLoaded(state)) // Notify so that we can change the current context
      .pipeTo(self)

    def receive: Receive = loading

    def loading: Receive = {
      case StateLoaded(s) =>
	state = Some(s)
	unstashAll // read all accumulated messages as soon as we process this message
	context.become(running)
      case State.failure(ex) => throw ex // Best practice: trigger the restart of the actor by default. The stashed messages are not lost.
      case _ => stash()
    }

    def running: Receive = {
       //regular handler
    }
}

Now when we introduce operations that need to update the DB we need to fine-tune this.

case class UpdateState(foo: Int)
case class StateUpdated(state: Option[State])

class MyActor(repository: Repository) extends Actor with Stash {

    var state: Option[State] = None

    repository.read(id)                 // Non-blocking DB. Asyncronous read from DB.
      .map(state => StateLoaded(state)) // Notify so that we can change the current context
      .pipeTo(self)

    def receive: Receive = loading

    def loading: Receive = {
      case StateLoaded(s) =>
	state = Some(s)
	unstashAll // read all accumulated messages as soon as we process this message
	context.become(running)
      case State.failure(ex) => throw ex // Best practice: trigger the restart of the actor by default. The stashed messages are not lost.
      case _ => stash()
    }

    def running: Receive = {
      case UpdateState(foo) =>
	context.become(waiting)
	repository.update(state.copy(foo = foo))
	  .map(StateUpdate.apply)
	  .pipeTo(self)(sender()) // send message to self as soon as the operation is done with the original sender() to reply back to
    }

    def waiting: Receive = {
      case evt @ StateUpdate(state) =>
	unstashAll()              // enqueue messages not processed while the DB was being written.
	context.become(running)   // can process messages as usual
	sender() ! evt            // reply back to the original sender
      case failure @ Status.Failure(ex) =>
	log.error(s"[orderId] FAILURE: ${ex.getMessage}")
	sender() ! failure        // make sure we reply back
	throw ex                  // trigger actor restart
      case _ =>
	stash()

}

Passivation #

This fenomenon can be observer through small dips in the throughput. This happens as the Actors attempts to manage the number of actors in-memory as keep all of them is unreasonable. E.g., idle actors.

Each actor tracks the time it processed a message. If it hadn’t processed a message within a configured time period, it will Passivate, leading to the removal of the actor in-memory.

The period must be tune-up, too long may lead to OOM and too short may lead to constant reads from the DB. Best practise is to determine and then tune up by watching the memory usage.

It can also be done manually by sending a Passivate message to the parent.

Rebalancing #

Occurs whenever the size of the cluster changes. The Shard coordinator will initiate the rebalancing process by distributing the shards across the all available nodes in order to keep an even distribution of entities.

This can only occur in a healthy cluster. Therefore any unreachable nodes must be removed (and terminated before) either manually through Akka Management or using the Lightbend Split Brain Resolver.

Steps:

  1. Coordinator informs Regions that a rebalance has started.
  2. Messages to an entity on a moving shard are buffered.
  3. Oce shared was rebalanced, the queued messages are sent

During rebalancing, the messages delivered follow the at-most-once semantics.

There are several Shard Allocation Strategies, the default one is LeastShardAllocationStrategy.

However, the shards are not automatically restarted. In order for it to happen one needs to use “Remember Entities” with some costs.

Remember Entities #

By enabling remember-entities, when a node restarts/rebalances, it will restore entities. This works by informing every member every time each entity starts or stops (using Akka distributed data) and stored in a durable storage in the disk (it can be recovered even after full cluster restart). However, this be disabled on environments without persistent storage (e.g., Kubernetes), in those cases use eventsourced data mode (see documentation ).

Warning!

  • Enabling this disables automatic passivation.
  • It is not cheap as every node will have to be informed of all running entities, which leads to an overhead starting/stopping them.

Best practice is to limit when we have a limited number of active entities. Most of times is not really needed as entities will be removed automatically through Passivation brought back when needed. However some use-cases:

  • When the entity has a scheduled process that may not have completed.
  • When the time to restart an entity on demand could cause the system to backup (long startup times).
  • When the resource savings of passivating the Entities are insignificant.

With this feature, the node’s ExtractShardId function must handle ShardRegion.StartEntity(entityId).

Note: During startup, some nodes may become overwhelmed. In order to avoid concentrating the shards on a single member of the cluster we set the minimum number of members under cluster settings. This allows unnecessary rebalances during the startup. While the cluster has not enough members, existing members will remain in the Join state.