使用 Twisted 框架从 Python 应用程序与 Kafka 服务器进行通信

问题描述 投票:0回答:1

我有一个使用 Twisted 框架编写的应用程序,它使用输出插件将各种数据(主要是 JSON 格式的日志条目)传送到各种日志服务器。我也想让它能够将数据发送到 Kafka 服务器 - 但我遇到了某种我不知道如何解决的问题。

如果我使用 Python 和

kafka-python
模块以简单的方式将数据发送到 Kafka 服务器,一切都会正常工作:

from json import dumps
from kafka import KafkaProducer

site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
topic = 'test'

producer = KafkaProducer(
    bootstrap_servers='{}:{}'.format(site, port),
    value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
    sasl_mechanism='SCRAM-SHA-256',
    security_protocol='SASL_SSL',
    sasl_plain_username=username,
    sasl_plain_password=password
)

event = {
    'message': 'Test message'
}

try:
    producer.send(topic, event)
    producer.flush()
    print('Message sent.')
except Exception as e:
    print('Error producing message: {}'.format(e))
finally:
    producer.close()

但是,如果我尝试使用几乎相同的代码从实际的 Twisted 应用程序发送它,它就会挂起:

from json import dumps

from core import output
from kafka import KafkaProducer
from twisted.python.log import msg

class Output(output.Output):

    def start(self):
        site = '<KAFKA SERVER>'
        port = 9092
        username = '<USERNAME>'
        password = '<PASSWORD>'
        self.topic = 'test'
        self.producer = KafkaProducer(
            bootstrap_servers='{}:{}'.format(site, port),
            value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
            sasl_mechanism='SCRAM-SHA-256',
            security_protocol='SASL_SSL',
            sasl_plain_username=username,
            sasl_plain_password=password
        )

    def stop(self):
        self.producer.flush()
        self.producer.close()

    def write(self, event):
        try:
            self.producer.send(self.topic, event)
            self.producer.flush()
        except Exception as e:
            msg('Kafka error: {}'.format(e))

(这不会开箱即用;它只是插件并使用在其他地方定义的通用

Output
类。)

特别是,它挂在

self.producer.send(self.topic, event)
线上。

我认为问题来自于

kafka-python
中的Kafka生产者是同步(阻塞)而Twisted需要异步(非阻塞)代码。有一个异步 Kafka 模块,名为
afkak
- 但它似乎没有提供 Kafka 服务器的身份验证,因此它不适合我的需求。

我的理解是,在 Twisted 中解决此类问题的方法是使用 deferreds。但是,我一直无法理解到底该怎么做。如果我像这样重写

write
方法

    def write(self, event):
        d = threads.deferToThread(self.postentry, event)
        d.addCallback(self.postentryCallback)
        return d

    def postentryCallback(self):
        reactor.stop()

    def postentry(self, event):
        try:
            self.producer.send(self.topic, event)
            self.producer.flush()
        except Exception as e:
            msg('Kafka error: {}'.format(e))

当尝试将数据发送到 Kafka 服务器时,它不再挂起 - 但当应用程序终止并且没有任何内容发送到 Kafka 服务器时,它会挂起(我可以使用用纯 Python 编写的单独的消费者脚本进行验证)。

有什么想法我做错了什么以及如何解决它吗?

Twisted 有一些典型的阻塞代码的异步实现,例如向 Web 服务器发出 GET/POST 请求 - 但 Kafka 不是 Web 服务器,它在端口 9092 上运行,所以我认为我不能使用它。

python apache-kafka twisted
1个回答
0
投票

aiokafka
应该是一个用于与 Kafka 服务器异步通信的 Python 模块 - 但我无法让它工作。

最终,我用

confluent-kafka
模块解决了我的问题。不幸的是,它不是
kafka-python
模块的直接替代品,因此必须稍微重写代码:

from json import dumps

from core import output
from confluent_kafka import Producer
from twisted.python.log import msg


class Output(output.Output):

    def start(self):
        site = <KAFKA SERVER>
        port = 9092
        username = '<USERNAME>'
        password = '<PASSWORD>'
        self.topic = 'test'
        self.producer = Producer({
            'bootstrap.servers': '{}:{}'.format(site, port),
            'sasl.mechanism': 'SCRAM-SHA-256',
            'security.protocol': 'SASL_SSL',
            'sasl.username': username,
            'sasl.password': password
        })

    def stop(self):
        self.producer.flush()

    def write(self, event):
        self.postentry(event)

    def delivery_callback(self, err, message):
        if err:
            msg('Kafka error: {}'.format(err))

    def postentry(self, event):
        try:
            self.producer.produce(
                self.topic,
                bytes(str(dumps(event)).encode('utf-8')),
                callback=self.delivery_callback
            )
            self.producer.poll(0)
            self.producer.flush()
        except Exception as e:
            msg('Kafka error: {}'.format(e))
© www.soinside.com 2019 - 2024. All rights reserved.