spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

处理 Databricks 上 Delta Live 表中联接表的增量数据加载和 SCD 类型 2

我正在开发一个利用 Databricks 上的 Delta Live Tables 的项目,其中我需要创建一个具有缓慢变化的维度类型 2 的维度(Kimball 样式)。该维度是连接 b 的结果...

回答 1 投票 0

捕获 foreachBatch 函数中引发的异常

我正在使用 Pyspark 结构化流处理 Databricks,并且希望捕获我自己在作为“.foreachBatch”函数传递到流的函数中引发的异常。 这是我的前任...

回答 1 投票 0

Spark 中的 StreamQueryListener 不执行 onQueryProgress() 中的代码

我正在从 Databricks 增量表作为流读取数据并将其写入另一个增量表(使用屏幕截图中的控制台以便于调试),我想使用 StreamingQueryListener(...

回答 3 投票 0

Spark 结构化流可用,失败时触发结束偏移?

我想知道使用AvailableNow Trigger并且查询期间出现查询失败时spark结构化流应该有什么行为?更具体地说,会发生什么......

回答 1 投票 0

Autoloader 未在流模式下拾取 .text 文件

我正在使用 Databricks Autoloader 以流(微批量)模式处理文件。源文件采用.text 格式。虽然创建了检查点并且流没有失败,但 Delta ta...

回答 1 投票 0

从 SQS 驱动的 Pyspark 结构化流检索路径

我有一个 DMS 生成的 s3 数据湖,并设置 SQS 来跟踪生成的文件。现在我想将其流式传输到我的 EMR 集群中,为此我在此处找到了 Spark Streaming s3 连接器 https://git...

回答 1 投票 0

Spark 结构化流:流-流连接与不写入的聚合

平台:Databricks Notebooks|语言:PySpark 上下文:我正在尝试在流数据管道中构建一个节点,该节点评估所有存在的行数(例如 count(*) == Expected_co...

回答 1 投票 0

在 Spark 结构化流处理期间重命名 Spark UI 中的 jobId

我能够使用 setJobDescription 重命名 SparkUI 中的作业描述,仅购买我的预处理数据被重命名(缓存数据集),但主要作业/阶段未重命名 例如: 我的缓存...

回答 1 投票 0

从Databricks中的UDF内部查询Delta Lake

需要在结构化流中对 UDF 内的表执行一些查询。问题是,在 UDF 内部,如果我尝试使用 Spark.sql,我会收到空指针异常。最好的方法是什么

回答 1 投票 0

由于 60 秒内未收到任何更新而强制终止查询 xxxxxx

我正在 Databricks 中使用结构化流将批处理文件加载到 UC 表中。但是它正在工作,如果 foreachBatch 没有在 60 秒内完成,则会产生以下错误: ...

回答 1 投票 0

spark drop 重复项中的序列化错误

我在 Spark 中使用 dropDuplicates 函数时遇到序列化问题。这是我正在使用的代码: 覆盖 def innerTransform(dataFrames: Map[ReaderKey, DataFrame]): DataFrame = { ...

回答 1 投票 0

spark 结构化流 - 使用 availableNow 触发器从 kafka 读取

我尝试使用 Spark Stream API 从 Kafka 读取数据并将结果作为增量表写入 S3。对我来说,在 S3 上放置更少的对象很重要,因此我使用 coalesce(2) 在每个批次中创建两个对象。

回答 1 投票 0

错误SparkContext:无法添加spark-streaming-kafka-0-10_2.13-3.5.2.jar

错误 SparkContext:无法将 home/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-streaming-kafka-0-10_2.13-3.5.2.jar 添加到 Spark 环境 导入日志记录 从 pyspark.sql 导入

回答 1 投票 0

如何为 Spark 结构化流选择正确的 Spark.sql.shuffle.partitions 大小?

我正在使用 Spark Structured Streaming,特别是 Databricks Autoloader,来处理 S3 存储桶中的数百万条小记录,并将它们索引到 Delta Lake 表中。有

回答 1 投票 0

使用 Databricks 自动加载器读取以“§”作为分隔符的 CSV

我对 Spark Streaming 和自动加载器非常陌生,并且询问如何让自动加载器读取以“§”作为分隔符的文本文件。下面我尝试将文件读取为...

回答 1 投票 0

Spark 3.0 无法将非空数据写入iceberg

我有一个 avro 文件,其中有一个名为 timeStamp 的字段,这是一个强制字段,没有任何默认值。这意味着没有机会将该字段设置为空。架构定义如下 ...

回答 1 投票 0

将一个查询的结果馈送到同一 Spark 结构化流应用程序中的另一个查询

我刚刚开始研究 Spark 结构化流并提出了一个实现问题。 所以我正在使用 Apache Pulsar 来传输数据,并想知道是否可以运行不同的...

回答 1 投票 0

使用 WHEN | 时出现意外行为否则

我们开发了一种流处理,它使用许多其他增量表来丰富最终的数据产品。 我们将其称为 FinalDataProduct,插入数据的增量表,semiLayout a

回答 1 投票 0

使用 foreachBatch 的结构化流编写器不尊重 shuffle.partitions 参数

我们正在使用 foreachBatch 功能在结构化流上运行重复数据删除操作。 然而,写操作似乎并不尊重随机分区的数量t...

回答 1 投票 0

如何使用 Databricks 中结构化流的最大记录数来限制输入速率?

我正在尝试使用最大记录数来限制结构化流查询的输入速率。 但是,文档表示仅支持 maxFilesPerTrigger 或 maxBytesPerTrigger。 难道是……

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.