我有一个使用 Twisted 框架编写的应用程序,它使用输出插件将各种数据(主要是 JSON 格式的日志条目)传送到各种日志服务器。我也想让它能够将数据发送到 Kafka 服务器 - 但我遇到了某种我不知道如何解决的问题。
如果我使用 Python 和
kafka-python
模块以简单的方式将数据发送到 Kafka 服务器,一切都会正常工作:
from json import dumps
from kafka import KafkaProducer
site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
topic = 'test'
producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(site, port),
value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password
)
event = {
'message': 'Test message'
}
try:
producer.send(topic, event)
producer.flush()
print('Message sent.')
except Exception as e:
print('Error producing message: {}'.format(e))
finally:
producer.close()
但是,如果我尝试使用几乎相同的代码从实际的 Twisted 应用程序发送它,它就会挂起:
from json import dumps
from core import output
from kafka import KafkaProducer
from twisted.python.log import msg
class Output(output.Output):
def start(self):
site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
self.topic = 'test'
self.producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(site, port),
value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password
)
def stop(self):
self.producer.flush()
self.producer.close()
def write(self, event):
try:
self.producer.send(self.topic, event)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))
(这不会开箱即用;它只是插件并使用在其他地方定义的通用
Output
类。)
特别是,它挂在
self.producer.send(self.topic, event)
线上。
我认为问题来自于
kafka-python
中的Kafka生产者是同步(阻塞)而Twisted需要异步(非阻塞)代码。有一个异步 Kafka 模块,名为 afkak
- 但它似乎没有提供 Kafka 服务器的身份验证,因此它不适合我的需求。
我的理解是,在 Twisted 中解决此类问题的方法是使用 deferreds。但是,我一直无法理解到底该怎么做。如果我像这样重写
write
方法
def write(self, event):
d = threads.deferToThread(self.postentry, event)
d.addCallback(self.postentryCallback)
return d
def postentryCallback(self):
reactor.stop()
def postentry(self, event):
try:
self.producer.send(self.topic, event)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))
当尝试将数据发送到 Kafka 服务器时,它不再挂起 - 但当应用程序终止并且没有任何内容发送到 Kafka 服务器时,它会挂起(我可以使用用纯 Python 编写的单独的消费者脚本进行验证)。
有什么想法我做错了什么以及如何解决它吗?
Twisted 有一些典型的阻塞代码的异步实现,例如向 Web 服务器发出 GET/POST 请求 - 但 Kafka 不是 Web 服务器,它在端口 9092 上运行,所以我认为我不能使用它。
aiokafka
应该是一个用于与 Kafka 服务器异步通信的 Python 模块 - 但我无法让它工作。
最终,我用
confluent-kafka
模块解决了我的问题。不幸的是,它不是 kafka-python
模块的直接替代品,因此必须稍微重写代码:
from json import dumps
from core import output
from confluent_kafka import Producer
from twisted.python.log import msg
class Output(output.Output):
def start(self):
site = <KAFKA SERVER>
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
self.topic = 'test'
self.producer = Producer({
'bootstrap.servers': '{}:{}'.format(site, port),
'sasl.mechanism': 'SCRAM-SHA-256',
'security.protocol': 'SASL_SSL',
'sasl.username': username,
'sasl.password': password
})
def stop(self):
self.producer.flush()
def write(self, event):
self.postentry(event)
def delivery_callback(self, err, message):
if err:
msg('Kafka error: {}'.format(err))
def postentry(self, event):
try:
self.producer.produce(
self.topic,
bytes(str(dumps(event)).encode('utf-8')),
callback=self.delivery_callback
)
self.producer.poll(0)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))