If we look back at the HTTP request method that we used in Chapter 6, Using Concurrency and Parallelism with Schedulers and Chapter 5, Combinators, Conditionals, and Error Handling, it has this signature:
Observable<Map> requestJson(HttpAsyncClient client, String url).
Instead of just calling a method that makes a request to a URL and returns the response as JSON, we create a
HttpAsyncClient instance, have to start the it and pass it to the
requestJson() method. But there is more: we need to close the client after we read the result, and because the observable is asynchronous, we need to wait for its
OnCompleted notification and then to do the closing. This is very complex and should be changed. The
Observable, which read from files, need to create streams/readers/channels and close them when all the subscribers are unsubscribed. The
Observable, emitting data from a database should set up and then close all the connections, statements, and result sets that are used...