在 Java 中可以这样声明和折叠无限流
List<Integer> collect = Stream.iterate(0, i -> i + 2)
.map(i -> i * 3)
.filter(i -> i % 2 == 0)
.limit(10)
.collect(Collectors.toList());
// -> [0, 6, 12, 18, 24]
在 JavaScript 中,我可以使用生成器函数来生成和传播值流。
// Limit the value in generator
let generator = (function* () {
for (let i=0; i<10; i++) {
yield i
}
})()
[ ...generator ]
.map(i => i * 3)
.filter(i => i % 2 === 0)
// -> [0, 6, 12, 18, 24]
但是我如何流式传输和折叠无限流呢?我知道我可以使用
for (n of generator)
循环迭代和限制流。但是使用 Java 示例等流畅的 API 是否可行?
这是给定答案的另一种方法。
首先创建一个函数式API。
const itFilter = p => function* (ix) {
for (const x of ix)
if (p(x))
yield x;
};
const itMap = f => function* (ix) {
for (const x of ix)
yield f(x);
};
const itTake = n => function* (ix) {
let m = n;
for (const x of ix) {
if (m-- === 0)
break;
yield x;
}
};
const comp3 = f => g => h => x =>
f(g(h(x))); const xs = [1,2,3,4,5,6,7,8,9,10];
const stream = comp3(itTake(3))
(itFilter(x => x % 2 === 0))
(itMap(x => x * 3));
console.log(
Array.from(stream(xs))
);
接下来,定义一个
Box
类型以允许任意功能 API 的方法链接。
function Box(x) {
return new.target ? (this.x = x, this) : new Box(x)
}
Box.prototype.map = function map(f) {return new Box(f(this.x))};
Box.prototype.fold = function fold(f) {return f(this.x)};
最后,使用新的
Box
类型来链接方法。
const itFilter = p => function* (ix) {
for (const x of ix)
if (p(x))
yield x;
};
const itMap = f => function* (ix) {
for (const x of ix)
yield f(x);
};
const itTake = n => function* (ix) {
let m = n;
for (const x of ix) {
if (m-- === 0)
break;
yield x;
}
};
const xs = [1,2,3,4,5,6,7,8,9,10];
function Box(x) {
return new.target ? (this.x = x, this) : new Box(x)
}
Box.prototype.map = function map(f) {return new Box(f(this.x))};
Box.prototype.fold = function fold(f) {return f(this.x)};
const stream = Box(xs)
.map(itMap(x => x * 3))
.map(itFilter(x => x % 2 === 0))
.map(itTake(3))
.fold(x => x);
console.log(
Array.from(stream)
);
Box
免费为您提供流畅的 API。
这是一个例子 -
// a terminating generator
const range = function* (from, to)
{ while (from < to)
yield from++
}
// higher-order generator
const G =
range(0, 100).filter(isEven).map(square)
for (const x of G)
console.log(x)
// (0*0) (2*2) (4*4) (6*6) (8*8) ...
// 0 4 16 36 64 ...
我们可以通过扩展生成器原型来实现这样的事情 -
const Generator =
Object.getPrototypeOf(function* () {})
Generator.prototype.map = function* (f, context)
{ for (const x of this)
yield f.call(context, x)
}
Generator.prototype.filter = function* (f, context)
{ for (const x of this)
if (f.call(context, x))
yield x
}
展开下面的代码片段以在您的浏览器中验证我们的进度 -
const Generator =
Object.getPrototypeOf(function* () {})
Generator.prototype.map = function* (f, context)
{ for (const x of this)
yield f.call(context, x)
}
Generator.prototype.filter = function* (f, context)
{ for (const x of this)
if (f.call(context, x))
yield x
}
// example functions
const square = x =>
x * x
const isEven = x =>
(x & 1) === 0
// an terminating generator
const range = function* (from, to)
{ while (from < to)
yield from++
}
// higher-order generator
for (const x of range(0, 100).filter(isEven).map(square))
console.log(x)
// (0*0) (2*2) (4*4) (6*6) (8*8) ...
// 0 4 16 36 64 ...
继续,像
fold
或 collect
之类的东西假设流最终终止,否则它无法返回值 -
Generator.prototype.fold = function (f, acc, context)
{ for (const x of this)
acc = f.call(context, acc, x)
return acc
}
const result =
range(0, 100) // <- a terminating stream
.filter(isEven)
.map(square)
.fold(add, 0) // <- assumes the generator terminates
console.log(result)
// 161700
如果必须折叠无限流,可以实现
limit
-
Generator.prototype.limit = function* (n)
{ for (const x of this)
if (n-- === 0)
break // <-- stop the stream
else
yield x
}
// an infinite generator
const range = function* (x = 0)
{ while (true)
yield x++
}
// fold an infinite stream using limit
const result =
range(0) // infinite stream, starting at 0
.limit(100) // limited to 100 values
.filter(isEven) // only pass even values
.map(square) // square each value
.fold(add, 0) // fold values together using add, starting at 0
console.log(result)
// 161700
展开下面的代码片段以验证浏览器中的结果 -
const Generator =
Object.getPrototypeOf(function* () {})
Generator.prototype.map = function* (f, context)
{ for (const x of this)
yield f.call(context, x)
}
Generator.prototype.filter = function* (f, context)
{ for (const x of this)
if (f.call(context, x))
yield x
}
Generator.prototype.fold = function (f, acc, context)
{ for (const x of this)
acc = f.call(context, acc, x)
return acc
}
Generator.prototype.limit = function* (n)
{ for (const x of this)
if (n-- === 0)
break // <-- stop the stream
else
yield x
}
const square = x =>
x * x
const isEven = x =>
(x & 1) === 0
const add = (x, y) =>
x + y
// an infinite generator
const range = function* (x = 0)
{ while (true)
yield x++
}
// fold an infinite stream using limit
const result =
range(0) // starting at 0
.limit(100) // limited to 100 values
.filter(isEven) // only pass even values
.map(square) // square each value
.fold(add, 0) // fold values together using add, starting at 0
console.log(result)
// 161700
在上面,请注意将
limit
的顺序更改为 after filter
表达式如何改变结果 -
const result =
range(0) // starting at 0
.filter(isEven) // only pass even values
.limit(100) // limited to 100 values
.map(square) // square each value
.fold(add, 0) // fold values together using add, starting at 0
console.log(result)
// 1313400
在第一个节目中 -
(0, 1, 2, 3, 4, ...)
(0, 1, 2, 3, 4, ...,97, 98, 99)
(0, 2, 4, ...94, 96, 98)
(0, 4, 16, ..., 8836, 9216, 9604)
(0 + 0 + 4 + 16 + ..., + 8836 + 9216 + 9604)
161700
在第二个节目中-
(0, 1, 2, 3, 4, ...)
(0, 2, 4, ...)
(0, 2, 4, 6, 8, ...194, 196, 198)
(0, 4, 16, 36, 64, ..., 37636, 38416, 29304)
(0 + 4 + 16 + 36 + 64 + ..., + 37636+ 38416 + 29304)
1313400
最后我们实现了
collect
,与fold
不同,它不需要初始累加器。相反,第一个值是从流中手动抽取并用作初始累加器。流恢复,将每个值与前一个值折叠 -
Generator.prototype.collect = function (f, context)
{ let { value } = this.next()
for (const x of this)
value = f.call(context, value, x)
return value
}
const toList = (a, b) =>
[].concat(a, b)
range(0,100).map(square).collect(toList)
// [ 0, 1, 2, 3, ..., 97, 98, 99 ]
range(0,100).map(square).collect(add)
// 4950
并注意双重消耗您的流! JavaScript 没有为我们提供持久迭代器,因此一旦消耗了流,您就无法可靠地调用流上的其他高阶函数 -
// create a stream
const stream =
range(0)
.limit(100)
.filter(isEven)
.map(square)
console.log(stream.fold(add, 0)) // 161700
console.log(stream.fold(add, 0)) // 0 (stream already exhausted!)
// create another stream
const stream2 =
range(0)
.limit(100)
.filter(isEven)
.map(square)
console.log(stream2.fold(add, 0)) // 161700
console.log(stream2.fold(add, 0)) // 0 (stream2 exhausted!)
当您执行类似
merge
- 之类的操作时,很可能会发生这种情况
const r =
range (0)
r.merge(r, r).limit(3).fold(append, [])
// double consume! bug!
// [ [ 0, 1, 2 ], [ 3, 4, 5 ], [ 6, 7, 8 ] ]
// expected:
// [ [ 0, 0, 0 ], [ 1, 1, 1 ], [ 2, 2, 2 ] ]
// fresh range(0) each time
range(0).merge(range(0), range(0)).limit(3).fold(append, [])
// correct:
// [ [ 0, 0, 0 ], [ 1, 1, 1 ], [ 2, 2, 2 ] ]
每次使用fresh生成器(
range(0)...
)可以避免这个问题 -
const stream =
range(0)
.merge
( range(0).filter(isEven)
, range(0).filter(x => !isEven(x))
, range(0).map(square)
)
.limit(10)
console.log ('natural + even + odd + squares = ?')
for (const [ a, b, c, d ] of stream)
console.log (`${ a } + ${ b } + ${ c } + ${ d } = ${ a + b + c + d }`)
// natural + even + odd + squares = ?
// 0 + 0 + 1 + 0 = 1
// 1 + 2 + 3 + 1 = 7
// 2 + 4 + 5 + 4 = 15
// 3 + 6 + 7 + 9 = 25
// 4 + 8 + 9 + 16 = 37
// 5 + 10 + 11 + 25 = 51
// 6 + 12 + 13 + 36 = 67
// 7 + 14 + 15 + 49 = 85
// 8 + 16 + 17 + 64 = 105
// 9 + 18 + 19 + 81 = 127
这是为我们的生成器使用参数的关键原因:它会让您考虑正确地重用它们。因此,我们的流不应该将
stream
定义为上面的 const
,而应该始终 是函数,即使是无效的 -
// streams should be a function, even if they don't accept arguments
// guarantees a fresh iterator each time
const megaStream = (start = 0, limit = 1000) =>
range(start) // natural numbers
.merge
( range(start).filter(isEven) // evens
, range(start).filter(x => !isEven(x)) // odds
, range(start).map(square) // squares
)
.limit(limit)
const print = s =>
{ for (const x of s)
console.log(x)
}
print(megaStream(0).merge(megaStream(10, 3)))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ] ]
print(megaStream(0).merge(megaStream(10), megaStream(100)).limit(5))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ], [ 100, 100, 101, 10000 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ], [ 101, 102, 103, 10201 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ], [ 102, 104, 105, 10404 ] ]
// [ [ 3, 6, 7, 9 ], [ 13, 16, 17, 169 ], [ 103, 106, 107, 10609 ] ]
// [ [ 4, 8, 9, 16 ], [ 14, 18, 19, 196 ], [ 104, 108, 109, 10816 ] ]
我们可以将
merge
实现为 -
Generator.prototype.merge = function* (...streams)
{ let river = [ this ].concat(streams).map(s => [ s, s.next() ])
while (river.every(([ _, { done } ]) => done === false))
{ yield river.map(([ _, { value } ]) => value)
river = river.map(([ s, _ ]) => [ s, s.next() ])
}
}
展开下面的代码片段以验证浏览器中的结果 -
const Generator =
Object.getPrototypeOf(function* () {})
Generator.prototype.map = function* (f, context)
{ for (const x of this)
yield f.call(context, x)
}
Generator.prototype.filter = function* (f, context)
{ for (const x of this)
if (f.call(context, x))
yield x
}
Generator.prototype.limit = function* (n)
{ for (const x of this)
if (n-- === 0)
break // <-- stop the stream
else
yield x
}
Generator.prototype.merge = function* (...streams)
{ let river = [ this ].concat(streams).map(s => [ s, s.next() ])
while (river.every(([ _, { done } ]) => done === false))
{ yield river.map(([ _, { value } ]) => value)
river = river.map(([ s, _ ]) => [ s, s.next() ])
}
}
const isEven = x =>
(x & 1) === 0
const square = x =>
x * x
const range = function* (x = 0)
{ while (true)
yield x++
}
// streams should be functions, even if they don't have parameters
const megaStream = (start = 0, limit = 1000) =>
range(start) // natural numbers
.merge
( range(start).filter(isEven) // evens
, range(start).filter(x => !isEven(x)) // odds
, range(start).map(square) // squares
)
.limit(limit)
// for demo only
const print = s =>
{ for (const x of s) console.log(x) }
print(megaStream(0).merge(megaStream(10, 3)))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ] ]
print(megaStream(0).merge(megaStream(10), megaStream(100)).limit(5))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ], [ 100, 100, 101, 10000 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ], [ 101, 102, 103, 10201 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ], [ 102, 104, 105, 10404 ] ]
// [ [ 3, 6, 7, 9 ], [ 13, 16, 17, 169 ], [ 103, 106, 107, 10609 ] ]
// [ [ 4, 8, 9, 16 ], [ 14, 18, 19, 196 ], [ 104, 108, 109, 10816 ] ]
我将添加另一个可能是您正在寻找的答案。我是 scramjet 的作者,这是一个基于流的框架,它为转换添加了流畅的 API。用它可以很容易地实现你想要的:
import {DataStream} from "scramjet";
let i = 0;
const out = await (
DataStream.from(function*() { let n = 2; while (true) yield n++; })
.map(n => n+2)
.filter(i -> i % 2 == 0)
.until(() => i++ === 10)
.toArray()
);
我主要是为了异步操作而构建它(所以你可以用异步函数替换任何这些函数,它的工作原理完全相同)。所以如果这可能的话,答案是肯定的。
但需要注意的是:它所基于的node.js流中有一些缓冲区,因此生成器可能会比until方法允许的迭代次数更多。
随着 ECMAScript 2025 中引入迭代器辅助方法,我们现在可以:
function* iterate(val, cb) {
while (true) {
yield val;
val = cb(val);
}
}
const collect = iterate(0, i => i + 2)
.map(i => i * 3)
.filter(i => i % 2 == 0)
.take(5) // equivalent of limit
.toArray();
console.log(collect); // -> [0, 6, 12, 18, 24]
这里,
map
、filter
、take
和toArray
是迭代器辅助方法,允许惰性求值,形成“管道”。 toArray
方法开始消费值,take
方法停止消费,从上一个迭代器开始,一旦产生了5个值,整个过程就完成了。