Pyspark 将数据从 Kafka 主题流式传输到弹性搜索索引,在 Jupyter 笔记本中运行,导致失败

问题描述 投票:0回答:1

我正在通过 Jupyter Notebook 中的 Pyspark 从 Kafka Topic 流式传输数据。我们正在将流写入 Elasticsearch 索引。

加载到 HDFS 时出现此错误,但工作正常。

Driver stacktrace:)
24/09/14 04:17:08 ERROR Executor: Exception in task 39.0 in stage 15.0 (TID 193)
java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
        at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.<init>(EsStreamQueryWriter.scala:50)
        at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.$anonfun$addBatch$5(EsSparkSqlStreamingSink.scala:72)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Spark版本:3.5.0 弹性搜索:elastic.co/elasticsearch/elasticsearch:8.9.0

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.9.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ELASTIC_PASSWORD=password#123
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ports:
      - '9200:9200'
      - '9300:9300'

  kibana:
    image: docker.elastic.co/kibana/kibana:8.9.0
    container_name: kibana
    ports:
      - '5601:5601'
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - xpack.security.enabled=false
    depends_on:
      - elasticsearch

Pyspark 代码

# Create Spark Session
spark = SparkSession.builder \
    .appName("KafkaToHDFS") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.elasticsearch:elasticsearch-spark-30_2.12:8.9.0") \
    .config("es.nodes", "192.XXX.XX.144") \
    .config("es.port", "9200") \
    .config("es.nodes.wan.only", "true") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()


es_query = person_df.writeStream \
    .format("es") \
    .queryName("writing_to_es") \
    .option("es.nodes", "192.XXX.XX.144:9200") \
    .option("es.resource", "uc_person_plot/_doc") \
    .option("checkpointLocation", "hdfs://namenode:9000/uc/es/checkpoint_dir") \
    .outputMode("append") \
    .start()

请帮助您解决问题。

apache-spark elasticsearch pyspark
1个回答
0
投票

在 Jupyter Notebook 中,它是 Spark 3.5.0。这个版本的 Spark 导致了这个问题。 我开始使用 Spark 3.2.0,它运行得很轻松。

© www.soinside.com 2019 - 2024. All rights reserved.