Reactive programming is better, scalable, and a faster way to build applications. Reactive Programing can be done with OO languages, however, they make a lot of sense with FP languages. When FP is married to Reactive Programing, we get something called Functional Reactive Programing (FRP). Scala FRP can be used for many purposes like GUI, Robotics, and Music, because it gives you a better model to model time. Reactive programming is a new technique, which works with Streams(also known as Data Flows). Streams is a way to think and code applications in a way which can express data transformations and flow. The main idea is to propagate changes through a circuit or flow. Yes, we are talking about a new way to do async programming.
The main library for Reactive Programing is called Reactive Extensions (Rx) - http://reactivex.io/), originally built for .NET by Eric Meijer. It combines the best ideas from the Observer and Iterator Patterns, and FP. Rx has implementations for many languages like Scala, Java, Python, .NET, PHP, and others (https://github.com/ReactiveX). Coding with Rx is easy, and you can create Streams, combine with query-like operators, and also listen (subscribe) to any observable Streams to perform data transformations. Rx is used by many successful companies today like Netflix, GitHub, Microsoft, SoundCloud, Couchbase, Airbnb, Trello, and several others. In this book, we will use RxScala, which is the Scala implementation of the Reactive Streams.
The following table shows the main class/concepts you need to know in order to work with Rx.
Term / Class |
Concept |
Observable |
Create async composable Streams from sources. |
Observer |
A callback function type. |
Subscription |
The bound between the Subscriber and the Observable. Receives notifications from Observables. |
Reactive Streams is also the name of a common specification trying to consolidate and standardize the reactive stream processing, There are several implementations such as RxJava/RxScala, Reactor, Akka, Slick, and Vert.x. You can find more at https://github.com/reactive-streams/reactive-streams-jvm.
Back to the Observables -- we can perform all kinds of operations with observables. For instance, we can filter, select, aggregate, compose, perform time-based operations, and apply backpressure. There are two big wins with Observables instead of callbacks. First of all, Observables are not opinionated about how low-level I/O and threading happens, and secondly, when you are doing complex code, callbacks tend to be nested, and that is when things get ugly and hard to read. Observables have a simple way to do composition thanks to FP.
Observables push values to consumers whenever values are available, which is great because then the values can arrive in sync or async fashion. Rx provides a series of collection operators to do all sorts of data transformations you may need. Let's see some code now. We will use RxScala version 0.26.1, which is compatible with RxJava version 1.1.1+. RxScala is just a wrapper for RxJava (Created by Netflix). Why not use RxJava straight? Because the syntax won't be pleasant; with RxScala, we can have a fluent Scala experience. RxJava is great, however, Java syntax for this is not pleasant - as Scala is, in fact, pretty ugly.
package scalabook.rx.chap1 import rx.lang.scala.Observable import scala.concurrent.duration._ object SimpleRX extends App { val o = Observable. interval(100 millis). take(5) o.subscribe( x => println(s"Got it: $x") ) Thread.sleep(1000) Observable. just(1, 2, 3, 4). reduce(_+_). subscribe( r => println(s"Sum 1,2,3,4 is $r in a Rx Way")) }
If you run this preceding Scala program, you will see the following output:
Got it: 0 Got it: 1 Got it: 2 Got it: 3 Got it: 4 Sum 1,2,3,4 is 10 in a Rx Way
If you try to run this code in the Scala REPL, it will fail, because we need the RxScala and RxJava dependencies. For this, we will need SBT and dependency management. Do not worry, we will cover how to work with SBT in our Scala application in the next chapter.
Going back to the observables, we need to import the Scala Observable. Make sure you get it from the Scala package, because if you get the Java one, you will have issues: in the very first part of the code, we will get numbers starting from 0 each 100 milliseconds, and this code would run forever. To avoid this, we use the take function to put a limit into the collection, so we will get the first five values. Then, later, we subscribe to the observer, and when data is ready, our code will run. For the first sample, it's pretty easy, we are just printing the values we have got. There is a thread sleep in this program, otherwise, the program would terminate, and you would not see any value on the console.
The second part of the code does something more interesting. First of all, it creates an Observable from a static list of values, which are 1,2,3, and 4. We apply a reduce function into the elements, which will sum all the elements with each other, and then we subscribe and print the result.
package scalabook.rx.chap1 import rx.lang.scala.Observable object ComplexRxScala extends App { Observable. just(1,2,3,4,5,6,7,8,9,10). // 1,2,3,4,5,6,7,8,9,10 filter( x => x%2==0). // 2,4,6,8,10 take(2). // 2,4 reduce(_+_). // 6 subscribe( r => println(s"#1 $r")) val o1 = Observable. just(1,2,3,4,5,6,7,8,9,10). // 1,2,3,4,5,6,7,8,9,10 filter( x => x%2==0). // 2, 4, 6, 8, 10 take(3). // 2, 4 ,6 map( n => n * n) // 4, 16, 36 val o2 = Observable. just(1,2,3,4,5,6,7,8,9,10). // 1,2,3,4,5,6,7,8,9,10 filter( x => x%2!=0). // 1, 3, 5, 7, 9 take(3). // 1, 3, 5 map( n => n * n) // 1, 9, 25 val o3 = o1. merge(o2). // 2,16, 36, 1, 9, 25 subscribe( r => println(s"#2 $r")) }
The preceding first part of the code creates an Observable with numbers from 1 to 10, and then applies a filter
function, which will get only the even numbers. It then reduces them, calculates their sum, and lastly, prints the solution. You can visualize it as depicted in the following image:
For the second part of the code, we create two different observables. The first one is with even numbers and the second one is with odd numbers. These two observables are decoupled from each other; you can control as many observables you want. Later on, the code uses a merge function to join these two observables into a third and new observable containing the content of the first and second observables.
There are many functions and options, and you can see the whole list at http://rxmarbles.com/ and https://github.com/ReactiveX/RxScala. For the sake of simplicity, for now, we are just working with numbers. Later, we will use this to do more advance compositions including database calls and external web services calls.