CDH蒸蒸消费者kerberos kafka

问题描述 投票:1回答:1

[有人尝试将CDS的serber-steaming(pyspark)用作kerberos KAFKA的消费者吗?

我搜索CDH,仅找到有关Scala的示例。

这是否意味着CDH不支持此功能?

任何人都可以为此提供帮助???

pyspark apache-kafka spark-streaming kerberos cloudera-cdh
1个回答
0
投票

CDH支持基于Pyspark的结构化流API,也可以连接受Kerberos保护的Kafka群集。即使是我也很难找到示例代码。您可以参考下面的示例代码,这些示例代码在CDH产品环境中经过了很好的测试和实现。

Note:下面的示例代码中要考虑的要点。

根据您的环境调整软件包版本。

提及权限JAAS,spark提交命令中的Keytab文件位置和代码中的配置参数。

该代码已作为示例读取Kerberos启用的Kafka集群主题并写入HDFS位置。

spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,com.databricks:spark-avro_2.11:3.2.0  --conf spark.ui.port=4055 --files /home/path/spark_jaas,/home/bdpda/bdpda.headless.keytab --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/bdpda/spark_jaas" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/bdpda/spark_jaas" pysparkstructurestreaming.py

Pyspark代码:pysparkstructurestreaming.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
import time

#  Spark Streaming context :

spark = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)

#  Kafka Topic Details :

KAFKA_TOPIC_NAME_CONS = "topic_name"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'kafka_server:9093'

#  Creating  readstream DataFrame :

df = spark.readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
     .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
     .option("startingOffsets", "earliest") \
     .option("kafka.security.protocol","SASL_SSL")\
     .option("kafka.client.id" ,"Clinet_id")\
     .option("kafka.sasl.kerberos.service.name","kafka")\
     .option("kafka.ssl.truststore.location", "/home/path/kafka_trust.jks") \
     .option("kafka.ssl.truststore.password", "password_rd") \
     .option("kafka.sasl.kerberos.keytab","/home/path.keytab") \
     .option("kafka.sasl.kerberos.principal","path") \
     .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

#  Creating  Writestream DataFrame :

df1.writeStream \
   .option("path","target_directory") \
   .format("csv") \
   .option("checkpointLocation","chkpint_directory") \
   .outputMode("append") \
   .start()

ssc.awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.