解释Spark Structured Streaming执行程序和Kafka分区之间的映射

问题描述 投票:1回答:2

我已经使用4个分区在Kafka主题上部署了一个包含4个工作者的结构化流。

我假设将为4个分区部署4个工作程序,在worker < - >分区之间进行一对一映射。

但事实并非如此。所有分区都由同一个Executor提供服务。我通过检查thread-id并通过执行程序登录来确认这一点。

是否有任何文档显示Kafka分区和Spark Structured Streams之间的相关性。还有,我们可以调整任何旋钮。

apache-spark spark-structured-streaming
2个回答
0
投票

如果您使用的是DirectStream API,则相关性为1:1(sparkcore:partition)。来自spark streaming guide

Kafka 0.10的Spark Streaming集成在设计上与0.8 Direct Stream方法类似。它提供简单的并行性,Kafka分区和Spark分区之间的1:1对应关系,以及对偏移和元数据的访问


0
投票

相关性为“1:n(执行程序:分区)”:Kafka分区只能由一个执行程序使用,一个执行程序可以使用多个Kafka分区。

这与Spark Streaming一致。


对于结构化流,默认模型是“微批处理模型”,“连续处理模型”仍处于“实验”状态。

对于“微批处理模型”,在“KafkaSource.scala”中有

 *   - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the
 *     data from Kafka topic + partition is consistently read by the same executors across
 *     batches, and cached KafkaConsumers in the executors can be reused efficiently. See the
 *     docs on [[KafkaSourceRDD]] for more details.

在“KafkaSourceRDD”中

/**
 * An RDD that reads data from Kafka based on offset ranges across multiple partitions.
 * Additionally, it allows preferred locations to be set for each topic + partition, so that
 * the [[KafkaSource]] can ensure the same executor always reads the same topic + partition
 * and cached KafkaConsumers (see [[KafkaDataConsumer]] can be used read data efficiently.
 *
 * ...
 */
private[kafka010] class KafkaSourceRDD(

我们知道默认的位置政策是LocationStrategies.PreferConsistent


对于“连续处理模型”,在“KafkaContinuousReader.scala”中

  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
    ...
    startOffsets.toSeq.map {
      case (topicPartition, start) =>
        KafkaContinuousDataReaderFactory(
          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
          .asInstanceOf[DataReaderFactory[UnsafeRow]]
    }.asJava
  }

/**
 * A data reader factory for continuous Kafka processing. This will be serialized and transformed
 * into a full reader on executors.
 *
 * @param topicPartition The (topic, partition) pair this task is responsible for.
 * ...
 */
case class KafkaContinuousDataReaderFactory(
    topicPartition: TopicPartition,
    startOffset: Long,
    kafkaParams: ju.Map[String, Object],
    pollTimeoutMs: Long,
    failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
  override def createDataReader(): KafkaContinuousDataReader = {
    new KafkaContinuousDataReader(
      topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
  }
}

我们可以知道每个(topic, partition)将包含在一个工厂中,然后将在一个执行器中。

© www.soinside.com 2019 - 2024. All rights reserved.