Akka Java Tutorials Part 4- JDBC Query Processing using Akka Stream

gaurav ranjan
5 min readNov 22, 2019

As part of akka java tutorials series in Part 3 we saw how to leverage Akka’s distributed Pub/Sub for to build a simple distributed queuing system.

In this tutorial I am going to cover Jdbc query processing using Akka Streams. Before I delve deep lets discuss few things about streaming systems.

What is Reactive Streams?

Reactive Streams is a standard for asynchronous data processing in a streaming fashion. With their inclusion in Java 9, as the java.util.concurrent.Flow interfaces, Reactive Streams are becoming the go-to tool for building streaming components of applications for years to come. It’s worth pointing out that Reactive Streams are “just” a standard, and by themselves can’t do anything — we need to use an implementation thereof, and in this tutorial we’ll use Akka Streams — one of the main leading implementations of RS since their inception.

Basics of Akka Streams

In Akka Streams, the processing pipeline (the graph) consists of three types of elements: a Source (the producer), a Sink (the consumer), and Flows (the processing stages).

Using those components, you define your graph, which is nothing more than a recipe for processing your data — it doesn’t do any computations so far. To actually execute the pipeline, you need to materialize the graph, i.e. convert it to a runnable form. In order to do it, you need a so-called materializer, which optimizes the graph definition and actually runs it. Therefore, the definition of the graph is completely decoupled from the way of running it, which, in theory, lets you use any materializer to run the pipeline. However, the built-in ActorMaterializer is actually the status quo, so chances are you won’t be using any other implementation.

I would be covering Akka Streams in more depth in future tutorials.

JDBC Query Processing-The Problem

Classical Big Data solutions have a couple of components. Ingesting events through a message broker, processing them in some kind of engine and storing them in a database or distributed filesystem. Generally we have cases where data is produced at a faster rate compared to the rate it is consumed or persisted to the database. And just increasing buffer sizes until some hardware limit is reached, eg. memory isn’t a solution. This is where backpressure comes into play. It helps to avoid unbounded buffering across asynchronous boundaries. Also, looking at the database and the JDBC specification it quickly becomes clear, that most databases don’t support non-blocking, asynchronous calls. Introducing blocking code in a stream based solution won’t work.

Slick for JDBC-The Solution

Slick is easy to use in asynchronous, non-blocking application designs, and supports building applications according to the Reactive Manifesto. Unlike simple wrappers around traditional, blocking database APIs, Slick gives you:

  • Clean separation of I/O and CPU-intensive code: Isolating I/O allows you to keep your main thread pool busy with CPU-intensive parts of the application while waiting for I/O in the background.
  • Resilience under load: When a database cannot keep up with the load of your application, Slick will not create more and more threads (thus making the situation worse) or lock out all kinds of I/O. Back-pressure is controlled efficiently through a queue (of configurable size) for database I/O actions, allowing a certain number of requests to build up with very little resource usage and failing immediately once this limit has been reached.
  • Reactive Streams for asynchronous streaming.
  • Efficient utilization of database resources: Slick can be tuned easily and precisely for the parallelism (number of concurrent active jobs) and resource ussage (number of currently suspended database sessions) of your database server.

Alpakka-Jdbc

The Alpakka project is an open source initiative to implement stream-aware and reactive integration pipelines for Java and Scala. It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure. Akka Streams is a Reactive Streams and JDK 9+ java.util.concurrent.Flow-compliant implementation and therefore fully interoperable with other implementations.

In this tutorial I am using Alpakka-SlickJdbc for interacting with the Db layer. The connector provides Scala and Java DSLs to create a Source to stream the results of a SQL database query and a Flow/Sink to perform SQL actions (like inserts, updates, and deletes) for each element in a stream. It is built on the Slick library to interact with a long list of supported relational databases.

With this introduction lets start with the implementation.

Step 1: The Db configuration in application.conf. We are using embedded H2 database.

slick-h2 {
profile = "slick.jdbc.H2Profile$"
db {
connectionPool = disabled
dataSourceClass = "slick.jdbc.DriverDataSource"
properties
= {
driver = "org.h2.Driver"
url
= "jdbc:h2:file:./target/db/testdb"
user
= "sa"
password
= ""
}
}
}

Step 2: Creating a Slick Session with the config.

slickSession = SlickSession.forConfig(config.getConfig("slick-h2"));

Step 3: I have Employee DB with attributes id,name,age and city. I will be showcasing the CRUD operation.

public class DBQueries {

public final static String selectEmployees = "select id,age,name,city from employees";
public final static String updateEmployee = "update employees set city='<city>' where id=<id>";
public final static String createEmployee = "insert into employees values(<id>,'<name>',<age>,'<city>')";
public final static String deleteEmployee = "delete from employees where id=<id>";
}

Step 4: Writing DB queries using Slick Apis.

Select Query

public static CompletionStage<ArrayList<Employee>> selectEmployeeDetails() {

return Slick.source(slickSession,
DBQueries.selectEmployees,
(SlickRow res) -> {
return mapEmployee(res);
}).async().runWith(Sink.fold(new ArrayList<Employee>(), (total, next) -> {
log.info("employee id: {}, employee name :{}, employee city: {}", next.get(0).id, next.get(0).name, next.get(0).city);
total.addAll(next);
return total;
}), materializer).exceptionally(ex -> {
log.error(ex, "error in selecting employees");
throw new CompletionException(ex);
});
}

Update Query

public static CompletionStage<Done> updateEmployee(Employee employee) {
List<Employee> employees = new ArrayList<>();
employees.add(employee);
return Source.from(employees)
.runWith(
Slick.sink(
slickSession,
(msg) ->
DBMapper.getUpdateQuery(employee.city, employee.id)),
materializer);
}

Insert Query

public static CompletionStage<Done> insertEmployee(Employee employee) {
List<Employee> employees = new ArrayList<>();
employees.add(employee);
return Source.from(employees)
.runWith(
Slick.sink(
slickSession,
(msg) ->
DBMapper.getinsertQuery(employee.city, employee.name, employee.age, employee.id)),
materializer);
}

Delete Query

public static CompletionStage<Done> deleteEmployee(Employee employee) {
List<Employee> employees = new ArrayList<>();
employees.add(employee);
return Source.from(employees)
.runWith(
Slick.sink(
slickSession,
(msg) ->
DBMapper.getDeleteQuery(employee.id)),
materializer);
}

The complete code is available here.

Conclusion

In this series we saw how AKKA streams can be used for firing DB queries via JDBC. It is highly useful for cases where we face issues with respect to fast producer and slow consumer.

In the next part I would be covering Akka HTTP client library in Depth.

--

--