spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

如何用Spark高效读取多个parquet小文件?有CombineParquetInputFormat吗?

Spark 生成了多个小 parquet 文件。如何在生产者和消费者 Spark 作业上有效处理少量 parquet 文件。

回答 2 投票 0

DLT 水印名称“窗口”未定义

我正在从流表中读取: df = Spark.readStream.option("ignoreChanges", "true").table(层次结构) 为了简单起见,我们只是说我需要获取列...

回答 1 投票 0

使用 Databricks 自动加载器读取以“§”作为分隔符的 CSV

我对 Spark Streaming 和自动加载器非常陌生,并且询问如何让自动加载器读取以“§”作为分隔符的文本文件。下面我尝试将文件读取为...

回答 1 投票 0

Spark 3.0 无法将非空数据写入iceberg

我有一个 avro 文件,其中有一个名为 timeStamp 的字段,这是一个强制字段,没有任何默认值。这意味着没有机会将该字段设置为空。架构定义如下 ...

回答 1 投票 0

使用 WHEN | 时出现意外行为否则

我们开发了一种流处理,它使用许多其他增量表来丰富最终的数据产品。 我们将其称为 FinalDataProduct,插入数据的增量表,semiLayout a

回答 1 投票 0

设置结构化流中的每个微批次数据计数

我们在 Databricks 中利用结构化流,使用 foreach 功能进行转换和操作,并最终将数据写入 Delta 表。我们的数据来源...

回答 1 投票 0


以JSON格式的字符串访问数组中的特定元素

我有一些流数据,可以像这样最小化地减少: { “数据”:[ { “钥匙”=1, “val”=“a” }, { ...

回答 1 投票 0

如何使用apache Spark进行视频流中的人脸检测

这是我试图解决的问题的背景: 我有一个视频文件(MPEG-2 编码)位于某个远程服务器上。 我的工作是编写一个程序来对这个v进行人脸检测...

回答 1 投票 0

pyspark - 结构化流式传输到弹性搜索

我正在编写一个代码,其中我尝试使用 pySpark 的结构化流将数据流式传输到弹性搜索中。 火花版本:3.0.0 安装模式:pip 查询 = inpJoinDF.writeStream \ .输出...

回答 2 投票 0

状态失败,403 禁止

我这个程序有问题,我使用 Spark Streaming 从 Twitter 获取帖子,但它无法获取帖子并出现此错误。执行的时候出现这个错误。 获取推文时出错:403

回答 1 投票 0

在 Dremio 中读取冰山表失败,原因是“不是 Parquet 文件”和“预期的幻数”

我有一个 Spark 结构化流作业,它从 Kafka 读取数据并将其作为 Apache Iceberg 表(通过 Nessie 目录)写入 S3(NetApp StorageGRID 设备,本地)。 然后我同意...

回答 1 投票 0

Spark 结构化流:由于缺少水印,将 DataFrame 写入 CSV 失败

我正在使用 火花,版本 3.4.1 PySpark,版本 3.4.1 Python,版本 3.11 使用 Spark 结构化流我想将 DataFrame 编写为 CSV 文件。 logsDF 是一个 pyspark.sql.dataframe.DataFrame w...

回答 2 投票 0

使用Python Spark Streaming从http下载数据

我是 PySpark 的新手,我在 Ubuntu 14.04 上安装了 Kafka 单节点和单代理。 安装后,我使用 kafka-console- Producer 和

回答 1 投票 0

如何在 Flink 的 SQL CLI 中保存查询结果

我正在使用 Flink SQL CLI 运行 SQL 查询, 我需要保存“SELECT ...NOW()...FROM ...”等查询的结果 其中包括时间戳。 我想保留 NOW() 为

回答 1 投票 0

无法使用 Pyspark 从 Azure 事件中心检索数据

我正在尝试使用 pysprak 从 Azure 事件中心检索数据。代码继续运行但不显示任何数据 EH_CONN_STR = '端点=sb://event-hub-18-jul.servicebus.windows.net/;

回答 1 投票 0

如何在spark结构流中并行写入?

我有多个数据帧,最后我将这些 DF 写入增量表中。 我需要并行写入 5 个增量表中的 5 个数据帧。 我们可以在笔记本上做这个吗? 我是...

回答 1 投票 0

尝试将spark写入mongodb时出错

我正在尝试使用 Spark Streaming 将数据附加到 mongodb,但遇到了一些问题。这是我的代码: def write_to_db(df, epoch_id): df.select(“id”,“生产公司”).sho...

回答 1 投票 0

EventHub Spark 使用证书身份验证的结构化流

我想在 Azure Synapse 中使用 pyspark 使用结构化流进行 EventHub 消息消费,有人指出我这个项目 https://github.com/alexott/databricks-playground/tree/main/kafka-

回答 1 投票 0

在 DELTA 到 DELTA 流式传输期间,列 `_rescued_data` 已经存在

我正在开发一个 Spark Streaming 作业,该作业从源 Delta 表读取数据并将其写入具有 SCD2 逻辑的目标 Delta 表。但是,我遇到了与“

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.