Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
Spark Structured Streaming dropDuplicates 在重新启动应用程序后无法按预期工作
我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...
java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理
我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。 我在从位于
引起:java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArraySerializer
我正在使用 kafka 和 cassandra 进行 Spark 结构化流处理,当我在命令下运行时出现错误 Spark-submit --class StreamHandler --master local[*] --packages "org.apache.spark...
我正在开发一个 AWS MSK 消费者项目,我希望在部署到 EMR 之前开发/测试我的应用程序。但是我不知道如何配置我的本地以从我的 MSK kafka 中使用
Spark 结构化流 dropDuplicates 未按预期工作
我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...
如何使结构化流中的 dropDuplicates 状态过期以避免 OOM?
我想使用spark结构化流计算每天的唯一访问次数,所以我使用以下代码 .dropDuplicates("uuid") 第二天,今天维持的状态应该会消失...
我正在开发 Spark Streaming 项目,目标是创建一个简单的应用程序,以便在数据流满足条件时通知用户(例如,当股票价格 > x 时发送通知)。 df =...
我有一个应用程序,它从 Kafka 获取数据并将其保存到数据库中。我的代码如下所示: Spark.readStream .format("卡夫卡") .选项(选项) 。加载() .writeStream .触发(
我们可以在 PySpark 结构化流中使用 row_number() 吗?
Row_number() 函数上的 PySpark SQL 函数参考说 返回窗口分区内从 1 开始的序列号 暗示该功能仅适用于 Windows。试 df.
dropDuplicatesWithinWatermark 到底是如何工作的?
我有这个代码: 导入 org.apache.spark.sql.SparkSession 导入 org.apache.spark.sql.execution.streaming.MemoryStream 导入 org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Tr...
检查点后未使用新的spark.sql.shuffle.partitions值
我有一个 Spark 的结构化流应用程序,带有检查点以在 parquet 中写入输出并使用默认的 Spark.sql.shuffle.partitions = 200。我需要更改随机分区,但是...
Databricks Autoloader Schema Evolution 抛出 StateSchemaNotCompatible 异常
我正在尝试将 Databricks Autoloader 用于一个非常简单的用例: 从 S3 读取 JSON 并将其加载到增量表中,并进行模式推断和演化。 这是我的代码: 自我火花\ ...
当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACUUM 和 DELETE 是否安全
我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全
当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACCUM 和 DELETE 是否安全
我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全
使用托管身份访问 Spark Streaming 中的 Eventhub
我有一个要求,我需要访问 Spark Streaming(天蓝色数据砖)中的 eventhub 流,但使用托管身份验证。目前我在所有BL中只看到SAS密钥访问...
Azure databricks 自动加载器 Spark Streaming 无法读取输入文件
我已经使用自动加载器功能设置了流作业,输入位于 azure adls gen2 中,采用 parquet 格式。下面是代码。 df = Spark.readStream.format("cloudFiles")\ .选择...
delta mergeSchema 无法将 MemoryStream 与 Spark 检查点结合使用
我正在使用 Spark 的 MemoryStream 测试 DeltaWriter 类来创建流(而不是 readStream),并且我想使用选项“mergeSchema”将结果作为增量文件写入 s3 上:...
我是结构化流媒体新手,希望根据 json 消息中的日期列创建分区列。 这是示例消息: {“日期”:“2022-03-01”,“c...
将数据从增量表写入事件中心的流作业在数据块中失败 - 超时问题
这里是我的代码,用于将数据从增量表写入事件中心(消费者团队将从这里消费数据): 导入 org.apache.spark.eventhubs._ 导入 org.apache.spark.sql.streaming.Trigger._...
Trigger.AvailableNow 用于 PySpark (Databricks) 中的 Delta 源流查询
Databricks 文档中的所有示例均采用 Scala 语言。无法从 PySpark 找到如何使用此触发器类型。是否有等效的 API 或解决方法?