Package org.eclipse.microprofile.reactive.streams.operators
This provides operators for building stream graphs that consume or produce elements using the
Publisher, Subscriber and
Processor interfaces.
There are four primary classes used for building these graphs:
PublisherBuilder, which when built produces aPublisherSubscriberBuilder, which when built produces aCompletionSubscriberProcessorBuilder, which when built produces aProcessorCompletionRunner, which when built produces aCompletionStage
Operations on these builders may change the shape of the builder, for example,
ProcessorBuilder.toList() changes the builder to a
SubscriberBuilder, since the processor now has a
termination stage to direct its elements to.
SubscriberBuilder's are a special case, in that they don't just build a
Subscriber, they build a
CompletionSubscriber, which encapsulates both a
Subscriber and a CompletionStage of the result of the
subscriber processing. This CompletionStage will be redeemed with a result when the
stream terminates normally, or if the stream terminates with an error, will be redeemed with an error. The result is
specific to whatever the Subscriber does, for example, in the case of
ProcessorBuilder.toList(), the result will be a
List of all the elements produced by the Processor, while in the case
of ProcessorBuilder.findFirst(), it's an
Optional of the first element of the stream. In some cases, there is no result, in which case the
result is the Void type, and the CompletionStage is only useful for
signalling normal or error termination of the stream.
The CompletionRunner builds a closed graph, in that case
both a Publisher and Subscriber have been provided, and
building the graph will run it and return the result of the Subscriber as a
CompletionStage.
An example use of this API is perhaps you have a Publisher of rows from a database, and
you want to output it as a comma separated list of lines to publish to an HTTP client request body, which expects a
Publisher of ByteBuffer. Here's how this might be implemented:
Publisher<Row> rowsPublisher = ...;
Publisher<ByteBuffer> bodyPublisher =
// Create a publisher builder from the rows publisher
ReactiveStreams.fromPublisher(rowsPublisher)
// Map the rows to CSV strings
.map(row ->
String.format("%s, %s, %d\n", row.getString("firstName"),
row.getString("lastName"), row.getInt("age))
)
// Convert to ByteBuffer
.map(line -> ByteBuffer.wrap(line.getBytes("utf-8")))
// Build the publisher
.build();
// Make the request
HttpClient client = HttpClient.newHttpClient();
client.send(
HttpRequest
.newBuilder(new URI("http://www.foo.com/"))
.POST(BodyProcessor.fromPublisher(bodyPublisher))
.build()
);
The documentation for each operator uses marble diagrams to visualize how the operator functions. Each element flowing in and out of the stream is represented as a coloured marble that has a value, with the operator applying some transformation or some side effect, termination and error signals potentially being passed, and for operators that subscribe to the stream, an output value being redeemed at the end.
Below is an example diagram labelling all the parts of the stream.

-
ClassDescriptionA builder for a closed reactive streams graph.CompletionSubscriber<T,
R> A subscriber that redeems a completion stage when it completes.Operators for connecting different graphs together.Operators for completing a stream.Operators for handling errors in streams.Operations for transforming a stream.Operations for peeking at elements and signals in the stream, without impacting the stream itself.ProcessorBuilder<T,R> A builder for aProcessor.A stream that completes with a single result.A builder for aPublisher.Primary entry point into the Reactive Streams utility API.Factory interface for providing the implementation of the static factory methods inReactiveStreams.SubscriberBuilder<T,R> A builder for aSubscriberand its result.Operations for transforming a stream.