Book Image

Hands-On Reactive Programming with Python

By : Romain Picard
Book Image

Hands-On Reactive Programming with Python

By: Romain Picard

Overview of this book

Reactive programming is central to many concurrent systems, but it’s famous for its steep learning curve, which makes most developers feel like they're hitting a wall. With this book, you will get to grips with reactive programming by steadily exploring various concepts This hands-on guide gets you started with Reactive Programming (RP) in Python. You will learn abouta the principles and benefits of using RP, which can be leveraged to build powerful concurrent applications. As you progress through the chapters, you will be introduced to the paradigm of Functional and Reactive Programming (FaRP), observables and observers, and concurrency and parallelism. The book will then take you through the implementation of an audio transcoding server and introduce you to a library that helps in the writing of FaRP code. You will understand how to use third-party services and dynamically reconfigure an application. By the end of the book, you will also have learned how to deploy and scale your applications with Docker and Traefik and explore the significant potential behind the reactive streams concept, and you'll have got to grips with a comprehensive set of best practices.
Table of Contents (16 chapters)

Introduction to ReactiveX and RxPY

ReactiveX is a library which aims to make asynchronous programming easy. As the header of the project's website says, it is: The Observer pattern done right. ReactiveX is a library based on the idea of observable streams. A stream is an entity that emits zero, one, or several items, over a period of time. This stream of items can be observed by other entities that are interested in receiving these items and manipulated by them. This simple idea is the basis of what has become an incredibly successful way of doing asynchronous programming.

As said in the very first paragraph of this book, asynchronous programming is a very active field. ReactiveX is a typical example of technologies that did not exist a few years ago but that are now heavily used. It was originally one of the components of the Volta project at Microsoft. This project consisted of a set of developer tools to help with developing client and server parts of web applications. The Volta project was suspended in 2008 but ReactiveX continued to be developed, up to the point when it was publicly released for the .NET platform in 2010. The library was very successful, with a community starting to grow up and big companies such as Netflix and GitHub using it. In 2012, implementations for .NET, JavaScript, and C++ were published as open source projects. Since that time, ReactiveX has even impacted the standardization of some programming languages. ReactiveX now has official implementations for almost 20 programming languages and was the foundation of the Java reactive streams (http://www.reactive-streams.org) standard and the EcmaScript observable (https://github.com/tc39/proposal-observable) API. Nowadays, many other libraries, heavily inspired by ReactiveX, are available for virtually any programming language.

All of this is based on concepts that have already existed for many years, such as the observer design pattern, the iterator design pattern, and some principles from functional programming. The ingenuity came from combining them in such a way that it avoids the callback hell. Even better, it is equally suited for frontend applications that deal with user events and GUI widgets, and backend applications that work with network and database requests.

ReactiveX principles

ReactiveX is based on two entities: observables and observers. These are the only things that one needs to understand to be able to start writing code. Everything else is based on the behavior of one of these two entities.

Observables represent a source of events. An observable is an entity that can emit zero or one of several items. An observable has an explicit lifetime with a start and an end. When an observable completes or faces an error, it cannot send items anymore; its lifetime has ended. An observable may never end. In this case, it is an infinite source of events. Observables are a way to manage sequences of items in an asynchronous way. Table 1.1, which follows, shows a comparison between how to access items in a synchronous or asynchronous way. As you can see, observables fill a gap and are allowed to operate on multiple items in an asynchronous way.

Single item Multiple items
Synchronous Getter Iterable
Asynchronous Future Observable
Table 1.1: Accessing an asynchronous sequence of items if possible

Observables work in push mode, as opposed to the pull mode of an iterable. Each time a new item is available, the observable pushes it to its observer. Table 1.2 shows the difference between the pull mode of an iterator and the push mode of an observable. This is what makes the behavior reactive and easy to handle with asynchronous code: whether items are emitted immediately or later is not important to the observer receiving it, and the code semantic is very similar to the one used in synchronous code:

Event Iterable (pull) Observable (push)
Retrieve data

For i in collection

on_next

Error Exception is raised

on_error

Complete End of loop

on_completed

Table 1.2 : Observables are push based

Observers are the receiving part of the items. An observer subscribes to an observable so that it can receive items emitted by this observable. Just as the observable emits items one after another, an observer receives them one after another. The observable informs the observer of the end of the sequence, either by indicating that the observable has completed (successfully) or by indicating that an error has occurred. These two kinds of completion are notified in a similar way, and so can be handled in a similar way. With ReactiveX, the error management is not a special case, but on a par with the items and completion management. In contrast to iterables that use exceptions, there is no radically different way of handling success from failure.

The implementation of RxPY (as well as all other implementations of ReactiveX) involves two other entities: a subscription function and a disposable object. Figure 1.5 shows a simplified representation of these entities. The AnonymousObservable class is the class that is almost always used to create an observable (directly or via another subclass). This class contains two methods to manage the lifetime of the observable and its observer. The first one, init, is not even a method but the constructor of the class. It takes a subscription function as an input argument. This subscription function will be called when an observer subscribes to this observable. The observable constructor returns a disposable function that can be called to free all resources used by the observable and observer. The second method of the AnonymousObservable class is subscribe. This is the method that is used to attach an observer to an observable and start the observable; that is, to make it start emitting items. The AnonymousObservable class can be used directly, but there are many cases where using an existing RxPY AnonymousObservable subclass is easier. This is typically the case when you need to create an observable from an iterable object or a single object.

The Observer class is a base class that contains three methods. They correspond to the behavior explained previously. This class must be subclassed to implement these three methods. The method on_next is called each time an item is emitted by the observable. The method on_completed is called when the observable completes successfully. Finally, the method on_error is called when the observable completes because of an error. The on_item method will never be called after the on_completed or the on_error methods.

The subscription entity is a function that takes an observer as input parameter. This function is called when the subscribe method of the observable is called. This is where the emission of items is implemented. The emission of these items can be either synchronous or asynchronous. Items are emitted in a synchronous way if the subscription function directly calls the on_items methods of the observable. But items can also be emitted asynchronously if the observer instance is saved and used later (after the subscription function returns). The subscription function can return a Disposable object or function. This Disposable object will be called when the observable is being disposed.

Finally, the Disposable class and its associated dispose function are used to clean up any resources used by an Observer or a subscription. In the case of an asynchronous observable, this is how the subscription function is notified that it must stop emitting items, because Observer is no more valid after that. The following figure shows these components:

Figure 1.5: RxPY components

Let's try to make more sense of all these definitions. The following figure shows a sequence diagram of how these calls are organized when an observable is created, subscribed, and finally disposed:

Figure 1.6: RxPY dynamics. An example of the creation of a synchronous observable and its subscription and disposal

First the application creates an AnonymousObservable and provides the subscription function associated to this observable. It then creates an observer object (actually a subclass of Observer). After that, the observable is subscribed, with reference to the observer object provided as an input parameter. During the call to subscribe, the subscription function is called. In this example, the subscription function is synchronous: it emits three items (the integers 1, 2, and 3), and completes the observable. When the subscription function returns, the observable is already completed. At that point, the subscribe method of AnonymousObservable returns the Disposable function to the application. The application finally calls this dispose function to clean up any resource still used by subscription and observer.

Operators

Now the principles of observables and observers should start to be more clear. However, you may wonder: how can this make development of asynchronous code easier? After all, what was described in the previous section is almost exactly the description of the observer design pattern, with additions for the management of completion and errors. The answer is that this is not the whole story, but the foundation of an extensible framework. What made ReactiveX different from other frameworks is its inspiration from functional programming, with the availability of many operators that can be chained in a pipeline. In the RxPY implementation, operators are methods of the Observable class. On some implementations of ReactiveX, they are implemented as functions which allow us to add new operators very easily. Look at the following example of pseudo-code:

Observable.from_(...)
.filter()
.distinct()
.take(20)
.map(...)

This is what a ReactiveX code looks like. It is a succession of operators that modify the items flowing through them. An operator can be seen as a monad: a construction that encapsulates program logic instead of data. (I apologize to functional programmers for this very simplistic definition.) An operator takes an observable as input and returns another observable. So, we can consider the observable data type as an abstract type that is used to compose functions together. Some operators accept other input parameters to provide additional program logic that will be executed on each item received on the operator. For example, the map operator that will be used later takes a function as a parameter. This function contains the code logic that will be executed on each item sent on the input observable.

Operators are not limited to the code logic of items. Since they work on the observable data type, they can also be used to manage observables' completion and errors. For example, some operators use completion events to chain observables one after another. Other operators use error events to gracefully manage these errors.

The RxPY implementation contains about 140 operators. We will cover the most used, which is about half of them. As you will be writing RxPY code, you will also be writing your own operators, even if they will not be directly usable in the kind of pipeline that we saw earlier. As we will see, writing a custom operator is the way to factorize code and make functions simpler when using ReactiveX.

Installating RxPY

RxPY is available as a Python package published on PyPI (https://pypi.org/), so it can be installed with pip. Depending on your operating system, you may already have pip installed. Otherwise refer to the pip installation documentation (https://pip.pypa.io/en/stable/installing/).

By following the examples in this book, you will install several libraries and sample packages that you may not want to keep on your system. In order to avoid cluttering your environment, you should run all these examples in a virtualenv. virtualenv is a tool that allows you to create isolated Python environments. With it you can create many different Python environments, potentially using different Python interpreters that are independent from each other. This is a great tool when developing in Python because it allows you to:

  • Test libraries without installing them on your system
  • Test some code with different versions of Python interpreters
  • Test some code with different versions of dependency packages
  • Have one independent and reproducible execution environment per project

If you are not already familiar with it, I encourage you to use it when running the example code that we will use. The installation of virtualenv is very easy via PyPI, using the following command:

pip3 install virtualenv

Creating a new virtual environment is also quite easy, in a single line with three parameters, using the following line of code:

virtualenv --system-site-packages -p /usr/local/bin/python3 venv-rx

The --system-site-packages option indicates that we want to use system packages when they are already installed. If you omit this parameter you will create a complete Python environment from scratch. This would be better for isolation but requires much more space because each dependency will be reinstalled in virtualenv. In our case, we do not need a strict isolation from the system; we only want to avoid installing new packages in our system. The -p option indicates what the Python interpreter will use. You should adapt it to your environment. The /usr/local/bin value corresponds to a macOS system. On Linux, the cpython interpreter is usually located at /usr/bin. Finally the venv-rx parameter is the name of virtualenv created. After running this command you will see a new directory named venv-rx.

Once virtualenv is created, you can enter it and leave it. Once you have entered virtualenv, all actions done by the Python interpreter are done in this isolated environment. Entering into virtualenv is done by sourcing a script in its bin directory, as can be seen in the following code:

source venv-rx/bin/activate
(venv-rx)$

When you are inside virtualenv, the name of virtualenv is printed in parentheses before the shell prompt. This is how you know if you are inside an isolated environment or on your system environment. Leaving virtualenv is done by executing the deactivate function, as can be seen in the following code:

(venv-rx)$ deactivate
$

From that point, the rx package can be installed via the pip tool, as can be seen in the following code:

$ source venv-rx/bin/activate
(venv-rx)$ pip install rx

If you want to install the latest development version, you can also install it directly from the GitHub sources. First clone the repository, and then install RxPY from the local sources, as demonstrated in the following code:

(venv-rx)$ git clone https://github.com/ReactiveX/RxPY.git
(venv-rx)$ cd RxPy.git
(venv-rx)$ python3 setup.py install