spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

使用事件中心管理 PySpark Streaming 中的数据封装

使用 PySpark 流式传输数据时,我收到的主要消息封装在名为“body”的键中。 Spark.readStream.format("eventhubs").options(**ehConf).load() 难道是……

回答 1 投票 0

Spark 结构化流:流-流连接与不写入的聚合

平台:Databricks Notebooks|语言:PySpark 上下文:我正在尝试在流数据管道中构建一个节点,该节点评估所有存在的行数(例如 count(*) == Expected_co...

回答 1 投票 0

PySpark 流代码中的动态键/列重命名

我正在尝试动态重命名流数据中的列,但标准方法似乎不适用于流。 我的代码如下。 json_df = Spark.readStream.format("前夕...

回答 1 投票 0

在 Spark 结构化流处理期间重命名 Spark UI 中的 jobId

我能够使用 setJobDescription 重命名 SparkUI 中的作业描述,仅购买我的预处理数据被重命名(缓存数据集),但主要作业/阶段未重命名 例如: 我的缓存...

回答 1 投票 0

静态和流数据帧之间的 Spark 结构化流连接

我正在阅读有关 Spark 结构化流连接的信息,并遇到了静态到流数据集支持的连接类型。我有一个问题,为什么不支持它,因为据我所知...

回答 1 投票 0

隐藏 Spark 属性,使其不显示在 Spark Web UI 中,而不实现安全过滤器

位于 http://:4040 的应用程序 Web UI 在“环境”选项卡中列出了 Spark 属性。 将显示通过spark-defaults.conf、SparkConf 或命令行显式指定的所有值。 嗬...

回答 2 投票 0

在apache Iceberg 上的同一个表中合并多个流的问题

我在不同字段的同一个表中进行了多个火花流写入。 Iceberg 文档说了以下内容:Iceberg 支持使用乐观并发进行多个并发写入...

回答 1 投票 0

Spark 流作为事件处理/处理解决方案(微服务)

Spark 批处理为我们的业务带来了很多价值,因为它非常容易水平扩展(我们将 AWS EMR 与 YARN 结合使用)。 然而,我们最新的专有技术带来了新的挑战

回答 1 投票 0

PySpark StreamingQueryException:Elasticsearch Spark 连接器的 java.lang.NoSuchMethodError

我正在从事 PySpark 流作业,需要将流数据从 Kafka 写入 Elasticsearch。我正在使用: 火花版本:3.5.2 Elasticsearch Spark 连接器:org.elasticsearch:elasticsea...

回答 1 投票 0

错误 SparkContext:无法添加文件 java.io.FileNotFoundException:找不到 Spark 的 Jar

请根据我使用的代码帮助我修复上述错误 proccesing_data.py代码用于使用spark-streaming处理数据 导入日志记录 从 pyspark.sql 导入 SparkSession 来自 pys...

回答 1 投票 0

spark 结构化流 - 使用 availableNow 触发器从 kafka 读取

我尝试使用 Spark Stream API 从 Kafka 读取数据并将结果作为增量表写入 S3。对我来说,在 S3 上放置更少的对象很重要,因此我使用 coalesce(2) 在每个批次中创建两个对象。

回答 1 投票 0

如何用Spark高效读取多个parquet小文件?有CombineParquetInputFormat吗?

Spark 生成了多个小 parquet 文件。如何在生产者和消费者 Spark 作业上有效处理少量 parquet 文件。

回答 2 投票 0

DLT 水印名称“窗口”未定义

我正在从流表中读取: df = Spark.readStream.option("ignoreChanges", "true").table(层次结构) 为了简单起见,我们只是说我需要获取列...

回答 1 投票 0

使用 Databricks 自动加载器读取以“§”作为分隔符的 CSV

我对 Spark Streaming 和自动加载器非常陌生,并且询问如何让自动加载器读取以“§”作为分隔符的文本文件。下面我尝试将文件读取为...

回答 1 投票 0

Spark 3.0 无法将非空数据写入iceberg

我有一个 avro 文件,其中有一个名为 timeStamp 的字段,这是一个强制字段,没有任何默认值。这意味着没有机会将该字段设置为空。架构定义如下 ...

回答 1 投票 0

使用 WHEN | 时出现意外行为否则

我们开发了一种流处理,它使用许多其他增量表来丰富最终的数据产品。 我们将其称为 FinalDataProduct,插入数据的增量表,semiLayout a

回答 1 投票 0

设置结构化流中的每个微批次数据计数

我们在 Databricks 中利用结构化流,使用 foreach 功能进行转换和操作,并最终将数据写入 Delta 表。我们的数据来源...

回答 1 投票 0


以JSON格式的字符串访问数组中的特定元素

我有一些流数据,可以像这样最小化地减少: { “数据”:[ { “钥匙”=1, “val”=“a” }, { ...

回答 1 投票 0

如何使用apache Spark进行视频流中的人脸检测

这是我试图解决的问题的背景: 我有一个视频文件(MPEG-2 编码)位于某个远程服务器上。 我的工作是编写一个程序来对这个v进行人脸检测...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.