[有人尝试将CDS的serber-steaming(pyspark)用作kerberos KAFKA的消费者吗?
我搜索CDH,仅找到有关Scala的示例。
这是否意味着CDH不支持此功能?
任何人都可以为此提供帮助???
CDH支持基于Pyspark的结构化流API,也可以连接受Kerberos保护的Kafka群集。即使是我也很难找到示例代码。您可以参考下面的示例代码,这些示例代码在CDH产品环境中经过了很好的测试和实现。
Note:下面的示例代码中要考虑的要点。
根据您的环境调整软件包版本。
提及权限JAAS,spark提交命令中的Keytab文件位置和代码中的配置参数。
该代码已作为示例读取Kerberos启用的Kafka集群主题并写入HDFS位置。
spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,com.databricks:spark-avro_2.11:3.2.0 --conf spark.ui.port=4055 --files /home/path/spark_jaas,/home/bdpda/bdpda.headless.keytab --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/bdpda/spark_jaas" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/bdpda/spark_jaas" pysparkstructurestreaming.py
Pyspark代码:pysparkstructurestreaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
import time
# Spark Streaming context :
spark = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)
# Kafka Topic Details :
KAFKA_TOPIC_NAME_CONS = "topic_name"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'kafka_server:9093'
# Creating readstream DataFrame :
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol","SASL_SSL")\
.option("kafka.client.id" ,"Clinet_id")\
.option("kafka.sasl.kerberos.service.name","kafka")\
.option("kafka.ssl.truststore.location", "/home/path/kafka_trust.jks") \
.option("kafka.ssl.truststore.password", "password_rd") \
.option("kafka.sasl.kerberos.keytab","/home/path.keytab") \
.option("kafka.sasl.kerberos.principal","path") \
.load()
df1 = df.selectExpr( "CAST(value AS STRING)")
# Creating Writestream DataFrame :
df1.writeStream \
.option("path","target_directory") \
.format("csv") \
.option("checkpointLocation","chkpint_directory") \
.outputMode("append") \
.start()
ssc.awaitTermination()