我有 PySpark 代码,它很少对外部系统进行 POST API 调用。 对于输入数据帧中的每一行,我需要触发 POST API 请求(使用 Python 代码)以在外部系统中创建一个条目。鉴于数据集很大,这个过程需要相当长的时间。
为了提高性能,计划使用 Python 的 ThreadPoolExecutor 基于可用内核并行(多线程)处理行(即 POST API)。
from concurrent.futures import ThreadPoolExecutor, as_completed
num_cores = spark.sparkContext.defaultParallelism
def process_all_rows(input_df):
results = []
with ThreadPoolExecutor(max_workers=num_cores) as executor: # Adjust max_workers based on needs
futures = {executor.submit(process_row, row): row for row in input_df.collect()}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Error in thread execution: {e}")
return results
在回顾这一点时,我被告知ThreadPoolExecutor主要执行上下文切换。因此,如果输入 DataFrame 有 100 行,并且 num_cores 设置为 8(即集群有 8 个核心),则代码将仅使用一个核心(并非所有可用的 8 个核心),通过上下文切换顺序触发 POST 请求,即。触发一个 POST API 请求,然后触发下一个,依此类推。这是正确的理解吗? ThreadPoolExecutor 会并行使用所有 8 个核心吗?
请考虑以下内容以利用 Spark 内置并行性。
requests
库创建 UDF。考虑使用
异步库(例如 asyncio 或 aiohttp)可实现并发
要求更好的性能。