spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark 结构化流 - 流 - 静态连接:如何更新静态 DataFrame

我的问题几乎与此相同:Stream-Static Join: How tofresh (unpersist/persist) static Dataframeperiodic 然而@Michael Heil 的解决方案并不适合我的代码。 另一个类似...

回答 1 投票 0

多个 Spark 结构化流作业使用相同 Kafka 主题的问题

我有两个独立的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流处理来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组...

回答 1 投票 0

Spark 自定义流数据源?

我正在尝试找出当前推荐的使用 Spark 结构化流最新版本创建自定义流数据源的方法。 我找到了以下教程 https://aamargaj...

回答 1 投票 0

从 PySpark 结构化流将聚合数据写入 MongoDB 的问题

我在尝试从 PySpark 结构化流作业将聚合数据写入 MongoDB 时遇到问题。这是我的设置: 我有一个 Kafka 主题,我正在其中使用 JSON 消息。 我是...

回答 1 投票 0

使用 Spark scala 加速将数据保存到 S3 存储桶

我正在寻找一些指针,通过它们可以加快数据持久保存到 S3 的速度。因此,我目前根据以下示例路径将数据持久保存到 s3 存储桶 s3://

回答 1 投票 0

在 Spark 结构化流中将 mergeSchema 设置为 true 时不会添加其他列

我有一个 Spark 结构化流作业(Kafka)。我想用任何额外的传入列更新表。我已将 mergeSchema 设置为 true,但默认情况下我的列不会更新。 我的我...

回答 1 投票 0

如何优雅地停止 Spark foreachBatch 回调中的线程

我正在使用线程包中的线程来启动执行火花流的函数。我想在满足条件时停止进程函数内的线程。 导入线程 小鬼...

回答 1 投票 0

为什么要使用Spark结构化流AvailableNow而不仅仅是普通的批处理数据帧?

我正在学习 Spark 结构化流,事情还有点模糊......我没有得到的一件事是使用批处理模式(AvailableNow = True)相对于普通模式的优势......

回答 1 投票 0

Spark消费者使用docker运行时找不到kafka主题分区

当我提交连接到 kafka 代理的 Spark 应用程序时,它会执行 kafka 查询,但不会将任何内容返回到控制台。找不到主题分区。 这是我的日志

回答 1 投票 0

使用PySpark结构化流,如何通过WebSocket将处理后的数据发送到客户端

我在应用程序中使用 PySpark 结构化流,其中使用 readStream 从 Apache Iceberg 表中读取附加数据。在 PySpark 框架内处理数据后,我...

回答 1 投票 0

(为什么)Spark Structured Streaming 会重新编译每个小批量的代码

我有一个 Spark 结构化流作业,从 Kafka 读取数据,解析 avro,分解列,计算一些额外的列作为现有列的简单组合(总和/乘积/除法),然后写...

回答 1 投票 0

DataBricks 自动加载器与输入源文件删除检测

在连续从源 s3 存储桶中提取文件时,我希望能够检测到文件被删除的情况。据我所知,自动加载器无法处理检测...

回答 1 投票 0

Spark 结构化流 - 检查点元数据无限增长

我使用spark结构流3.1.2。我需要使用 s3 来存储检查点元数据(我知道,这不是检查点元数据的最佳存储)。压缩间隔是10(默认),我设置了spar...

回答 2 投票 0

如何从特定事件中心分区 Spark-Streaming 结构读取

我有一个具有 32 个分区的事件中心。 我需要使用 Pyspark 从事件中心读取分区 1 。 这是我现有的代码 # 配置 连接字符串 = "端点=sb://abcd" 事件中心名称...

回答 1 投票 0

登录 Spark 结构化流

我能够开发一个管道,从 kafka 读取数据并进行一些转换,并将输出写入 kafka 接收器以及 parque 接收器。我想添加有效的日志记录来记录

回答 1 投票 0

从 kafka 读取的 Spark 结构化流作业未显示在 kafka 消费者组中

我使用 pyspark 创建了一个 Spark 流作业,它使用 readStream 从 kafka 主题读取数据,并使用 writeStream 写入 Oracle 数据库中的表。 作业可以成功读取...

回答 1 投票 0

Python/PySpark - 以编程方式将 json_string 列发送到 REST API

我有一个数据帧,我使用 Spark Structured Streaming .readStream() 进行流式传输: ID json_数据 123 {颜色:“红色”,值:“#f00”} 125 {颜色:“蓝色”,值:“...

回答 1 投票 0

spark 结构化流作业如何处理流 - 静态 DataFrame 连接?

我有一个 Spark 结构化流作业,它从 cassandra 和 deltalake 读取映射表并与流 df 连接。我想了解这里的确切机制。火花会击中这些吗

回答 1 投票 0

如何在 Spark Streaming 作业中查找数据帧的大小

我正在尝试查找每个批次中 Spark 流作业中数据帧的大小。我能够成功地找到批处理作业的大小,但是当涉及到流式传输时,我无法做到......

回答 2 投票 0

Spark 结构化流的分组和排序

我有一个用例,其中有流数据集,例如手机号码、开始时间和通话持续时间。 我需要对手机号码进行分组,并根据开始时间对组进行排序并过滤掉呼叫

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.