Book Image

Clojure Reactive Programming

Book Image

Clojure Reactive Programming

Overview of this book

Table of Contents (19 chapters)
Clojure Reactive Programming
Credits
About the Author
Acknowledgments
About the Reviewers
www.PacktPub.com
Preface
Bibliography
Index

Backpressure


The main mechanism by which core.async allows for coordinating backpressure is buffering. core.async doesn't allow unbounded buffers as this can be a source of bugs and a resource hog.

Instead, we are required to think hard about our application's unique needs and choose an appropriate buffering strategy.

Fixed buffer

This is the simplest form of buffering. It is fixed to a chosen number n, allowing producers to put items in the channel without having to wait for consumers:

(def result (chan (buffer 5)))
(go-loop []
  (<! (async/timeout 1000))
  (when-let [x (<! result)]
    (prn "Got value: " x)
    (recur)))

(go  (doseq [n (range 5)]
       (>! result n))
     (prn "Done putting values!")
     (close! result))

;; "Done putting values!"
;; "Got value: " 0
;; "Got value: " 1
;; "Got value: " 2
;; "Got value: " 3
;; "Got value: " 4

In the preceding example, we created a buffer of size 5 and started a go loop to consume values from it. The go loop uses a timeout channel...