我正在尝试使用 Apache Beam 的 Python SDK 连接到 Kafka 主题,并将管道作为数据流作业提交。
这是我的代码片段
import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
import os
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.coders.coders import Coder
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path/to/gcp_auth.json"
parser = argparse.ArgumentParser()
args, beam_args = parser.parse_known_args()
print(beam_args)
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my_project',
job_name='my_job_name',
temp_location='my_bucket/temp',
region='my_region',
network='my_network',
subnetwork='my_sub_network',
service_account_email = 'my_service_account'
)
with beam.Pipeline(options=beam_options) as pipeline:
msg_kv_bytes = pipeline | "read file" >> ReadFromKafka(
consumer_config={
"bootstrap.servers": "ip:port",
"group.id": "my_group_id",
"security.protocol": "SSL",
"ssl.ca.location": "bucket/cacerts.cer",
"ssl.key.location":"bucket/clientcert.key",
"ssl.certificate.location":"bucket/clientcert.cer",
"ssl.keystore.password":"mypass",
"ssl.key.password": "mypass",
"enable.auto.commit": True,
"auto.offset.reset": "earliest"
},
topics=["my_topic"]
)
我的代码运行如下:
python3 readfromkafka.py --worker_region my_region --runner DataflowRunner --project my_project --network my_network --subnetwork my_subnet --temp_location gcs_bucket/temp --no_use_public_ips --streaming
但我收到错误:
Traceback (most recent call last):
File "/Users/myuser/Documents/dev_box/apache_beam/network-outage/readfromkafka.py", line 36, in <module>
msg_kv_bytes = pipeline | "read file" >> ReadFromKafka(
^^^^^^^^^^^^^^
File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/io/kafka.py", line 177, in __init__
super().__init__(
File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 652, in __init__
payload.payload() if isinstance(payload, PayloadBuilder) else payload)
^^^^^^^^^^^^^^^^^
File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 110, in payload
return self.build().SerializeToString()
^^^^^^^^^^^^
File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 147, in build
schema=schema, payload=RowCoder(schema).encode(row))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/prabhan/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 448, in encode
return self.get_impl().encode(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/coders/coder_impl.py", line 233, in apache_beam.coders.coder_impl.StreamCoderImpl.encode
File "apache_beam/coders/coder_impl.py", line 236, in apache_beam.coders.coder_impl.StreamCoderImpl.encode
File "apache_beam/coders/coder_impl.py", line 1769, in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 701, in apache_beam.coders.coder_impl.MapCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 270, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 406, in encode
return value.encode('utf-8')
^^^^^^^^^^^^
AttributeError: 'bool' object has no attribute 'encode'
导致此错误的原因可能是什么? 我尝试添加不同的反序列化器,并且还使用了 ReadFromKafka 中可用的参数,例如
max_num_records
、with_metadata
作为 True 和 False 等
似乎没有什么帮助。
我找到了解决办法。
问题是我在 ReadFromKafka 转换的
"enable.auto.commit": True
中使用的 consumer_config
。
一旦删除该配置,我就能够编译代码。