-
Book Overview & Buying
-
Table Of Contents
Node.js Design Patterns - Fourth Edition
By :
In this chapter, we’ve explored how Node.js streams work, how to create custom streams, and how to compose them into efficient, elegant data processing pipelines. To complete the picture, let’s look at some utilities provided by the node:stream module that simplify working with Readable streams. These utilities are designed to streamline data processing in a streaming fashion and bring a functional programming flavor to stream operations.
All these utilities are methods available for any Readable stream, including Duplex, PassThrough, and Transform streams. Since most of these methods return a new Readable stream, they can be chained together to create expressive, pipeline-like code. Unsurprisingly, many of these methods mirror common operations available in the Array prototype, but they are optimized for handling streaming data.
Here’s a summary of the key methods:
readable.map(fn): Applies a transformation function (fn) to each chunk in the stream, returning a new stream with the transformed data. If fn returns a Promise, the result is awaited before being passed to the output stream.readable.flatMap(fn): Similar to map, but allows fn to return streams, iterables, or async iterables, which are then flattened and merged into the output stream.readable.filter(fn): Filters the stream by applying fn to each chunk. Only chunks for which fn returns a truthy value are included in the output stream. Supports async fn functions.readable.forEach(fn): Invokes fn for each chunk in the stream. This is typically used for side effects rather than producing a new stream. If fn returns a Promise, it will be awaited before processing the next chunk.readable.some(fn): Checks if at least one chunk satisfies the condition in fn. Once a truthy value is found, the stream is destroyed, and the returned Promise resolves to true. If no chunk satisfies the condition, it resolves to false.readable.every(fn): Verifies if all chunks satisfy the condition in fn. If any chunk fails the condition, the stream is destroyed, and the returned Promise resolves to false. Otherwise, it resolves to true when the stream ends.readable.find(fn): It returns a Promise that will resolve to the value of the first chunk that satisfies the condition in fn. If no chunk meets the condition, the returned Promise will resolve to undefined once the stream ends.readable.drop(n): Skips the first n chunks in the stream, returning a new stream that starts from the (n+1)th chunk.readable.take(n): Returns a new stream that includes, at most, the first n chunks. Once n chunks are reached, the stream is terminated.readable.reduce(fn, initialValue): Reduces the stream by applying fn to each chunk, accumulating a result that is returned as a Promise. If no initialValue is provided, the first chunk is used as the initial value.The official documentation has lots of examples for all these methods and there are other less common methods we haven’t explored for brevity. We recommend you check out the docs (nodejsdp.link/stream-iterators) if any of these still feel confusing and you are unsure about when to use them.
Just to give you a more practical overview, let’s re-implement the processing pipeline we illustrated before to explain filtering and reducing with a custom Transform stream, but this time we are going to use only Readable stream utilities. As a reminder, in this example, we are parsing a CSV file that contains sales data. We want to calculate the total amount of profit made from sales in Italy. Every line of the CSV file has 3 fields: type, country, and profit. The first line contains the CSV headers.
import { createReadStream } from 'node:fs'
import { createInterface } from 'node:readline'
import { Readable, compose } from 'node:stream'
import { createGunzip } from 'node:zlib'
const uncompressedData = compose( // 1
createReadStream('data.csv.gz'),
createGunzip()
)
const byLine = Readable.from( // 2
createInterface({ input: uncompressedData })
)
const totalProfit = await byline // 3
.drop(1) // 4
.map(chunk => { // 5
const [type, country, profit] = chunk.toString().split(',')
return { type, country, profit: Number.parseFloat(profit) }
})
.filter(record => record.country === 'Italy') // 6
.reduce((acc, record) => acc + record.profit, 0) // 7
console.log(totalProfit)
Here’s a step-by-step breakdown of what the preceding code does:
createInterface() utility from the node:readline module to wrap our source stream and give us a new Readable stream (byLine) that produces lines from the original stream..reduce(), which returns a Promise, we use await here to wait for the returned Promise to resolve and to capture the final result in the total variable..drop(1), which allows us to skip the first line of the uncompressed source data. This line will contain the CSV header (“type,country,profit”) and no useful data, so it makes sense to skip it. This operation returns a new Readable stream, so we can chain other helper methods..map(). In the mapping function, we provide all the necessary logic to parse a line from the original CSV file and convert it into a record object containing the fields type, country, and profit. This operation returns another Readable stream, so we can keep chaining more helper functions to continue building our processing logic..filter(), which we use to retain only records that represent profit associated with the country Italy. Once again, this operation gives us a new Readable stream..reduce(). We use this helper to aggregate all the filtered records by summing their profit. As we mentioned before, this operation will give us a Promise that will resolve to the total profit once the stream completes.This example shows how to create stream processing pipelines using a more direct approach. In this approach, we chain helper methods, and we have all the transformation logic clearly visible in the same context (assuming we define all the transformation functions in line). This approach can be particularly convenient in situations where the transformation logic is very simple, and you don’t need to build highly specialized and reusable custom Transform streams.
Note that, in this example, we created our own basic way of parsing records out of CSV lines rather than using a dedicated library for it. We did this just to have an excuse to showcase how to use the .drop() and .map() methods. Our implementation is very rudimentary, and it doesn’t handle all the possible edge cases. This is fine because we know there aren’t edge cases (e.g., quoted fields) in our input data, but in real-world projects, we would recommend using a reliable CSV parsing library instead.