It's hard to wrap your head around reactive concepts when you're coming from a traditional programming model. Some of the subsequent sections are aimed at introducing you to reactive concepts and how they evolved into their present state.
The official document for Reactive Streams (http://www.reactive-streams.org/) says that—Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
It started as an initiative between a group of companies in 2013. In April 2015, 1.0 of the specification was released and there were a number of implementations (such as Akka Streams and Vert.x) available at the same time. The specification was initiated with a target to get it included in the official Java standard library and in 2017, with the release of JDK9, it made it's way into it officially. As with any specification, the ultimate aim is to have a number of implementations conforming to the specification, and over time, the specification evolves. The specification consists of some core interfaces, some rules around these, and a Technology Compatibility Kit (TCK).
TCK is a suite of tests that will be executed to check the correctness/compliance of a Java Specification Request (JSR) implementation. In Java Community Process (JCP), TCK is one of the three required components for ratifying a JSR. The other two are JSR specification and JSR reference implementation. The TCK for the Java platform is called Java Compatibility Kit (JCK).
Being a specification, it enables any implementation respecting the specification to cooperate and interoperate with each other. For example, an implementation written in Akka can talk to the Vert.x implementation over the Reactive Streams protocol without any trouble. Adoption is growing and, as we speak, more implementations that conform to the specifications written in different languages are being released:
Figure 3: Reactive Streams Specification/API
The preceding figure clearly shows the Reactive Streams Specification. Some of the important specification rules are as follows:
- The calls from
Publisher
toSubscriber
andSubscriber
toPublisher
shouldn't be concurrent in nature. - The
Subscriber
can perform its job synchronously or asynchronously but always has to be non-blocking in nature. - From
Publisher
toSubscriber
there should be an upper bound defined. After that defined bound, buffer overflows occur and could result in errors. - Apart from NullPointerException (NPE), no other exception can be raised. In the case of NPE,
Publisher
calls theonError
method andSubscriber
cancels theSubscription
.
In the preceding definition of Reactive Streams, there are some very important terms, namely non-blocking and backpressure, which we'll explore a bit more to understand the core concepts of Reactive Streams.
Non-blocking means threads never block. If the thread needs to block, the code is written in such a way that the thread gets notified at the right time and the process continues. Reactive programming lets you implement a non-blocking, declarative, and event-driven architecture.
One of the approaches to writing non-blocking applications is by using messages as the means of sending data. A thread sends the request and soon after that, the thread is being used for something else. When the response is ready, it is delivered back using another thread and the requesting party is notified so that further processing can continue:
Figure 4: Non-blocking
The non-blocking concept is already implemented by well-known frameworks, such as Node.js and Akka. The approach that Node.js uses is a single thread that sends data in a multiplexing aspect.
In telecommunications and computer networks, multiplexing (sometimes contracted to muxing) is a method by which multiple analog or digital signals are combined into one signal over a shared medium. The aim is to share an expensive resource. For more information about multiplexing, you can visit the following link: http://www.icym.edu.my/v13/about-us/our-news/general/722-multiplexing.html.
In an ideal scenario, every message produced by the Producer
is passed to the Subscriber
as and when the message is produced without any delay. There is a chance that the Subscriber
is unable to handle the messages at the same rate as they are produced and this can cramp its resources.
Backpressure is a method by which the Subscriber
can tell the Producer
to send messages at a slower rate to give the Subscriber
time to handle these messages properly without putting too much pressure on its resources.
Since this is the first chapter, we are just introducing you to these important reactive concepts. Code examples will be covered in subsequent chapters.
Now that we have a brief idea of Reactive Streams and Reactive Streams Specification, we will go into next important reactive concept in Java, namely Reactive Extensions.
Reactive Extensions (Rx or ReactiveX) (https://msdn.microsoft.com) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Data sequences can take many forms, such as a stream of data from a file or web service, web services requests, system notifications, or a series of events such as user inputs.
As stated in the preceding definition, these are APIs that allow stream composition using the Observer pattern. It's my duty to introduce you to the Observer pattern before going any further. The following is the definition of this pattern and it's quite intuitive:
The Observer pattern defines a provider (also known as a subject or an observable) and zero, one, or more observers (Subscriber
). Observers register with the provider, and whenever a predefined condition, event, or state change occurs, the provider automatically notifies all observers by calling one of their methods. For more information about the Observer pattern, you can refer to this link: https://docs.microsoft.com/en-us/dotnet/standard/events/observer-design-pattern.
Data can flow in a number of forms, such as streams or events. Reactive Extensions lets you convert this dataflow into observables and aids you in programming reactive code.
Rx is implemented in a variety of languages, including Java (RxJava). A full list of implemented languages and more detail on Rx can be found at http://reactivex.io/.
RxJava is a Java VM implementation of ReactiveX—a library for composing asynchronous and event-based programs by using observable sequences.
RxJava was ported from .NET to the world of Java by Netflix. After almost two years of development, a stable release of the API was made available in 2014. This stable release targets Java (Version 6 and above), Scala, JRuby, Kotlin, and Clojure.
RxJava is a single-JAR, lightweight library and focuses on Observable abstraction. It facilitates integration with a variety of external libraries, making the library align with reactive principles. Some examples are rxjava-jdbc
(database calls using JDBC with RxJava Observables) and Camel RX (Camel support for Reactive Extensions using RxJava).
RxJava 2.x is a complete rewrite from its predecessor, RxJava 1.x.
RxJava 1.x was created before Reactive Streams Specification, and because of this it doesn't implement it. RxJava 2.x, on the other hand, is written on top of Reactive Streams Specification and fully implements it, and also targets Java 8+. RxJava types in RxJava 1.x have been fully tweaked to comply with the specification and suffered heavy changes when the rewrite took place. It's good to note that there exists a bridge library (https://github.com/ReactiveX/RxJavaReactiveStreams) that bridges between RxJava 1.x types and Reactive Streams, allowing RxJava 1.x to pass the Reactive Streams TCK-compliance tests.
In RxJava 2.x, many concepts remain intact but names have been changed to comply with the spec.
We will not be going deep into RxJava as it is a big topic and there are plenty of books available that dive deep into RxJava.
As part of concurrency updates to JDK 9 (JEP 266), Reactive Streams was added to the Java standard library. Reactive Streams was initiated in 2013 by some of the well-known organizations that wanted to standardize the approach by which asynchronous data can be exchanged between software components. Soon, the concept became adopted by the industry and there evolved a number of implementations that all had similar core concepts but lacked standard nomenclature and terminologies, especially as regards interfaces and package naming. To avoid multiple nomenclatures and to enable interoperability between implementations, JDK 9 included basic interfaces as part of the Flow Concurrency library. This made applications want to implement Reactive Streams to depend on this library but not include specific implementations into the code base. Thus it is very easy to swap between implementations without any trouble.
These interfaces are coded as static interfaces within the java.util.concurrent.Flow
class.
Reactive Streams specifications in Java 9 revolve around just four interfaces—Publisher
, Subscriber
, Subscription
, and Processor
. The library also includes a Publisher
implementation—SubmissionPublisher
. All of these are included within the java.util.concurrent
package in the Java standard library. We will touch upon these interfaces in the following subsections.
The definition of this interface is as follows:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
As you can see, Publisher
allows the Subscriber
interface to subscribe to it so as to receive the message when Publisher
produces it.
The definition of this interface is as follows:
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
As you can see, the Subscriber
interface's onSubscribe
method allows Subscriber
to be notified when Publisher
accepts the Subscription
. The onNext
method is invoked when new items get published. As the name suggests, the onError
method is invoked when there's an error and the onComplete
method gets invoked when Publisher
has completed its function.
The definition of this interface is as follows:
public interface Subscription { public void request(long n); public void cancel(); }
The method request is for accepting requests for items and method cancel is for when Subscription
is cancelled.
The definition of this interface is as follows:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
It inherits from both the Publisher
and Subscriber
interfaces and therefore inherits all the methods of these interfaces. The main aspect is that the Publisher
can produce an item but the Subscriber
can consume a different item than that produced by the Publisher
.