并行线程池执行器

问题描述 投票:0回答:1

我有 PySpark 代码,它很少对外部系统进行 POST API 调用。 对于输入数据帧中的每一行,我需要触发 POST API 请求(使用 Python 代码)以在外部系统中创建一个条目。鉴于数据集很大,这个过程需要相当长的时间。

为了提高性能,计划使用 Python 的 ThreadPoolExecutor 基于可用内核并行(多线程)处理行(即 POST API)。

from concurrent.futures import ThreadPoolExecutor, as_completed

num_cores = spark.sparkContext.defaultParallelism

def process_all_rows(input_df):
    results = []
    
    with ThreadPoolExecutor(max_workers=num_cores) as executor:  # Adjust max_workers based on needs
        futures = {executor.submit(process_row, row): row for row in input_df.collect()}
        
        for future in as_completed(futures):
            try:
                result = future.result()
                results.append(result)  
            except Exception as e:
                logger.error(f"Error in thread execution: {e}")
    
    return results

在回顾这一点时,我被告知ThreadPoolExecutor主要执行上下文切换。因此,如果输入 DataFrame 有 100 行,并且 num_cores 设置为 8(即集群有 8 个核心),则代码将仅使用一个核心(并非所有可用的 8 个核心),通过上下文切换顺序触发 POST 请求,即。触发一个 POST API 请求,然后触发下一个,依此类推。这是正确的理解吗? ThreadPoolExecutor 会并行使用所有 8 个核心吗?

python python-3.x apache-spark pyspark
1个回答
0
投票

请考虑以下内容以利用 Spark 内置并行性。

  • 使用 python
    requests
    库创建 UDF。考虑使用 异步库(例如 asyncio 或 aiohttp)可实现并发 要求更好的性能。
  • foreachPartition:对于较大的数据集或当您想要处理时 批量数据,foreachPartition 可以更高效。它允许 您可以在每个分区内并行处理数据。
© www.soinside.com 2019 - 2024. All rights reserved.