如何优雅地停止 Spark foreachBatch 回调中的线程

问题描述 投票:0回答:1

我正在使用 threading 包中的线程来启动执行 Spark 流的函数。我想在满足条件时停止

process
函数内的线程。

import threading
import asyncio
import functools
from pyspark.sql import SparkSession

threading.Thread(target=streaming_to_consumer_wrapper).start()

async def process(df, df_id):
    if df_id == 2:
        # I want to stop the thread here

def streaming_to_consumer_wrapper():
    asyncio.run(streaming_to_consumer())

async def streaming_to_consumer():
    df = spark.readStream \
        .format("iceberg") \
        .load("local.db.table")

    query = df \
        .writeStream \
        .outputMode("append") \
        .foreachBatch(functools.partial(process_wrapper)) \
        .trigger(processingTime="0.5 seconds") \
        .start()

    query.awaitTermination(2)

python multithreading pyspark python-multithreading spark-structured-streaming
1个回答
0
投票

我通过添加

event = threading.Event()
并将
event
作为参数传递给
process
解决了这个问题。同时,我将启动流线程的线程放入
while
循环中,等待事件。在
df_id == 2
之后,线程接收到事件,退出循环并通过
query.stop()
停止流线程。

import threading
import asyncio
import functools
from pyspark.sql import SparkSession

threading.Thread(target=streaming_to_consumer_wrapper).start()

async def process(df, df_id, event):
    if df_id == 2:
        event.set()
        return

def streaming_to_consumer_wrapper(event):
    asyncio.run(streaming_to_consumer(event))

async def streaming_to_consumer():
    df = spark.readStream \
        .format("iceberg") \
        .load("local.db.table")

    event = threading.Event()

    query = df \
        .writeStream \
        .outputMode("append") \
        .foreachBatch(functools.partial(process_wrapper, event)) \
        .trigger(processingTime="0.5 seconds") \
        .start()

    query.awaitTermination(2)

    while not event.is_set():
        time.sleep(1)

    query.stop()

© www.soinside.com 2019 - 2024. All rights reserved.