spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark Structured Streaming with Kafka client 3.4.0 找不到主题,它曾与 spark 2.2 和 Kafka client 0.10.2.1 一起工作

我正在从 spark 2.2 升级到 3.4.0,我的应用程序也使用 kafka stream,并且对于 spark-3.4.0,我必须同时更新 kafka-client。我在某处读到 kafka-3.4.0 不需要

回答 0 投票 0

Databricks Autoloader 如何在微批中拆分数据?

基于此,Databricks Runtime >= 10.2 支持“availableNow”触发器,可用于以较小的不同微批次执行批处理,其大小可以是

回答 0 投票 0

Spark 最后一个窗口在追加模式下不刷新

问题很简单,当你使用带有追加模式的TUMBLING窗口时,只有当下一条消息到达时窗口才会关闭(+水印逻辑)。 在当前的实现中,如果你停止

回答 1 投票 0

当某些偏移量没有任何数据时从kafka读取时出现空指针异常

我正在阅读火花流中的 kafka 主题,在我们开始在 kafka 中获得空抵消之前,工作一直很好。 [错误] org.apache.spark.executor.Executor - 阶段任务 0.0 异常 ...

回答 0 投票 0

底层方法抛出异常时的SparkStream

我有一个连续从 Kafka 读取的 spark readStream 函数。我对数据执行了一些操作,并想使用 Spark writeStream 将其批量写入 Cassandra DB。虽然一直

回答 1 投票 0

PySpark 从“Kafka 值”中分离出嵌套的 json 列,用于 Spark 结构化流

我已经能够编写控制台来控制我想要处理的 json 文件。拜托,我如何将“值”列分成数据列,如 json 中的那样,并写入 delta lake 以获取 sql que ...

回答 2 投票 0

使用 Kafka 进行 Spark Structured Streaming 时我们什么时候需要检查点?在 ReadStream 或 Write Stream 中?

我是 Kafka 和 PySpark+Structured Streaming 的新手,我们需要从 Kafka 主题流式传输数据并在数据经历多次转换时摄取到另一个表中。 我明白...

回答 1 投票 0

有没有办法在我只读的表上防止 ConcurrentAppendException?

我有一个结构化流,它从增量表会话中读取并写入增量表记录。目标是让流继续运行(这就是我认为流应该工作的方式),

回答 0 投票 0

在 ETL 之后写入数据库或 myApp(接收器)之前,Spark 只有窗口结果

输入kafka => ETL spark =>输出到kafka。 假设您要计算每个用户的总观看次数,但这些观看次数可能以百万为单位。如果你只是在数据中使用 KAFKA 接收器......

回答 0 投票 0

Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka

我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下: df_source = app....

回答 1 投票 0

对 databricks autolader 寄予厚望

我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。 现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations 库。

回答 1 投票 0

无法使用 pyspark readstream 从 kafka 主题读取记录数组

我正在使用来自 kafka 主题的 pyspark readstream 以及一系列记录,例如 [ {}, {}, {} ]。 我能够使用 from_avro( F.col('value'), avro_schema ) 解析单个记录。 然而,实际...

回答 1 投票 0

包含多个连接的 Spark 流式查询没有输出

我有一个连接查询,它有另一个连接查询作为子查询,但该查询没有输出。我单独运行子查询来找出问题所在,并且它按预期工作。 我正在尝试...

回答 0 投票 0

使用 Spark streaming + Kafka 时如何修复过期批次?

我正在尝试使用 foreachBatch() 从 kafka 主题读取数据,如下所示。 def write_stream_batches(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str, kafkaconfig: dict): 问题...

回答 1 投票 0

在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢

我们有一个输入文件的存储库,如 �3 \*\*Events*.xml => 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便...

回答 0 投票 0

使用 toTable 在 Databricks 中写入流不会执行 foreachBatch

下面的代码正常工作,即将数据写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有被执行。 当我有...

回答 1 投票 0

使用 Kafka 驱动程序从 Azure 事件中心读取似乎没有获得任何数据

我在 Azure Databricks python 笔记本中运行以下代码: 主题 = "myeventhub" BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093" EH_SASL = "kafkas...

回答 2 投票 0

Spark Shuffle Read 和 Shuffle Write 在结构化尖叫中增加

在过去的 23 小时里,我一直在使用 Kafka 运行 spark-structured streaming。我可以看到 Shuffle Read 和 Shuffle Write 急剧增加,最后,驱动程序因“ou ...

回答 0 投票 0

writeStream()在批次数据中打印空值,即使我在kafka中通过writeStream()提供适当的json数据。

我试图使用模式转换json,并将值打印到控制台,但writeStream()在所有列中打印空值,即使我给了适当的数据。数据我给kafka主题...{"股票":。

回答 1 投票 0

在spark结构化流作业中,我如何从每个微批中的相同起始偏移量读取?

我使用的是spark结构化流。能否在每个批次执行后重置Kafka偏移量,使每个批次从相同的起始偏移量读取,而不是只读取新发现的事件?...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.