Book Image

Clojure Reactive Programming

Book Image

Clojure Reactive Programming

Overview of this book

Table of Contents (19 chapters)
Clojure Reactive Programming
Credits
About the Author
Acknowledgments
About the Reviewers
www.PacktPub.com
Preface
Bibliography
Index

Backpressure


Another issue we might be faced with is the one of observables that produce items faster than we can consume. The problem that arises in this scenario is what to do with this ever-growing backlog of items.

As an example, think about zipping two observables together. The zip operator (or map in RxClojure) will only emit a new value when all observables have emitted an item.

So if one of these observables is a lot faster at producing items than the others, map will need to buffer these items and wait for the others, which will most likely cause an error, as shown here:

(defn fast-producing-obs []
  (rx/map inc (Observable/interval 1 TimeUnit/MILLISECONDS)))

(defn slow-producing-obs []
  (rx/map inc (Observable/interval 500 TimeUnit/MILLISECONDS)))

(rx/subscribe (->> (rx/map vector
                           (fast-producing-obs)
                           (slow-producing-obs))
                   (rx/map (fn [[x y]]
                             (+ x y)))
                  ...