我被要求研究使用 Apache Spark 的 MemoryStream 在 Java Spring Boot 服务中模拟 Kafka 流。文档/在线社区在这个主题上有点小,所以我正在寻求帮助。
这是我的实现代码。
final DataStreamReader kafkaDataStreamReader = kafkaLoader.getStream(sparkSession, options);
final Dataset<Row> df = kafkaDataStreamReader.load();
return df.writeStream().foreachBatch((batch, batchId) -> {
// Process a batch of data received from Kafka
updateTargets(name, customizerFunction, avroSchema, batch);
DataStreamReader
,这可能就是我努力创建 MemoryStream
的原因。@Slf4j
@Service
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
@Autowired
SparkSession sparkSession;
@SneakyThrows
@Override
public DataStreamReader getStream(SparkSession sparkSession, Map<String,Object> options) {
options = Map.of();
MemoryStream<String> stream = null;
try {
stream = new MemoryStream<>(1, sparkSession.sqlContext(), null, Encoders.STRING());
String jsonString = "{..}";
Seq<String> seq = JavaConverters
.collectionAsScalaIterableConverter(List.of(jsonString))
.asScala()
.toSeq();
Offset currentOffset = stream.addData(seq);
stream.commit(currentOffset);
} catch (Exception e){
log.warn("Error creating MemoryStream: ", e);
return new DataStreamReader(sparkSession);
}
Dataset<Row> data = stream.toDF();
log.debug("Stream enabled [t/f]: {}", data.isStreaming());
return data
.sqlContext()
.readStream();
.format("kafka")
.option("kafka.bootstrap.servers", "test-servers")
.option("subscribe", "test-data");
}
当我运行集成测试时,会调用 ItKafkaLoader
,因此 ActiveProfiles 在这里设置为 it
,这也是我努力创建 MemoryStream 的地方。因为我的实现代码期望返回类型为 DataStreamReader
的对象,所以我相信我需要调用 readStream()
,因为它的类型为 DataStreamReader
?但是,当我尝试readStream()
时,Spark 会抛出有关我的路径未定义的异常。
java.lang.IllegalArgumentException: 'path' is not specified
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataPathNotSpecifiedError
在搜索此错误时,我倾向于发现我需要将格式设置为 Kafka。然后,Spark 需要一个主题,然后是一个代理。我希望,自从我使用 MemoryStream 以来,Spark 就能识别出这是一个虚拟的 Kafka 集群和主题,并通过我的 MemoryStream 启动我的模拟 Kafka 流。但这并没有发生,当我运行集成测试时,我收到这些错误。
- Query [id = 4ebacd71-d..., runId = 1a2c4...] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
- Invalid url in bootstrap.servers: test-servers
理想情况下,我想弄清楚如何修复 ItKafkaLoader 中的
getStream()
,但是我有一种轻微的感觉,我不明白 MemoryStream 的真正用途,可能需要做一些不同的事情。
更新: 我发现在较新版本的 Spark 中,您只需将格式设置为内存即可,但是,我的 Spark 版本 v2.12 似乎不支持这一点。我也没有获得升级 Spark 版本的许可。
好吧,我想出了一些事情
getStream()
函数的响应更改为 Dataset
类型,而不是 DataStreamReader
这是我的代码更改
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
@SneakyThrows
@Override
public Dataset<Row> getStream(SparkSession sparkSession, Map<String, String> options) {
MemoryStream<Data> stream;
try {
Encoder<Data> encoder = Encoders.bean(Data.class);
stream = new MemoryStream<>(1, sparkSession.sqlContext(),null, encoder);
List<Data> data = getData();
Dataset<Data> df = sparkSession.createDataset(data, encoder);
Seq<Data> seqT = JavaConverters
.asScalaIteratorConverter(df.toLocalIterator())
.asScala()
.toSeq();
stream.addData(seqT);
} catch (Exception e) {
log.warn("Error creating MemoryStream: ", e);
return sparkSession.emptyDataFrame();
}
Dataset<Row> data = stream.toDF();
log.debug("Stream enabled [t/f]: {}", data.isStreaming());
return data;
}
一些需要指出的事情:
MemoryStream<String>
那么我的架构将只有一个默认列 value
,其中一行是整个 json 对象,而不是我的自定义 Data
对象内所需的属性。
Data
addData()
函数需要 Seq 类型
Seq<Data> seq = JavaConverters
.asScalaIteratorConverter(<list / collection here ...>)
.asScala()
.toSeq();`