在 kotlin 中并行化 api 调用

问题描述 投票:0回答:1

我需要从S3异步调用2个读取操作,然后解析来自S3的数据。 两个文件的总记录约为 24k 尝试使用异步协程,但每个进程在一分钟后触发 例如:

开始从 S3 读取文件 1 - 于 10:29:53 开始 解析文件 1 的数据 - 从 10:29:53 开始

开始从 S3 读取文件 2 - 于 10:30:53 开始 解析文件 2 的数据 - 从 10:30:53 开始

我们如何并行从 S3 操作获取和解析所有文件,以便它们同时开始执行,即 10:29:53

fun process() {
    runBlocking {
        async { parseFile("File1") }.await()
        async { parseFile("File2") }.await()

    }
}

private fun parseFile(fileName: String) {
    lon.info {"Initiate read from S3 for $fileName "}
    val getRecord = s3RepositoryClient.getObjectContentInputStream(fileName)
    val parseToString=parse(
        objectMapper, getRecord,"test"
    )
    parseToString.parallelStream().forEach {
        dynamoDbClient.save(it)
    }
}


fun parseToString(mapper: ObjectMapper, records: InputStream, textName: String): List<Billing> {
    var response: List<Billing> = emptyList()
    val reader = BufferedReader(records.reader())
    try {
        var line = reader.readLine()
        while (line != null) {
            response = response.plus(
                Billing.from(
                    mapper.readValue(line, Employee::class.java),
                    line,
                    textName
                )
            )
            line = reader.readLine()
        }
    } finally {
        reader.close()
    }
    return response
}
multithreading kotlin asynchronous parallel-processing kotlin-coroutines
1个回答
0
投票

有两个问题导致此情况。

首先,您先

await()
拨打第一个电话,然后再拨打第二个电话。这意味着我们只有在第一个完全完成后才能开始第二个。要同时运行它们,我们必须启动它们然后等待:

val deferred1 = async { parseFile("File1") }
val deferred2 = async { parseFile("File2") }
val result1 = deferred1.await()
val result2 = deferred2.await()

在您的具体情况下,您甚至不使用这些函数的结果,因此您不必等待它们,我们可以/应该使用

launch()
而不是
async()

runBlocking {
    launch { parseFile("File1") }
    launch { parseFile("File2") }
}

第二个问题是,默认情况下

runBlocking()
使用单个线程,并且由于您的操作会阻塞 I/O,因此它一次只能运行其中一个。要利用多个线程,请使用
Dispatchers.IO
:

runBlocking(Dispatchers.IO) {
    launch { parseFile("File1") }
    launch { parseFile("File2") }
}
© www.soinside.com 2019 - 2024. All rights reserved.