我正在从事 PySpark 流作业,需要将流数据从 Kafka 写入 Elasticsearch。我正在使用:
Spark版本:3.5.2 Elasticsearch Spark 连接器:
org.elasticsearch:elasticsearch-spark-30_2.13:8.10.2 Scala Version: 2.13
目标是在 PySpark 中处理和关联流数据,并将最终输出写入 Elasticsearch 索引。但是,我在执行过程中遇到以下错误:
Traceback (most recent call last):
File "/Users/harshvardhanmatta/iceberg_pyspark_kafka_project/correlation/correlation_hotspot_mikrotik_5.py", line 182, in <module>
main()
File "/Users/harshvardhanmatta/iceberg_pyspark_kafka_project/correlation/correlation_hotspot_mikrotik_5.py", line 174, in main
es_write_query.processAllAvailable()
...
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 2fb60ac0-7734-404a-b02a-e5812e8b91f5, runId = 3800f064-6b8b-4d7d-b6ad-60f500eddbb2] terminated with exception: Job aborted due to stage failure: Task 20 in stage 13.0 failed 1 times, most recent failure: Lost task 20.0 in stage 13.0 (TID 432) (10.5.49.178 executor driver): java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.<init>(EsStreamQueryWriter.scala:50)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.$anonfun$addBatch$5(EsSparkSqlStreamingSink.scala:72)
我正在使用以下配置将最终的流式 DataFrame 写入 Elasticsearch:
.outputMode("append") \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "correlated-data") \
.option("es.nodes", "137.59.52.242") \
.option("es.port", "9200") \
.option("es.net.http.auth.user", "elastic") \
.option("es.net.http.auth.pass", "xxxx") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start()
错误分析: 错误 java.lang.NoSuchMethodError 通常表示版本不匹配或 Spark 版本与连接器之间不兼容。我怀疑问题可能是:
Spark 3.5.2、Elasticsearch Spark 连接器或 Scala 版本之间不匹配。 Spark SQL 流 API 和 Elasticsearch 连接器中的方法可能不兼容。
迄今为止采取的行动:
问题:
任何有关如何解决此问题的指导将不胜感激。
根据elasticsearch的github问题,尚不支持spark 3.5。
我会使用较旧的 Spark 版本(例如 3.4.3)尝试您的工作流程,该版本被提到在同一 github 问题线程中受支持。