我正在使用 threading 包中的线程来启动执行 Spark 流的函数。我想在满足条件时停止
process
函数内的线程。
import threading
import asyncio
import functools
from pyspark.sql import SparkSession
threading.Thread(target=streaming_to_consumer_wrapper).start()
async def process(df, df_id):
if df_id == 2:
# I want to stop the thread here
def streaming_to_consumer_wrapper():
asyncio.run(streaming_to_consumer())
async def streaming_to_consumer():
df = spark.readStream \
.format("iceberg") \
.load("local.db.table")
query = df \
.writeStream \
.outputMode("append") \
.foreachBatch(functools.partial(process_wrapper)) \
.trigger(processingTime="0.5 seconds") \
.start()
query.awaitTermination(2)
我通过添加
event = threading.Event()
并将 event
作为参数传递给 process
解决了这个问题。同时,我将启动流线程的线程放入 while
循环中,等待事件。在df_id == 2
之后,线程接收到事件,退出循环并通过query.stop()
停止流线程。
import threading
import asyncio
import functools
from pyspark.sql import SparkSession
threading.Thread(target=streaming_to_consumer_wrapper).start()
async def process(df, df_id, event):
if df_id == 2:
event.set()
return
def streaming_to_consumer_wrapper(event):
asyncio.run(streaming_to_consumer(event))
async def streaming_to_consumer():
df = spark.readStream \
.format("iceberg") \
.load("local.db.table")
event = threading.Event()
query = df \
.writeStream \
.outputMode("append") \
.foreachBatch(functools.partial(process_wrapper, event)) \
.trigger(processingTime="0.5 seconds") \
.start()
query.awaitTermination(2)
while not event.is_set():
time.sleep(1)
query.stop()