我有几个具有下一个结构的 csv 文件 {“unixTimestampMs”:1609531200000,“timezoneId”:“欧洲/莫斯科”,“有效负载”:...}
我需要按时间条件读取 zstream 中的所有内容。 (例如,从第一个文件中取出 10 条记录,然后从第二个文件中取出 5 条记录,从第三个文件中取出 15 条记录,依此类推,按时间排序)
以前我有单个文件,就像这样:
def fileStream: Stream[Throwable, Result] =
for {
res <-
fileSource(appConf.sourceFilePath)
.mapZIO(transactionService.mkMsgFromString)
.collectSome
.groupAdjacentBy(_.unixTimestampMs)
.map(evenlyDistributeMessages)
.flatMap(ZStream.fromChunk(_))
.mapZIO(sleepIfRequired)
.mapZIOPar(proxyConf.maxConnections, proxyConf.maxConnections)(emitMessage(_, uriToSendMessages))
} yield res
哪里
def fileSource(file: String): Stream[Throwable, String] =
ZStream
.acquireReleaseWith(transactionsFileService.getFileAsSource(file))(releaseFile)
.flatMap(f => ZStream.fromIterator(f.getLines().drop(1), 256))
so fileSource 给了我我期望的流。 如何针对多个文件情况重写它并按时间顺序合并到 zstream 中?
Finally I found solution:
def fileStream: Stream[Throwable, Result] =
for {
res <- appConf.sourceFileList
.map(fileSource)
.map(_.mapZIO(transactionService.mkMsgFromString))
.map(_.collectSome)
.reduce(_ mergeSorted _)
.groupAdjacentBy(_.unixTimestampMs)
.map(evenlyDistributeMessages)
.flatMap(ZStream.fromChunk(_))
.mapZIO(sleepIfRequired)
.mapZIOPar(proxyConf.maxConnections, proxyConf.maxConnections)(emitMessage(_, uriToSendMessages))
} yield res
appConf.sourceFileList 是文件名列表。
并且还使用 mergeSorted 我实现了 Ordering[T]