Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

sgmonda/streamfu

Repository files navigation

streamfu

Streams should feel like arrays. Now they do.

Functional stream utilities for JavaScript & TypeScript

Website * Getting Started * API * Why streamfu? * Contributing


Why streamfu?

Streams are one of the most powerful primitives in JavaScript. They handle infinite data, backpressure, and async flows -- things arrays simply can't do.

But the standard API makes you pay for that power with boilerplate, footguns, and unreadable code.

The problem: Native streams are painful

Here's a real scenario -- read a stream of numbers, keep only even ones, double them, and collect the results:

// Native Web Streams -- imperative, verbose, error-prone
const reader = readable.getReader()
const results: number[] = []

while (true) {
const { done, value } = await reader.read()
if (done) break
if (value % 2 === 0) {
results.push(value * 2)
}
}

reader.releaseLock()

Manual reader management. Mutable state. An infinite while (true) loop. And this is the simple case.

Need to split a stream? Native tee() only gives you two copies. Want to merge streams? Build your own. Want to zip? Good luck.

// Native -- splitting a stream into 4 branches
const [a, rest1] = stream.tee()
const [b, rest2] = rest1.tee()
const [c, d] = rest2.tee()
// Hope you got the order right...

The solution: streamfu

// streamfu -- declarative, composable, readable
import { createReadable, filter, list, map, pipe } from "@sgmonda/streamfu"

const readable = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

const stream = pipe(
readable,
(r) => filter(r, (n) => n % 2 === 0),
(r) => map(r, (n) => n * 2),
)
const results = await list(stream)

Same result. No manual readers. No mutable state. No while (true). Just pure transformations.

// streamfu -- branch into any number of copies
import { branch } from "@sgmonda/streamfu"

const [a, b, c, d] = branch(stream, 4)

Side-by-side comparison

Task Native Streams streamfu
Transform each chunk pipeThrough(new TransformStream({...})) map(stream, fn)
Filter chunks Manual reader loop + condition filter(stream, fn)
Reduce to value Manual reader loop + accumulator reduce(stream, fn, init)
Combine streams Manual reader orchestration zip(s1, s2, s3)
Concatenate streams Complex async pull logic concat(s1, s2, s3)
Split stream Nested .tee() chains branch(stream, n)
Get element at index Manual counter + reader at(stream, i)
Check if value exists Manual loop + early exit includes(stream, val)
Chain operations Deeply nested pipeThrough pipe(stream, f1, f2, f3)

If you know Array.prototype, you already know streamfu.


Getting Started

Install

npm / yarn / pnpm / bun
npx jsr add @sgmonda/streamfu # npm
yarn dlx jsr add @sgmonda/streamfu # yarn
pnpm dlx jsr add @sgmonda/streamfu # pnpm
bunx jsr add @sgmonda/streamfu # bun
Deno
deno add jsr:@sgmonda/streamfu

Quick start

import { createReadable, filter, map, pipe, reduce } from "@sgmonda/streamfu"

// Create a stream from any iterable
const numbers = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

// Compose transformations with pipe
const stream = pipe(
numbers,
(r) => filter(r, (n) => n % 2 === 0), // keep even: 2, 4, 6, 8, 10
(r) => map(r, (n) => n * 2), // double: 4, 8, 12, 16, 20
)
const sumOfDoubledEvens = await reduce(stream, (a, b) => a + b, 0) // sum: 60

console.log(sumOfDoubledEvens) // 60

API Reference

Stream creation

Function Description
createReadable(iterable) Create a stream from arrays, generators, sets, strings -- any iterable
createWritable(fn) Create a writable stream from a callback function
range(min, max, step?) Generate a stream of numbers in a range
words(chars, length) Generate a stream of random strings

Transformations (non-consuming)

These return a new stream -- the original is not consumed.

Function Description
map(stream, ...fns) Transform each chunk (supports chaining multiple transforms)
filter(stream, fn) Keep only chunks matching a predicate
flat(stream, depth?) Flatten a stream of arrays
flatMap(stream, fn) Map + flatten in one step
slice(stream, start, end?) Extract a portion of the stream
splice(stream, start, count, ...items) Remove/insert chunks at a position
concat(...streams) Concatenate multiple streams sequentially
zip(...streams) Combine streams into a stream of tuples
pipe(stream, ...fns) Chain multiple stream operations
branch(stream, n) Split a stream into n independent copies

Consumers (consuming)

These consume the stream -- it cannot be reused afterward.

Function Description
reduce(stream, fn, init) Reduce to a single value
list(stream) Collect all chunks into an array
some(stream, fn) Check if any chunk matches
every(stream, fn) Check if all chunks match
forEach(stream, fn) Execute a function for each chunk
includes(stream, value) Check if a value exists in the stream
at(stream, index) Get the chunk at a specific index
indexOf(stream, value) Find the index of a value

Consuming vs non-consuming

Rule of thumb: If it returns a ReadableStream, it's non-consuming. If it returns a Promise, it consumes the stream.

Need to consume a stream multiple times? Use branch() first:

const [forSum, forCount] = branch(numbers, 2)

const sum = await reduce(forSum, (a, b) => a + b, 0)
const count = await reduce(forCount, (acc) => acc + 1, 0)

Streams the Hard Way vs streamfu

Every example below shows a real task done the hard way with native Web Streams, then the easy way with streamfu.

1. Transform every chunk -- Parse CSV lines and uppercase names

Before -- native Web Streams:

const transform1 = new TransformStream({
transform(line, ctrl) {
ctrl.enqueue(line.split(","))
},
})
const transform2 = new TransformStream({
transform(cols, ctrl) {
ctrl.enqueue({ name: cols[0].toUpperCase(), age: Number(cols[1]) })
},
})

const reader = csvStream.pipeThrough(transform1).pipeThrough(transform2).getReader()
const results = []
while (true) {
const { done, value } = await reader.read()
if (done) break
results.push(value)
}
reader.releaseLock()

After -- streamfu:

import { createReadable, list, map } from "@sgmonda/streamfu"

const stream = map(
csvStream,
(line) => line.split(","),
(cols) => ({ name: cols[0].toUpperCase(), age: Number(cols[1]) }),
)
const results = await list(stream)
2. Filter and collect -- Keep only active users from a stream

Before -- native Web Streams:

const filter = new TransformStream({
transform(user, ctrl) {
if (user.status === "active") ctrl.enqueue(user)
},
})

const reader = usersStream.pipeThrough(filter).getReader()
const active = []
while (true) {
const { done, value } = await reader.read()
if (done) break
active.push(value)
}
reader.releaseLock()

After -- streamfu:

import { filter, list } from "@sgmonda/streamfu"

const active = await list(filter(usersStream, (u) => u.status === "active"))
3. Reduce to a single value -- Sum a stream of numbers

Before -- native Web Streams:

const reader = numbersStream.getReader()
let total = 0
while (true) {
const { done, value } = await reader.read()
if (done) break
total += value
}
reader.releaseLock()

After -- streamfu:

import { reduce } from "@sgmonda/streamfu"

const total = await reduce(numbersStream, (sum, n) => sum + n, 0)
4. Multi-step pipeline -- Filter, transform, and collect in one chain

Before -- native Web Streams:

const filterStep = new TransformStream({
transform(n, ctrl) {
if (n % 2 === 0) ctrl.enqueue(n)
},
})
const doubleStep = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(n * 2)
},
})
const toStringStep = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(`Value: ${n}`)
},
})

const reader = numbersStream
.pipeThrough(filterStep)
.pipeThrough(doubleStep)
.pipeThrough(toStringStep)
.getReader()
const results = []
while (true) {
const { done, value } = await reader.read()
if (done) break
results.push(value)
}
reader.releaseLock()

After -- streamfu:

import { filter, list, map, pipe } from "@sgmonda/streamfu"

const results = await list(pipe(
numbersStream,
(r) => filter(r, (n) => n % 2 === 0),
(r) => map(r, (n) => n * 2),
(r) => map(r, (n) => `Value: ${n}`),
))
5. Concatenate multiple sources -- Merge API pages into one stream and take the first 50

Before -- native Web Streams:

const sources = [page1Stream, page2Stream, page3Stream]
const reader1 = sources[0].getReader()
const reader2 = sources[1].getReader()
const reader3 = sources[2].getReader()
const all = []

for (const reader of [reader1, reader2, reader3]) {
while (true) {
const { done, value } = await reader.read()
if (done) break
all.push(value)
}
reader.releaseLock()
}
const first50 = all.slice(0, 50)

After -- streamfu:

import { concat, list, slice } from "@sgmonda/streamfu"

const first50 = await list(slice(concat(page1Stream, page2Stream, page3Stream), 0, 50))
6. Zip parallel streams -- Pair names with scores into labeled strings

Before -- native Web Streams:

const readerA = namesStream.getReader()
const readerB = scoresStream.getReader()
const leaderboard = []

while (true) {
const [a, b] = await Promise.all([readerA.read(), readerB.read()])
if (a.done || b.done) break
leaderboard.push(`${a.value}: ${b.value}`)
}
readerA.releaseLock()
readerB.releaseLock()

After -- streamfu:

import { list, map, zip } from "@sgmonda/streamfu"

const leaderboard = await list(map(zip(namesStream, scoresStream), ([name, score]) => `${name}: ${score}`))
7. Consume a stream twice -- Get both the sum and the max from the same stream

Before -- native Web Streams:

const [copy1, copy2] = numbersStream.tee()

const reader1 = copy1.getReader()
let sum = 0
while (true) {
const { done, value } = await reader1.read()
if (done) break
sum += value
}
reader1.releaseLock()

const reader2 = copy2.getReader()
let max = -Infinity
while (true) {
const { done, value } = await reader2.read()
if (done) break
if (value > max) max = value
}
reader2.releaseLock()

After -- streamfu:

import { branch, reduce } from "@sgmonda/streamfu"

const [forSum, forMax] = branch(numbersStream, 2)

const [sum, max] = await Promise.all([
reduce(forSum, (a, b) => a + b, 0),
reduce(forMax, (a, b) => (b > a ? b : a), -Infinity),
])
8. Flatten paginated results -- Expand arrays of items into individual chunks

Before -- native Web Streams:

// Each chunk is an array like [item1, item2, item3] from a paginated API
const flatten = new TransformStream({
transform(page, ctrl) {
for (const item of page) ctrl.enqueue(item)
},
})
const label = new TransformStream({
transform(item, ctrl) {
ctrl.enqueue(`#${item.id}: ${item.title}`)
},
})

const reader = pagesStream.pipeThrough(flatten).pipeThrough(label).getReader()
const items = []
while (true) {
const { done, value } = await reader.read()
if (done) break
items.push(value)
}
reader.releaseLock()

After -- streamfu:

import { flatMap, list, map } from "@sgmonda/streamfu"

const items = await list(map(flatMap(pagesStream, (page) => page), (item) => `#${item.id}: ${item.title}`))
9. Quick queries on a stream -- Answer five questions about the data without five manual loops

Before -- native Web Streams:

// Need 5 copies -- tee() only gives 2, so we chain:
const [s1, rest1] = dataStream.tee()
const [s2, rest2] = rest1.tee()
const [s3, rest3] = rest2.tee()
const [s4, s5] = rest3.tee()

// Does it contain 42?
let hasFortyTwo = false
const r1 = s1.getReader()
while (true) {
const { done, value } = await r1.read()
if (done) break
if (value === 42) {
hasFortyTwo = true
break
}
}
r1.releaseLock()

// Are all values positive?
let allPositive = true
const r2 = s2.getReader()
while (true) {
const { done, value } = await r2.read()
if (done) break
if (value <= 0) {
allPositive = false
break
}
}
r2.releaseLock()

// Is any value greater than 100?
let anyOver100 = false
const r3 = s3.getReader()
while (true) {
const { done, value } = await r3.read()
if (done) break
if (value > 100) {
anyOver100 = true
break
}
}
r3.releaseLock()

// Where is 7?
let indexOf7 = -1
let idx = 0
const r4 = s4.getReader()
while (true) {
const { done, value } = await r4.read()
if (done) break
if (value === 7) {
indexOf7 = idx
break
}
idx++
}
r4.releaseLock()

// What's the third element?
let third = undefined
let count = 0
const r5 = s5.getReader()
while (true) {
const { done, value } = await r5.read()
if (done) break
if (count === 2) {
third = value
break
}
count++
}
r5.releaseLock()

After -- streamfu:

import { at, branch, every, includes, indexOf, some } from "@sgmonda/streamfu"

const [s1, s2, s3, s4, s5] = branch(dataStream, 5)

const [hasFortyTwo, allPositive, anyOver100, indexOf7, third] = await Promise.all([
includes(s1, 42),
every(s2, (n) => n > 0),
some(s3, (n) => n > 100),
indexOf(s4, 7),
at(s5, 2),
])
10. Generate, splice, and process -- Create a range, replace elements, and log results

Before -- native Web Streams:

// Generate 1..10 manually
const numbers = new ReadableStream({
start(ctrl) {
for (let i = 1; i <= 10; i++) ctrl.enqueue(i)
ctrl.close()
},
})

// Splice: remove 3 items at index 3, insert 99 and 100
let idx = 0
const removed = 3
const spliceTransform = new TransformStream({
transform(chunk, ctrl) {
if (idx === 3 + removed) {
ctrl.enqueue(99)
ctrl.enqueue(100)
}
if (idx < 3 || idx >= 3 + removed) {
ctrl.enqueue(chunk)
}
idx++
},
flush(ctrl) {
if (idx <= 3 + removed) {
ctrl.enqueue(99)
ctrl.enqueue(100)
}
},
})

// Double each value
const doubleTransform = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(n * 2)
},
})

const reader = numbers.pipeThrough(spliceTransform).pipeThrough(doubleTransform).getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log(value)
}
reader.releaseLock()

After -- streamfu:

import { forEach, map, pipe, range, splice } from "@sgmonda/streamfu"

await forEach(
pipe(
range(1, 10),
(r) => splice(r, 3, 3, 99, 100),
(r) => map(r, (n) => n * 2),
),
(value) => console.log(value),
)

Design principles

  • Functional & pure -- No side effects, no mutations. Every operation returns a new stream.
  • Familiar API -- Mirrors Array.prototype methods. Zero learning curve.
  • Universal -- Built on the Web Streams API. Works in Node.js, Deno, Bun, and browsers.
  • Type-safe -- Full TypeScript support with precise generics.
  • Tested -- 100% code coverage. Every function, every edge case.

Error handling

One of the trickiest parts of working with streams is error handling. The traditional approach relies on listening to events, which splits your logic across multiple callbacks and makes the control flow hard to follow:

// Event-based error handling: scattered logic, easy to forget a listener
stream.on("data", (chunk) => {/* process chunk */})
stream.on("error", (err) => {/* handle error */})
stream.on("end", () => {/* cleanup */})

This pattern has several problems:

  • Error handling is disconnected from the processing logic
  • Forgetting the error listener can cause unhandled exceptions that crash your process
  • Coordinating end and error to know when the stream is truly done requires extra state
  • It doesn't compose well with async/await code

In streamfu, errors propagate automatically through chained operations. If a map() transformer or a filter() predicate throws, the error bubbles up and rejects the promise returned by any consuming operation (list(), reduce(), every(), etc.). This means you can use a standard try/catch block:

// Errors propagate through the entire chain
const stream = pipe(
createReadable(data),
(r) => map(r, transformFn), // if this throws...
(r) => filter(r, predicateFn),
)

try {
const results = await list(stream) // ...the error rejects here
} catch (err) {
console.error("Something failed:", err)
}

No special error listeners, no extra plumbing. Errors flow naturally through map(), filter(), and any other chained transformation, all the way to whichever consumer ends the pipeline.

Contributing

Contributions welcome! Fork the repo, make your changes, and submit a PR.

deno task test

Requirements:

Publishing

Published to JSR automatically via GitHub CI when version changes in deno.json on main.


MIT License * Made with care by @sgmonda and contributors

About

Functional stream utilities for JavaScript & TypeScript

Topics

Resources

Readme

License

MIT license

Code of conduct

Code of conduct

Contributing

Contributing

Stars

Watchers

Forks

Contributors