PySpark StreamingQueryException:Elasticsearch Spark 连接器的 java.lang.NoSuchMethodError

问题描述 投票:0回答:1

我正在从事 PySpark 流作业,需要将流数据从 Kafka 写入 Elasticsearch。我正在使用:

Spark版本:3.5.2 Elasticsearch Spark 连接器:

org.elasticsearch:elasticsearch-spark-30_2.13:8.10.2 Scala Version: 2.13

目标是在 PySpark 中处理和关联流数据,并将最终输出写入 Elasticsearch 索引。但是,我在执行过程中遇到以下错误:

Traceback (most recent call last):
  File "/Users/harshvardhanmatta/iceberg_pyspark_kafka_project/correlation/correlation_hotspot_mikrotik_5.py", line 182, in <module>
    main()
  File "/Users/harshvardhanmatta/iceberg_pyspark_kafka_project/correlation/correlation_hotspot_mikrotik_5.py", line 174, in main
    es_write_query.processAllAvailable()
  ...
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 2fb60ac0-7734-404a-b02a-e5812e8b91f5, runId = 3800f064-6b8b-4d7d-b6ad-60f500eddbb2] terminated with exception: Job aborted due to stage failure: Task 20 in stage 13.0 failed 1 times, most recent failure: Lost task 20.0 in stage 13.0 (TID 432) (10.5.49.178 executor driver): java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
    at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.<init>(EsStreamQueryWriter.scala:50)
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.$anonfun$addBatch$5(EsSparkSqlStreamingSink.scala:72)

我正在使用以下配置将最终的流式 DataFrame 写入 Elasticsearch:

    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "correlated-data") \
    .option("es.nodes", "137.59.52.242") \
    .option("es.port", "9200") \
    .option("es.net.http.auth.user", "elastic") \
    .option("es.net.http.auth.pass", "xxxx") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .start()

错误分析: 错误 java.lang.NoSuchMethodError 通常表示版本不匹配或 Spark 版本与连接器之间不兼容。我怀疑问题可能是:

Spark 3.5.2、Elasticsearch Spark 连接器或 Scala 版本之间不匹配。 Spark SQL 流 API 和 Elasticsearch 连接器中的方法可能不兼容。

迄今为止采取的行动

  1. 验证Spark和Scala版本一致:使用Scala 2.13。
  2. 将连接器更新为 org.elasticsearch:elasticsearch-spark-30_2.13:8.10.2。
  3. 向 Spark 会话和连接器添加了适当的配置。

问题

  1. 有人在 Spark 3.5.x 和 Elasticsearch 连接器方面遇到过类似的问题吗?
  2. 我应该为 Spark 3.5.2 使用不同的连接器版本吗?
  3. 是否有任何已知的兼容性问题或针对此特定错误的修复?

任何有关如何解决此问题的指导将不胜感激。

scala apache-spark elasticsearch pyspark spark-streaming
1个回答
0
投票

根据elasticsearch的github问题,尚不支持spark 3.5。

我会使用较旧的 Spark 版本(例如 3.4.3)尝试您的工作流程,该版本被提到在同一 github 问题线程中受支持。

© www.soinside.com 2019 - 2024. All rights reserved.