如何在同一浮士德应用程序(使用kafka和python)中创建不同的消费者群 我是浮士德的新手,我需要我的应用来使用不同的消费者组来处理kakfa消息。我需要在同一计划中有两个不同的消费者组,但是由于消费者组...

问题描述 投票:0回答:1
任何想法如何使用同一命令初始化两个应用程序?或者,使用同一应用程序,如何为每个主题创建另一个消费者组?有没有选择做类似的操作

topic1 = app.topic('topic1', value_type=str, consumer_id='consumer_group1')

topic2 = app.topic('topic2', value_type=str, consumer_id='consumer_group2')

或有什么方法可以通过命令行在脚本中启动两个应用程序?我正在尝试

faust -A test_faust:app1,app2 worker

但是给出了错误

ValueError: Component 'app1,app2' of 'test_faust:app1,app2' is not a valid identifier


thanks

我找到了一个解决方案。关键是将两个应用程序传递给工人:

import asyncio
import nest_asyncio
nest_asyncio.apply()

async def start_worker(worker: faust.Worker) -> None:
    await worker.start()

def manage_loop():
    loop = asyncio.get_event_loop_policy().get_event_loop()
    try:
        worker = faust.Worker(*[app1, app2], loop=loop)
        loop.run_until_complete(start_worker(worker))
        loop.run_forever()
    finally:
        worker.stop_and_shutdown()
        worker.close()

if __name__=='__main__':
    manage_loop()

然后开始启动应用程序,您需要调用

python test_faust.py worker
python apache-kafka faust
1个回答
0
投票

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.