Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
Spark Structured Streaming with Kafka client 3.4.0 找不到主题,它曾与 spark 2.2 和 Kafka client 0.10.2.1 一起工作
我正在从 spark 2.2 升级到 3.4.0,我的应用程序也使用 kafka stream,并且对于 spark-3.4.0,我必须同时更新 kafka-client。我在某处读到 kafka-3.4.0 不需要
Databricks Autoloader 如何在微批中拆分数据?
基于此,Databricks Runtime >= 10.2 支持“availableNow”触发器,可用于以较小的不同微批次执行批处理,其大小可以是
问题很简单,当你使用带有追加模式的TUMBLING窗口时,只有当下一条消息到达时窗口才会关闭(+水印逻辑)。 在当前的实现中,如果你停止
我正在阅读火花流中的 kafka 主题,在我们开始在 kafka 中获得空抵消之前,工作一直很好。 [错误] org.apache.spark.executor.Executor - 阶段任务 0.0 异常 ...
我有一个连续从 Kafka 读取的 spark readStream 函数。我对数据执行了一些操作,并想使用 Spark writeStream 将其批量写入 Cassandra DB。虽然一直
PySpark 从“Kafka 值”中分离出嵌套的 json 列,用于 Spark 结构化流
我已经能够编写控制台来控制我想要处理的 json 文件。拜托,我如何将“值”列分成数据列,如 json 中的那样,并写入 delta lake 以获取 sql que ...
使用 Kafka 进行 Spark Structured Streaming 时我们什么时候需要检查点?在 ReadStream 或 Write Stream 中?
我是 Kafka 和 PySpark+Structured Streaming 的新手,我们需要从 Kafka 主题流式传输数据并在数据经历多次转换时摄取到另一个表中。 我明白...
有没有办法在我只读的表上防止 ConcurrentAppendException?
我有一个结构化流,它从增量表会话中读取并写入增量表记录。目标是让流继续运行(这就是我认为流应该工作的方式),
在 ETL 之后写入数据库或 myApp(接收器)之前,Spark 只有窗口结果
输入kafka => ETL spark =>输出到kafka。 假设您要计算每个用户的总观看次数,但这些观看次数可能以百万为单位。如果你只是在数据中使用 KAFKA 接收器......
Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka
我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下: df_source = app....
我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。 现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations 库。
无法使用 pyspark readstream 从 kafka 主题读取记录数组
我正在使用来自 kafka 主题的 pyspark readstream 以及一系列记录,例如 [ {}, {}, {} ]。 我能够使用 from_avro( F.col('value'), avro_schema ) 解析单个记录。 然而,实际...
我有一个连接查询,它有另一个连接查询作为子查询,但该查询没有输出。我单独运行子查询来找出问题所在,并且它按预期工作。 我正在尝试...
使用 Spark streaming + Kafka 时如何修复过期批次?
我正在尝试使用 foreachBatch() 从 kafka 主题读取数据,如下所示。 def write_stream_batches(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str, kafkaconfig: dict): 问题...
在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢
我们有一个输入文件的存储库,如 �3 \*\*Events*.xml => 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便...
使用 toTable 在 Databricks 中写入流不会执行 foreachBatch
下面的代码正常工作,即将数据写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有被执行。 当我有...
使用 Kafka 驱动程序从 Azure 事件中心读取似乎没有获得任何数据
我在 Azure Databricks python 笔记本中运行以下代码: 主题 = "myeventhub" BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093" EH_SASL = "kafkas...
Spark Shuffle Read 和 Shuffle Write 在结构化尖叫中增加
在过去的 23 小时里,我一直在使用 Kafka 运行 spark-structured streaming。我可以看到 Shuffle Read 和 Shuffle Write 急剧增加,最后,驱动程序因“ou ...
writeStream()在批次数据中打印空值,即使我在kafka中通过writeStream()提供适当的json数据。
我试图使用模式转换json,并将值打印到控制台,但writeStream()在所有列中打印空值,即使我给了适当的数据。数据我给kafka主题...{"股票":。
在spark结构化流作业中,我如何从每个微批中的相同起始偏移量读取?
我使用的是spark结构化流。能否在每个批次执行后重置Kafka偏移量,使每个批次从相同的起始偏移量读取,而不是只读取新发现的事件?...