Another issue we might be faced with is the one of observables that produce items faster than we can consume. The problem that arises in this scenario is what to do with this ever-growing backlog of items.
As an example, think about zipping two observables together. The zip
operator (or map
in RxClojure) will only emit a new value when all observables have emitted an item.
So if one of these observables is a lot faster at producing items than the others, map
will need to buffer these items and wait for the others, which will most likely cause an error, as shown here:
(defn fast-producing-obs [] (rx/map inc (Observable/interval 1 TimeUnit/MILLISECONDS))) (defn slow-producing-obs [] (rx/map inc (Observable/interval 500 TimeUnit/MILLISECONDS))) (rx/subscribe (->> (rx/map vector (fast-producing-obs) (slow-producing-obs)) (rx/map (fn [[x y]] (+ x y))) ...