配置Apache Spark的MemoryStream来模拟Kafka流

问题描述 投票:0回答:1

我被要求研究使用 Apache Spark 的 MemoryStream 在 Java Spring Boot 服务中模拟 Kafka 流。文档/在线社区在这个主题上有点小,所以我正在寻求帮助。

这是我的实现代码。

final DataStreamReader kafkaDataStreamReader = kafkaLoader.getStream(sparkSession, options);

final Dataset<Row> df = kafkaDataStreamReader.load();

return df.writeStream().foreachBatch((batch, batchId) -> {
    // Process a batch of data received from Kafka
    updateTargets(name, customizerFunction, avroSchema, batch);
  • KafkaLoader 是一个类,根据配置文件(it/prod),它将以不同的方式配置 Kafka Stream。它返回一个
    DataStreamReader
    ,这可能就是我努力创建
    MemoryStream
    的原因。
  • 接下来,在 writeStream 中我将写入我的源目的地。
@Slf4j
@Service
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
  @Autowired
  SparkSession sparkSession;

  @SneakyThrows
  @Override
  public DataStreamReader getStream(SparkSession sparkSession, Map<String,Object> options) {
    options = Map.of();
    MemoryStream<String> stream = null;
    try {
      stream = new MemoryStream<>(1, sparkSession.sqlContext(), null, Encoders.STRING());
      String jsonString = "{..}";

      Seq<String> seq = JavaConverters
        .collectionAsScalaIterableConverter(List.of(jsonString))
        .asScala()
        .toSeq();

      Offset currentOffset = stream.addData(seq);
      stream.commit(currentOffset);
    } catch (Exception e){
      log.warn("Error creating MemoryStream: ", e);
      return new DataStreamReader(sparkSession);
    }
    Dataset<Row> data = stream.toDF();
    log.debug("Stream enabled [t/f]: {}", data.isStreaming());
    return data
      .sqlContext()
      .readStream();
      .format("kafka")
      .option("kafka.bootstrap.servers", "test-servers")
      .option("subscribe", "test-data");
  }
当我运行集成测试时,会调用

ItKafkaLoader
,因此 ActiveProfiles 在这里设置为
it
,这也是我努力创建 MemoryStream 的地方。因为我的实现代码期望返回类型为
DataStreamReader
的对象,所以我相信我需要调用
readStream()
,因为它的类型为
DataStreamReader
?但是,当我尝试
readStream()
时,Spark 会抛出有关我的路径未定义的异常。

java.lang.IllegalArgumentException: 'path' is not specified
    at org.apache.spark.sql.errors.QueryExecutionErrors$.dataPathNotSpecifiedError

在搜索此错误时,我倾向于发现我需要将格式设置为 Kafka。然后,Spark 需要一个主题,然后是一个代理。我希望,自从我使用 MemoryStream 以来,Spark 就能识别出这是一个虚拟的 Kafka 集群和主题,并通过我的 MemoryStream 启动我的模拟 Kafka 流。但这并没有发生,当我运行集成测试时,我收到这些错误。

- Query [id = 4ebacd71-d..., runId = 1a2c4...] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
- Invalid url in bootstrap.servers: test-servers

理想情况下,我想弄清楚如何修复 ItKafkaLoader 中的

getStream()
,但是我有一种轻微的感觉,我不明白 MemoryStream 的真正用途,可能需要做一些不同的事情。

更新: 我发现在较新版本的 Spark 中,您只需将格式设置为内存即可,但是,我的 Spark 版本 v2.12 似乎不支持这一点。我也没有获得升级 Spark 版本的许可。

apache-spark apache-kafka spark-structured-streaming memorystream spark-java
1个回答
0
投票

好吧,我想出了一些事情

  • 将我的 MemoryStream 配置为 DataStreamReader 并不是这样的举动。
    • startStream需要一个格式,而一个格式导致需要一个路径
    • 如果我想这样做,我应该使用内存格式,但是,该选项在我的 Spark 2.12 版本中不可用
  • 解决方法是将数据添加到 MemoryStream,然后在模拟流上调用 writeStream。
    • 我将
      getStream()
      函数的响应更改为
      Dataset
      类型,而不是
      DataStreamReader

这是我的代码更改

@Profile("it")
public class ItKafkaLoader extends KafkaLoader {

  @SneakyThrows
  @Override
  public Dataset<Row> getStream(SparkSession sparkSession, Map<String, String> options) {
    MemoryStream<Data> stream;
    try {
      Encoder<Data> encoder = Encoders.bean(Data.class);

      stream = new MemoryStream<>(1, sparkSession.sqlContext(),null, encoder);

      List<Data> data = getData();

      Dataset<Data> df = sparkSession.createDataset(data, encoder);

      Seq<Data> seqT = JavaConverters
        .asScalaIteratorConverter(df.toLocalIterator())
        .asScala()
        .toSeq();

      stream.addData(seqT);

    } catch (Exception e) {
      log.warn("Error creating MemoryStream: ", e);
      return sparkSession.emptyDataFrame();
    }
    Dataset<Row> data = stream.toDF();
    log.debug("Stream enabled [t/f]: {}", data.isStreaming());
    return data;
  }

一些需要指出的事情:

  • 如果我只是创建一个
    MemoryStream<String>
    那么我的架构将只有一个默认列
    value
    ,其中一行是整个 json 对象,而不是我的自定义
    Data
    对象内所需的属性。
    • 我需要定义一个编码器,以便添加到 MemoryStream 的数据可以携带与我的自定义对象相同的架构
      Data
  • Java Spark 中的
    addData()
    函数需要 Seq 类型
    • 我发现实现此目的的最简单方法是使用下面的代码将 Collection 转换为 Seq
    • 我能够使用它将列表和数据集/数据帧转换为序列
Seq<Data> seq = JavaConverters
        .asScalaIteratorConverter(<list / collection here ...>)
        .asScala()
        .toSeq();`
© www.soinside.com 2019 - 2024. All rights reserved.