我需要从S3异步调用2个读取操作,然后解析来自S3的数据。 两个文件的总记录约为 24k 尝试使用异步协程,但每个进程在一分钟后触发 例如:
开始从 S3 读取文件 1 - 于 10:29:53 开始 解析文件 1 的数据 - 从 10:29:53 开始
开始从 S3 读取文件 2 - 于 10:30:53 开始 解析文件 2 的数据 - 从 10:30:53 开始
我们如何并行从 S3 操作获取和解析所有文件,以便它们同时开始执行,即 10:29:53
fun process() {
runBlocking {
async { parseFile("File1") }.await()
async { parseFile("File2") }.await()
}
}
private fun parseFile(fileName: String) {
lon.info {"Initiate read from S3 for $fileName "}
val getRecord = s3RepositoryClient.getObjectContentInputStream(fileName)
val parseToString=parse(
objectMapper, getRecord,"test"
)
parseToString.parallelStream().forEach {
dynamoDbClient.save(it)
}
}
fun parseToString(mapper: ObjectMapper, records: InputStream, textName: String): List<Billing> {
var response: List<Billing> = emptyList()
val reader = BufferedReader(records.reader())
try {
var line = reader.readLine()
while (line != null) {
response = response.plus(
Billing.from(
mapper.readValue(line, Employee::class.java),
line,
textName
)
)
line = reader.readLine()
}
} finally {
reader.close()
}
return response
}
有两个问题导致此情况。
首先,您先
await()
拨打第一个电话,然后再拨打第二个电话。这意味着我们只有在第一个完全完成后才能开始第二个。要同时运行它们,我们必须启动它们然后等待:
val deferred1 = async { parseFile("File1") }
val deferred2 = async { parseFile("File2") }
val result1 = deferred1.await()
val result2 = deferred2.await()
在您的具体情况下,您甚至不使用这些函数的结果,因此您不必等待它们,我们可以/应该使用
launch()
而不是 async()
:
runBlocking {
launch { parseFile("File1") }
launch { parseFile("File2") }
}
第二个问题是,默认情况下
runBlocking()
使用单个线程,并且由于您的操作会阻塞 I/O,因此它一次只能运行其中一个。要利用多个线程,请使用 Dispatchers.IO
:
runBlocking(Dispatchers.IO) {
launch { parseFile("File1") }
launch { parseFile("File2") }
}