Akka Java Tutorials Part 1-AKKA Distributed Data

Part 1- Akka Distributed Data

gaurav ranjan
5 min readNov 1, 2019

In this tutorial I am going to discuss about Akka Distributed Data , how it leverages Conflict Free Replicated Data Types (CRDTs) & build a simple distributed cache using it.

Introduction to Akka Distributed Data

Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types (CRDTs).

All data entries are spread to all nodes, in the cluster via direct replication and gossip based dissemination. You have fine grained control of the consistency level for reads and writes.

What is Conflict Free Replicated Data Types (CRDTs) & how Akka leverages it?

In distributed computing, a conflict-free replicated data type (CRDT) is a data structure which can be replicated across multiple computers in a network, where the replicas can be updated independently and concurrently without coordination between the replicas, and where it is always mathematically possible to resolve inconsistencies which might result.

Akka uses CRDTs for maintaining eventual consistency across cluster nodes. It makes it possible to perform updates from any node without coordination. Concurrent updates from different nodes will automatically be resolved by the monotonic merge function, which all data types must provide. The state changes always converge.

Several useful data types for counters, sets, maps and registers are provided and you can also implement your own custom data types.The data objects could be in various forms like counters, sets maps, etc. As per the document we have following standard data types available.

  • Counters: GCounter, PNCounter
  • Sets: GSet, ORSet
  • Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
  • Registers: LWWRegister, Flag

Apart from these we can very well use custom data types also. In my example I am using ORMap which is basically a Key-Value pair data type. The objective is to build simple distributed cache.

Some Concepts before we start to get into code.

Replicator

The akka.cluster.ddata.Replicator actor provides the API for interacting with the data. The Replicator actor must be started on each node in the cluster, or group of nodes tagged with a specific role.

Consistency

The consistency level that is supplied in the Update and Get specifies per request how many replicas that must respond successfully to a write and read request.

There are various levels of consistencies like readAll, writeAll ,WriteMajority. or ReadMajority. For All Consistency all the nodes must respond. For Majority (N / 2 + 1) nodes must respond.

With these concepts lets dig into code. For more info we can check out the links mentioned above.

Akka Cluster

For this to work a cluster needs to be setup. Setting a akka cluster is really easy. We just need to add bunch of configuration in the akka configuration file. We need to have cluster mode on. We can check the configs here.

akka {
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
stdout-loglevel
= "INFO"

actor {
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
java = "akka.serialization.JavaSerializer"
}
serialization-bindings {
"com.akka.tutorials.core.ClusterMessages" = java
"com.akka.tutorials.dto.CrdtMessage" = java
}

default-dispatcher.throughput = 1

deployment {
/ping {
router = round-robin-pool
cluster {
enabled = on
use-role = "akka-java-tutorials-node"
}
}

}
}

remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
#hostname = "127.0.0.1"
port = 0
}
log-remote-lifecycle-events = off
}

cluster {
roles = ["akka-java-tutorials-node"]
retry-unsuccessful-join-after = 4s
auto-down-unreachable-after = 3s # this doesn't work
use-dispatcher = cluster-dispatcher
failure-detector {
heartbeat-interval = 2s
min-std-deviation = 500ms
threshold = 8.0
acceptable-heartbeat-pause = 4s

}
}
}
cluster-dispatcher {
type = "Dispatcher"
executor
= "fork-join-executor"
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
}
}

Apart from these we need to set up the seed nodes

akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp.port = 2500
}
cluster {
seed-nodes = ["akka.tcp://akka-java-tutorials@127.0.0.1:2500"]
auto-down-unreachable-after = 1s
}
}

And finally we create the actor system

public static void init() {
config = setConfig();
actorSystem = ActorSystem.create("akka-java-tutorials", config);
mat = ActorMaterializer.create(actorSystem);
CrdtQueries.init(actorSystem);
crdtActor = actorSystem.actorOf(CrdtActor.props(actorSystem));
}
private static Config setConfig() {
Config configAll = ConfigFactory.load();
Config envq = configAll.getConfig("development");
String address = "127.0.0.1";

Config common = configAll.getConfig("common");
return ConfigFactory.parseString(
"akka.remote.netty.tcp.hostname=" + address).withFallback(envq).withFallback(common);
}

Once the akka cluster set up is done we start with our distributed data set.

Replicator

We define the replicator actor.

ActorRef replicator = DistributedData.get(actorSystem).replicator();

Consistency

Define the consistency levels as desired. I am using All Consistency level.

final static Replicator.WriteConsistency writeAll = new Replicator.WriteAll(Duration.ofSeconds(5));final static Replicator.ReadConsistency readAll = new Replicator.ReadAll(Duration.ofSeconds(5));

KEY

Key represents the data store which we are going to use in our cluster. We are using a String and an custom object as value

private static final Key<ORMap<String, CrdtMessage>> KEY = ORMapKey.create("crdt-tutorial");

Replicated Data(CRDT)

The custom object represents the CRDT in this case. As we see we have a merge function which defines how we want to collect the updates on the data object. Concurrent updates from different nodes will automatically be resolved by the monotonic merge function, which all data types must provide. The state changes always converge.. The current implementation is a very simple one. But it could be more complex

@Data
@Builder
public class CrdtMessage implements ReplicatedData {

String key;
String message;

@Override
public ReplicatedData merge(ReplicatedData that) {
return that;
}
}

Subscribe

Subscribe to changes to CRDT data objects change events.

@Override
public void preStart() {
Replicator.Subscribe<ORMap<String, CrdtMessage>> subscribe = new Replicator.Subscribe(KEY, getSelf());
replicator.tell(subscribe, ActorRef.noSender());
}

Listen to changes within the data objects in the OnRecieve method of the actor in which is subscribed to the event.

@Override
public void onReceive(Object message) {
if (message instanceof Replicator.Changed) {
Replicator.Changed changed = (Replicator.Changed<ORMap<String, CrdtMessage>>) message;
Key key = changed.key();
if (key.equals(KEY)) {
ORMap<String, CrdtMessage> orMap = (ORMap<String, CrdtMessage>) changed.dataValue();
orMap.getEntries().forEach((k, v) -> {
log.info("crdt-key:" + k + " crdt-message:" + v.getMessage());
});
}

Crdt Queries

After all these we finally need to query our data object. Here we have a simple key-value object. I will showcase updates/addition ,deletion and reads. Let us take a look.

Reads

Using Replicator.Get for this operation.


public static CompletionStage<ORMap<String, CrdtMessage>> readAllFileMessages() {
CompletableFuture<Object> messageMap = ask(replicator,
new Replicator.Get<ORMap<String, CrdtMessage>>(KEY, readAll),
timeoutCache).toCompletableFuture();
return messageMap.<ORMap<String, CrdtMessage>>thenApply(respobjeçt -> {
if (respobjeçt instanceof Replicator.GetSuccess) {
Replicator.GetSuccess getSuccess = (Replicator.GetSuccess<ORMap<String, CrdtMessage>>) respobjeçt;
return ((ORMap<String, CrdtMessage>) getSuccess.dataValue());
} else {
return ORMap.create();
}
});
}

Updates/Writes

Using Replicator.Update for this operation

public static CompletableFuture<Object> updateFileMessage(CrdtMessage crdtMessage, SelfUniqueAddress node) {    Replicator.Update<ORMap<String, CrdtMessage>> update =
new Replicator.Update<>(KEY, ORMap.create(), writeAll,
Optional.of(crdtMessage),
map -> map.put(node, crdtMessage.getKey(), crdtMessage));
return ask(replicator,
update,
timeoutCache).toCompletableFuture();
}

Deletion

public static CompletableFuture<Object> removeMessage(CrdtMessage crdtMessage, SelfUniqueAddress node) {    Replicator.Update<ORMap<String, CrdtMessage>> update =
new Replicator.Update<>(KEY, ORMap.create(), writeAll,
Optional.of(crdtMessage),
map -> map.remove(node, crdtMessage.getKey()));
return ask(replicator,
update,
timeoutCache).toCompletableFuture();
}

The complete code is available here.

Conclusion

We covered akka distributed data here. More exhaustive literature can be found on the links provided. This could be a great start for integration it in your application. In the next story I will be covering a design to build a singleton, fault-tolerant cron job within akka cluster using Cluster Singleton and Akka Scheduler.

Stay Tuned!!

--

--