pyspark 相关问题

Spark Python API(PySpark)将apache-spark编程模型暴露给Python。

如何从特定事件中心分区 Spark-Streaming 结构读取

我有一个具有 32 个分区的事件中心。 我需要使用 Pyspark 从事件中心读取分区 1 。 这是我现有的代码 # 配置 连接字符串 = "端点=sb://abcd" 事件中心名称...

回答 1 投票 0

如何优化 pyspark 中的链式多重连接

快要跑好几天了,连跑都不跑 df = ( A .join(b, on='aa', how='left') .join(c, on='aa', how='left') .join(d, on='aa', how='left') .joi...

回答 1 投票 0

将自定义模块导入 AWS EMR

我有一个包含“main.py”文件的 s3 存储库,其中包含我构建的自定义模块(在“Cache”和“Helpers”内): 我的“main.py”文件如下所示: 从 pyspark.sql 导入 SparkSession 来自

回答 1 投票 0

Tweepy:Twitter 错误响应:状态代码 = 403:对 Azure 解决方案笔记本 (Spark) 中的推文提取进行故障排除

我在使用 Tweepy v3.10.0 和 Twitter API 提取推文时遇到 403 错误。搜索 API 在 Postman 中工作正常,表明笔记本 Ingest_Process_ 中存在身份验证问题...

回答 1 投票 0

在增量表上执行 Z ORDERING 时,列的顺序重要吗?

我有一个巨大的(批量数据)增量表,想对 2 列执行 Z 排序。 TRASACTION_ID - 高基数 PRODUCT_ID - 基数相对较少 而 Z ORDERING 哪一列应该...

回答 1 投票 0

Azure databricks pyspark 中自动生成且唯一的 bigint 类型 id

我想创建一个自动生成的ID,它必须是唯一的。如果我使用 monotonically_increasing_id 它会生成唯一的 id,但仅适用于特定的作业运行,当下一批运行时,它会...

回答 1 投票 0

运行Python/PySpark脚本时环境变量错误

有没有简单的方法来修复此错误: 缺少 Python 可执行文件“python3”,SPARK_HOME 环境变量默认为“C:\Users\user1\Anaconda3\Lib\site-packages\pyspark in\..”。请在...

回答 2 投票 0

如何用递增值桥接空行?

我在 Databricks 中工作,我有一个数据框(我们称之为“df”),其中一列中有几个空行: +----+------------+--------+ |编号 |日期 |价值| +----+------------+--------+ |...

回答 1 投票 0

为什么我在spark中的峰值执行内存是0?

数据文件(CSV)说明: 磁盘上 12.1GiB 序列化内存 - 1.9 GiB 我运行以下代码: ###进口 导入pyspark 从 pyspark.sql 导入 SparkSession 从 pyspark.sql 导入函数...

回答 2 投票 0

从 Pyspark 中的字符串类型转换为时间戳

如何在 Pyspark 中将字符串类型转换为时间戳 string类型数据是这样的2024-04-02-19.02.20.000000。我需要时间戳中的此列数据。 列是String类型,需要转换...

回答 1 投票 0

如何使用 Py-spark 从 Fabric 中的 Azure 事件中心读取压缩数据

我正在将压缩数据发送到事件中心,以克服 Azure 事件中心中的 1 MB 硬限制。 我还必须在 Py-spark 中阅读此内容并更新增量表。 发送到事件中心的压缩数据已完成...

回答 1 投票 0

IllegalArgumentException:java.net.URISyntaxException:通过 PySpark 访问 s3 存储桶数据时

我正在尝试通过 Jupyter 笔记本中的 PySpark 读取镶木地板。 sc = SparkSession.builder.getOrCreate() conf = SparkConf().setAppName("分类器") sc._jsc.hadoopConfiguration().set(...

回答 1 投票 0

Databricks 合并到 - 添加插入另一个表的条件

我有一个名为“结果”的增量表,我有一个需要按顺序对结果执行的操作列表。 给定操作列表:['op_1', 'op_2', 'op_3'] op_1 修改 va...

回答 1 投票 0

在 Windows 中初始化 SparkSession 时出错

我在 Windows 系统中初始化 SparkSession 时收到以下错误消息。 我已按照以下步骤操作,并将相应的路径添加为 Env Varaiable: 安装 Java 11 并设置

回答 1 投票 0

从 kafka 读取的 Spark 结构化流作业未显示在 kafka 消费者组中

我使用 pyspark 创建了一个 Spark 流作业,它使用 readStream 从 kafka 主题读取数据,并使用 writeStream 写入 Oracle 数据库中的表。 作业可以成功读取...

回答 1 投票 0

如何使用 PySpark 计算 ADLS 中的目录大小?

我想计算包含子文件夹和子文件的目录(例如XYZ)大小。 我想要所有文件和 XYZ 内所有内容的总大小。 我可以找到

回答 6 投票 0

将字符串转换为带有毫秒和时区的日期时间 - Pyspark

我有以下3个时间记录(最初它是以字符串形式出现)要转换为时间戳。 创建日期 2022-01-24 08:37:21.6670097-06:00 2022-01-26 14:11:54.1950238-06:00 2022-12-28 09:0...

回答 1 投票 0

尝试在单个 pyspark 数据帧中进行多个连接

在不涉及太多细节的情况下,我试图将一堆行合并到一个条目中。我在下面简化了“源”和“目标”数据框。 来源DF: 我...

回答 1 投票 0

Synapse Notebook 中的 Spark DataFrame 分区覆盖问题:空分区未覆盖

我在 Synapse Notebook 环境中实现 Spark DataFrame 分区覆盖时遇到了问题。尽管成功地从分区中删除了指定的帐号,但如果...

回答 1 投票 0

如果条件为真则合并 PySpark 表

如果外部数据加载满足特定条件,我们会尝试更新我们的表。如果我们尝试使用我们的解决方案,代码会遇到几个错误。 输出应该更新我们的 Spark t...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.