Spark 会话线程安全

问题描述 投票:0回答:1

我读到 Spark 会话上下文是线程安全的,但并非在所有情况下都是如此。

我有一个多线程应用程序,其组织方式如下:

N - 为事件总线提供服务并发送一些简单 Spark 任务的工作人员。

现在我想为每个任务添加职位描述,以使管道更易于观察,例如:

  def withJobDesciption[T](desc:String)(fn: => T)(implicit spark: SparkSession) : T = {
    spark.sparkContext.setJobDescription(desc)
    try {
      fn
    } finally {
      spark.sparkContext.setJobDescription("")
    }
  }

sparkContext
看起来像全局存储,不能在线程之间共享。从另一个角度来看,我在官方代码库部分看到这样的:

org/apache/spark/util/HadoopFSUtils.scala

    val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)

    val statusMap = try {
      val description = paths.size match {
        case 0 =>
          "Listing leaf files and directories 0 paths"
        case 1 =>
          s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
        case s =>
          s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
      }
      sc.setJobDescription(description)

....

    finally {
      sc.setJobDescription(previousJobDescription)
    }

所以我的问题是在多线程应用程序中设置

jobDescription
的规范方法是什么以及在这种环境中使用spark会话的正确方法是什么?

scala apache-spark
1个回答
0
投票

代码调用setLocalProperty:

/** Set a human readable description of the current job. */
  def setJobDescription(value: String): Unit = {
    setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
  }

setLocalProperty 本身在它的 docs:

中给出了一个方便的警告
These properties are inherited by child threads spawned from this thread. This may have unexpected consequences when working with thread pools. The standard java implementation of thread pools have worker threads spawn other worker threads. As a result, local properties may propagate unpredictably.

从您的描述中并不清楚您是否会犯此错误。

© www.soinside.com 2019 - 2024. All rights reserved.