spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark watermark api 问题

对于结构化流媒体水印,在 api 中设置为 1 小时。 现在我在 Streaming Listener 中使用下面的这个 api: **事件:StreamingQueryListener.QueryProgressEvent** triggerTime = Instant.parse(event.pr...

回答 0 投票 0

Spark StructuredStreaming - 水印未按预期工作

我有一个触发时间为 10 分钟的结构化流式传输作业,我正在使用水印来说明迟到的数据。 但是,水印不起作用 - 而不是单个记录......

回答 1 投票 0

在 Azure 事件中心的单个消费者组上是否可以有多个具有自己检查点的读取进程

在我们的场景中——我们正在轮询一个本地数据库,并使用 REST 端点每 2 分钟将数据发布到事件中心 我们正在尝试将多个实体推送到同一个事件中心......

回答 0 投票 0

Spark Structured Streaming:时间窗口语义和 Available-now micro-batch

我不需要持续运行的集群来处理我的数据,所以我想,正如 Spark 文档所建议的那样,使用 available-now 触发器: 这在您想要定期发布的场景中很有用...

回答 0 投票 0

在 spark 结构化流中刷新静态数据帧

我写了一个 Spark Structured Streaming Job,它读取和写入 Kafka 队列。 在作业中,我在加入静态 DataFrame 后将输入复制到数百行。这是我...

回答 0 投票 0

Spark Structured Stream - Kinesis 作为数据源

我正在尝试使用 psypark 结构化流来消耗运动数据流记录。 我正在尝试在 aws 胶水批处理作业中运行此代码。我的目标是使用检查点并将检查点和数据保存到...

回答 1 投票 0

Spark Structured Streaming - 忽略水印,输出旧数据

我有一个示例 Spark 结构化代码,我正在尝试实施/测试水印以说明迟到的数据。 不知何故,水印被忽略了,旧数据被发布了......

回答 1 投票 0

如何在 Spark 结构化流中正确地将消息映射到具有 `schema` 和 `payload` 的对象?

我希望在 Spark 结构化流式传输期间将消息映射到内部具有架构和有效负载的对象。 这是我的原始代码 val input_schema = new StructType() .add("时间戳&quo...

回答 1 投票 0

如何在 Spark 结构化流中正确地将消息映射到具有 `schema` 和 `payload` 的结构?

我希望在 Spark 结构化流传输期间将消息映射到内部具有架构和有效负载的结构。 这是我的原始代码 val input_schema = new StructType() .add("时间戳&quo...

回答 0 投票 0

Spark Structured Streaming with Kafka client 3.4.0 找不到主题,它曾与 spark 2.2 和 Kafka client 0.10.2.1 一起工作

我正在从 spark 2.2 升级到 3.4.0,我的应用程序也使用 kafka stream,并且对于 spark-3.4.0,我必须同时更新 kafka-client。我在某处读到 kafka-3.4.0 不需要

回答 0 投票 0

Databricks Autoloader 如何在微批中拆分数据?

基于此,Databricks Runtime >= 10.2 支持“availableNow”触发器,可用于以较小的不同微批次执行批处理,其大小可以是

回答 0 投票 0

Spark 最后一个窗口在追加模式下不刷新

问题很简单,当你使用带有追加模式的TUMBLING窗口时,只有当下一条消息到达时窗口才会关闭(+水印逻辑)。 在当前的实现中,如果你停止

回答 1 投票 0

当某些偏移量没有任何数据时从kafka读取时出现空指针异常

我正在阅读火花流中的 kafka 主题,在我们开始在 kafka 中获得空抵消之前,工作一直很好。 [错误] org.apache.spark.executor.Executor - 阶段任务 0.0 异常 ...

回答 0 投票 0

底层方法抛出异常时的SparkStream

我有一个连续从 Kafka 读取的 spark readStream 函数。我对数据执行了一些操作,并想使用 Spark writeStream 将其批量写入 Cassandra DB。虽然一直

回答 1 投票 0

PySpark 从“Kafka 值”中分离出嵌套的 json 列,用于 Spark 结构化流

我已经能够编写控制台来控制我想要处理的 json 文件。拜托,我如何将“值”列分成数据列,如 json 中的那样,并写入 delta lake 以获取 sql que ...

回答 2 投票 0

使用 Kafka 进行 Spark Structured Streaming 时我们什么时候需要检查点?在 ReadStream 或 Write Stream 中?

我是 Kafka 和 PySpark+Structured Streaming 的新手,我们需要从 Kafka 主题流式传输数据并在数据经历多次转换时摄取到另一个表中。 我明白...

回答 1 投票 0

有没有办法在我只读的表上防止 ConcurrentAppendException?

我有一个结构化流,它从增量表会话中读取并写入增量表记录。目标是让流继续运行(这就是我认为流应该工作的方式),

回答 0 投票 0

在 ETL 之后写入数据库或 myApp(接收器)之前,Spark 只有窗口结果

输入kafka => ETL spark =>输出到kafka。 假设您要计算每个用户的总观看次数,但这些观看次数可能以百万为单位。如果你只是在数据中使用 KAFKA 接收器......

回答 0 投票 0

Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka

我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下: df_source = app....

回答 1 投票 0

对 databricks autolader 寄予厚望

我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。 现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations 库。

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.