spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka

我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下: df_source = app....

回答 1 投票 0

对 databricks autolader 寄予厚望

我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。 现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations 库。

回答 1 投票 0

无法使用 pyspark readstream 从 kafka 主题读取记录数组

我正在使用来自 kafka 主题的 pyspark readstream 以及一系列记录,例如 [ {}, {}, {} ]。 我能够使用 from_avro( F.col('value'), avro_schema ) 解析单个记录。 然而,实际...

回答 1 投票 0

包含多个连接的 Spark 流式查询没有输出

我有一个连接查询,它有另一个连接查询作为子查询,但该查询没有输出。我单独运行子查询来找出问题所在,并且它按预期工作。 我正在尝试...

回答 0 投票 0

使用 Spark streaming + Kafka 时如何修复过期批次?

我正在尝试使用 foreachBatch() 从 kafka 主题读取数据,如下所示。 def write_stream_batches(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str, kafkaconfig: dict): 问题...

回答 1 投票 0

在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢

我们有一个输入文件的存储库,如 �3 \*\*Events*.xml => 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便...

回答 0 投票 0

使用 toTable 在 Databricks 中写入流不会执行 foreachBatch

下面的代码正常工作,即将数据写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有被执行。 当我有...

回答 1 投票 0

使用 Kafka 驱动程序从 Azure 事件中心读取似乎没有获得任何数据

我在 Azure Databricks python 笔记本中运行以下代码: 主题 = "myeventhub" BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093" EH_SASL = "kafkas...

回答 2 投票 0

Spark Shuffle Read 和 Shuffle Write 在结构化尖叫中增加

在过去的 23 小时里,我一直在使用 Kafka 运行 spark-structured streaming。我可以看到 Shuffle Read 和 Shuffle Write 急剧增加,最后,驱动程序因“ou ...

回答 0 投票 0

writeStream()在批次数据中打印空值,即使我在kafka中通过writeStream()提供适当的json数据。

我试图使用模式转换json,并将值打印到控制台,但writeStream()在所有列中打印空值,即使我给了适当的数据。数据我给kafka主题...{"股票":。

回答 1 投票 0

在spark结构化流作业中,我如何从每个微批中的相同起始偏移量读取?

我使用的是spark结构化流。能否在每个批次执行后重置Kafka偏移量,使每个批次从相同的起始偏移量读取,而不是只读取新发现的事件?...

回答 1 投票 0

Spark结构化流媒体实时聚合

是否可以在每次触发时,在聚合时间窗口结束前,输出聚合数据?上下文。我正在开发一个应用程序,从Kafka主题读取数据,处理数据,... ...

回答 1 投票 0

火花匕首的意外排序

我写了下面的代码,我想从kafka读取并写入按年、月、日和小时分区的parquet文件。在dag中我看到一个排序操作(如下图)。这个排序操作是不是要...

回答 1 投票 0

连接spark结构化流+kafka时出错。

我试图连接我的结构化流火花2.4.5与kafka,但所有的时间,我试图这个数据源提供者错误的出现,按照我的scala代码和我的sbt构建:和错误是:和我的sbt.build是:谢谢你!。按照我的scala代码和我的sbt构建:导入org......

回答 1 投票 0

跳过的阶段对Spark作业有什么性能影响吗?

我正在运行一个spark结构化流作业,其中包括创建一个空的数据框架,使用每个微批更新它,如下所示。随着每一个微批处理的执行,阶段数增加......。

回答 1 投票 0

火花重新分区

什么是需要修复的数据。如何决定spark中Reparation的大小。修复的概念是否适用于spark流和结构化流。DF.Repartion(num)

回答 1 投票 0

PySpark结构化流+Kafka错误 (原因是:java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport )

我试图运行Python Spark结构化流+Kafka,当我运行Master@MacBook-Pro命令 spark-3.0.0-preview2-bin-hadoop2.7 % binspark-submit --packages org.apache.spark:spark-sql-...

回答 2 投票 0

使用Spark Accumulators与结构化流的关系

在我的结构化流作业中,我在updateAcrossEvents方法中更新Spark Accumulators,但是当我试图在StreamingListener中打印它们时,它们总是0。下面是代码。....

回答 1 投票 0

如何在kafka中摄取两个生产者的数据并使用spark结构化流加入?

我试图从两个kafka主题中读取数据,但我无法加入并找到最终的数据帧。我的kafka主题是CSVStreamRetail和OrderItems。 val spark = SparkSession .builder ...

回答 1 投票 1

我们如何将spark结构化流连接到redis?

我的目标是从redis中摄取流式数据并进行处理,如何通过spark结构化流连接和处理数据?我如何通过spark结构化流连接和处理数据?

回答 1 投票 3

© www.soinside.com 2019 - 2024. All rights reserved.