Contents
- What you'll learn
- Background concepts
- Example: bank scenario
- Adding concurrency with Future and ManagedExecutor
- Enabling MicroProfile Fault Tolerance in Open Liberty
- Adding the @Bulkhead annotation
- Adding the @Asynchronous annotation with @Bulkhead
- Adding the @Fallback annotation
- Interactive bulkhead and asynchronous playground
- Nice work! Where to next?
Tags
Limiting the number of concurrent requests to microservices
Use a MicroProfile Bulkhead policy to limit requests and prevent faults from cascading to the entire system.
What you'll learn
Explore how the MicroProfile Bulkhead policy from the Fault Tolerance feature limits requests and prevents faults from stopping an entire system.
You will learn about the limitations of single-threaded programs by looking at a simple online banking microservice. You'll then implement concurrency to scale your microservice and see how it fails when no fault tolerance is implemented. Next, you'll enable the MicroProfile Fault Tolerance feature and use the Bulkhead policy to prevent the failing method from taking down the whole application. You'll explore the two approaches to Bulkhead, semaphore isolation and thread pool isolation, and the parameters used for the annotation. Finally, you'll add a fallback class, which is invoked if a BulkheadException
is thrown.
When you arrive at the section about the Interactive bulkhead and asynchronous playground, you can modify the parameters of a MicroProfile Bulkhead policy in any combination. You can then run a sample scenario to see how the parameter values affect the results.
Background concepts
Within this guide, we use the MicroProfile Bulkhead, Asynchronous, and Fallback policies to limit the number of concurrent requests to a microservice. These policies prevent system overload and isolate any failing microservices to a limited resource pool.
Bulkhead
The term bulkhead comes from the structure that is used in ships to create partitions in the hull. In the event of a hull breach, these isolated partitions prevent water from filling up the entire hull and sinking the ship.
The bulkhead pattern in software systems works similarly by isolating services to prevent them from starving the system of resources. Two approaches to bulkhead are semaphore isolation and thread pool isolation.
The semaphore isolation approach limits the number of concurrent requests to the service. It rejects requests immediately once the limit is hit.
The thread pool isolation approach uses a thread pool to separate the service from the caller and contain it to a subset of system resources. This approach also provides a waiting queue, rejecting requests only when both the pool and queue are full. Thread pool management adds some overhead, which slightly reduces performance compared to using a semaphore, but allows hanging threads to time out.
Asynchronous
Asynchronous programming allows multiple tasks to run simultaneously rather than waiting for a task to finish before it executes the next. To run multiple tasks, this policy runs tasks on separate threads.
Fallback
A fallback service runs when the main service fails. It can provide graceful failure or continued or partial operation of the original service.
Example: bank scenario
Imagine that you're developing a VirtualFinancialAdvisor (VFA) microservice for a bank to allow clients to chat online with a virtual financial advisor, a resource intensive AI. The initial naive synchronous implementation of the microservice allows only one customer to chat with a financial advisor at a time. As you progress through the guide, you will see how the number of available chat sessions impacts the service and the system.
Begin by requesting an online chat with a virtual financial advisor.
Click
Adding concurrency with Future and ManagedExecutor
The initial synchronous implementation of the VirtualFinancialAdvisor microservice allows only a single virtual financial advisor chat session at a time. Implement concurrency with Future
and ManagedExecutor
to remove the single chat limitation and allow multiple chat sessions to be available at the same time.
Future
is a special return type that is used in Java concurrency to fetch the result of a task when it is available. ManagedExecutor
is a container-managed executor service that creates instances of CompletableFuture
, an implementation of Future
.
Microprofile Context Propagation allows us to build a ManagedExecutor
instance which can propagate context to completion stage actions. Using the @Produces
and @ApplicationScoped
annotations allow injecting the same ManagedExecutor at other points in the application.
requestForVFA
method on line 10 with the following block of code.Alternatively, click
Then, click
Click
Enabling MicroProfile Fault Tolerance in Open Liberty
Microprofile Fault Tolerance allows microservices to handle unavailable services. It uses different policies such as Bulkhead and Fallback to guide the execution and result of some logic. The MicroProfile Fault Tolerance 1.0 feature provides an environment to support resilient microservices through patterns that include bulkheads. Enable the MicroProfile Fault Tolerance 1.0 feature in the server.xml
file of the Open Liberty server where the VirtualFinancialAdvisor microservice runs.
featureManager
element that is in the server.xml
file.Alternatively, click
Then, click
Adding the @Bulkhead annotation
Now that you've seen the application easily fail when it becomes overloaded, let's apply a Bulkhead policy to limit the number of concurrent chat requests to the service. Limiting the number of concurrent requests limits the amount of system resources that service invocations can use and prevents the rest of the system from failing.
With a Bulkhead policy, the microservice runs on the current thread. The number of concurrent threads that invoke the service is limited and managed by a semaphore. When this maximum number of concurrent chat requests is reached, any subsequent requests immediately fail with a BulkheadException
.
The @Bulkhead
annotation has one parameter that specifies the maximum number of concurrent requests to the service. If the parameter is not specified, the default is 10
requests.
After you modify your server.xml
file to include the Fault Tolerance feature, add a Bulkhead policy to the VirtualFinancialAdvisor microservice.
@Bulkhead
annotation with a value of 50
on line 25, before the serviceForVFA
method.Alternatively, click
Then, click
Click
Adding the @Asynchronous annotation with @Bulkhead
The @Bulkhead
annotation limits the number of concurrent financial advisor chat requests. After the maximum number of concurrent chat requests to the VirtualFinancialAdvisor microservice is reached, the next chat request fails. To ease this problem, add the @Asynchronous
annotation to allow additional chat requests to be placed in a waiting queue.
When you use the @Bulkhead
annotation with the @Asynchronous
annotation, a thread pool manages access to the microservice. You configure the maximum number of threads that are used along with a waiting queue size. When a request for a financial advisor cannot be added to the waiting queue, a BulkheadException
is thrown.
To use the @Bulkhead
and @Asynchronous
annotations together, modify your server.xml
file to include the Fault Tolerance feature. Then, configure the @Bulkhead
annotation with the following parameters:
- value: The maximum number of concurrent requests to the service. If the parameter is not specified, the default is
10
requests. - waitingTaskQueue: The size of the waiting queue that holds requests to run at a different time. This parameter must be greater than 0. If the parameter is not specified, the default is
10
requests. This parameter for the@Bulkhead
annotation takes effect only when you use the@Asynchronous
annotation.
It is best practice to set the waitingTaskQueue
parameter so that it has a value equal to or larger than the value
parameter.
@Asynchronous
annotation must return a Future
. To add the @Asynchronous
annotation, update the @Bulkhead
annotation's parameters and change the return type for serviceForVFA
, replace the code from lines 28 - 32 with the following block of code.Alternatively, click
Because the
@Asynchronous
annotation manages thread pools, you no longer need to use ManagedExecutor
in the requestForVFA
method. Replace the requestForVFA
method on line 15 with the following block of code.Alternatively, click
Then, click
Click
Click
Adding the @Fallback annotation
The @Asynchronous
and @Bulkhead
annotations together place requests for a financial advisor in a waiting queue after the number of requests exceeds the specified maximum number of concurrent requests. If the waiting queue is full, a BulkheadException
is thrown. Let's add a fallback service to handle the exception. A fallback service runs when the main service fails. It can provide graceful failure or continued or partial operation of the original service.
The @Fallback
annotation identifies a class or a method that automatically runs when a BulkheadException
occurs. To use the @Fallback
annotation, modify your server.xml
file to include the Fault Tolerance feature. In our scenario, add the ServiceFallbackHandler.class
, which implements the FallbackHandler.class
, to allow a customer to schedule an appointment. When a customer makes a request for a chat session, and the request cannot be handled because the maximum limit of concurrent requests has been reached and the wait queue is full, the ServiceFallbackHandler.handle
method is called. The return type of both methods, ServiceFallbackHandler.handle
and BankService.serviceForVFA
, must be of type Future<Service>
. Otherwise, a FaultToleranceDefinitionException
is thrown.
For more information on using the @Fallback
annotation to identify the fallback as a method instead of a class or to learn about further restricting when the fallback will run by using the applyOn
and skipOn
parameters of the Fallback annotation, see the MicroProfile Circuit Breaker guide.
BankService.java
file, add the @Fallback(ServiceFallbackHandler.class)
annotation on line 20.Alternatively, click
Then, click
Click
Click
Interactive bulkhead and asynchronous playground
Now that you learned about bulkheads and asynchronous threads, you can explore the parameters in the @Bulkhead
annotation and see the asynchronous bulkhead in action.
You learned about the following parameters:
- value: The maximum number of concurrent requests to the service. The default is
10
requests. - waitingTaskQueue: The size of the waiting queue that holds requests to execute at a different time. The default is
10
requests.
It is best practice to set the waitingTaskQueue parameter so that it is has a value equal to or larger than the value parameter.
@Bulkhead
annotation on lines 22 and 23 in the editor. In this simulation, the maximum value for value
and waitingTaskQueue
is 10. Then click Run to use the new values. Repeat the process as many times as you like.
Click Chat request to initiate a request with a virtual financial advisor or End chat to end an arbitrary chat and observe the simulation.
xxxxxxxxxx
package io.openliberty.guides.bulkhead.global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
public Service requestForVFA() {
int counter = ++counterForVFA;
return bankService.serviceForVFA(counter);
}
public Service serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return chatService;
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
xxxxxxxxxx
package io.openliberty.guides.bulkhead.global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
public Service requestForVFA() {
int counter = ++counterForVFA;
return bankService.serviceForVFA(counter);
}
public Service serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return chatService;
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
xxxxxxxxxx
package io.openliberty.guides.bulkhead.global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
public Service requestForVFA() {
int counter = ++counterForVFA;
return bankService.serviceForVFA(counter);
}
public Service serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return chatService;
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
xxxxxxxxxx
package io.openliberty.guides.bulkhead.global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
public Service requestForVFA() {
int counter = ++counterForVFA;
return bankService.serviceForVFA(counter);
}
public Service serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return chatService;
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
xxxxxxxxxx
package global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
@Produces @ApplicationScoped
ManagedExecutor executor = ManagedExecutor.builder().propagated( ThreadContext.APPLICATION ).build();
public Future<Service> requestForVFA() {
int counter = ++counterForVFA;
Future<Service> serviceRequest = executor.runAsync(() -> {
try {
return bankService.serviceForVFA(counter);
} catch (Exception ex) {
handleException();
}
return null;
});
return serviceRequest;
}
public Service serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return chatService;
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
xxxxxxxxxx
package global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
@Produces @ApplicationScoped
ManagedExecutor executor = ManagedExecutor.builder().propagated( ThreadContext.APPLICATION ).build();
public Future<Service> requestForVFA() {
int counter = ++counterForVFA;
Future<Service> serviceRequest = executor.runAsync(() -> {
try {
return bankService.serviceForVFA(counter);
} catch (Exception ex) {
handleException();
}
return null;
});
return serviceRequest;
}
public Service serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return chatService;
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
<feature>mpFaultTolerance-2.1</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
IN PROGRESS 0 /50 |
---|
xxxxxxxxxx
package io.openliberty.guides.bulkhead.global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
@Produces @ApplicationScoped
ManagedExecutor executor = ManagedExecutor.builder().propagated( ThreadContext.APPLICATION ).build();
public Future<Service> requestForVFA() {
int counter = ++counterForVFA;
Future<Service> serviceRequest = executor.runAsync(() -> {
try {
return bankService.serviceForVFA(counter);
} catch (Exception ex) {
handleException();
}
return null;
});
return serviceRequest;
}
@Bulkhead(50)
public Service serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return chatService;
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
<feature>mpFaultTolerance-2.1</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
IN PROGRESS 0 /50 | WAITING 0 /50 |
---|
xxxxxxxxxx
package io.openliberty.guides.bulkhead;
import java.util.concurrent.Future;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
@Produces @ApplicationScoped
ManagedExecutor executor = ManagedExecutor.builder().propagated( ThreadContext.APPLICATION ).build();
public Future<Service> requestForVFA() {
int counter = ++counterForVFA;
return bankService.serviceForVFA(counter);
}
@Asynchronous
@Bulkhead(value = 50,
waitingTaskQueue = 50)
public Future<Service> serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return executor.completedFuture(chatService);
}
}
xxxxxxxxxx
package io.openliberty.guides.bulkhead.global.eBank.microservices;
import org.eclipse.microprofile.faulttolerance.ExecutionContext;
import org.eclipse.microprofile.faulttolerance.FallbackHandler;
import org.eclipse.microprofile.context.ManagedExecutor;
import java.util.concurrent.Future;
@ApplicationScoped
public class ServiceFallbackHandler implements FallbackHandler<Future<Service>> {
@Inject
ManagedExecutor executor;
@Override
public Future<Service> handle(ExecutionContext context) {
return handleFallback(context);
}
private Future<Service> handleFallback(ExecutionContext context) {
Service service = new ScheduleService();
return executor.completedFuture(service);
}
}
xxxxxxxxxx
<?xml version="1.0"?>
<server description="Sample Liberty server">
<featureManager>
<feature>cdi-2.0</feature>
<feature>mpContextPropagation-1.0</feature>
<feature>mpFaultTolerance-2.1</feature>
</featureManager>
<httpEndpoint host="*" id="defaultHttpEndpoint" httpsPort="${default.https.port}" httpPort="${default.http.port}"/>
</server>
xxxxxxxxxx
package io.openliberty.guides.bulkhead.global.eBank.microservices;
import java.util.concurrent.Future;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
public class BankService {
@Inject private BankService bankService;
private int counterForVFA = 0;
@Produces @ApplicationScoped
ManagedExecutor executor = ManagedExecutor.builder().propagated( ThreadContext.APPLICATION ).build();
public Future<Service> requestForVFA() {
int counter = ++counterForVFA;
return bankService.serviceForVFA(counter);
}
@Asynchronous
@Bulkhead(value=5,
waitingTaskQueue=5)
public Future<Service> serviceForVFA(int counterForVFA) {
Service chatService = new ChatSession(counterForVFA);
return executor.completedFuture(chatService);
}
}
Nice work! Where to next?
Nice work! You learned about the benefits of asynchronous processing and how a faulty process could potentially build up many requests, which, if not limited, could consume all resources, such as CPU, threads, and memory, in the host and lead to a cascading failure. You learned that a bulkhead isolation policy can be used to limit the number of parallel executions so that the impact of a failing process is isolated from affecting the whole system. You learned about the two approaches to bulkhead, semaphore isolation and thread pool isolation, and how to identify a fallback service to run when the number of concurrent processes as defined by the bulkhead policy is reached.
What did you think of this guide?




Thank you for your feedback!
What could make this guide better?
Raise an issue to share feedback
Create a pull request to contribute to this guide
Need help?
Ask a question on Stack Overflow
Like Open Liberty? Star our repo on GitHub.
StarWhere to next?
Download the sample application for this guide bundled with Open Liberty on github
Keep exploring MicroProfile with these guides.
Learn more about MicroProfile.