因此,在我开始之前,我想说的是,我知道这个问题可能看起来重复或重复,但是那里可用的答案与我的要求无关。它们主要用于Java和Scala,但仅适用于python。
所以我有一个在gcp上运行的受Kerberos保护的kafka集群。我通过使用kafka-python包创建生产者和消费者来检查运行的两次,它运行良好。
但是当我尝试使用pyspark通过我的spark应用程序连接到该群集时,它将无法正常工作。我的spark应用程序如下所示:-
def application(topic, batchTime, appName, **kwargs):
import os
try:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars <full-path-to-spark>/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar pyspark-shell'
conf = SparkConf().setAppName(appName).setMaster('local[*]')
sc = SparkContext(conf=conf)
stream_context = StreamingContext(sparkContext=sc, batchDuration=batchTime)
kafka_stream = KafkaUtils.createDirectStream(ssc=stream_context, topics=[topic],
kafkaParams={"metadata.broker.list":"broker1:9092",
"ssl.context": 'context',
'sasl.plain.username': '****',
'sasl.plain.password': '*********',
'sasl.mechanism': 'PLAIN',
'security.protocol': "SASL_PLAINTEXT"})
lines = kafka_stream.map(lambda x: json.loads(x[1]))
final_obj = lines.map(lambda line: SparkHelper.get_app_type(line, line['app_type']))
final_obj.foreachRDD(handler)
final_obj.pprint()
当我运行它时,它显示错误,如:-
19/11/26 19:11:59 WARN Utils: Your hostname, openstack-inspiron-3543 resolves to a loopback address: 127.0.1.1; using 10.10.0.25 instead (on interface wlp6s0)
19/11/26 19:11:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/11/26 19:11:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/11/26 19:12:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.mechanism is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.plain.password is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.plain.username is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property security.protocol is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property ssl.context is not valid
然后控制台卡在那儿。它不终止也不运行任何东西。是的,仅当我的本地kafka集群正在运行时,才会发生这种情况。如果我关闭它并运行spark应用程序,它将显示“ NoBrokerAvailable”。这不应该发生吧?当我尝试一起连接到其他群集时,本地kafka群集是否正在运行并不重要。
我尝试安装其他的spark-stream-kafka-assembly jar文件,但它们均无效。我从https://jar-download.com/?search_box=spark-streaming-kafka-assembly下载了jar文件。
我正在使用的当前版本是...
火花:2.4.4,Hadoop:2.7,api_version:0.10
我正在使用融合平台5.3.1来运行kafka集群。
我不明白问题出在哪里。请指出我正在犯的任何错误,或者如果没有什么我应该更改以使它起作用。感谢您的提前答复!
代替DSstream方法,您可以尝试Pyspark结构化流,因为它具有传递SSL功能并易于处理的选项。请在下面的堆栈溢出链接中参考经过测试的示例代码。Failed to find leader for topics; java.lang.NullPointerException NullPointerException at org.apache.kafka.common.utils.Utils.formatAddress