JavaScript:折叠无限流(生成器函数)

问题描述 投票:0回答:4

在 Java 中可以这样声明和折叠无限流

List<Integer> collect = Stream.iterate(0, i -> i + 2)
    .map(i -> i * 3)
    .filter(i -> i % 2 == 0)
    .limit(10)
    .collect(Collectors.toList());

// -> [0, 6, 12, 18, 24]

在 JavaScript 中,我可以使用生成器函数来生成和传播值流。

// Limit the value in generator
let generator = (function* () {
    for (let i=0; i<10; i++) {
        yield i
    }
})()

[ ...generator ]
    .map(i => i * 3)
    .filter(i => i % 2 === 0)

// -> [0, 6, 12, 18, 24]

但是我如何流式传输和折叠无限流呢?我知道我可以使用

for (n of generator)
循环迭代和限制流。但是使用 Java 示例等流畅的 API 是否可行?

javascript ecmascript-6 functional-programming stream generator
4个回答
2
投票

这是给定答案的另一种方法。

1.函数式API

首先创建一个函数式API。

const itFilter = p => function* (ix) {
  for (const x of ix)
    if (p(x))
      yield x;
};

const itMap = f => function* (ix) {
  for (const x of ix)
    yield f(x);
};

const itTake = n => function* (ix) {
  let m = n;
  
  for (const x of ix) {
    if (m-- === 0)
      break;

    yield x;
  }
};

const comp3 = f => g => h => x =>
  f(g(h(x)));    const xs = [1,2,3,4,5,6,7,8,9,10];

const stream = comp3(itTake(3))
  (itFilter(x => x % 2 === 0))
    (itMap(x => x * 3));

console.log(
  Array.from(stream(xs))
);

2.盒式

接下来,定义一个

Box
类型以允许任意功能 API 的方法链接。

function Box(x) {
  return new.target ? (this.x = x, this) : new Box(x)
}

Box.prototype.map = function map(f) {return new Box(f(this.x))};
Box.prototype.fold = function fold(f) {return f(this.x)};

3.方法链接

最后,使用新的

Box
类型来链接方法。

const itFilter = p => function* (ix) {
  for (const x of ix)
    if (p(x))
      yield x;
};

const itMap = f => function* (ix) {
  for (const x of ix)
    yield f(x);
};

const itTake = n => function* (ix) {
  let m = n;
  
  for (const x of ix) {
    if (m-- === 0)
      break;
      
    yield x;
  }
};

const xs = [1,2,3,4,5,6,7,8,9,10];

function Box(x) {
  return new.target ? (this.x = x, this) : new Box(x)
}

Box.prototype.map = function map(f) {return new Box(f(this.x))};
Box.prototype.fold = function fold(f) {return f(this.x)};

const stream = Box(xs)
  .map(itMap(x => x * 3))
  .map(itFilter(x => x % 2 === 0))
  .map(itTake(3))
  .fold(x => x);
  
 console.log(
   Array.from(stream)
 );

Box
免费为您提供流畅的 API。


2
投票

这是一个例子 -

// a terminating generator
const range = function* (from, to)
{ while (from < to)
    yield from++
}

// higher-order generator
const G =
  range(0, 100).filter(isEven).map(square)

for (const x of G)
  console.log(x)

// (0*0) (2*2) (4*4) (6*6) (8*8) ...
// 0 4 16 36 64 ...

我们可以通过扩展生成器原型来实现这样的事情 -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

展开下面的代码片段以在您的浏览器中验证我们的进度 -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

// example functions
const square = x =>
  x * x

const isEven = x =>
  (x & 1) === 0
  
// an terminating generator
const range = function* (from, to)
{ while (from < to)
    yield from++
}

// higher-order generator
for (const x of range(0, 100).filter(isEven).map(square))
  console.log(x)

// (0*0) (2*2) (4*4) (6*6) (8*8) ...
// 0 4 16 36 64 ...

继续,像

fold
collect
之类的东西假设流最终终止,否则它无法返回值 -

Generator.prototype.fold = function (f, acc, context)
{ for (const x of this)
    acc = f.call(context, acc, x)
  return acc
}

const result =
  range(0, 100)      // <- a terminating stream
    .filter(isEven)
    .map(square)
    .fold(add, 0)    // <- assumes the generator terminates

console.log(result)
// 161700

如果必须折叠无限流,可以实现

limit
-

Generator.prototype.limit = function* (n)
{ for (const x of this)
    if (n-- === 0)
      break // <-- stop the stream
    else
      yield x
}

// an infinite generator
const range = function* (x = 0)
{ while (true)
    yield x++
}

// fold an infinite stream using limit
const result =
  range(0)          // infinite stream, starting at 0
    .limit(100)     // limited to 100 values
    .filter(isEven) // only pass even values
    .map(square)    // square each value
    .fold(add, 0)   // fold values together using add, starting at 0

console.log(result)
// 161700

展开下面的代码片段以验证浏览器中的结果 -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

Generator.prototype.fold = function (f, acc, context)
{ for (const x of this)
    acc = f.call(context, acc, x)
  return acc
}

Generator.prototype.limit = function* (n)
{ for (const x of this)
    if (n-- === 0)
      break // <-- stop the stream
    else
      yield x
}

const square = x =>
  x * x

const isEven = x =>
  (x & 1) === 0
  
const add = (x, y) =>
  x + y

// an infinite generator
const range = function* (x = 0)
{ while (true)
    yield x++
}

// fold an infinite stream using limit
const result =
  range(0)          // starting at 0
    .limit(100)     // limited to 100 values
    .filter(isEven) // only pass even values
    .map(square)    // square each value
    .fold(add, 0)   // fold values together using add, starting at 0

console.log(result)
// 161700

在上面,请注意将

limit
的顺序更改为 after
filter
表达式如何改变结果 -

const result =
  range(0)          // starting at 0
    .filter(isEven) // only pass even values
    .limit(100)     // limited to 100 values
    .map(square)    // square each value
    .fold(add, 0)   // fold values together using add, starting at 0

console.log(result)
// 1313400

在第一个节目中 -

  1. 从无限范围开始
    (0, 1, 2, 3, 4, ...)
  2. 限制为 100 个值
    (0, 1, 2, 3, 4, ...,97, 98, 99)
  3. 仅传递偶数值
    (0, 2, 4, ...94, 96, 98)
  4. 对每个值进行平方
    (0, 4, 16, ..., 8836, 9216, 9604)
  5. 使用 add 折叠值,从 0 开始,
    (0 + 0 + 4 + 16 + ..., + 8836 + 9216 + 9604)
  6. 结果
    161700

在第二个节目中-

  1. 从无限范围开始
    (0, 1, 2, 3, 4, ...)
  2. 仅传递偶数值
    (0, 2, 4, ...)
  3. 限制为 100 个值
    (0, 2, 4, 6, 8, ...194, 196, 198)
  4. 对每个值进行平方
    (0, 4, 16, 36, 64, ..., 37636, 38416, 29304)
  5. 使用 add 折叠值,从 0 开始,
    (0 + 4 + 16 + 36 + 64 + ..., + 37636+ 38416 + 29304)
  6. 结果
    1313400

最后我们实现了

collect
,与
fold
不同,它不需要初始累加器。相反,第一个值是从流中手动抽取并用作初始累加器。流恢复,将每个值与前一个值折叠 -

Generator.prototype.collect = function (f, context)
{ let { value } = this.next()
  for (const x of this)
    value = f.call(context, value, x)
  return value
}

const toList = (a, b) =>
  [].concat(a, b)

range(0,100).map(square).collect(toList)
// [ 0, 1, 2, 3, ..., 97, 98, 99 ]

range(0,100).map(square).collect(add)
// 4950

并注意双重消耗您的流! JavaScript 没有为我们提供持久迭代器,因此一旦消耗了流,您就无法可靠地调用流上的其他高阶函数 -

// create a stream
const stream  =
  range(0)
    .limit(100)
    .filter(isEven)
    .map(square)

console.log(stream.fold(add, 0)) // 161700
console.log(stream.fold(add, 0)) // 0 (stream already exhausted!)

// create another stream
const stream2  =
  range(0)
    .limit(100)
    .filter(isEven)
    .map(square)

console.log(stream2.fold(add, 0)) // 161700
console.log(stream2.fold(add, 0)) // 0 (stream2 exhausted!)

当您执行类似

merge
-

之类的操作时,很可能会发生这种情况
const r =
  range (0)

r.merge(r, r).limit(3).fold(append, [])
// double consume! bug!
// [ [ 0, 1, 2 ], [ 3, 4, 5 ], [ 6, 7, 8 ] ]
// expected:
// [ [ 0, 0, 0 ], [ 1, 1, 1 ], [ 2, 2, 2 ] ]

// fresh range(0) each time
range(0).merge(range(0), range(0)).limit(3).fold(append, [])
// correct:
// [ [ 0, 0, 0 ], [ 1, 1, 1 ], [ 2, 2, 2 ] ]

每次使用fresh生成器(

range(0)...
)可以避免这个问题 -

const stream =
  range(0)
    .merge
      ( range(0).filter(isEven)
      , range(0).filter(x => !isEven(x))
      , range(0).map(square)
      )
    .limit(10)

console.log ('natural + even + odd + squares = ?')
for (const [ a, b, c, d ] of stream)
  console.log (`${ a } + ${ b } + ${ c } + ${ d } = ${ a + b + c + d }`)

// natural + even + odd + squares = ?
// 0 + 0 + 1 + 0 = 1
// 1 + 2 + 3 + 1 = 7
// 2 + 4 + 5 + 4 = 15
// 3 + 6 + 7 + 9 = 25
// 4 + 8 + 9 + 16 = 37
// 5 + 10 + 11 + 25 = 51
// 6 + 12 + 13 + 36 = 67
// 7 + 14 + 15 + 49 = 85
// 8 + 16 + 17 + 64 = 105
// 9 + 18 + 19 + 81 = 127

这是为我们的生成器使用参数的关键原因:它会让您考虑正确地重用它们。因此,我们的流不应该将

stream
定义为上面的
const
,而应该始终 是函数,即使是无效的 -

// streams should be a function, even if they don't accept arguments
// guarantees a fresh iterator each time
const megaStream = (start = 0, limit = 1000) =>
  range(start) // natural numbers
    .merge
      ( range(start).filter(isEven) // evens
      , range(start).filter(x => !isEven(x)) // odds
      , range(start).map(square) // squares
      )
    .limit(limit)

const print = s =>
{ for (const x of s)
    console.log(x)
}

print(megaStream(0).merge(megaStream(10, 3)))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ] ]

print(megaStream(0).merge(megaStream(10), megaStream(100)).limit(5))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ], [ 100, 100, 101, 10000 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ], [ 101, 102, 103, 10201 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ], [ 102, 104, 105, 10404 ] ]
// [ [ 3, 6, 7, 9 ], [ 13, 16, 17, 169 ], [ 103, 106, 107, 10609 ] ]
// [ [ 4, 8, 9, 16 ], [ 14, 18, 19, 196 ], [ 104, 108, 109, 10816 ] ]

我们可以将

merge
实现为 -

Generator.prototype.merge = function* (...streams)
{ let river = [ this ].concat(streams).map(s => [ s, s.next() ])
  while (river.every(([ _, { done } ]) => done === false))
  { yield river.map(([ _, { value } ]) => value)
    river = river.map(([ s, _ ]) => [ s, s.next() ])
  }
}

展开下面的代码片段以验证浏览器中的结果 -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

Generator.prototype.limit = function* (n)
{ for (const x of this)
    if (n-- === 0)
      break // <-- stop the stream
    else
      yield x
}

Generator.prototype.merge = function* (...streams)
{ let river = [ this ].concat(streams).map(s => [ s, s.next() ])
  while (river.every(([ _, { done } ]) => done === false))
  { yield river.map(([ _, { value } ]) => value)
    river = river.map(([ s, _ ]) => [ s, s.next() ])
  }
}

const isEven = x =>
  (x & 1) === 0

const square = x =>
  x * x

const range = function* (x = 0)
{ while (true)
    yield x++
}

// streams should be functions, even if they don't have parameters
const megaStream = (start = 0, limit = 1000) =>
  range(start) // natural numbers
    .merge
      ( range(start).filter(isEven) // evens
      , range(start).filter(x => !isEven(x)) // odds
      , range(start).map(square) // squares
      )
    .limit(limit)

// for demo only
const print = s =>
{ for (const x of s) console.log(x) }

print(megaStream(0).merge(megaStream(10, 3)))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ] ]

print(megaStream(0).merge(megaStream(10), megaStream(100)).limit(5))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ], [ 100, 100, 101, 10000 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ], [ 101, 102, 103, 10201 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ], [ 102, 104, 105, 10404 ] ]
// [ [ 3, 6, 7, 9 ], [ 13, 16, 17, 169 ], [ 103, 106, 107, 10609 ] ]
// [ [ 4, 8, 9, 16 ], [ 14, 18, 19, 196 ], [ 104, 108, 109, 10816 ] ]


0
投票

我将添加另一个可能是您正在寻找的答案。我是 scramjet 的作者,这是一个基于流的框架,它为转换添加了流畅的 API。用它可以很容易地实现你想要的:

import {DataStream} from "scramjet";
let i = 0;
const out = await (
    DataStream.from(function*() { let n = 2; while (true) yield n++; })
        .map(n => n+2)
        .filter(i -> i % 2 == 0)
        .until(() => i++ === 10)
        .toArray()
);

我主要是为了异步操作而构建它(所以你可以用异步函数替换任何这些函数,它的工作原理完全相同)。所以如果这可能的话,答案是肯定的。

但需要注意的是:它所基于的node.js流中有一些缓冲区,因此生成器可能会比until方法允许的迭代次数更多。


0
投票

随着 ECMAScript 2025 中引入迭代器辅助方法,我们现在可以:

function* iterate(val, cb) {
    while (true) {
        yield val;
        val = cb(val);
    }
}

const collect = iterate(0, i => i + 2)
    .map(i => i * 3)
    .filter(i => i % 2 == 0)
    .take(5) // equivalent of limit
    .toArray();

console.log(collect); // -> [0, 6, 12, 18, 24]

这里,

map
filter
take
toArray
是迭代器辅助方法,允许惰性求值,形成“管道”。
toArray
方法开始消费值,
take
方法停止消费,从上一个迭代器开始,一旦产生了5个值,整个过程就完成了。

© www.soinside.com 2019 - 2024. All rights reserved.