用于数据流项目的JavaScript转换器

问题描述 投票:1回答:1

在我当前的项目中,我正在处理大量的数字数据和必须以数据流可编程方式对它们进行的转换。

我偶然发现了换能器的概念,它有望解决在大型阵列上处理多种转换的难题。换能器似乎不完全适合我在此要解决的问题。

我正在寻找一种换能器的模式/概念,该换能器仅收集所需的回溯量然后处理结果。类似于浏览器版本的tensorflow,reaktor,max-msp(输入输出,流程图,基于节点的可视化编程)

这些模块中的大多数应连接到源,但也应能够充当将那些模块链接到其他模块的源

source ( a stream ) =[new-value]|=> module1 => module2 => ...
                                |=> module3 => module4 // branch off here to a new chain

根据我的理解,大多数博客中解释的换能器采用整个阵列,并通过选择的变压器提供每个单独的值。

然而,我的模块/变压器并不需要那么多数据来工作,以简单的移动平均线为例,其回顾了4步。

我想象该模块收集足够的数据,直到它开始输出为止。我也不需要将整个数组保存在内存中,我应该只处理所需的确切数量。结果/输出可以选择存储在数据库中。

stream =[sends-1-value]=> module[collects-values-until-processing-starts] =[sends-one-value]=>...

也应该有可能将多个信号源连接到一个模块中(传感器似乎没有提供该模块。

此处的换能器模式是否仍然适用,或者那里还有其他内容?

老实说,每个程序员都会有一个想法来完成这项工作,但是我要求某种确定的方法来完成这项工作,就像换能器一样。

javascript functional-programming stream
1个回答
0
投票

transducer模式在这里肯定适用。您可以创建一个带有传感器和正确数据结构配对的浮点处理器。我给您一个基线示例,其中有一个假设:

  1. 您正在使用的流将实现Symbol.asyncIterator

考虑一个简单的队列

function SimpleQueue({ size }) {
  this.size = size
  this.buffer = []
}

SimpleQueue.prototype.push = function(item) {
  this.buffer.push(item)
  if (this.buffer.length > this.size) {
    this.buffer.shift()
  }
  return this
}

SimpleQueue.prototype[Symbol.iterator] = function*() {
  for (const item of this.buffer) {
    yield item
  }
}

我们的简单队列有一个方法push,它将项目推入其内部缓冲区(数组)。简单队列也是可迭代的,因此您可以执行for (const x of simpleQueue) {/* stuff */}

我们现在将在浮点处理器中使用我们的SimpleQueue。>>

const average = iterable => {
  let sum = 0, count = 0
  for (const item of iterable) {
    sum += item
    count += 1
  }
  return sum / count
}

const floatingPointAverage = ({ historySize }) => {
  const queue = new SimpleQueue({ size: historySize })
  return item => {
    queue.push(item)
    const avg = average(queue)
    console.log(queue, avg) // this shows the average as the process runs
    return avg
  }
}

[floatingPointAverage获取一个项目,将其推入我们的SimpleQueue,然后返回队列中项目的当前平均值。

最后,我们可以实现并使用我们的传感器

const { pipe, map, transform } = require('rubico')

const numbersStream = {
  [Symbol.asyncIterator]: async function*() {
    for (let i = 0; i < 1000; i++) yield i
  },
}

transform(
  pipe([
    map(floatingPointAverage({ historySize: 4 })),
    /* transducers that do stuff with floating point average here */
  ]),
  null,
)(numbersStream)

<在这种情况下是map(floatingPointAverage({ historySize: 4 }))。该转换器由提供,这是我为解决自己的异步问题而编写的库。我在rubico here

背景下写了关于换能器的文章您的输出应该看起来像这样SimpleQueue { size: 4, buffer: [ 0 ] } 0 SimpleQueue { size: 4, buffer: [ 0, 1 ] } 0.5 SimpleQueue { size: 4, buffer: [ 0, 1, 2 ] } 1 SimpleQueue { size: 4, buffer: [ 0, 1, 2, 3 ] } 1.5 SimpleQueue { size: 4, buffer: [ 1, 2, 3, 4 ] } 2.5 SimpleQueue { size: 4, buffer: [ 2, 3, 4, 5 ] } 3.5 SimpleQueue { size: 4, buffer: [ 3, 4, 5, 6 ] } 4.5 SimpleQueue { size: 4, buffer: [ 4, 5, 6, 7 ] } 5.5 SimpleQueue { size: 4, buffer: [ 5, 6, 7, 8 ] } 6.5 SimpleQueue { size: 4, buffer: [ 6, 7, 8, 9 ] } 7.5
© www.soinside.com 2019 - 2024. All rights reserved.