从多个文件中读取并按条件将它们合并到流中

问题描述 投票:0回答:1

我有几个具有下一个结构的 csv 文件 {“unixTimestampMs”:1609531200000,“timezoneId”:“欧洲/莫斯科”,“有效负载”:...}

我需要按时间条件读取 zstream 中的所有内容。 (例如,从第一个文件中取出 10 条记录,然后从第二个文件中取出 5 条记录,从第三个文件中取出 15 条记录,依此类推,按时间排序)

以前我有单个文件,就像这样:

def fileStream: Stream[Throwable, Result] =
    for {
      res <-
        fileSource(appConf.sourceFilePath)
          .mapZIO(transactionService.mkMsgFromString)
          .collectSome
          .groupAdjacentBy(_.unixTimestampMs)
          .map(evenlyDistributeMessages)
          .flatMap(ZStream.fromChunk(_))
          .mapZIO(sleepIfRequired)
          .mapZIOPar(proxyConf.maxConnections, proxyConf.maxConnections)(emitMessage(_, uriToSendMessages))
    } yield res

哪里

def fileSource(file: String): Stream[Throwable, String] =
    ZStream
      .acquireReleaseWith(transactionsFileService.getFileAsSource(file))(releaseFile)
      .flatMap(f => ZStream.fromIterator(f.getLines().drop(1), 256))

so fileSource 给了我我期望的流。 如何针对多个文件情况重写它并按时间顺序合并到 zstream 中?

scala zio zio-streams
1个回答
0
投票
Finally I found solution:

def fileStream: Stream[Throwable, Result] =
    for {
      res <- appConf.sourceFileList
          .map(fileSource)
          .map(_.mapZIO(transactionService.mkMsgFromString))
          .map(_.collectSome)
          .reduce(_ mergeSorted _)
          .groupAdjacentBy(_.unixTimestampMs)
          .map(evenlyDistributeMessages)
          .flatMap(ZStream.fromChunk(_))
          .mapZIO(sleepIfRequired)
          .mapZIOPar(proxyConf.maxConnections, proxyConf.maxConnections)(emitMessage(_, uriToSendMessages))
    } yield res

appConf.sourceFileList 是文件名列表。

并且还使用 mergeSorted 我实现了 Ordering[T]

© www.soinside.com 2019 - 2024. All rights reserved.