如何在 Kafka-Bigquery Dataflow Flex 模板中设置参数

问题描述 投票:0回答:1

我一直在尝试使用数据流弹性模板从 Kafka 消费者连接到 Bigquery。以下是我的配置 -

gcloud dataflow flex-template run first-kafka --template-file-gcs-location gs://dataflow-templates-us-east4/latest/flex/Kafka_to_BigQuery --region us-east4 --worker-region us-east4 --subnetwork <subnetwork-url> --parameters inputTopics=topic1,bootstrapServers=<bootstrap server>,outputTableSpec=<Bigquery table>,stagingLocation=gs://<xxx>/staging-dataflow,dataflowKmsKey=<CMS Key>,[email protected]

连接到消费者 security.protocol=SASL_SSL 并且它也使用这样的用户名和密码 xyz.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="abcd" password="1234";};

我也有 JKS 信任库文件。

我的问题是如何向数据流模板提供这些配置?

  1. 安全协议= SASL_SSL
  2. java.security.auth.login.config = xyz.conf
  3. 如果需要客户端信任库 JKS,那么我该如何提供呢? (目前我的云存储桶中有这些) 注意:我的客户端计算机在 AWS MSK 上运行

我在未设置这些配置的情况下遇到的错误是 -

java.lang.RuntimeException:org.apache.kafka.common.errors.TimeoutException:获取主题元数据时超时

google-cloud-platform apache-kafka google-bigquery google-cloud-dataflow aws-msk
1个回答
0
投票

要在连接到 Kafka 时向 Dataflow Flex 模板提供自定义 Kafka 客户端配置(包括

security.protocol
java.security.auth.login.config
和信任库设置),您可以使用
--parameters
标志传入其他配置值。以下是如何将这些配置包含在
gcloud
命令中:

gcloud dataflow flex-template run first-kafka \
    --template-file-gcs-location gs://dataflow-templates-us-east4/latest/flex/Kafka_to_BigQuery \
    --region us-east4 \
    --worker-region us-east4 \
    --subnetwork <subnetwork-url> \
    --parameters inputTopics=topic1,bootstrapServers=<bootstrap server>,outputTableSpec=<Bigquery table>,stagingLocation=gs://<xxx>/staging-dataflow,dataflowKmsKey=<CMS Key>,[email protected],\
    securityProtocol=SASL_SSL,loginConfigFile=xyz.conf,truststoreLocation=<gs://path/to/truststore.jks>

在此命令中,我添加了三个新参数:

  1. securityProtocol
    :该参数允许您指定Kafka的
    security.protocol
    配置,例如
    SASL_SSL

  2. loginConfigFile
    :此参数用于
    java.security.auth.login.config
    配置,应指向您的
    xyz.conf
    文件。

  3. truststoreLocation
    :此参数应指向 Google Cloud Storage 中信任库 JKS 文件的位置。确保将
    <gs://path/to/truststore.jks>
    替换为您的信任库的实际 GCS 路径。

现在,您需要修改 Dataflow 管道代码以读取这些参数并在 Kafka 使用者配置中设置它们。

在 Dataflow 管道代码中(可能在

@Setup
方法中),您可以使用
RuntimeValueProvider
接口访问这些参数。以下是如何检索这些值并在 Kafka 使用者配置中设置它们的示例:

@Setup
public void setup() {
    String securityProtocol = options.getSecurityProtocol();
    String loginConfigFile = options.getLoginConfigFile();
    String truststoreLocation = options.getTruststoreLocation();

    Properties kafkaProps = new Properties();
    kafkaProps.put("security.protocol", securityProtocol);
    kafkaProps.put("java.security.auth.login.config", loginConfigFile);

    // Configure truststore (if required)
    if (truststoreLocation != null && !truststoreLocation.isEmpty()) {
        kafkaProps.put("ssl.truststore.location", truststoreLocation);
        // You may need to provide other truststore-related settings here
    }

    // Create Kafka consumer with the configured properties
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProps);
    
    // Perform other Kafka consumer setup and processing
    // ...
}

此代码读取传递给 Dataflow 作业的参数,将它们设置在 Kafka 使用者的属性中,并配置信任库(如果提供)。确保您的数据流管道代码使用

options
对象来访问传递给作业的参数。

完成这些配置后,您的数据流作业应该能够使用所需的安全设置和自定义配置连接到 Kafka。

© www.soinside.com 2019 - 2024. All rights reserved.