# Data source
DataSource = Generator[int, Any, None]
def create_data_source() -> DataSource
...
# Data processing - should be able to run in the background until being shut down
async def data_processor(source: DataSource) -> None:
try:
for item in source:
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print("Cancelled")
async def main():
# Run the function
data_source = create_data_source()
processing_task = asyncio.create_task(data_processor(data_source))
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
processing_task.cancel()
asyncio.run(main())
asyncio.Event
向其发送关闭信号,甚至尝试使用
concurrent.futures
重写该函数,但没有任何可行。我想念什么?
注: 我搜索了以前的Q&AS,但没有找到解决方案。由于依赖性原因,这是在Python 3.11上运行的,但是如果较新版本具有解决方案,那么我也许可以更新。
import threading
import time
class MyTask:
def __init__(self):
self.thread = None
self.stop_event = threading.Event()
def _wrapper(self, func, *args, **kwargs):
"""Wrapper function to run the task and check for stop signal."""
try:
func(self.stop_event, *args, **kwargs)
except Exception as e:
print(f"Task encountered an error: {e}")
def start_task(self, func, *args, **kwargs):
"""Starts the function in a separate thread."""
if self.thread and self.thread.is_alive():
print("Task is already running.")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._wrapper, args=(func, *args), kwargs=kwargs)
self.thread.start()
def stop_task(self):
"""Stops the running task."""
if self.thread and self.thread.is_alive():
self.stop_event.set() # Signal the function to stop
print("Stopping the task...")
else:
print("No active task to stop.")
def example_task(stop_event):
"""Example function that stops when the stop_event is set."""
for i in range(10):
if stop_event.is_set():
print("Task stopping early.")
return
print(f"Running... {i}")
time.sleep(1)
print("Task completed.")
if __name__ == "__main__":
task = MyTask()
task.start_task(example_task)
time.sleep(3)
task.stop_task()