spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

pyspark - 结构化流式传输到弹性搜索

我正在编写一个代码,其中我尝试使用 pySpark 的结构化流将数据流式传输到弹性搜索中。 火花版本:3.0.0 安装模式:pip 查询 = inpJoinDF.writeStream \ .输出...

回答 2 投票 0

Databricks Spark.readstream 格式差异

我对 Databricks 中以下代码的差异感到困惑 Spark.readStream.format('json') 与 Spark.readStream.format('cloudfiles').option('cloudFiles.format', 'json') 我知道

回答 2 投票 0

使用pyspark结构化流解析Kafka但得到null

我尝试使用下面的代码解析Kafka: 从 pyspark.sql 导入 SparkSession 从 pyspark.sql.functions 导入 * 从 pyspark.sql.types 导入 * 火花 = SparkSession \ .builder \ .ap...

回答 1 投票 0

Spark 结构化流:由于缺少水印,将 DataFrame 写入 CSV 失败

我正在使用 火花,版本 3.4.1 PySpark,版本 3.4.1 Python,版本 3.11 使用 Spark 结构化流我想将 DataFrame 编写为 CSV 文件。 logsDF 是一个 pyspark.sql.dataframe.DataFrame w...

回答 2 投票 0

将附加参数传递给 pyspark 中的 foreachBatch

我在 pyspark 结构化流中使用 foreachBatch,使用 JDBC 将每个微批次写入 SQL Server。我需要对多个表使用相同的过程,并且我想重用同一个编写器

回答 3 投票 0

在 Databricks 中调用一次 Trigger 来处理 Kinesis Stream

我正在寻找一种方法来触发我的 Databricks 笔记本一次来处理 Kinesis Stream 并使用以下模式 导入 org.apache.spark.sql.streaming.Trigger // 加载您的流数据帧 维...

回答 3 投票 0

PySpark 结构化流作业的正常关闭会引发 Py4JNetworkError

我正在开发一个 PySpark 结构化流作业,该作业从 Kafka 主题读取数据并实时处理它。我想使用信号处理实现正常关闭,但我遇到了......

回答 1 投票 0

Spark 结构化流 - 流 - 静态连接:如何更新静态 DataFrame

我的问题几乎与此相同:Stream-Static Join: How tofresh (unpersist/persist) static Dataframeperiodic 然而@Michael Heil 的解决方案并不适合我的代码。 另一个类似...

回答 1 投票 0

多个 Spark 结构化流作业使用相同 Kafka 主题的问题

我有两个独立的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流处理来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组...

回答 1 投票 0

Spark 自定义流数据源?

我正在尝试找出当前推荐的使用 Spark 结构化流最新版本创建自定义流数据源的方法。 我找到了以下教程 https://aamargaj...

回答 1 投票 0

从 PySpark 结构化流将聚合数据写入 MongoDB 的问题

我在尝试从 PySpark 结构化流作业将聚合数据写入 MongoDB 时遇到问题。这是我的设置: 我有一个 Kafka 主题,我正在其中使用 JSON 消息。 我是...

回答 1 投票 0

使用 Spark scala 加速将数据保存到 S3 存储桶

我正在寻找一些指针,通过它们可以加快数据持久保存到 S3 的速度。因此,我目前根据以下示例路径将数据持久保存到 s3 存储桶 s3://

回答 1 投票 0

在 Spark 结构化流中将 mergeSchema 设置为 true 时不会添加其他列

我有一个 Spark 结构化流作业(Kafka)。我想用任何额外的传入列更新表。我已将 mergeSchema 设置为 true,但默认情况下我的列不会更新。 我的我...

回答 1 投票 0

如何优雅地停止 Spark foreachBatch 回调中的线程

我正在使用线程包中的线程来启动执行火花流的函数。我想在满足条件时停止进程函数内的线程。 导入线程 小鬼...

回答 1 投票 0

为什么要使用Spark结构化流AvailableNow而不仅仅是普通的批处理数据帧?

我正在学习 Spark 结构化流,事情还有点模糊......我没有得到的一件事是使用批处理模式(AvailableNow = True)相对于普通模式的优势......

回答 1 投票 0

Spark消费者使用docker运行时找不到kafka主题分区

当我提交连接到 kafka 代理的 Spark 应用程序时,它会执行 kafka 查询,但不会将任何内容返回到控制台。找不到主题分区。 这是我的日志

回答 1 投票 0

使用PySpark结构化流,如何通过WebSocket将处理后的数据发送到客户端

我在应用程序中使用 PySpark 结构化流,其中使用 readStream 从 Apache Iceberg 表中读取附加数据。在 PySpark 框架内处理数据后,我...

回答 1 投票 0

(为什么)Spark Structured Streaming 会重新编译每个小批量的代码

我有一个 Spark 结构化流作业,从 Kafka 读取数据,解析 avro,分解列,计算一些额外的列作为现有列的简单组合(总和/乘积/除法),然后写...

回答 1 投票 0

DataBricks 自动加载器与输入源文件删除检测

在连续从源 s3 存储桶中提取文件时,我希望能够检测到文件被删除的情况。据我所知,自动加载器无法处理检测...

回答 1 投票 0

Spark 结构化流 - 检查点元数据无限增长

我使用spark结构流3.1.2。我需要使用 s3 来存储检查点元数据(我知道,这不是检查点元数据的最佳存储)。压缩间隔是10(默认),我设置了spar...

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.