收集流中的数据,直到id被改变?

问题描述 投票:0回答:1

我有管线的流。

pipeline(readStream, transformStream, writeStream);

readStream传递给transformStream(在每个 "数据 "事件中) 像这样的对象。

{
   id: 1,
   name: 'John',
   phone: 1000000,
}

我需要将电话存储在transform流中,直到该对象的id被改变, 然后我应该将这样的对象推送到流缓冲区。

{
   id: 1,
   name: 'John',
   phones: [1000000, 1000001, 1000002],
}

所以在转换流后的数组:

[
  {
   id: 1,
   name: 'John',
   phone: 1000000,
  },
  {
   id: 1,
   name: 'John',
   phone: 1000001,
  },
  {
   id: 2,
   name: 'Ray',
   phone: 1000002,
  },
  {
   id: 3,
   name: 'Santa',
   phone: 1000003,
  },
]

变换流后的数组将是:

[
  {
   id: 1,
   name: 'John',
   phones: [1000000, 1000001],
  },
  {
   id: 2,
   name: 'Ray',
   phones: [1000002],
  },
  {
   id: 3,
   name: 'Santa',
   phones: [1000003],
  },
]

我如何实现这个?

javascript node.js stream
1个回答
0
投票

这是一个简单的例子。主要的想法是跟踪之前的条目块,并将其与当前的条目进行比较,以决定我们是否应该更新的 phones 属性或将数据推送到下一个流。

const streamify = require('stream-array'); // using this package for testing purposes only
const Stream = require('stream');

const input = [
  {
    id: 1,
    name: 'John',
    phone: 1000000,
  },
  {
    id: 1,
    name: 'John',
    phone: 1000001,
  },
  {
    id: 2,
    name: 'Ray',
    phone: 1000002,
  },
  {
    id: 3,
    name: 'Santa',
    phone: 1000003,
  },
];

function createObjectMergeStream() {
  let previousEntry = null;

  return new Stream.Transform({
    writableObjectMode: true,
    transform: transformFunc,
    flush(callback) {
      callback(null, JSON.stringify(previousEntry)); // stringifying for demonstration only
    }
  });

  function transformFunc(currentEntry, encoding, callback){
    if (previousEntry === null) {
      // if this is the first the stream is receiving
      previousEntry = {
        id: currentEntry.id,
        name: currentEntry.name,
        phones: [currentEntry.phone]
      }

      callback();
    }
    else if (previousEntry.id === currentEntry.id) {
      // if the id's match, only update the phones array
      previousEntry.phones.push(currentEntry.phone);
      callback();
    }
    else {
      // if this entry does not match the id of the previous entry

      // stringifying for demonstration only
      const output = JSON.stringify(previousEntry) + '\n';

      previousEntry = {
        id: currentEntry.id,
        name: currentEntry.name,
        phones: [currentEntry.phone]
      }

      callback(null, output);
    }
  }
}


// turn the `input` array into a readable  stream
const inputStream = streamify(input);

// create our transform stream
const transformStream = createObjectMergeStream();

// take objects from input array, process the objects in our transform stream then print them to the console
inputStream.pipe(transformStream).pipe(process.stdout);

输出

{"id":1,"name":"John","phones":[1000000,1000001]}
{"id":2,"name":"Ray","phones":[1000002]}
{"id":3,"name":"Santa","phones":[1000003]}
© www.soinside.com 2019 - 2024. All rights reserved.