Interface OutgoingConnectorFactory
- All Superinterfaces:
ConnectorFactory
bean implementing this
interface. This bean is called for every stream that needs to be created for this specific transport
(so Kafka in this example). These streams are connected to methods annotated with
Outgoing.
The factory is called to create a subscriber for each configured transport. The configuration is done
using MicroProfile Config. The following snippet gives an example for a hypothetical Kafka connector:
mp.messaging.outgoing.my-channel.connector=acme.kafka mp.messaging.outgoing.my-channel.topic=my-topic mp.messaging.connector.acme.kafka.bootstrap.servers=localhost:9092 ...
The configuration keys are structured as follows: mp.messaging.[incoming|outgoing].channel-name.attribute or
mp.messaging.[connector].connector-name.attribute.
Channel names are not expected to contain . so the first occurrence of a . in the channel-name portion
of a property terminates the channel name and precedes the attribute name.
For connector attributes, the longest string, inclusive of .s, that matches a loadable
connector is used as a connector-name. The remainder, after a . separator, is the attribute name.
Configuration keys that begin
mp.messaging.incoming are not used for OutgoingConnectorFactory configuration.
The channel-name segment in the configuration key corresponds to the name of the channel used in the
Outgoing annotation:
@Outgoing("my-channel")
public CompletionStage<String> produce(String s) {
// ...
}
The set of attributes depend on the connector and transport layer (For example, bootstrap.servers is Kafka specific).
The connector attribute indicates the name of the connector.
It will be matched to the value returned by the Connector qualifier
used on the relevant OutgoingConnectorFactory bean implementation.
This is how a reactive messaging implementation looks for the specific OutgoingConnectorFactory required for
a channel.
Any mp.messaging.connector attributes for the channel's connector are also included in the set
of relevant attributes. Where an attribute is present for both a channel and its connector the value of the channel
specific attribute will take precedence.
In the previous configuration, the reactive messaging implementation would need to find the
OutgoingConnectorFactory implementation qualified with the Connector qualifier with the value
acme.kafka to create the my-channel subscriber. Note that if
the connector cannot be found, the deployment must be failed with a DeploymentException.
The getSubscriberBuilder(Config) is called for every channel that needs to be created. The
Config object passed to the method contains a subset of the global configuration, and with the prefixes removed.
So for the previous configuration, it would be:
bootstrap.servers = localhost:9092 topic = my-topic
So the connector implementation can retrieve the value with Config.getValue(String, Class) and
Config.getOptionalValue(String, Class).
If the configuration is invalid, the getSubscriberBuilder(Config) method must throw an
IllegalArgumentException, caught by the reactive messaging implementation and triggering a failure in the
deployment.
Note that a Reactive Messaging implementation must support the configuration format described here. Implementations are free to provide additional support for other approaches.
-
Field Summary
Fields inherited from interface org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory
CHANNEL_NAME_ATTRIBUTE, CONNECTOR_ATTRIBUTE, CONNECTOR_PREFIX, INCOMING_PREFIX, OUTGOING_PREFIX -
Method Summary
Modifier and TypeMethodDescriptionSubscriberBuilder<? extends Message<?>,Void> getSubscriberBuilder(Config config) Creates a channel for the given configuration.
-
Method Details
-
getSubscriberBuilder
Creates a channel for the given configuration. The channel's configuration is associated with a specificconnector, using theConnectorqualifier's parameter indicating a key to whichOutgoingto use.Note that the connection to the transport or broker is generally postponed until the subscription.
- Parameters:
config- the configuration, nevernull, must contain theConnectorFactory.CHANNEL_NAME_ATTRIBUTEattribute.- Returns:
- the created
SubscriberBuilder, must not benull. - Throws:
IllegalArgumentException- if the configuration is invalid.NoSuchElementException- if the configuration does not contain an expected attribute.
-