Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
我正在使用 Spark 2.1 并尝试优雅地停止流查询。 StreamingQuery.stop() 是否是一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息: ...
当我尝试通过偏移量的时间戳从 kafka 主题检索数据时,作业失败并出现错误: 原因是:java.lang.AssertionError:断言失败:没有与 topic-par 的请求匹配的偏移量...
配置Apache Spark的MemoryStream来模拟Kafka流
我被要求研究使用 Apache Spark 的 MemoryStream 在 Java Spring Boot 服务中模拟 Kafka 流。文档/在线社区在这个主题上有点小,所以我...
includeExistingFiles: false 在 Databricks Autoloader 中不起作用
使用自动加载器从 adls gen2 获取文件。但是,我只想摄取新文件。使用以下配置仍然无法阻止现有文件被摄取。还有人吗...
我正在使用徽章架构创建 DLT 管道。在 Silver 中,我使用 CDC/SCD1 按日期获取最新的 id,工作正常,但我对 @dlt.view 包装器有疑问。 我现在的
“固定间隔微批次”和“AvailableNow”触发器之间的根本区别是什么? 我发现有关这些内容的文档令人困惑。 根本不同吗...
如果对流数据帧数据进行分组,是否可以在 Spark 结构化流中以单独的单个微批次处理每个组?像这样的东西: dfs = ... dfs.groupBy(...).writestrea...
在 Spark 结构化流中对 foreachBatch 操作应用定义的函数时出现 STREAMING_CONNECT_SERIALIZATION_ERROR
我正在使用 Spark 结构化流,但偶然发现了一个问题,但我看不到问题的根本原因和解决方案。 我定义了一个包含函数的 Reader 类
有状态 Spark Streaming 的 SST 文件数量无限增长
我们正在 Databricks 上运行一个非常简单的 Apache Spark Streaming 应用程序。它使用来自 Apache Kafka 的消息,基于 1 小时水印进行重复数据删除,并写入输出...
Kafka 删除(逻辑删除)未更新 Spark 结构化流中的最大聚合
我正在对 Spark 结构化流 (Spark 3.0) 作业中的计算聚合进行原型设计,并将更新发布到 Kafka。我需要计算最大日期和所有时间的最大百分比(否
使用 TriggerAvailableNow 和 Eventhubs 进行 Spark 结构化流处理
我一直在尝试将数据块中的增量表中的事件写入事件集线器,并且在尝试使其与 Trigger availableNow=True 一起使用时遇到了问题, 我基本上是在收集...
在 UDF 中,我想将增量表读入数据帧,根据其内容更新应用 UDF 的实际数据帧的行,然后更新增量表。我会用...
PySpark 结构化流每批 2 个 SQL(长 addBatch 执行)
我有一个 Pyspark 结构化流应用程序(3.3.2),它需要使用微批次从 Kafka 读取输入,执行复杂的逻辑,其中包括连接来自几个数据帧的数据。 该应用程序是
为什么spark既需要预写日志又需要检查点? 为什么我们不能只使用检查点?另外使用预写日志有什么好处? 存储的数据有什么区别...
我希望在流中存储 PySpark DataFrame,针对每个批次对其进行更改,然后使用 foreachBatch 再次保存更新的 DataFrame。实现这一目标的最简单方法是什么。 我...
我是结构化流媒体新手,并尝试在结构化流媒体中读取数据进行性能测试 我想测试不同的场景,例如,不同的集群大小、不同的数量......
Apache Spark 结构化流文档中的“检查点”和“预写日志”是什么意思? 我怎样才能更好地理解这些概念? 有什么先决条件吗...
无法在google colab中使用pyspark从Kafka读取流数据
我正在 google colab 上运行 pyspark。我已经设置了 Kafka 并在主题中添加了一个 csv 文件。如果我不使用结构化流从 kafka 读取数据,我就可以读取数据并打印它。 然而,...
在 Databricks 结构化流中,是否可以使用 writeStream API 将流写入统一目录管理表? 我能够将流写入 ADLS Gen2 中的外部表,...
如何将Azure ADLS存储帐户名称和容器名称传递给spark.readStream
我在同一资源组下有两个存储帐户(STORAGE_ACCOUNT_A 和 STORAGE_ACCOUNT_B),并且我已经设置了 Spark Streaming 作业(自动加载器)。 df = Spark.readStream.format("cloudFiles&