streamfu
Streams should feel like arrays. Now they do.
Functional stream utilities for JavaScript & TypeScript
Website * Getting Started * API * Why streamfu? * Contributing
Why streamfu?
Streams are one of the most powerful primitives in JavaScript. They handle infinite data, backpressure, and async flows -- things arrays simply can't do.
But the standard API makes you pay for that power with boilerplate, footguns, and unreadable code.
The problem: Native streams are painful
Here's a real scenario -- read a stream of numbers, keep only even ones, double them, and collect the results:
const reader = readable.getReader()
const results: number[] = []
while (true) {
const { done, value } = await reader.read()
if (done) break
if (value % 2 === 0) {
results.push(value * 2)
}
}
reader.releaseLock()
Manual reader management. Mutable state. An infinite while (true) loop. And this is the simple case.
Need to split a stream? Native tee() only gives you two copies. Want to merge streams? Build your own. Want to zip? Good luck.
const [a, rest1] = stream.tee()
const [b, rest2] = rest1.tee()
const [c, d] = rest2.tee()
// Hope you got the order right...
The solution: streamfu
import { createReadable, filter, list, map, pipe } from "@sgmonda/streamfu"
const readable = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
const stream = pipe(
readable,
(r) => filter(r, (n) => n % 2 === 0),
(r) => map(r, (n) => n * 2),
)
const results = await list(stream)
Same result. No manual readers. No mutable state. No while (true). Just pure transformations.
import { branch } from "@sgmonda/streamfu"
const [a, b, c, d] = branch(stream, 4)
Side-by-side comparison
| Task | Native Streams | streamfu |
|---|---|---|
| Transform each chunk | pipeThrough(new TransformStream({...})) |
map(stream, fn) |
| Filter chunks | Manual reader loop + condition | filter(stream, fn) |
| Reduce to value | Manual reader loop + accumulator | reduce(stream, fn, init) |
| Combine streams | Manual reader orchestration | zip(s1, s2, s3) |
| Concatenate streams | Complex async pull logic | concat(s1, s2, s3) |
| Split stream | Nested .tee() chains |
branch(stream, n) |
| Get element at index | Manual counter + reader | at(stream, i) |
| Check if value exists | Manual loop + early exit | includes(stream, val) |
| Chain operations | Deeply nested pipeThrough |
pipe(stream, f1, f2, f3) |
If you know Array.prototype, you already know streamfu.
Getting Started
Install
npm / yarn / pnpm / bun
yarn dlx jsr add @sgmonda/streamfu # yarn
pnpm dlx jsr add @sgmonda/streamfu # pnpm
bunx jsr add @sgmonda/streamfu # bun
Deno
Quick start
// Create a stream from any iterable
const numbers = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
// Compose transformations with pipe
const stream = pipe(
numbers,
(r) => filter(r, (n) => n % 2 === 0), // keep even: 2, 4, 6, 8, 10
(r) => map(r, (n) => n * 2), // double: 4, 8, 12, 16, 20
)
const sumOfDoubledEvens = await reduce(stream, (a, b) => a + b, 0) // sum: 60
console.log(sumOfDoubledEvens) // 60
API Reference
Stream creation
| Function | Description |
|---|---|
createReadable(iterable) |
Create a stream from arrays, generators, sets, strings -- any iterable |
createWritable(fn) |
Create a writable stream from a callback function |
range(min, max, step?) |
Generate a stream of numbers in a range |
words(chars, length) |
Generate a stream of random strings |
Transformations (non-consuming)
These return a new stream -- the original is not consumed.
| Function | Description |
|---|---|
map(stream, ...fns) |
Transform each chunk (supports chaining multiple transforms) |
filter(stream, fn) |
Keep only chunks matching a predicate |
flat(stream, depth?) |
Flatten a stream of arrays |
flatMap(stream, fn) |
Map + flatten in one step |
slice(stream, start, end?) |
Extract a portion of the stream |
splice(stream, start, count, ...items) |
Remove/insert chunks at a position |
concat(...streams) |
Concatenate multiple streams sequentially |
zip(...streams) |
Combine streams into a stream of tuples |
pipe(stream, ...fns) |
Chain multiple stream operations |
branch(stream, n) |
Split a stream into n independent copies |
Consumers (consuming)
These consume the stream -- it cannot be reused afterward.
| Function | Description |
|---|---|
reduce(stream, fn, init) |
Reduce to a single value |
list(stream) |
Collect all chunks into an array |
some(stream, fn) |
Check if any chunk matches |
every(stream, fn) |
Check if all chunks match |
forEach(stream, fn) |
Execute a function for each chunk |
includes(stream, value) |
Check if a value exists in the stream |
at(stream, index) |
Get the chunk at a specific index |
indexOf(stream, value) |
Find the index of a value |
Consuming vs non-consuming
Rule of thumb: If it returns a
ReadableStream, it's non-consuming. If it returns aPromise, it consumes the stream.
Need to consume a stream multiple times? Use branch() first:
const sum = await reduce(forSum, (a, b) => a + b, 0)
const count = await reduce(forCount, (acc) => acc + 1, 0)
Streams the Hard Way vs streamfu
Every example below shows a real task done the hard way with native Web Streams, then the easy way with streamfu.
1. Transform every chunk -- Parse CSV lines and uppercase names
Before -- native Web Streams:
transform(line, ctrl) {
ctrl.enqueue(line.split(","))
},
})
const transform2 = new TransformStream({
transform(cols, ctrl) {
ctrl.enqueue({ name: cols[0].toUpperCase(), age: Number(cols[1]) })
},
})
const reader = csvStream.pipeThrough(transform1).pipeThrough(transform2).getReader()
const results = []
while (true) {
const { done, value } = await reader.read()
if (done) break
results.push(value)
}
reader.releaseLock()
After -- streamfu:
const stream = map(
csvStream,
(line) => line.split(","),
(cols) => ({ name: cols[0].toUpperCase(), age: Number(cols[1]) }),
)
const results = await list(stream)
2. Filter and collect -- Keep only active users from a stream
Before -- native Web Streams:
transform(user, ctrl) {
if (user.status === "active") ctrl.enqueue(user)
},
})
const reader = usersStream.pipeThrough(filter).getReader()
const active = []
while (true) {
const { done, value } = await reader.read()
if (done) break
active.push(value)
}
reader.releaseLock()
After -- streamfu:
const active = await list(filter(usersStream, (u) => u.status === "active"))
3. Reduce to a single value -- Sum a stream of numbers
Before -- native Web Streams:
let total = 0
while (true) {
const { done, value } = await reader.read()
if (done) break
total += value
}
reader.releaseLock()
After -- streamfu:
const total = await reduce(numbersStream, (sum, n) => sum + n, 0)
4. Multi-step pipeline -- Filter, transform, and collect in one chain
Before -- native Web Streams:
transform(n, ctrl) {
if (n % 2 === 0) ctrl.enqueue(n)
},
})
const doubleStep = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(n * 2)
},
})
const toStringStep = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(`Value: ${n}`)
},
})
const reader = numbersStream
.pipeThrough(filterStep)
.pipeThrough(doubleStep)
.pipeThrough(toStringStep)
.getReader()
const results = []
while (true) {
const { done, value } = await reader.read()
if (done) break
results.push(value)
}
reader.releaseLock()
After -- streamfu:
const results = await list(pipe(
numbersStream,
(r) => filter(r, (n) => n % 2 === 0),
(r) => map(r, (n) => n * 2),
(r) => map(r, (n) => `Value: ${n}`),
))
5. Concatenate multiple sources -- Merge API pages into one stream and take the first 50
Before -- native Web Streams:
const reader1 = sources[0].getReader()
const reader2 = sources[1].getReader()
const reader3 = sources[2].getReader()
const all = []
for (const reader of [reader1, reader2, reader3]) {
while (true) {
const { done, value } = await reader.read()
if (done) break
all.push(value)
}
reader.releaseLock()
}
const first50 = all.slice(0, 50)
After -- streamfu:
const first50 = await list(slice(concat(page1Stream, page2Stream, page3Stream), 0, 50))
6. Zip parallel streams -- Pair names with scores into labeled strings
Before -- native Web Streams:
const readerB = scoresStream.getReader()
const leaderboard = []
while (true) {
const [a, b] = await Promise.all([readerA.read(), readerB.read()])
if (a.done || b.done) break
leaderboard.push(`${a.value}: ${b.value}`)
}
readerA.releaseLock()
readerB.releaseLock()
After -- streamfu:
const leaderboard = await list(map(zip(namesStream, scoresStream), ([name, score]) => `${name}: ${score}`))
7. Consume a stream twice -- Get both the sum and the max from the same stream
Before -- native Web Streams:
const reader1 = copy1.getReader()
let sum = 0
while (true) {
const { done, value } = await reader1.read()
if (done) break
sum += value
}
reader1.releaseLock()
const reader2 = copy2.getReader()
let max = -Infinity
while (true) {
const { done, value } = await reader2.read()
if (done) break
if (value > max) max = value
}
reader2.releaseLock()
After -- streamfu:
const [forSum, forMax] = branch(numbersStream, 2)
const [sum, max] = await Promise.all([
reduce(forSum, (a, b) => a + b, 0),
reduce(forMax, (a, b) => (b > a ? b : a), -Infinity),
])
8. Flatten paginated results -- Expand arrays of items into individual chunks
Before -- native Web Streams:
const flatten = new TransformStream({
transform(page, ctrl) {
for (const item of page) ctrl.enqueue(item)
},
})
const label = new TransformStream({
transform(item, ctrl) {
ctrl.enqueue(`#${item.id}: ${item.title}`)
},
})
const reader = pagesStream.pipeThrough(flatten).pipeThrough(label).getReader()
const items = []
while (true) {
const { done, value } = await reader.read()
if (done) break
items.push(value)
}
reader.releaseLock()
After -- streamfu:
const items = await list(map(flatMap(pagesStream, (page) => page), (item) => `#${item.id}: ${item.title}`))
9. Quick queries on a stream -- Answer five questions about the data without five manual loops
Before -- native Web Streams:
const [s1, rest1] = dataStream.tee()
const [s2, rest2] = rest1.tee()
const [s3, rest3] = rest2.tee()
const [s4, s5] = rest3.tee()
// Does it contain 42?
let hasFortyTwo = false
const r1 = s1.getReader()
while (true) {
const { done, value } = await r1.read()
if (done) break
if (value === 42) {
hasFortyTwo = true
break
}
}
r1.releaseLock()
// Are all values positive?
let allPositive = true
const r2 = s2.getReader()
while (true) {
const { done, value } = await r2.read()
if (done) break
if (value <= 0) {
allPositive = false
break
}
}
r2.releaseLock()
// Is any value greater than 100?
let anyOver100 = false
const r3 = s3.getReader()
while (true) {
const { done, value } = await r3.read()
if (done) break
if (value > 100) {
anyOver100 = true
break
}
}
r3.releaseLock()
// Where is 7?
let indexOf7 = -1
let idx = 0
const r4 = s4.getReader()
while (true) {
const { done, value } = await r4.read()
if (done) break
if (value === 7) {
indexOf7 = idx
break
}
idx++
}
r4.releaseLock()
// What's the third element?
let third = undefined
let count = 0
const r5 = s5.getReader()
while (true) {
const { done, value } = await r5.read()
if (done) break
if (count === 2) {
third = value
break
}
count++
}
r5.releaseLock()
After -- streamfu:
const [s1, s2, s3, s4, s5] = branch(dataStream, 5)
const [hasFortyTwo, allPositive, anyOver100, indexOf7, third] = await Promise.all([
includes(s1, 42),
every(s2, (n) => n > 0),
some(s3, (n) => n > 100),
indexOf(s4, 7),
at(s5, 2),
])
10. Generate, splice, and process -- Create a range, replace elements, and log results
Before -- native Web Streams:
const numbers = new ReadableStream({
start(ctrl) {
for (let i = 1; i <= 10; i++) ctrl.enqueue(i)
ctrl.close()
},
})
// Splice: remove 3 items at index 3, insert 99 and 100
let idx = 0
const removed = 3
const spliceTransform = new TransformStream({
transform(chunk, ctrl) {
if (idx === 3 + removed) {
ctrl.enqueue(99)
ctrl.enqueue(100)
}
if (idx < 3 || idx >= 3 + removed) {
ctrl.enqueue(chunk)
}
idx++
},
flush(ctrl) {
if (idx <= 3 + removed) {
ctrl.enqueue(99)
ctrl.enqueue(100)
}
},
})
// Double each value
const doubleTransform = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(n * 2)
},
})
const reader = numbers.pipeThrough(spliceTransform).pipeThrough(doubleTransform).getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log(value)
}
reader.releaseLock()
After -- streamfu:
await forEach(
pipe(
range(1, 10),
(r) => splice(r, 3, 3, 99, 100),
(r) => map(r, (n) => n * 2),
),
(value) => console.log(value),
)
Design principles
- Functional & pure -- No side effects, no mutations. Every operation returns a new stream.
- Familiar API -- Mirrors
Array.prototypemethods. Zero learning curve. - Universal -- Built on the Web Streams API. Works in Node.js, Deno, Bun, and browsers.
- Type-safe -- Full TypeScript support with precise generics.
- Tested -- 100% code coverage. Every function, every edge case.
Error handling
One of the trickiest parts of working with streams is error handling. The traditional approach relies on listening to events, which splits your logic across multiple callbacks and makes the control flow hard to follow:
stream.on("data", (chunk) => {/* process chunk */})
stream.on("error", (err) => {/* handle error */})
stream.on("end", () => {/* cleanup */})
This pattern has several problems:
- Error handling is disconnected from the processing logic
- Forgetting the
errorlistener can cause unhandled exceptions that crash your process - Coordinating
endanderrorto know when the stream is truly done requires extra state - It doesn't compose well with
async/awaitcode
In streamfu, errors propagate automatically through chained operations. If a map() transformer or a filter() predicate throws, the error bubbles up and rejects the promise returned by any consuming operation (list(), reduce(), every(), etc.). This means you can use a standard try/catch block:
const stream = pipe(
createReadable(data),
(r) => map(r, transformFn), // if this throws...
(r) => filter(r, predicateFn),
)
try {
const results = await list(stream) // ...the error rejects here
} catch (err) {
console.error("Something failed:", err)
}
No special error listeners, no extra plumbing. Errors flow naturally through map(), filter(), and any other chained transformation, all the way to whichever consumer ends the pipeline.
Contributing
Contributions welcome! Fork the repo, make your changes, and submit a PR.
Requirements:
- Follow Conventional Commits
- Maintain 100% test coverage
- Include JSDoc comments on all exports
Publishing
Published to JSR automatically via GitHub CI when version changes in deno.json on main.
MIT License * Made with care by @sgmonda and contributors