Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
如何使用pyspark filestream读取最近一小时内上传的新文件?
我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流。我尝试过 Spark 的 maxFileAge 选项
Apache Spark 3.5 批处理模式结构化流中的 Kafka 偏移量问题
我正在根据 Kafka 集成指南编写一个使用 Kafka 作为源的批量查询,并希望定期提交该批处理,比如每天一次,以处理已添加的记录
我有 Spark 结构化流应用程序,可以愉快地与 Spark 3.0.1 一起使用。现在我尝试升级到 Spark 3.3.2 并收到以下异常: > 23/12/15 00:14:45 错误
我想在 Databricks 中设置集群使用警报。 例如:如果驱动程序节点的 CPU 利用率 > 80%,则发送警报。 可以为工作设置电子邮件通知,但我很感兴趣...
首先是一些背景知识,在一些业务需求将所有管道合并到一个中心位置之后,我最近不得不将管道从标准读取和写入转移到结构化流。 ...
Spark Structured Streaming dropDuplicates 在重新启动应用程序后无法按预期工作
我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...
java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理
我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。 我在从位于
引起:java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArraySerializer
我正在使用 kafka 和 cassandra 进行 Spark 结构化流处理,当我在命令下运行时出现错误 Spark-submit --class StreamHandler --master local[*] --packages "org.apache.spark...
我正在开发一个 AWS MSK 消费者项目,我希望在部署到 EMR 之前开发/测试我的应用程序。但是我不知道如何配置我的本地以从我的 MSK kafka 中使用
Spark 结构化流 dropDuplicates 未按预期工作
我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...
如何使结构化流中的 dropDuplicates 状态过期以避免 OOM?
我想使用spark结构化流计算每天的唯一访问次数,所以我使用以下代码 .dropDuplicates("uuid") 第二天,今天维持的状态应该会消失...
我正在开发 Spark Streaming 项目,目标是创建一个简单的应用程序,以便在数据流满足条件时通知用户(例如,当股票价格 > x 时发送通知)。 df =...
我有一个应用程序,它从 Kafka 获取数据并将其保存到数据库中。我的代码如下所示: Spark.readStream .format("卡夫卡") .选项(选项) 。加载() .writeStream .触发(
我们可以在 PySpark 结构化流中使用 row_number() 吗?
Row_number() 函数上的 PySpark SQL 函数参考说 返回窗口分区内从 1 开始的序列号 暗示该功能仅适用于 Windows。试 df.
dropDuplicatesWithinWatermark 到底是如何工作的?
我有这个代码: 导入 org.apache.spark.sql.SparkSession 导入 org.apache.spark.sql.execution.streaming.MemoryStream 导入 org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Tr...
检查点后未使用新的spark.sql.shuffle.partitions值
我有一个 Spark 的结构化流应用程序,带有检查点以在 parquet 中写入输出并使用默认的 Spark.sql.shuffle.partitions = 200。我需要更改随机分区,但是...
Databricks Autoloader Schema Evolution 抛出 StateSchemaNotCompatible 异常
我正在尝试将 Databricks Autoloader 用于一个非常简单的用例: 从 S3 读取 JSON 并将其加载到增量表中,并进行模式推断和演化。 这是我的代码: 自我火花\ ...
当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACUUM 和 DELETE 是否安全
我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全
当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACCUM 和 DELETE 是否安全
我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全
使用托管身份访问 Spark Streaming 中的 Eventhub
我有一个要求,我需要访问 Spark Streaming(天蓝色数据砖)中的 eventhub 流,但使用托管身份验证。目前我在所有BL中只看到SAS密钥访问...