Book Image

Learning Concurrency in Python

By : Elliot Forbes
Book Image

Learning Concurrency in Python

By: Elliot Forbes

Overview of this book

Python is a very high level, general purpose language that is utilized heavily in fields such as data science and research, as well as being one of the top choices for general purpose programming for programmers around the world. It features a wide number of powerful, high and low-level libraries and frameworks that complement its delightful syntax and enable Python programmers to create. This book introduces some of the most popular libraries and frameworks and goes in-depth into how you can leverage these libraries for your own high-concurrent, highly-performant Python programs. We'll cover the fundamental concepts of concurrency needed to be able to write your own concurrent and parallel software systems in Python. The book will guide you down the path to mastering Python concurrency, giving you all the necessary hardware and theoretical knowledge. We'll cover concepts such as debugging and exception handling as well as some of the most popular libraries and frameworks that allow you to create event-driven and reactive systems. By the end of the book, you'll have learned the techniques to write incredibly efficient concurrent systems that follow best practices.
Table of Contents (20 chapters)
Title Page
Credits
About the Author
About the Reviewer
www.PacktPub.com
Customer Feedback
Preface
Index

Reactive programming


Reactive programming is very similar to that of event-driven, but instead of revolving around events, it focuses on data. More specifically, it deals with streams of data, and reacts to specific data changes.

ReactiveX - RxPy

RxPy is the Python equivalent of the very popular ReactiveX framework. If you've ever done any programming in Angular 2 and proceeding versions, then you will have used this when interacting with HTTP services. This framework is a conglomeration of the observer pattern, the iterator pattern, and functional programming. We essentially subscribe to different streams of incoming data, and then create observers that listen for specific events being triggered. When these observers are triggered, they run the code that corresponds to what has just happened.

We'll take a data center as a good example of how reactive programming can be utilized. Imagine this data center has thousands of server racks, all constantly computing millions upon millions of calculations. One of the biggest challenges in these data centers is keeping all these tightly packed server racks cool enough so that they don't damage themselves. We could set up multiple thermometers throughout our data center to ensure that we aren't getting too hot anywhere, and send the readings from these thermometers to a central computer as a continuous stream:

Within our central control station, we could set up a RxPy program that observes this continuous stream of temperature information. Within these observers, we could then define a series of conditional events to listen out for, and then react whenever one of these conditionals is hit.

One such example would be an event that only triggers if the temperature for a specific part of the data center gets too warm. When this event is triggered, we could then automatically react and increase the flow of any cooling system to that particular area, and thus bring the temperature back down again:

import rx
from rx import Observable, Observer
# Here we define our custom observer which
# contains an on_next method, an on_error method
# and an on_completed method
class temperatureObserver(Observer):
# Every time we receive a temperature reading
# this method is called
def on_next(self, x):
print("Temperature is: %s degrees centigrade" % x)
if (x > 6):
print("Warning: Temperate Is Exceeding Recommended Limit")
if (x == 9):
print("DataCenter is shutting down. Temperature is too high")
# if we were to receive an error message
# we would handle it here
def on_error(self, e):
print("Error: %s" % e)

# This is called when the stream is finished
def on_completed(self):
print("All Temps Read")
# Publish some fake temperature readings 
xs = Observable.from_iterable(range(10))
# subscribe to these temperature readings
d = xs.subscribe(temperatureObserver())

Breaking it down

The first two lines of our code import the necessary rx module, and then from there import both observable and observer.

We then go on to create a temperatureObserver class that extends the observer. This class contains three functions:

  • on_next: This is called every time our observer observes something new
  • on_error: This acts as our error-handler function; every time we observe an error, this function will be called
  • on_completed: This is called when our observer meets the end of the stream of information it has been observing

In the on_next function, we want it to print out the current temperature, and also to check whether the temperature that it receives is under a set of limits. If the temperature matches one of our conditionals, then we handle it slightly differently, and print out descriptive errors as to what has happened.

After our class declaration, we go on to create a fake observable which contains 10 separate values using Observable.from_iterable(), and finally, the last line of our preceding code then subscribes an instance of our new temperatureObserver class to this observable.