Reactive Programming (Photo Credit: Reactive Programming Book by Sergi Mansilla)
In the previous parts we were trying to understand what actually reactive programming is and how its mechanism operates. Also, we introduced with project Reactor and some of most important operators, what are marble diagrams of operators, and how can we start thinking in a reactive way? In this part we will try to explore other useful operators. So, let’s start…
Marble diagram of defer (Photo Credit: Project Reactor)
When we need to capture or create a sequence object lazily, then we should use defer(). It lazily supplies a publisher every time a subscription is made on the resulting Flux/Mono, so the actual source instantiation is deferred until each subscription, and the supplier can create a subscriber-specific instance.
var deferred = Mono.defer(() -> Mono.just(UUID.randomUUID()));
deferred.subscribe(x -> System.out.println(x));
blockFirst:
Marble diagram of blockFirst (Photo Credit: Project Reactor)
Subscribe to Flux and block indefinitely until the upstream signals its first value or completes. Returns that value, or null if the Flux completes empty. In case of Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception). Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signals from hot publishers.
String first = Flux.just("blue", "green", "red")
.blockFast();
System.out.println(first);
blockLast:
Marble diagram of blockLast (Photo Credit: Project Reactor)
Subscribe to Flux and block indefinitely until the upstream signals its last value or completes. Returns that value, or null if the Flux completes empty. In case of Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception). Note that each blockLast() will trigger a new subscription: in other words, the result might miss signals from hot publishers.
String last = Flux.just("blue", "green", "red")
.blockLast();
System.out.println(last);
fromCallable:
Marble diagram of fromCallable (Photo Credit: Project Reactor)
It is also used to create sequence objects like Mono lazily. But unlike defer() or just(), fromCallable() creates a Mono producing its value using the provided Callable. If the Callable resolves to null, the resulting Mono completes empty.
var fromCallable = Mono.fromCallable(() -> UUID.randomUUID());
fromCallable.subscribe(x -> System.out.println(x));
fromSupplier:
Marble diagram of fromSupplier (Photo Credit: Project Reactor)
It is also similar to defer() and used to create sequence objects like Mono lazily. But a difference is, it can transform the supplied data into Mono automatically. It creates a Mono, producing its value using the provided supplier. If the supplier resolves to null, the resulting Mono completes empty.
var fromSupplier = Mono.fromSupplier(() -> UUID.randomUUID());
fromSupplier.subscribe(x -> System.out.println(x));
defaultIfEmpty:
Marble diagram of defaultIfEmpty (Photo Credit: Project Reactor)
It’s closely similar to switchIfEmpty(), but the defaultIfEmpty() indicates the completed sequence. It is used to provide a default unique value if this sequence is completed without any data. Another difference is switchIfEmpty() takes a Flux/Mono stream as input, but defaultIfEmpty() takes a raw value.
Marble diagram of distinct (Photo Credit: Project Reactor)
This operator is also self-describing. The distinct() operator tracks elements for each subscriber and filters out the duplicates. So only the distinct values propagate downstream and remove the duplicate ones.
Marble diagram of fromIterable (Photo Credit: Project Reactor)
fromIterable() is used for converting an iterable like an array or list to a Flux stream. On the other hand, fromArray() is array-specific, meaning it works with only the array type.
Marble diagram of concat (Photo Credit: Project Reactor)
Concatenate all sources provided in an Iterable, forwarding elements emitted by the sources downstream. Concatenation is achieved by sequentially subscribing to the first source, then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream. It returns a new Flux concatenating all source sequences.
Marble diagram of concatMap (Photo Credit: Project Reactor)
concatMap() functionality is similar to concat(), which means it concatenates multiple sources and returns a new Flux, but it also maps the items as well. So we can consider it like a combination of the concat() and flatMap() operators.
Marble diagram of zip (Photo Credit: Project Reactor)
Like its name says, the zip() operator combines or zips multiple sources together; that is to say, it waits for all the sources to emit one element and combines these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources are completed. Errors will immediately be forwarded. This “Step-Merge” processing is especially useful in Scatter-Gather scenarios.
Marble diagram of zipWith (Photo Credit: Project Reactor)
Zip this Flux with another publisher source; that is to say, wait for both to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources are completed. Errors will immediately be forwarded. This “Step-Merge” processing is especially useful in Scatter-Gather scenarios.
Marble diagram of then (Photo Credit: Project Reactor)
then() is used when you don’t care about what elements a publisher has output; you only care about when it finishes. So it takes an existing publisher, throws all of its elements away, and then propagates the completion signal or error signals. If we pass any Mono as a parameter of then(), it will propagate the parameter Mono as output.
Marble diagram of thenEmpty (Photo Credit: Project Reactor)
It is also like then(), but the difference is that thenEmpty() not only returns a Mono, but it also takes a Mono as a parameter. It represents a concatenation of the source completion signal and then the second, empty Mono completion signal. In other words, it completes when A and then B have both completed sequentially and doesn’t emit data.
Mono<Void> then = Flux.just("blue", "green", "red")
.then(Mono.just("Orange"))
.thenEmpty(Mono.empty());
then.subscribe();
merge:
Marble diagram of merge (Photo Credit: Project Reactor)
merge() is a similar operator to concat() or zip(), which is used to combine the results or sequences. But there is a difference: merge() can combine different data-type sequences into a single sequence. It merges data from publisher sequences contained in an iterable into an interleaved merged sequence. Unlike concat, inner sources are subscribed to eagerly. A new iterator will be created for each subscriber.
Insha Allah, in the upcoming part and last part of this series, I will try to explore error handling operators. Until than, may Allah keep you healthy and happy.
We use cookies and technologies like Google Analytics and
Microsoft Clarity to understand how users interact with our site
and improve your experience.