spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

从 SQS 驱动的 Pyspark 结构化流检索路径

我有一个 DMS 生成的 s3 数据湖,并设置 SQS 来跟踪生成的文件。现在我想将其流式传输到我的 EMR 集群中,为此我在此处找到了 Spark Streaming s3 连接器 https://git...

回答 1 投票 0

Spark 结构化流:流-流连接与不写入的聚合

平台:Databricks Notebooks|语言:PySpark 上下文:我正在尝试在流数据管道中构建一个节点,该节点评估所有存在的行数(例如 count(*) == Expected_co...

回答 1 投票 0

在 Spark 结构化流处理期间重命名 Spark UI 中的 jobId

我能够使用 setJobDescription 重命名 SparkUI 中的作业描述,仅购买我的预处理数据被重命名(缓存数据集),但主要作业/阶段未重命名 例如: 我的缓存...

回答 1 投票 0

从Databricks中的UDF内部查询Delta Lake

需要在结构化流中对 UDF 内的表执行一些查询。问题是,在 UDF 内部,如果我尝试使用 Spark.sql,我会收到空指针异常。最好的方法是什么

回答 1 投票 0

由于 60 秒内未收到任何更新而强制终止查询 xxxxxx

我正在 Databricks 中使用结构化流将批处理文件加载到 UC 表中。但是它正在工作,如果 foreachBatch 没有在 60 秒内完成,则会产生以下错误: ...

回答 1 投票 0

spark drop 重复项中的序列化错误

我在 Spark 中使用 dropDuplicates 函数时遇到序列化问题。这是我正在使用的代码: 覆盖 def innerTransform(dataFrames: Map[ReaderKey, DataFrame]): DataFrame = { ...

回答 1 投票 0

spark 结构化流 - 使用 availableNow 触发器从 kafka 读取

我尝试使用 Spark Stream API 从 Kafka 读取数据并将结果作为增量表写入 S3。对我来说,在 S3 上放置更少的对象很重要,因此我使用 coalesce(2) 在每个批次中创建两个对象。

回答 1 投票 0

错误SparkContext:无法添加spark-streaming-kafka-0-10_2.13-3.5.2.jar

错误 SparkContext:无法将 home/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-streaming-kafka-0-10_2.13-3.5.2.jar 添加到 Spark 环境 导入日志记录 从 pyspark.sql 导入

回答 1 投票 0

如何为 Spark 结构化流选择正确的 Spark.sql.shuffle.partitions 大小?

我正在使用 Spark Structured Streaming,特别是 Databricks Autoloader,来处理 S3 存储桶中的数百万条小记录,并将它们索引到 Delta Lake 表中。有

回答 1 投票 0

使用 Databricks 自动加载器读取以“§”作为分隔符的 CSV

我对 Spark Streaming 和自动加载器非常陌生,并且询问如何让自动加载器读取以“§”作为分隔符的文本文件。下面我尝试将文件读取为...

回答 1 投票 0

Spark 3.0 无法将非空数据写入iceberg

我有一个 avro 文件,其中有一个名为 timeStamp 的字段,这是一个强制字段,没有任何默认值。这意味着没有机会将该字段设置为空。架构定义如下 ...

回答 1 投票 0

将一个查询的结果馈送到同一 Spark 结构化流应用程序中的另一个查询

我刚刚开始研究 Spark 结构化流并提出了一个实现问题。 所以我正在使用 Apache Pulsar 来传输数据,并想知道是否可以运行不同的...

回答 1 投票 0

使用 WHEN | 时出现意外行为否则

我们开发了一种流处理,它使用许多其他增量表来丰富最终的数据产品。 我们将其称为 FinalDataProduct,插入数据的增量表,semiLayout a

回答 1 投票 0

使用 foreachBatch 的结构化流编写器不尊重 shuffle.partitions 参数

我们正在使用 foreachBatch 功能在结构化流上运行重复数据删除操作。 然而,写操作似乎并不尊重随机分区的数量t...

回答 1 投票 0

如何使用 Databricks 中结构化流的最大记录数来限制输入速率?

我正在尝试使用最大记录数来限制结构化流查询的输入速率。 但是,文档表示仅支持 maxFilesPerTrigger 或 maxBytesPerTrigger。 难道是……

回答 1 投票 0

pyspark - 结构化流式传输到弹性搜索

我正在编写一个代码,其中我尝试使用 pySpark 的结构化流将数据流式传输到弹性搜索中。 火花版本:3.0.0 安装模式:pip 查询 = inpJoinDF.writeStream \ .输出...

回答 2 投票 0

Databricks Spark.readstream 格式差异

我对 Databricks 中以下代码的差异感到困惑 Spark.readStream.format('json') 与 Spark.readStream.format('cloudfiles').option('cloudFiles.format', 'json') 我知道

回答 2 投票 0

使用pyspark结构化流解析Kafka但得到null

我尝试使用下面的代码解析Kafka: 从 pyspark.sql 导入 SparkSession 从 pyspark.sql.functions 导入 * 从 pyspark.sql.types 导入 * 火花 = SparkSession \ .builder \ .ap...

回答 1 投票 0

Spark 结构化流:由于缺少水印,将 DataFrame 写入 CSV 失败

我正在使用 火花,版本 3.4.1 PySpark,版本 3.4.1 Python,版本 3.11 使用 Spark 结构化流我想将 DataFrame 编写为 CSV 文件。 logsDF 是一个 pyspark.sql.dataframe.DataFrame w...

回答 2 投票 0

将附加参数传递给 pyspark 中的 foreachBatch

我在 pyspark 结构化流中使用 foreachBatch,使用 JDBC 将每个微批次写入 SQL Server。我需要对多个表使用相同的过程,并且我想重用同一个编写器

回答 3 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.