Book Image

C++ Reactive Programming

By : Praseed Pai, Peter Abraham
Book Image

C++ Reactive Programming

By: Praseed Pai, Peter Abraham

Overview of this book

Reactive programming is an effective way to build highly responsive applications with an easy-to-maintain code base. This book covers the essential functional reactive concepts that will help you build highly concurrent, event-driven, and asynchronous applications in a simpler and less error-prone way. C++ Reactive Programming begins with a discussion on how event processing was undertaken by different programming systems earlier. After a brisk introduction to modern C++ (C++17), you’ll be taken through language-level concurrency and the lock-free programming model to set the stage for our foray into the Functional Programming model. Following this, you’ll be introduced to RxCpp and its programming model. You’ll be able to gain deep insights into the RxCpp library, which facilitates reactive programming. You’ll learn how to deal with reactive programming using Qt/C++ (for the desktop) and C++ microservices for the Web. By the end of the book, you will be well versed with advanced reactive programming concepts in modern C++ (C++17).
Table of Contents (20 chapters)
Title Page
Copyright and Credits
Packt Upsell
Contributors
Preface
Index

Converting events to IObservable<T>


We have now understood how one can convert an IEnumerable<T>-based pull program to an IObservable<T>/IObserver<T>-based push program. In real life, the event source is not as simple as we found in the number stream example given earlier. Let us see how we can convert a MouseMove event into a stream with a small MFC program:

Note

We have chosen MFC for this particular implementation because we have a chapter dedicated to Qt-based reactive programming. In that chapter, we will be implementing Reactive programs in idiomatic asynchronous push-based streams. In this MFC program, we simply do a filtering operation to see whether the mouse is moving in a bounding rectangle and, if so, notify the observer. We are using synchronous dispatch here. This example is synchronous too:

#include "stdafx.h"
#include <afxwin.h>
#include <afxext.h>
#include <math.h>
#include <vector>
#include "../Common2.h"

using namespace std;
class CMouseFrame :public CFrameWnd,IObservable<CPoint>
{
private:
      RECT _rect;
      POINT _curr_pos;
      vector<IObserver<CPoint> *> _event_src;
public:
      CMouseFrame(){
            HBRUSH brush =
                  (HBRUSH)::CreateSolidBrush(RGB(175, 238, 238));
            CString mywindow = AfxRegisterWndClass(
                  CS_HREDRAW | CS_VREDRAW | CS_DBLCLKS,
                  0, brush, 0);
            Create(mywindow, _T("MFC Clock By Praseed Pai"));
      }

The preceding part of the code defines a Frame class that derives from the MFC library the CFrameWnd class and also implements the IObservable<T> interface to force the programmer to implement the Subscribe method. A vector of IObserver<T> will store the list of observers or Subscribers. For this example, we will have only one observer. There is no restriction on the number of observer in the code:

      virtual bool Subscribe(IObserver<CPoint>& observer) {
            _event_src.push_back(&observer);
            return true;
      }

The Subscribe method just stores the reference to the observer onto a vector and returns true: when the mouse is moved, we get notification from the MFC library and if it is in a rectangular area, observer will be notified (the notification code is as follows):

      bool FireEvent(const CPoint& pt) {
            vector<IObserver<CPoint> *>::iterator it =
                  _event_src.begin();
            while (it != _event_src.end()){
                  IObserver<CPoint> *observer = *it;
                  observer->OnNext(pt);
                  //---------- In a Real world Rx programs there is a 
                  //--------- sequence stipulated to call methods...
                  //--------- OnCompleted will be called only when 
                  //--------- all the data is processed...this code
                  //--------- is written to demonstrate the call schema
                  observer->OnCompleted();
                  it++;
            }
            return true;
      }

The FireEvent method walks through the observer's and calls the OnNext method of the observer. It also calls the OnCompleted method of each instance of Observer's: The Rx dispatching mechanism follows certain rules while calling the observer methods. If OnComplete method is called, no more OnNext will be called on the same observer. Similarly, if OnError is called, no further messages will be dispatched to the observer. If we need to follow the conventions stipulated by the Rx model here, the listing will get complicated. The purpose of the code given here is to show how the Rx programming model works in a schematic manner.

      int OnCreate(LPCREATESTRUCT l){
            return CFrameWnd::OnCreate(l);
      }
      void SetCurrentPoint(CPoint pt) {
            this->_curr_pos = pt;
            Invalidate(0);
      }

The SetCurrentPoint method is invoked by observer to set the current point where the text has to be drawn. The Invalidate method is invoked to trigger a WM_PAINT message and the MFC subsystem will route it to OnPaint (as it is wired in the Message maps):

      void OnPaint()
      {
            CPaintDC d(this);
            CBrush b(RGB(100, 149, 237));
            int x1 = -200, y1 = -220, x2 = 210, y2 = 200;
            Transform(&x1, &y1); Transform(&x2, &y2);
            CRect rect(x1, y1, x2, y2);
            d.FillRect(&rect, &b);
            CPen p2(PS_SOLID, 2, RGB(153, 0, 0));
            d.SelectObject(&p2);

            char *str = "Hello Reactive C++";
            CFont f;
            f.CreatePointFont(240, _T("Times New Roman"));
            d.SelectObject(&f);
            d.SetTextColor(RGB(204, 0, 0));
            d.SetBkMode(TRANSPARENT);
            CRgn crgn;
            crgn.CreateRectRgn(rect.left,rect.top,
            rect.right ,rect.bottom);
            d.SelectClipRgn(&crgn);
            d.TextOut(_curr_pos.x, _curr_pos.y,
            CString(str), strlen(str));
      }

The OnPaint method is invoked by the MFC framework when the Invalidate call is made. The method draws the literal string, Hello Reactive C++, on the screen:

      void Transform(int *px, int *py) {
            ::GetClientRect(m_hWnd, &_rect);
            int width = (_rect.right - _rect.left) / 2,
            height = (_rect.bottom - _rect.top) / 2;
           *px = *px + width; *py = height - *py;
      }

The Transform method computes the bound of the client area of the Frame and converts Cartesian coordinates to devise coordinates. This computation can be better done through world coordinate transformations:

      void OnMouseMove(UINT nFlags, CPoint point)
      {
            int x1 = -200,y1= -220, x2 = 210,y2 = 200;
            Transform(&x1, &y1);Transform(&x2, &y2);
            CRect rect(x1, y1, x2, y2);
            POINT pts;
            pts.x = point.x; pts.y = point.y;
            rect.NormalizeRect();
            //--- In a real program, the points will be aggregated
            //---- into a list (stream)
            if (rect.PtInRect(point)) {
                  //--- Ideally speaking this notification has to go
                  //--- through a non blocking call
                  FireEvent(point);
            }
      }

The OnMouseMove method checks whether the mouse position is within a rectangle centered inside the screen and fires the notification to the observer:

      DECLARE_MESSAGE_MAP();
};

BEGIN_MESSAGE_MAP(CMouseFrame, CFrameWnd)
      ON_WM_CREATE()
      ON_WM_PAINT()
      ON_WM_MOUSEMOVE()
END_MESSAGE_MAP()
class WindowHandler : public IObserver<CPoint>
{
private:
      CMouseFrame *window;
public:
      WindowHandler(CMouseFrame *win) : window(win) { }
      virtual ~WindowHandler() { window = 0; }
      virtual void OnCompleted() {}
      virtual void OnError(CustomException *exception) {}
      virtual void OnNext(CPoint value) {
            if (window) window->SetCurrentPoint(value);
      }
};

The preceding class WindowHandler implements the IObserver<T> interface and handles the event notified by CMouseFrame, which implements the IObservable<CPoint> interface. In this canned example, we set the current point by invoking the SetCurrentPoint method to draw the string at the mouse position:

class CMouseApp :public CWinApp
{
      WindowHandler *reactive_handler;
public:
      int InitInstance(){
            CMouseFrame *p = new CMouseFrame();
            p->ShowWindow(1);
            reactive_handler = new WindowHandler(p);
            //--- Wire the observer to the Event Source
            //--- which implements IObservable<T>
            p->Subscribe(*reactive_handler);
            m_pMainWnd = p;
            return 1;
      }
      virtual ~CMouseApp() {
            if (reactive_handler) {
                  delete reactive_handler;
                  reactive_handler = 0;
           }
      }
};

CMouseApp a;