Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka
我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下: df_source = app....
我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。 现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations 库。
无法使用 pyspark readstream 从 kafka 主题读取记录数组
我正在使用来自 kafka 主题的 pyspark readstream 以及一系列记录,例如 [ {}, {}, {} ]。 我能够使用 from_avro( F.col('value'), avro_schema ) 解析单个记录。 然而,实际...
我有一个连接查询,它有另一个连接查询作为子查询,但该查询没有输出。我单独运行子查询来找出问题所在,并且它按预期工作。 我正在尝试...
使用 Spark streaming + Kafka 时如何修复过期批次?
我正在尝试使用 foreachBatch() 从 kafka 主题读取数据,如下所示。 def write_stream_batches(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str, kafkaconfig: dict): 问题...
在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢
我们有一个输入文件的存储库,如 �3 \*\*Events*.xml => 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便...
使用 toTable 在 Databricks 中写入流不会执行 foreachBatch
下面的代码正常工作,即将数据写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有被执行。 当我有...
使用 Kafka 驱动程序从 Azure 事件中心读取似乎没有获得任何数据
我在 Azure Databricks python 笔记本中运行以下代码: 主题 = "myeventhub" BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093" EH_SASL = "kafkas...
Spark Shuffle Read 和 Shuffle Write 在结构化尖叫中增加
在过去的 23 小时里,我一直在使用 Kafka 运行 spark-structured streaming。我可以看到 Shuffle Read 和 Shuffle Write 急剧增加,最后,驱动程序因“ou ...
writeStream()在批次数据中打印空值,即使我在kafka中通过writeStream()提供适当的json数据。
我试图使用模式转换json,并将值打印到控制台,但writeStream()在所有列中打印空值,即使我给了适当的数据。数据我给kafka主题...{"股票":。
在spark结构化流作业中,我如何从每个微批中的相同起始偏移量读取?
我使用的是spark结构化流。能否在每个批次执行后重置Kafka偏移量,使每个批次从相同的起始偏移量读取,而不是只读取新发现的事件?...
是否可以在每次触发时,在聚合时间窗口结束前,输出聚合数据?上下文。我正在开发一个应用程序,从Kafka主题读取数据,处理数据,... ...
我写了下面的代码,我想从kafka读取并写入按年、月、日和小时分区的parquet文件。在dag中我看到一个排序操作(如下图)。这个排序操作是不是要...
我试图连接我的结构化流火花2.4.5与kafka,但所有的时间,我试图这个数据源提供者错误的出现,按照我的scala代码和我的sbt构建:和错误是:和我的sbt.build是:谢谢你!。按照我的scala代码和我的sbt构建:导入org......
我正在运行一个spark结构化流作业,其中包括创建一个空的数据框架,使用每个微批更新它,如下所示。随着每一个微批处理的执行,阶段数增加......。
什么是需要修复的数据。如何决定spark中Reparation的大小。修复的概念是否适用于spark流和结构化流。DF.Repartion(num)
我试图运行Python Spark结构化流+Kafka,当我运行Master@MacBook-Pro命令 spark-3.0.0-preview2-bin-hadoop2.7 % binspark-submit --packages org.apache.spark:spark-sql-...
在我的结构化流作业中,我在updateAcrossEvents方法中更新Spark Accumulators,但是当我试图在StreamingListener中打印它们时,它们总是0。下面是代码。....
如何在kafka中摄取两个生产者的数据并使用spark结构化流加入?
我试图从两个kafka主题中读取数据,但我无法加入并找到最终的数据帧。我的kafka主题是CSVStreamRetail和OrderItems。 val spark = SparkSession .builder ...
我的目标是从redis中摄取流式数据并进行处理,如何通过spark结构化流连接和处理数据?我如何通过spark结构化流连接和处理数据?