Akka Java Tutorials Part 5 — Asynchronous HTTP calls using AKKA HTTP
As part of akka java tutorials series in Part 4 we saw how AKKA streams can be leveraged to transform Mysql queries as a stream and discussed the advantages of the same.
In this tutorial I would be covering Akka Http client library for consuming rest services
Sometimes we end up calling external services from the application. Akka HTTP provides a mechanism to make asynchronous non blocking HTTP calls to external systems.
There are two important parts to use it firstly, making a request and secondly, processing the response. I’ll be discussing one example each for a GET and a POST Http requests, and then show how to process the response . I will also be covering the retry mechanism available in AKKA HTTP for the failure scenarios. Let’s get started.
Setting Up the Objects
To start with we need to initialise few objects which we will be using to make the Rest Calls
Marshaller : Marshalling is the process of converting a higher-level (object) structure into some kind of lower-level representation, often a “wire format”. Other popular names for marshalling are “serialization” or “pickling”. In Akka HTTP, marshalling means the conversion of an object of type T
into a lower-level target type, e.g. a MessageEntity
(which forms the “entity body” of an HTTP request or response) or a full HttpRequest
or HttpResponse
.
public static Unmarshaller<HttpEntity, EmployeeResponseDO> unmarshaller;ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); unmarshaller = Jackson.unmarshaller(mapper, EmployeeResponseDO.class);
Http Client API: The client APIs provide methods for calling a HTTP server using the same HttpRequest
and HttpResponse
abstractions that Akka HTTP server uses but adds the concept of connection pools to allow multiple requests to the same server to be handled more performantly by re-using TCP connections to the server.
static Http http;
http = Http.get(actorSystem);
Calling The Rest Services
In this example I am making a POST call to a rest service. I have used http://dummy.restapiexample.com/ for building an example.
Here as we can see the first step is to call the service. We are passing the req body ie employeeResponseDO, using the POST method and specifying the uri.
Next step is to chain the response using thenCompose method. Once that is done we are unmarshalling the response object using the unmarshaller defined in the object declaration above. The main point to note here is for all these the return type is CompletionStage type which makes the entire operation inherently asynchronous.
public static CompletionStage<EmployeeResponseDO> postEmployeeDetails(EmployeeResponseDO employeeResponseDO) {
Gson gson = new Gson();
return http.singleRequest(HttpRequest.create().withMethod(HttpMethods.POST)
.withEntity(HttpEntities.create(ContentTypes.APPLICATION_JSON, gson.toJson(employeeResponseDO)))
.withUri("http://dummy.restapiexample.com/api/v1/create")).thenCompose(res -> {
if (res.status().intValue() != 200) {
res.discardEntityBytes(materializer);
log.info("error in http status");
throw new RuntimeException();
} else {
return unmarshaller.unmarshal(res.entity().withContentType(ContentTypes.APPLICATION_JSON), materializer).<EmployeeResponseDO>thenApply(employee -> {
log.info("Akka HTTP POST" + " employee id:" + employee.id + " employee name:" + employee.name);
return employee;
});
}
}).exceptionally(ex -> {
ex.printStackTrace();
throw new RuntimeException(ex);
});
}
}
Retry Mechanism to handle failures
For any http calls we need a retry mechanism. Retry mechanism helps to properly mitigate any sort of error scenarios which might come in while making the Http Calls. AKKA Http calls is wrapped around as a AKKA streams Source to add the retry logic.
In the example below we are making a GET call. Here I have wrapped the HTTP call as stream. Akka streams provides a RestartSource
, RestartSink
and RestartFlow
for implementing the so-called exponential backoff supervision strategy, starting an operator again when it fails or completes, each time with a growing time delay between restarts.
RestartSource.onFailuresWithBackoff makes it easy to state the retry mechanism. Here I am using Source.fromCompletionStage to wrap the http call in a source and then passing the response to a mapAsycnc and finally run it with a Sink which collects the response.
public static CompletionStage<EmployeeResponseDO> getEmployeeDetails(int id) {
Gson gson = new Gson();
//adding restart logic on rest calls in case of failures
return RestartSource.onFailuresWithBackoff(
Duration.ofSeconds(3),
Duration.ofSeconds(15),
0.2,
2,
() -> Source.fromCompletionStage(http.singleRequest(HttpRequest.create().withMethod(HttpMethods.GET)
.withUri("http://dummy.restapiexample.com/api/v1/employee/" + id)).thenCompose(res -> {
if (res.status().intValue() != 200) {
res.discardEntityBytes(materializer);
log.info("error in http status");
throw new RuntimeException();
} else {
return unmarshaller.unmarshal(res.entity().withContentType(ContentTypes.APPLICATION_JSON), materializer).<EmployeeResponseDO>thenApply(employee -> {
return employee;
});
}
}).exceptionally(ex -> {
ex.printStackTrace();
throw new RuntimeException(ex);
})).mapAsync(1, res -> CompletableFuture.completedFuture(res))).
runWith(Sink.<EmployeeResponseDO,EmployeeResponseDO>fold(null, (total, next) -> {
log.info("Akka HTTP GET" + " employee id:" + next.id + " employee name:" + next.employee_name + " employee age:" + next.employee_age);
return next;
}), materializer);
}
The complete code is available here.
Conclusion
In this series we saw how AKKA HTTP can be leveraged to consume external rest calls from the application.
In the next series I would be covering Akka Streams in Depth.