spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

如何使用pyspark filestream读取最近一小时内上传的新文件?

我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流。我尝试过 Spark 的 maxFileAge 选项

回答 1 投票 0

Apache Spark 3.5 批处理模式结构化流中的 Kafka 偏移量问题

我正在根据 Kafka 集成指南编写一个使用 Kafka 作为源的批量查询,并希望定期提交该批处理,比如每天一次,以处理已添加的记录

回答 1 投票 0

spark-kafka 3.3.2 失败的结构化流媒体

我有 Spark 结构化流应用程序,可以愉快地与 Spark 3.0.1 一起使用。现在我尝试升级到 Spark 3.3.2 并收到以下异常: > 23/12/15 00:14:45 错误

回答 1 投票 0

如何为databricks集群设置警报?

我想在 Databricks 中设置集群使用警报。 例如:如果驱动程序节点的 CPU 利用率 > 80%,则发送警报。 可以为工作设置电子邮件通知,但我很感兴趣...

回答 1 投票 0

如何在批量用例的结构化流中实现窗口函数?

首先是一些背景知识,在一些业务需求将所有管道合并到一个中心位置之后,我最近不得不将管道从标准读取和写入转移到结构化流。 ...

回答 1 投票 0

Spark Structured Streaming dropDuplicates 在重新启动应用程序后无法按预期工作

我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...

回答 2 投票 0

java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理

我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。 我在从位于

回答 2 投票 0

引起:java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArraySerializer

我正在使用 kafka 和 cassandra 进行 Spark 结构化流处理,当我在命令下运行时出现错误 Spark-submit --class StreamHandler --master local[*] --packages "org.apache.spark...

回答 1 投票 0

消费MSK主题

我正在开发一个 AWS MSK 消费者项目,我希望在部署到 EMR 之前开发/测试我的应用程序。但是我不知道如何配置我的本地以从我的 MSK kafka 中使用

回答 1 投票 0

Spark 结构化流 dropDuplicates 未按预期工作

我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...

回答 1 投票 0

如何使结构化流中的 dropDuplicates 状态过期以避免 OOM?

我想使用spark结构化流计算每天的唯一访问次数,所以我使用以下代码 .dropDuplicates("uuid") 第二天,今天维持的状态应该会消失...

回答 4 投票 0

具有 Spark 结构化流的动态过滤器

我正在开发 Spark Streaming 项目,目标是创建一个简单的应用程序,以便在数据流满足条件时通知用户(例如,当股票价格 > x 时发送通知)。 df =...

回答 1 投票 0

通过 Spark Streaming 高效读取 Kafka

我有一个应用程序,它从 Kafka 获取数据并将其保存到数据库中。我的代码如下所示: Spark.readStream .format("卡夫卡") .选项(选项) 。加载() .writeStream .触发(

回答 1 投票 0

我们可以在 PySpark 结构化流中使用 row_number() 吗?

Row_number() 函数上的 PySpark SQL 函数参考说 返回窗口分区内从 1 开始的序列号 暗示该功能仅适用于 Windows。试 df.

回答 2 投票 0

dropDuplicatesWithinWatermark 到底是如何工作的?

我有这个代码: 导入 org.apache.spark.sql.SparkSession 导入 org.apache.spark.sql.execution.streaming.MemoryStream 导入 org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Tr...

回答 1 投票 0

检查点后未使用新的spark.sql.shuffle.partitions值

我有一个 Spark 的结构化流应用程序,带有检查点以在 parquet 中写入输出并使用默认的 Spark.sql.shuffle.partitions = 200。我需要更改随机分区,但是...

回答 2 投票 0

Databricks Autoloader Schema Evolution 抛出 StateSchemaNotCompatible 异常

我正在尝试将 Databricks Autoloader 用于一个非常简单的用例: 从 S3 读取 JSON 并将其加载到增量表中,并进行模式推断和演化。 这是我的代码: 自我火花\ ...

回答 1 投票 0

当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACUUM 和 DELETE 是否安全

我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全

回答 1 投票 0

当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACCUM 和 DELETE 是否安全

我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全

回答 1 投票 0

使用托管身份访问 Spark Streaming 中的 Eventhub

我有一个要求,我需要访问 Spark Streaming(天蓝色数据砖)中的 eventhub 流,但使用托管身份验证。目前我在所有BL中只看到SAS密钥访问...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.