我一直在尝试使用数据流弹性模板从 Kafka 消费者连接到 Bigquery。以下是我的配置 -
gcloud dataflow flex-template run first-kafka --template-file-gcs-location gs://dataflow-templates-us-east4/latest/flex/Kafka_to_BigQuery --region us-east4 --worker-region us-east4 --subnetwork <subnetwork-url> --parameters inputTopics=topic1,bootstrapServers=<bootstrap server>,outputTableSpec=<Bigquery table>,stagingLocation=gs://<xxx>/staging-dataflow,dataflowKmsKey=<CMS Key>,[email protected]
连接到消费者 security.protocol=SASL_SSL 并且它也使用这样的用户名和密码 xyz.conf
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="abcd" password="1234";};
我也有 JKS 信任库文件。
我的问题是如何向数据流模板提供这些配置?
我在未设置这些配置的情况下遇到的错误是 -
java.lang.RuntimeException:org.apache.kafka.common.errors.TimeoutException:获取主题元数据时超时
要在连接到 Kafka 时向 Dataflow Flex 模板提供自定义 Kafka 客户端配置(包括
security.protocol
、java.security.auth.login.config
和信任库设置),您可以使用 --parameters
标志传入其他配置值。以下是如何将这些配置包含在 gcloud
命令中:
gcloud dataflow flex-template run first-kafka \
--template-file-gcs-location gs://dataflow-templates-us-east4/latest/flex/Kafka_to_BigQuery \
--region us-east4 \
--worker-region us-east4 \
--subnetwork <subnetwork-url> \
--parameters inputTopics=topic1,bootstrapServers=<bootstrap server>,outputTableSpec=<Bigquery table>,stagingLocation=gs://<xxx>/staging-dataflow,dataflowKmsKey=<CMS Key>,[email protected],\
securityProtocol=SASL_SSL,loginConfigFile=xyz.conf,truststoreLocation=<gs://path/to/truststore.jks>
在此命令中,我添加了三个新参数:
securityProtocol
:该参数允许您指定Kafka的security.protocol
配置,例如SASL_SSL
。
loginConfigFile
:此参数用于 java.security.auth.login.config
配置,应指向您的 xyz.conf
文件。
truststoreLocation
:此参数应指向 Google Cloud Storage 中信任库 JKS 文件的位置。确保将 <gs://path/to/truststore.jks>
替换为您的信任库的实际 GCS 路径。
现在,您需要修改 Dataflow 管道代码以读取这些参数并在 Kafka 使用者配置中设置它们。
在 Dataflow 管道代码中(可能在
@Setup
方法中),您可以使用 RuntimeValueProvider
接口访问这些参数。以下是如何检索这些值并在 Kafka 使用者配置中设置它们的示例:
@Setup
public void setup() {
String securityProtocol = options.getSecurityProtocol();
String loginConfigFile = options.getLoginConfigFile();
String truststoreLocation = options.getTruststoreLocation();
Properties kafkaProps = new Properties();
kafkaProps.put("security.protocol", securityProtocol);
kafkaProps.put("java.security.auth.login.config", loginConfigFile);
// Configure truststore (if required)
if (truststoreLocation != null && !truststoreLocation.isEmpty()) {
kafkaProps.put("ssl.truststore.location", truststoreLocation);
// You may need to provide other truststore-related settings here
}
// Create Kafka consumer with the configured properties
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProps);
// Perform other Kafka consumer setup and processing
// ...
}
此代码读取传递给 Dataflow 作业的参数,将它们设置在 Kafka 使用者的属性中,并配置信任库(如果提供)。确保您的数据流管道代码使用
options
对象来访问传递给作业的参数。
完成这些配置后,您的数据流作业应该能够使用所需的安全设置和自定义配置连接到 Kafka。