Optimizing asynchronous communication with MicroProfile Reactive Messaging
Integrating MicroProfile Reactive Messaging and Apache Kafka with the liberty-kafka
connector provides an efficient asynchronous communication method for Open Liberty applications. This setup helps you handle large volumes of data efficiently, which is essential for event-driven systems.
The following sections describe how to intergrate MicroProfile Reactive Messaging with Apache Kafka to send messages within and between applications:
Configure the Liberty-Kafka connector
The liberty-kafka
connector enables applications to send and receive messages from an Apache Kafka broker. It uses MicroProfile Reactive Messaging standards for robust, asynchronous communication in microservices architectures.
Integrate Kafka with Open Liberty by following these steps:
1. Configure the Kafka broker connection
In the microprofile-config.properties
file, specify the Kafka broker addresses to establish a connection, which indicates where your Kafka broker is hosted.
mp.messaging.connector.liberty-kafka.bootstrap.servers=myKafkaBroker:9092
By configuring a MicroProfile application as shown in the example, to connect to a Kafka broker at myKafkaBroker:9092
for messaging. The application is able to send and receive messages through Kafka, facilitating event-driven communication.
2. Define channels for messaging
To specify the Kafka topic from which messages are received, create a channel for incoming messages.
mp.messaging.incoming.myChannel.connector=liberty-kafka mp.messaging.incoming.myChannel.topic=myTopicName
To specify the Kafka topic from which messages are sent, create a channel for outgoing messages.
mp.messaging.outgoing.myChannel.connector=liberty-kafka mp.messaging.outgoing.myChannel.topic=myTopicName.
By setting up a channel as shown in the examples, you connect a message channel directly to Kafka, which gives you precise control over the messaging channels. You can specify attributes such as bootstrap.servers
for connection and topic
for message direction, enhancing your application’s ability to scale and remain robust by efficiently managing messages.
For more information on Liberty-Kafka connector options and channel properties, see Liberty-Kafka connector options and channel properties.
3. Include Kafka client libraries
To integrate Kafka into your application environment by using Open Liberty, choose one of the following methods based on your requirement:
Include Kafka libraries as an application dependency
To use the Kafka connector provided by Open Liberty, you must include the Kafka client API jar in your application.
If you are building your application with Maven, add the Kafka client dependency in your Maven pom.xml
file.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
This approach integrates Kafka client libraries directly into your application. It does not require any additional server configuration for permissions, simplifying deployment and configuration management.
Include Kafka libraries as a shared library
You can integrate Kafka client libraries as a shared resource within the Open Liberty server. This approach is useful for situations where several applications on the same server instance require the Kafka client libraries. It effectively minimizes duplication.
However, if Kafka client libraries are used as a shared library, you must explicitly grant the necessary Java permissions for the libraries to function correctly. These permissions allow the Kafka client to connect to Kafka brokers, read system properties, and access or modify security properties.
To configure these permissions, you can use the server.xml
configuration file. The following example demonstrates how to grant the necessary permissions to a Kafka client library that is specified as a shared library:
<variable name="kafkaCodebase" value="${server.config.dir}/kafkaLib/kafka-clients-<client.version>.jar"/>
<javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanServerPermission" name="createMBeanServer"/>
<javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanPermission" name="*" actions="*"/>
<javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanTrustPermission"name="register"/>
<!-- Kafka client reads system properties -->
<javaPermission codebase="${kafkaCodebase}" className="java.util.PropertyPermission"name="*"actions="read"/>
<!-- Kafka client connects to the kafka broker server -->
<javaPermission codebase="${kafkaCodebase}" className="java.net.SocketPermission"name="*"actions="connect"/>
<!-- Kafka client loads serializers and deserializers by name -->
<javaPermission codebase="${kafkaCodebase}" className="java.lang.RuntimePermission"name="getcodebase="${kafkaCodebase}" classLoader"actions="*"/>
<!-- Kafka reads truststores -->
<javaPermission codebase="${kafkaCodebase}" className="java.io.FilePermission" name="*" <!-- all files in the current directory (i.e. the server directory) --> actions="read"/>
<!-- Kafka client allowed to invoke the Subject.doAs methods -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="doAs"/>
<!-- Kafka client allowed to call getSubject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="getSubject"/>
<!-- Kafka client sets properties for the Simple SASL/PLAIN Server Provider -->
<javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="putProviderProperty.Simple SASL/PLAIN Server Provider"/>
<!-- Kafka client allowed to set a Provider -->
<javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="insertProvider"/>
<!-- Kafka client allowed access to private Credentials belonging to a particular Subject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.PrivateCredentialPermission" name="* * "*"" actions="read"/>
<!-- Kafka client allowed to modify the set of public credentials associated with a Subject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPublicCredentials"/>
<!-- Kafka client allowed to modify the set of private credentials associated with a Subject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPrivateCredentials"/>
4. Configure security for the liberty-kafka connector
For more information on security and authentication methods, see Kafka connector security configuration.
Sending and receiving messages among applications by using connectors
To send and receive messages from other systems, reactive messaging uses connectors. Connectors can be attached to one end of a channel and are configured by using MicroProfile Config. Open Liberty includes the liberty-kafka
connector for sending and receiving messages from an Apache Kafka broker.
The following example shows you how to configure a microservice for retrieving messages from a Kafka topic by using MicroProfile Reactive Messaging and the liberty-kafka connector.
mp.messaging.incoming.foo.connector=liberty-kafka mp.messaging.incoming.foo.bootstrap.servers=kafkabrokerhost:9092 mp.messaging.incoming.foo.group.id=foo-reader mp.messaging.incoming.foo.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.incoming.foo.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
The kafkabrokerhost:9092
Kafka broker address, the foo-reader
consumer group ID, and the deserializers for both key and value are org.apache.kafka.common.serialization.StringDeserializer
, indicating that both keys and values are expected to be strings.
Similarly, the following example shows you how to set up a microservice to send messages to a Kafka broker by using MicroProfile Reactive Messaging and the liberty-kafka connector.
mp.messaging.outgoing.bar.connector=liberty-kafka mp.messaging.outgoing.bar.bootstrap.servers=kafkabrokerhost:9092 mp.messaging.outgoing.bar.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.bar.value.serializer=org.apache.kafka.common.serialization.StringSerializer
The example uses the liberty-kafka
connector to manage the connection between the application and Kafka. The bootstrap.servers
setting points to kafkabrokerhost:9092
, the Kafka broker’s network address, allowing the application to locate and send messages to the Kafka cluster. The key
and value
of messages are configured to use StringSerializer
. The application serializes both parts of the message as strings for Kafka transmission.
The application gains the ability to offload messages to the Kafka topic bar
. This approach to distributed messaging enhances scalability and flexibility in handling data flows.
For more information on liberty-kafka
connector options and channel properties, see Liberty-kafka connector options and channel properties.
For more information, see Creating the consumer in the inventory microservice in the Creating reactive Java microservices guide.
Troubleshooting
To troubleshoot the liberty-kafka
connector, address key issues like Kafka connectivity, managing multiple server instances, and giving distinct identifiers to producers and consumers. Make sure that the bootstrap.servers
are configured correctly for connection. Each consumer has a distinct group.id
to prevent conflicts, and producers need a distinct client.id
to avoid identifier overlap.
Multiple server instances
If you start multiple instances of Open Liberty with the same application, you must assign a distinct group.id
to each channel for every server instance. This requirement applies to all incoming channels. Without a distinct group.id
on each server instance, the server will block any new connections to a topic after the first connection is established. This policy makes sure that each connection to a topic is distinct and properly managed across all server instances.
Multiple Reactive Messaging applications using the same Kafka server
When multiple applications with a Kafka client are deployed to Open Liberty and connect to the same Kafka server, errors can happen. The errors come from conflicting identifiers that both Kafka producers and consumers use within these applications. Kafka generates the client.id
for both, that lead to these conflicts, especially since consumers determine their identifiers by using either their group.id
or client.id
.
To mitigate these conflicts, it is essential to make sure that each consumer channel has a distinct group.id
and each producer channel has a distinct client.id
. However, specifying these attributes directly on the liberty-kafka
connector is not an effective solution and must be avoided.
These steps aim to identify and address common challenges that arise during the integration of Kafka with Open Liberty. They help in facilitating the smooth functioning of your microservices architecture.
For more information on Apache Kafka, see the Apache Kafka documentation.