Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
Spark 2.3.1如何设置结构化流内连接的事件时间范围条件
我正在尝试加入流数据集,因为两个数据集都没有时间分量,所以将当前时间戳用于水印。我想在5分钟后清除加入的数据,因为加入不会...
[我正在使用Spark结构化流媒体,以使用kafka主题中的数据,并将数据写入另一个kafka接收器。我想存储两次偏移量-从主题读取一次并搅动...
我正在使用Scala中的Spark结构化流代码使用JSON数据格式使用来自Kafka主题的数据,并将数据写入另一个kafka接收器。我们正在以群集模式提交作业,并且...
假设我们在流中订阅了2个主题,一个主题是avro,另一个主题是字符串,是否可以根据主题名称动态反序列化?
我正在为我的Spark结构化流应用程序构建监视,并且需要让Spark应用程序消耗某个主题的消费者滞后。我相信火花驱动程序必须知道...
我正在尝试可视化结构化流中的流查询。我该怎么办?我应该使用仪表板还是其他工具?我在网上找不到任何类似的东西。 DF =火花\ ...
“格式错误的数据长度为负数,当尝试使用带有Avro数据源的kafka的Spark结构化流式传输时
因此,我一直在尝试使用Kafka和Avro数据来尝试Angel Conde的结构化流,并使用Avro数据进行结构化流式处理Avro然而,似乎其中的嵌套数据使我的数据有点复杂。这是我的代码,...
无法使用火花结构化流反序列化avro消息,其中键已字符串化,值是avro
使用Spark 2.4.0 Confluent schema-Registry接收模式消息Key在Avro中的String和Value中被序列化,因此我试图使用io.confluent.kafka来仅反序列化Value。
如何删除由Spark结构化流(Spark 2.4.5)创建的旧数据?我有Parquet / Avro格式(不是Delta)的HDFS数据,该数据是由Spark结构化流创建的,并由...
如何使用Spark结构化流从EventHub解压缩Gzip文件
是否有一种方法可以从Eventhub中读取gzip文件,并使用Spark结构化流将其解压缩,因此希望一次使用Spark结构化流触发将未压缩的json存储在ADLS中。我是...
[我正在尝试使用水印放置dropDuplicate,问题是水印无法清除状态,我的代码是:def main(args:Array [String]):Unit = {@ @transient lazy val log = LogManager.getRootLogger。 ..
根据官方spark文档,我们可以使用spark-submit --master spark:// IP-ADDRESS:PORT --status SUBMISSION_ID来检查状态,但是当我尝试使用它时却无法...
可以在完成输出模式下的Spark结构化流中丢弃/控制中间状态吗? (Spark 2.4.0)
我有一种情况,我想处理来自kafka主题的数据。我有这个特定的Java代码,可以从kafka主题中以流的形式读取数据。数据集 streamObjs = sparkSession ....
Scala:从Spark结构化流中读取Kafka Avro消息时出错
我一直在尝试从Scala 2.11的Spark结构化流(2.4.4)中读取Kafka的avro序列化消息。为此,我使用了spark-avro(下面的依赖项)。我生成kafka ...
Spark Streaming:从Kafka读取JSON并添加event_time
我正在尝试编写从Kafka读取的有状态Spark结构化流作业。作为要求的一部分,我需要在流中添加“ event_time”作为附加列。我正在尝试...
为什么将RDD更改为DataFrame时会发生Spark不可序列化异常?
我正在使用结构化流,并且以下代码有效val j = new Jedis()//无法序列化的redis客户端。 xx.writeStream.foreachBatch {(batchDF:DataFrame,batchId:Long)=> {j ....
我需要对来自Kafka的流数据进行一些汇总,并每M秒将结果的前10行输出到控制台。 input_df =(spark .readStream .format(“ kafka”)...
对org.apache.spark.streaming.kafka.KafkaUtils的依赖性
我正在尝试将星光流与kafka集成在一起。我无法解决org.apache.spark.streaming.kafka.KafkaUtils的依赖关系。下面是我的build.sbt:名称:=“ StreamingTest”版本:=“ 1.0” ...
通过加水印可以自动删除Apache Spark结构化流中的旧状态数据。在结构化流编程指南.md中,字数示例演示了如何对水印进行加注...