CompletableFuture, introduced in Java 8, provides an easy way to write asynchronous, non-blocking, multithreaded code. Since Spring 4.2 it’s now possible to have Controllers, Services, and Repositories return CompletableFuture
from non-private methods annotated with @Async
. In this blog post we will see how we can take advantage of this to write non-blocking, asynchronous code across multiple layers within our application.
You can find the source code for the sample project for this post here
Interface Driven Multilayered Architecture
Our application will consist of 3 layers of responsibility with each layer performing a specific role in the application:
- The Controller layer for presentation (JSON)
- The Service layer for business logic
- The Repository layer for persistence
Using a multilayered architecture pattern allows us to write modular code. With the Interface Driven Design architecture pattern we can establish a well known contract at the boundary of each module. This allows us to encapsulate the implementation details of each module and prevent those implementation details from leaking and causing coupling between modules.
With this approach we can substitute the implementation of a module for an alternative implementation using the same well known contract as defined by that module’s interface e.g. an RDBMS implementation for a MongoDB implementation, or a mock implementation.
This approach makes it much easier unit test our code by being able to mock each module by virtue of it’s interface.
AsyncConfiguration
To achieve non-blocking withCompletableFuture
in our application we will need to configure ThreadPoolTaskExecutor
instances for each layer within our application. The reason for this is to ensure that one layer cannot starve another of threads and cause deadlock.
A bounded thread pool is better for performance as spawning a new thread for each request can be costly. It is also useful to bound a thread pool as it makes you to consider the nature of your application and the resource and tuning requirements it will need in production.
We make use of custom configuration properties, ApplicationProperties
, to configure each ThreadPoolExecutor
.
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
...
@Override
@Bean(name = TASK_EXECUTOR_DEFAULT)
public Executor getAsyncExecutor() {
return newTaskExecutor(TASK_EXECUTOR_NAME_PREFIX_DEFAULT);
}
@Bean(name = TASK_EXECUTOR_REPOSITORY)
public Executor getRepositoryAsyncExecutor() {
return newTaskExecutor(TASK_EXECUTOR_NAME_PREFIX_REPOSITORY);
}
@Bean(name = TASK_EXECUTOR_SERVICE)
public Executor getServiceAsyncExecutor() {
return newTaskExecutor(TASK_EXECUTOR_NAME_PREFIX_SERVICE);
}
@Bean(name = TASK_EXECUTOR_CONTROLLER)
public Executor getControllerAsyncExecutor() {
return newTaskExecutor(TASK_EXECUTOR_NAME_PREFIX_CONTROLLER);
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
private Executor newTaskExecutor(final String taskExecutorNamePrefix) {
final ApplicationProperties.Async asyncProperties = applicationProperties.getAsync();
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(asyncProperties.getCorePoolSize());
executor.setMaxPoolSize(asyncProperties.getMaxPoolSize());
executor.setQueueCapacity(asyncProperties.getQueueCapacity());
executor.setThreadNamePrefix(taskExecutorNamePrefix);
return executor;
}
}
Repository Layer
Here is our repository interface. As we are using MongoDB for this example we extend MongoRepository
:
public interface UserRepository extends MongoRepository<User, String> {
@Async(AsyncConfiguration.TASK_EXECUTOR_REPOSITORY)
CompletableFuture<Page<User>> findAllBy(final Pageable pageable);
@Async(AsyncConfiguration.TASK_EXECUTOR_REPOSITORY)
CompletableFuture<User> findOneById(final String id);
}
We specify our User domain model as the type this Repository
will manage. The id type for the User entity is of type String.
Spring Data allows us to create query methods, methods whose very signature defines a database query. When Spring Data creates a proxy bean for a Repository
it will use the query method signatures to implement the query you wish to execute. The findByconvention
allows us to define query methods and return CompletableFuture
instances from our query methods thus making them asynchronous and non-blocking.
Check out my blog post on creating asynchronous query methods with Spring Data and CompletableFuture
.
Find All Users
With the following query method we can run a query to find all users in our database:
CompletableFuture<Page<User>> findAllBy(final Pageable pageable);
Spring Data supports pagination out of the box and provides the Page
type that represents a page of entity objects. The CompletableFuture<Page<User>>
being returned by our method is yielding a page of User entity objects. To specify the pagination criteria to use in the query the Pageable
parameter is used. Pageable
is another type provided by Spring for this very purpose.
Find One User
The query method to find one user should return only one result so we use the prefix findOne
. We are also searching by the id of the user so we specify that by naming the method findOneById
. The parameter id allows us to pass the id of the user we are searching for. To make the method asynchronous we give it a return type of CompletableFuture<User>
.
CompletableFuture<User> findOneById(final String id);
If no user is found theCompletableFuture
will yield a null result. Spring Data supports returning Optional
from standard query methods, but unfortunately there is no support for returning CompletableFuture<Optional>
.
Service Layer
Now to create our service layer. First we create an interface to define the contract for our service. Other than providing a contract on how you expose the functionality of your service, interface driven development is great for testing.
public interface UserService {
CompletableFuture<Page<User>> findAll(final Pageable pageable);
CompletableFuture<Optional<User>> findOneById(final String id);
}
The implementation for our service is very simple:
@Service
public class UserServiceImpl implements UserService {
private final UserRepository userRepository;
public UserServiceImpl(final UserRepository userRepository) {
this.userRepository = userRepository;
}
@Override
@Async(AsyncConfiguration.TASK_EXECUTOR_SERVICE)
public CompletableFuture<Page<User>> findAll(final Pageable pageable) {
return userRepository.findAllBy(pageable);
}
@Override
@Async(AsyncConfiguration.TASK_EXECUTOR_SERVICE)
public CompletableFuture<Optional<User>> findOneById(final String id) {
return userRepository
.findOneById(id)
.thenApply(Optional::ofNullable);
}
}
To let Spring and maintainers of your code (that includes you) know that we are creating a bean that will be used as a service we annotate the class with @Service
. @Service
doesn’t provide any additional behaviour over @Component
but it may do so at some point in the future and it helps to be explicit in your intentions for the bean.
We are also using the recommended injection method of constructor based injection. If there is only one constructor in the bean then Spring doesn’t require the constructor be annotated with @Inject
.
To enable asynchronous execution using Spring we annotate our method implementations with @Async
and also provide the name of the executor we want the work to be dispatched to. Here we are using the TASK_EXECUTOR_SERVICE
executor.
In the method findOneById
we are transforming our response object of User to Optional<User>
as the requested user may not exist. By wrapping the potentially null returned value in Optional
we are explicitly stating that the returned element may or may not be present.
RestController
Here’s what our RESTful controller looks like:
@RestController
@RequestMapping(value = UserController.REQUEST_PATH_API_USERS)
class UserController {
static final String REQUEST_PATH_API_USERS = "/api/users";
private static final Logger log = LoggerFactory.getLogger(UserController.class);
private static final String REQUEST_PATH_API_USERS_INDIVIDUAL_USER = "/{userId}";
private final UserService userService;
public UserController(final UserService userService) {
this.userService = userService;
}
@Async(AsyncConfiguration.TASK_EXECUTOR_CONTROLLER)
@GetMapping(produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public CompletableFuture<ResponseEntity> getUsers(final Pageable paging) {
return userService
.findAll(paging)
.<ResponseEntity>thenApply(ResponseEntity::ok)
.exceptionally(handleGetUsersFailure);
}
@Async(AsyncConfiguration.TASK_EXECUTOR_CONTROLLER)
@GetMapping(value = REQUEST_PATH_API_USERS_INDIVIDUAL_USER,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public CompletableFuture<ResponseEntity> getUser(@PathVariable final String userId) {
return userService
.findOneById(userId)
.thenApply(mapMaybeUserToResponse)
.exceptionally(handleGetUserFailure.apply(userId));
}
private static Function<Throwable, ResponseEntity> handleGetUsersFailure = throwable -> {
log.error("Unable to retrieve users", throwable);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
};
private static Function<Optional<User>, ResponseEntity> mapMaybeUserToResponse = maybeUser -> maybeUser
.<ResponseEntity>map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
private static Function<String, Function<Throwable, ResponseEntity>> handleGetUserFailure = userId -> throwable -> {
log.error(String.format("Unable to retrieve user for id: %s", userId), throwable);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
};
}
To make our request methods asynchronous and non-blocking we annotate them with the @Async
annotation and specify the executor we want to dispatch the work on.
Get All Users
Here’s the method we use to return all users when an HTTP GET request is received:
@Async(AsyncConfiguration.TASK_EXECUTOR_CONTROLLER)
@GetMapping(produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public CompletableFuture<ResponseEntity> getUsers(final Pageable paging) {
return userService
.findAll(paging)
.<ResponseEntity>thenApply(ResponseEntity::ok)
.exceptionally(handleGetUsersFailure);
}
private static Function<Throwable, ResponseEntity> handleGetUsersFailure = throwable -> {
log.error("Unable to retrieve users", throwable);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
};
The return type from our method is CompletableFuture<ResponseEntity>
and we annotate the method with @Async
and specify the task executor we want to dispatch the work on. In this case we are using the TASK_EXECUTOR_CONTROLLER
task executor.
It is possible to specify the type returned in your ResponseEntity
however in order to return error codes and error information of our own leaving this specificity out is necessary.
The method takes a Pageable
parameter. This is a type provided by Spring and allows the caller to specify paging parameters e.g.
http://localhost:8080/api/users?page=2&size=10
With this URL we are requesting the users resource and specifying the page offset and the number of results to return.
Our service and repository methods for finding all users take this Pageable parameter so all we need to do is pass it on.
When we invoke our service method findAll
we get a CompleteableFuture<Page<User>>
back. Our job is to map this type to the expected type CompletableFuture<ResponseEntity>
. By using the method thenApply onCompletableFuture
we can do something with the return value in this case Page<User>
. All we need to do is wrap the Page<User>
object in a ResponseEntity
and give it a HTTP response code of 200
(OK). Spring provides a handy way of instantiating just such a ResponseEntity
with the ResponseEntity.ok()
method.
.<ResponseEntity>thenApply(ResponseEntity::ok)
We have to explicitly define the return type when invoking the thenApply
method to tell Java that we are creating a ResponseEntity<Page<User>>
object but we want it to be treated as a plain ResponseEntity
object. This helps us when we define our error recovery:
.exceptionally(handleGetUsersFailure);
By specifying error handling code to be run in the event of an exception we can create an appropriate response to the client making the HTTP request. In this case we want to recover from any error and return a status code of 500
:
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
Get One User
This is the method we use to retrieve one user:
@Async(AsyncConfiguration.TASK_EXECUTOR_CONTROLLER)
@GetMapping(value = REQUEST_PATH_API_USERS_INDIVIDUAL_USER,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public CompletableFuture<ResponseEntity> getUser(@PathVariable final String userId) {
return userService
.findOneById(userId)
.thenApply(mapMaybeUserToResponse)
.exceptionally(handleGetUserFailure.apply(userId));
}
private static Function<Optional<User>, ResponseEntity> mapMaybeUserToResponse = maybeUser -> maybeUser
.<ResponseEntity>map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
private static Function<String, Function<Throwable, ResponseEntity>> handleGetUserFailure = userId -> throwable -> {
log.error(String.format("Unable to retrieve user for id: %s", userId), throwable);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
};
Again, to make this method asynchronous and non-blocking we annotate it with @Async
and return CompletableFuture<ResponseEntity>
.
We need to bind the {userId}
path variable to the userId
parameter. This is done with @PathVariable
. Issuing a GET
request to the following URL will invoke this method:
http://localhost:8080/api/users/1234
As with the getAllUsers
method we need to transform the CompletableFuture<Optional<User>>
response we get from the UserService
to ResponseEntity
. To do this we again use the CompletableFuture.thenApplymethod
. This time we create a pure function to transform the response:
private static Function<Optional<User>, ResponseEntity> mapMaybeUserToResponse = maybeUser -> maybeUser
.<ResponseEntity>map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
In the case where the user exists we are mapping the Optional<User>
parameter to a ResponseEntity
with the User object as the body and a status code of 200
(OK). If the Optional<User>
is empty we instead return a ResponseEntity
with no body and a status code of 404
(Not Found).
If there an exception is thrown we recover by sending a response with a status code of 500
(Internal Server Error):
private static Function<String, Function<Throwable, ResponseEntity>> handleGetUserFailure = userId -> throwable -> {
log.error(String.format("Unable to retrieve user for id: %s", userId), throwable);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
};
Wrapping Up
By using this technique you can split up your application into separate layers and have each layer perform tasks asynchronously. This separation of concerns makes it easier for you to conceptualise your logic, combine and recombine your code and logic, and make it easier to tune, and test your code from unit to integration to system testing.
Take a look at this code in action by cloning the source code.