在 Apache Beam 和 Dataflow 中使用 ReadFromKafka 时出错

问题描述 投票:0回答:1

我正在尝试使用 Apache Beam 的 Python SDK 连接到 Kafka 主题,并将管道作为数据流作业提交。

这是我的代码片段


import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
import os

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.coders.coders import Coder

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path/to/gcp_auth.json"

parser = argparse.ArgumentParser()
args, beam_args = parser.parse_known_args()
print(beam_args)



beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my_project',
    job_name='my_job_name',
    temp_location='my_bucket/temp',
    region='my_region',
    network='my_network',
    subnetwork='my_sub_network',
    service_account_email = 'my_service_account'
)



with beam.Pipeline(options=beam_options) as pipeline:
  msg_kv_bytes = pipeline | "read file" >> ReadFromKafka(
     consumer_config={
      "bootstrap.servers": "ip:port",
      "group.id": "my_group_id",
      "security.protocol": "SSL",
        "ssl.ca.location": "bucket/cacerts.cer",
        "ssl.key.location":"bucket/clientcert.key",
        "ssl.certificate.location":"bucket/clientcert.cer",
        "ssl.keystore.password":"mypass",
        "ssl.key.password": "mypass",
        "enable.auto.commit": True,
        "auto.offset.reset": "earliest"
    },
    topics=["my_topic"]
  )

我的代码运行如下:

python3 readfromkafka.py --worker_region my_region --runner DataflowRunner --project my_project --network my_network --subnetwork my_subnet --temp_location gcs_bucket/temp  --no_use_public_ips --streaming

但我收到错误:

Traceback (most recent call last):
  File "/Users/myuser/Documents/dev_box/apache_beam/network-outage/readfromkafka.py", line 36, in <module>
    msg_kv_bytes = pipeline | "read file" >> ReadFromKafka(
                                             ^^^^^^^^^^^^^^
  File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/io/kafka.py", line 177, in __init__
    super().__init__(
  File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 652, in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
    ^^^^^^^^^^^^^^^^^
  File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 110, in payload
    return self.build().SerializeToString()
           ^^^^^^^^^^^^
  File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 147, in build
    schema=schema, payload=RowCoder(schema).encode(row))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prabhan/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 448, in encode
    return self.get_impl().encode(value)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "apache_beam/coders/coder_impl.py", line 233, in apache_beam.coders.coder_impl.StreamCoderImpl.encode
  File "apache_beam/coders/coder_impl.py", line 236, in apache_beam.coders.coder_impl.StreamCoderImpl.encode
  File "apache_beam/coders/coder_impl.py", line 1769, in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 701, in apache_beam.coders.coder_impl.MapCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 270, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
  File "/Users/myuser/Documents/dev_box/apache_beam/beam_env/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 406, in encode
    return value.encode('utf-8')
           ^^^^^^^^^^^^
AttributeError: 'bool' object has no attribute 'encode'

导致此错误的原因可能是什么? 我尝试添加不同的反序列化器,并且还使用了 ReadFromKafka 中可用的参数,例如

max_num_records
with_metadata
作为 True 和 False 等 似乎没有什么帮助。

google-cloud-platform google-cloud-dataflow apache-beam apache-beam-io apache-beam-kafkaio
1个回答
0
投票

我找到了解决办法。

问题是我在 ReadFromKafka 转换的

"enable.auto.commit": True
中使用的
consumer_config

一旦删除该配置,我就能够编译代码。

© www.soinside.com 2019 - 2024. All rights reserved.