We have now understood how one can convert an IEnumerable<T>
-based pull program to an IObservable<T>
/IObserver<T>
-based push program. In real life, the event source is not as simple as we found in the number stream example given earlier. Let us see how we can convert a MouseMove
event into a stream with a small MFC program:
Note
We have chosen MFC for this particular implementation because we have a chapter dedicated to Qt-based reactive programming. In that chapter, we will be implementing Reactive programs in idiomatic asynchronous push-based streams. In this MFC program, we simply do a filtering operation to see whether the mouse is moving in a bounding rectangle and, if so, notify the observer
. We are using synchronous dispatch here. This example is synchronous too:
#include "stdafx.h" #include <afxwin.h> #include <afxext.h> #include <math.h> #include <vector> #include "../Common2.h" using namespace std; class CMouseFrame :public CFrameWnd,IObservable<CPoint> { private: RECT _rect; POINT _curr_pos; vector<IObserver<CPoint> *> _event_src; public: CMouseFrame(){ HBRUSH brush = (HBRUSH)::CreateSolidBrush(RGB(175, 238, 238)); CString mywindow = AfxRegisterWndClass( CS_HREDRAW | CS_VREDRAW | CS_DBLCLKS, 0, brush, 0); Create(mywindow, _T("MFC Clock By Praseed Pai")); }
The preceding part of the code defines a Frame
class that derives from the MFC
library the CFrameWnd
class and also implements the IObservable<T>
interface to force the programmer to implement the Subscribe
method. A vector of IObserver<T>
will store the list of observers
or Subscribers
. For this example, we will have only one observer
. There is no restriction on the number of observer
in the code:
virtual bool Subscribe(IObserver<CPoint>& observer) { _event_src.push_back(&observer); return true; }
The Subscribe
method just stores the reference to the observer
onto a vector and returns true
: when the mouse is moved, we get notification from the MFC
library and if it is in a rectangular area, observer
will be notified (the notification code is as follows):
bool FireEvent(const CPoint& pt) {
vector<IObserver<CPoint> *>::iterator it =
_event_src.begin();
while (it != _event_src.end()){
IObserver<CPoint> *observer = *it;
observer->OnNext(pt);
//---------- In a Real world Rx programs there is a
//--------- sequence stipulated to call methods...
//--------- OnCompleted will be called only when
//--------- all the data is processed...this code
//--------- is written to demonstrate the call schema
observer->OnCompleted();
it++;
}
return true;
}
The FireEvent
method walks through the observer
's and calls the OnNext
method of the observer
. It also calls the OnCompleted
method of each instance of Observer's: The Rx dispatching mechanism follows certain rules while calling the observer
methods. If OnComplete
method is called, no more OnNext
will be called on the same observer
. Similarly, if OnError
is called, no further messages will be dispatched to the observer
. If we need to follow the conventions stipulated by the Rx model here, the listing will get complicated. The purpose of the code given here is to show how the Rx programming model works in a schematic manner.
int OnCreate(LPCREATESTRUCT l){ return CFrameWnd::OnCreate(l); } void SetCurrentPoint(CPoint pt) { this->_curr_pos = pt; Invalidate(0); }
The SetCurrentPoint
method is invoked by observer
to set the current point where the text has to be drawn. The Invalidate
method is invoked to trigger a WM_PAINT
message and the MFC
subsystem will route it to OnPaint
(as it is wired in the Message
maps):
void OnPaint() { CPaintDC d(this); CBrush b(RGB(100, 149, 237)); int x1 = -200, y1 = -220, x2 = 210, y2 = 200; Transform(&x1, &y1); Transform(&x2, &y2); CRect rect(x1, y1, x2, y2); d.FillRect(&rect, &b); CPen p2(PS_SOLID, 2, RGB(153, 0, 0)); d.SelectObject(&p2); char *str = "Hello Reactive C++"; CFont f; f.CreatePointFont(240, _T("Times New Roman")); d.SelectObject(&f); d.SetTextColor(RGB(204, 0, 0)); d.SetBkMode(TRANSPARENT); CRgn crgn; crgn.CreateRectRgn(rect.left,rect.top, rect.right ,rect.bottom); d.SelectClipRgn(&crgn); d.TextOut(_curr_pos.x, _curr_pos.y, CString(str), strlen(str)); }
The OnPaint
method is invoked by the MFC
framework when the Invalidate
call is made. The method draws the literal
string, Hello Reactive C++
, on the screen:
void Transform(int *px, int *py) {
::GetClientRect(m_hWnd, &_rect);
int width = (_rect.right - _rect.left) / 2,
height = (_rect.bottom - _rect.top) / 2;
*px = *px + width; *py = height - *py;
}
The Transform
method computes the bound of the client area of the Frame
and converts Cartesian
coordinates to devise coordinates. This computation can be better done through world coordinate transformations:
void OnMouseMove(UINT nFlags, CPoint point) { int x1 = -200,y1= -220, x2 = 210,y2 = 200; Transform(&x1, &y1);Transform(&x2, &y2); CRect rect(x1, y1, x2, y2); POINT pts; pts.x = point.x; pts.y = point.y; rect.NormalizeRect(); //--- In a real program, the points will be aggregated //---- into a list (stream) if (rect.PtInRect(point)) { //--- Ideally speaking this notification has to go //--- through a non blocking call FireEvent(point); } }
The OnMouseMove
method checks whether the mouse position is within a rectangle centered inside the screen and fires the notification to the observer
:
DECLARE_MESSAGE_MAP(); }; BEGIN_MESSAGE_MAP(CMouseFrame, CFrameWnd) ON_WM_CREATE() ON_WM_PAINT() ON_WM_MOUSEMOVE() END_MESSAGE_MAP() class WindowHandler : public IObserver<CPoint> { private: CMouseFrame *window; public: WindowHandler(CMouseFrame *win) : window(win) { } virtual ~WindowHandler() { window = 0; } virtual void OnCompleted() {} virtual void OnError(CustomException *exception) {} virtual void OnNext(CPoint value) { if (window) window->SetCurrentPoint(value); } };
The preceding class WindowHandler
implements the IObserver<T>
interface and handles the event notified by CMouseFrame
, which implements the IObservable<CPoint>
interface. In this canned example, we set the current point by invoking the SetCurrentPoint
method to draw the string at the mouse position:
class CMouseApp :public CWinApp { WindowHandler *reactive_handler; public: int InitInstance(){ CMouseFrame *p = new CMouseFrame(); p->ShowWindow(1); reactive_handler = new WindowHandler(p); //--- Wire the observer to the Event Source //--- which implements IObservable<T> p->Subscribe(*reactive_handler); m_pMainWnd = p; return 1; } virtual ~CMouseApp() { if (reactive_handler) { delete reactive_handler; reactive_handler = 0; } } }; CMouseApp a;