Book Image

Learning RxJava

By : Thomas Nield
Book Image

Learning RxJava

By: Thomas Nield

Overview of this book

RxJava is a library for composing asynchronous and event-based programs using Observable sequences for the JVM, allowing developers to build robust applications in less time. Learning RxJava addresses all the fundamentals of reactive programming to help readers write reactive code, as well as teach them an effective approach to designing and implementing reactive libraries and applications. Starting with a brief introduction to reactive programming concepts, there is an overview of Observables and Observers, the core components of RxJava, and how to combine different streams of data and events together. You will also learn simpler ways to achieve concurrency and remain highly performant, with no need for synchronization. Later on, we will leverage backpressure and other strategies to cope with rapidly-producing sources to prevent bottlenecks in your application. After covering custom operators, testing, and debugging, the book dives into hands-on examples using RxJava on Android as well as Kotlin.
Table of Contents (21 chapters)
Title Page
Credits
About the Author
Acknowledgements
About the Reviewers
www.PacktPub.com
Customer Feedback
Preface

A quick exposure to RxJava


 Before we dive deep into the reactive world of RxJava, here is a quick exposure to get your feet wet first. In ReactiveX, the core type you will work with is the Observable. We will be learning more about the Observable throughout the rest of this book. But essentially, an Observable pushes things. A given Observable<T>pushes things of type T through a series of operators until it arrives at an Observer that consumes the items. For instance, create a new Launcher.java file in your project and put in the following code:

import io.reactivex.Observable;
public class Launcher {
      public static void main(String[] args) {
        Observable<String> myStrings =
          Observable.just("Alpha", "Beta", "Gamma", "Delta", 
"Epsilon");
      }
}

In our main() method,  we have an Observable<String> that will push five string objects. An Observable can push data or events from virtually any source, whether it is a database query or live Twitter feeds. In this case, we are quickly creating an Observable using Observable.just(), which will emit a fixed set of items.

Note

In RxJava 2.0, most types you will use are contained in the io.reactivex package. In RxJava 1.0, the types are contained in the rx package.

However, running this main() method is not going to do anything other than declare Observable<String>. To make this Observable actually push these five strings (which are called emissions), we need an Observer to subscribe to it and receive the items. We can quickly create and connect an Observer by passing a lambda expression that specifies what to do with each string it receives:

import io.reactivex.Observable;

public class Launcher {
      public static void main(String[] args)  {
        Observable<String> myStrings =
          Observable.just("Alpha", "Beta", "Gamma", "Delta", 
"Epsilon");

        myStrings.subscribe(s -> System.out.println(s));
      }
}

 When we run this code, we should get the following output:

    Alpha 
    Beta 
    Gamma 
    Delta 
    Epsilon

What happened here is that our Observable<String> pushed each string object one at a time to our Observer, which we shorthanded using the lambda expression s -> System.out.println(s). We pass each string through the parameter s (which I arbitrarily named) and instructed it to print each one. Lambdas are essentially mini functions that allow us to quickly pass instructions on what action to take with each incoming item. Everything to the left of the arrow -> are arguments (which in this case is a string we named s), and everything to the right is the action (which is System.out.println(s)).   If you are unfamiliar with lambda expressions, turn to Appendix, to learn more about how they work. If you want to invest extra time in understanding lambda expressions, I highly recommend that you read at least the first few chapters of Java 8 Lambdas (O'Reilly) (http://shop.oreilly.com/product/0636920030713.do) by Richard Warburton. Lambda expressions are a critical topic in modern programming and have become especially relevant to Java developers since their adoption in Java 8. We will be using lambdas constantly in this book, so definitely take some time getting comfortable with them.

 We can also use several operators between Observable and Observer to transform each pushed item or manipulate them in some way. Each operator returns a new Observable derived-off the previous one but reflects that transformation. For example, we can use map() to turn each string emission into its length(), and each length integer will then be pushed to Observer , as shown in the following code snippet:

import io.reactivex.Observable;

public class Launcher {
 public static void main(String[] args) {

   Observable<String> myStrings =
     Observable.just("Alpha", "Beta", "Gamma", "Delta",
      "Epsilon");

     myStrings.map(s -> s.length()).subscribe(s -> 
     System.out.println(s));
  }
}

When we run this code, we should get the following output:

    5
    4
    5
    5
    7

If you have used Java 8 Streams or Kotlin sequences, you might be wondering how Observable is any different. The key difference is that Observable pushes the items while Streams and sequences pull the items. This may seem subtle, but the impact of a push-based iteration is far more powerful than a pull-based one. As we saw earlier, you can push not only data, but also events. For instance, Observable.interval() will push a consecutive Long at each specified time interval, as shown in the following code snippet. This Long emission is not only data, but also an event! Let's take a look:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
      public static void main(String[] args) {
        Observable<Long> secondIntervals =
          Observable.interval(1, TimeUnit.SECONDS);

        secondIntervals.subscribe(s -> System.out.println(s));

        /* Hold main thread for 5 seconds
        so Observable above has chance to fire */
        sleep(5000);
      }

      public static void sleep(long millis) {
        try {
          Thread.sleep(millis);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
}

When we run this code, we should get the following output:


0 
1 
2 
3 
4 

When you run the preceding code, you will see that a consecutive emission fires every second. This application will run for about five seconds before it quits, and you will likely see emissions 0 to 4 fired, each separated by a just a second's gap. This simple idea that data is a series of events over time will unlock new possibilities in how we tackle programming.

On a side note, we will get more into concurrency later, but we had to create a sleep() method because this Observable fires emissions on a computation thread when subscribed to. The main thread used to launch our application is not going to wait on this Observable since it fires on a computation thread, not the main thread. Therefore, we use sleep() to pause the main thread for 5000 milliseconds and then allow it to reach the end of the main() method (which will cause the application to terminate). This gives Observable.interval() a chance to fire for a five second window before the application quits.

Throughout this book, we will uncover many mysteries about Observable and the powerful abstractions it takes care of for us. If you've conceptually understood what is going on here so far, congrats! You are already becoming familiar with how reactive code works. To emphasize again, emissions are pushed one at a time all the way to Observer. Emissions represent both data and an event, which can be emitted over time. Of course, beyond map(), there are hundreds of operators in RxJava, and we will learn about the key ones in this book. Learning which operators to use for a situation and how to combine them is the key to mastering RxJava. In the next chapter, we will cover Observable and Observer much more comprehensively. We will also demystify events and data being represented in Observable a bit more.