spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark 流 - withWatermark() 具有重复行为

我正在从 Kafka 读取数据(最早开始偏移)并在控制台上写入数据进行一些测试。水印持续时间为 10 秒。遵循 Spark 文档 - https://spark.apache.org/

回答 1 投票 0

如何使用 PySpark 正常关闭 StructureStreaming?

作为主题,如何使用 PySpark 优雅地关闭 StructureStreaming?当我调用 Spark.streams.awaitAnyTermination(timeout=100) 时,它总是返回错误:ERROR MicroBatchExecution: Query [id = 97f...

回答 1 投票 0

Spark 如何结构化流式处理微批次 - 一个接一个或同时

我有一个关于如何处理微批次的问题。会火花 为所有可用的执行器获取多个微批次并并行处理多个微批次或 它只会得到一批...

回答 1 投票 0

Spark 流 leftOuter 无法与第三个流连接一起使用

我被这个流 leftOuter join 困住了。我能够流连接 2 个数据帧,并且在水印时间到期后可以获得空值。但如果我加入 3 个数据框,我就无法实现。 ...

回答 1 投票 0

流数据帧/数据集不支持非基于时间的窗口,如何解决这个问题?

在我的代码中有一个特定的要求,因此我必须在流数据帧上使用分区。 但给了我以下错误 流式 DataF 不支持非基于时间的窗口...

回答 1 投票 0

即使没有新数据,我们如何强制 Databricks 结构化流每隔几秒从事件中心读取数据?

我们使用 Databricks 结构化流从 azure 事件中心读取数据,并使用 forEachBatch 将数据更新插入到 writeStream 部分中的增量表中 问题是 - 事件...

回答 1 投票 0

Stream-Static Join:如何定期刷新(取消持久化/持久化)静态数据帧

我正在构建一个 Spark 结构化流应用程序,我正在其中进行批处理流连接。并且批量数据的来源会定期更新。 所以,我打算做一个坚持/取消坚持...

回答 2 投票 0

使用 foreachBatch 进行自动加载器模式演变

我在工作过程中遇到了一些模式演变,但我找不到让它发挥作用的方法。 上周,我在 ERP 系统中启用了 5 列,业务需要在...

回答 1 投票 0

Spark Streaming 正在从 Kafka 主题读取以及如何将嵌套 Json 格式转换为 dataframe

我能够从 Kafka 主题读取数据,并能够使用 Spark Streaming 在控制台上打印数据。 我希望数据采用数据帧格式。 这是我的代码: 火花 = SparkSession \ .

回答 2 投票 0

PySpark 数据帧从_json 转换

我有以下 JSON,我正在从 Kafka 读取它,然后尝试使用 from_json 函数转换为 StructType。 schema_session_start = StructType([ StructField("ID", StringType()),...

回答 1 投票 0

Spark UI 的阶段终止是否会导致数据重新处理?

在 Spark UI 中,具有杀死活动运行阶段的功能: 当使用此按钮杀死阶段时,与该阶段关联的任务将被重新处理?或者他们会

回答 1 投票 0

来自 Kafka 源的 Pyspark 结构化流中的异常

环境:Spark 3.5.0、Scala 2.12.18、openjdk 11.0.20.1 我正在尝试从 Kafka 源流式传输数据,但从 Spark 中得到异常。 看来这通常是由于依赖版本所致

回答 1 投票 0

Spark 结构化流无法正确加载 csv 文件

我在 s3 中有管道分隔的 csv 文件,我正在尝试将其加载到 Databricks 中。当使用下面的代码读取时,文件被正确读取: df_test = Spark.read.option("标题", True).

回答 1 投票 0

将一列中的 json 数据从 Apache Kafka 流数据扩展到数据帧 PySpark 中的多列时出现问题

亲爱的, 我练习流数据。我决定创建 Apache Kafka。 我想从 KafkaProducer 读取数据,然后我创建了一个用于流查询的 Kafka 源: df = 火花 \ 。读 \ .表格...

回答 1 投票 0

PySpark(Spark v3.4.1)结构化流如何实现累积聚合数据写入spark sink?

我正在从事 pyspark 结构化流编程,以生成流数据的一些累积聚合。我已经使用mongodb和kafka作为spark读取流数据。我已经尝试过 foreachBatch...

回答 1 投票 0

PySpark 数据框转换 pyspark

我有一个下面的数据框,我需要将其转换如下。 我正在使用 PySpark 3.4.1 。 +------------------------+------------------------ -------------------------------------------------- ---------...

回答 1 投票 0

Spark 结构化流因 Kubernetes 中执行器内存不足而失败

我正在运行一个 Spark 结构化流作业,从对象存储桶中读取数据,进行一些转换和过滤,调用 groupby 和聚合。将写入内容发布到对象存储中我...

回答 1 投票 0

结构化流+Kafka集成numInputRows为0但偏移量增加

结构化流+ Kafka集成,spark版本3.3,我发现了一些奇怪的东西,一开始我可以得到正确的numInputRows,但是大约5次获取numInputRows将是0,bue endOff...

回答 1 投票 0

pyspark 中的用例流

我正在 Azure 上使用 Databricks,我的数据托管在 ADLS2 上。 当前运行时版本是 10.4 LTS(如果需要我可以升级) 我有一个表 Pimproduct: ID 姓名 行动 dlk_last_modified

回答 1 投票 0

Spark 流使用 Abris 从 Kafka 读取,并且最新的架构注册表不会同步

我有一个 Spark 流,它从 Kafka Avro 消息中读取并根据最新版本的架构生成数据帧。我正在使用 abris 来执行此操作,看起来就像这样。 导入 za.co.absa...

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.