Book Image

C++ Reactive Programming

By : Praseed Pai, Peter Abraham
Book Image

C++ Reactive Programming

By: Praseed Pai, Peter Abraham

Overview of this book

Reactive programming is an effective way to build highly responsive applications with an easy-to-maintain code base. This book covers the essential functional reactive concepts that will help you build highly concurrent, event-driven, and asynchronous applications in a simpler and less error-prone way. C++ Reactive Programming begins with a discussion on how event processing was undertaken by different programming systems earlier. After a brisk introduction to modern C++ (C++17), you’ll be taken through language-level concurrency and the lock-free programming model to set the stage for our foray into the Functional Programming model. Following this, you’ll be introduced to RxCpp and its programming model. You’ll be able to gain deep insights into the RxCpp library, which facilitates reactive programming. You’ll learn how to deal with reactive programming using Qt/C++ (for the desktop) and C++ microservices for the Web. By the end of the book, you will be well versed with advanced reactive programming concepts in modern C++ (C++17).
Table of Contents (20 chapters)
Title Page
Copyright and Credits
Packt Upsell
Contributors
Preface
Index

Pull-versus push-based reactive programming


Reactive programs can be classified as push-based and pull-based. The pull-based system waits for a demand to push the data streams to the requestor (or subscriber in our case). This is the classic case where the data source is actively polled for more information. This employs the iterator pattern, and IEnumerable <T>/IEnumerator <T> interfaces are specifically designed for such scenarios that are synchronous in nature (the application can block while pulling data). On the other hand, a push-based system aggregates events and pushes through a signal network to achieve the computation. In this case, unlike the pull-based system, data and related updates are handed to the subscriber from the source (Observable sequences in this case). This asynchronous nature is achieved by not blocking the subscriber, but rather making it react to the changes. As you can see, employing this push pattern is more beneficial in rich UI environments where you wouldn't want to block the main UI thread while waiting for some events. This becomes ideal, thus making reactive programs responsive.

The IEnumerable/IObservable duality

If you take a closer look, there is only a subtle difference between these two patterns. IEnumerable<T> can be considered the pull-based equivalent of the push-based IObservable<T>. In fact, they are duals. When two entities exchange information, one entity's pull corresponds to another entity pushing the information. This duality is illustrated in the following diagram:

Let's understand this duality by looking at this sample code, a number sequence generator:

Note

We have striven to use classic C++ constructs to write programs for this particular chapter as there are chapters on Modern C++ language features, language level concurrency, lock-free programming, and related topics for implementing Reactive constructs in Modern C++.

#include <iostream>
#include <vector>
#include <iterator>
#include <memory>
#include "../Common2.h"
using namespace std;

class ConcreteEnumberable : public IEnumerable<int>
{
      int *numberlist,_count;
public:
      ConcreteEnumberable(int numbers[], int count):
            numberlist(numbers),_count(count){}
      ~ConcreteEnumberable() {}

      class Enumerator : public IEnumerator<int>
      {
      int *inumbers, icount, index;
      public:
      Enumerator(int *numbers,
            int count):inumbers(numbers),icount(count),index(0) {}
      bool HasMore() { return index < icount; }
      //---------- ideally speaking, the next function should throw
      //---------- an exception...instead it just returns -1 when the 
      //---------- bound has reached
      int next() { return (index < icount) ?
                   inumbers[index++] : -1; }
      ~Enumerator() {}
      };
      IEnumerator<int> *GetEnumerator()
            { return new Enumerator(numberlist, _count); }
};

The preceding class takes an array of integers as a parameter and we can enumerate over the elements as we have implemented the IEnumerable<T> interface. The Enumeration logic is implemented by the nested class, which implements the IEnumerator<T> interface:

int main()
{
      int x[] = { 1,2,3,4,5 };
      //-------- Has used Raw pointers on purpose here as we have
      //------- not introduced unique_ptr,shared_ptr,weak_ptr yet
      //-------- using auto_ptr will be confusting...otherwise
      //-------- need to use boost library here... ( an overkill)
      ConcreteEnumberable *t = new ConcreteEnumberable(x, 5);
      IEnumerator<int> * numbers = t->GetEnumerator();
      while (numbers->HasMore())
            cout << numbers->next() << endl;
      delete numbers;delete t;
      return 0;
}

The main program instantiates an implementation of the ConcreteEnuerable class and walks through each element.

We will write an even number sequence generator to demonstrate how these data types work together in converting a pull-based program to a push program. The robustness aspect is given low priority to keep the listing terse:

#include "stdafx.h"
#include <iostream>
#include <vector>
#include <iterator>
#include <memory>
#include "../Common2.h"
using namespace std;

class EvenNumberObservable : IObservable<int>{
      int *_numbers,_count;
public:
      EvenNumberObservable(int numbers[],
            int count):_numbers(numbers),_count(count){}
      bool Subscribe(IObserver<int>& observer){
            for (int i = 0; i < _count; ++i)
                  if (_numbers[i] % 2 == 0)
                        observer.OnNext(_numbers[i]);
            observer.OnCompleted();
            return true;
      }
};

The preceding program takes an array of integers, filters out of the odd numbers, and notifies Observer<T> if an even integer is encountered. In this particular case, the data source is pushing data to observer. The implementation of Observer<T> is given as follows:

class SimpleObserver : public IObserver<int>{
public:
      void OnNext(int value) { cout << value << endl; }
      void OnCompleted() { cout << _T("hello completed") << endl; }
      void OnError( CustomException * ex) {}
};

The SimpleObserver class implements the IObserver<T> interface and it has the capability to receive notifications and react to them:

int main()
{
      int x[] = { 1,2,3,4,5 };
      EvenNumberObservable *t = new EvenNumberObservable(x, 5);
      IObserver<int>> *xy = new SimpleObserver();
      t->Subscribe(*xy);
      delete xy; delete t;
      return 0;
}

From the preceding example, you see how one can naturally subscribe for even numbers from an Observable sequence of natural numbers. The system will automatically push (publish) the values to the observer (subscriber) when an even number is detected. The code gives explicit implementations for key interfaces so that one can understand, or speculate what really happens under the hood.