The main mechanism by which core.async
allows for coordinating backpressure is buffering. core.async
doesn't allow unbounded buffers as this can be a source of bugs and a resource hog.
Instead, we are required to think hard about our application's unique needs and choose an appropriate buffering strategy.
This is the simplest form of buffering. It is fixed to a chosen number n
, allowing producers to put items in the channel without having to wait for consumers:
(def result (chan (buffer 5))) (go-loop [] (<! (async/timeout 1000)) (when-let [x (<! result)] (prn "Got value: " x) (recur))) (go (doseq [n (range 5)] (>! result n)) (prn "Done putting values!") (close! result)) ;; "Done putting values!" ;; "Got value: " 0 ;; "Got value: " 1 ;; "Got value: " 2 ;; "Got value: " 3 ;; "Got value: " 4
In the preceding example, we created a buffer of size 5
and started a go
loop to consume values from it. The go
loop uses a timeout
channel...