pyspark 相关问题

Spark Python API(PySpark)将apache-spark编程模型暴露给Python。

如何在 fs.azure.account.oauth2.msi.endpoint 中动态设置“api-version”

目前我正在通过 pyspark 库使用 hadoop-azure-3.4.1 连接到 ABFS。根据文档 - https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity -...

回答 1 投票 0

如何在特定条件下连接 2 个 DataFrame?

我有这两个数据框: df1: +---+----------+----------+ |id |id_special|date_1 | +---+----------+----------+ |1 |101 |2024-11-01| |2 |102 |2024-11-03| |3 |103 |2024-11-0...

回答 1 投票 0

如何使用pyspark启动独立集群?

我在ubuntu下使用pyspark和python 2.7 我安装它使用 pip 安装 pyspark --用户 并尝试按照说明设置 Spark 集群 我找不到脚本启动大师...

回答 3 投票 0

Glue S3 CSV 负载改进

我们有glue,它使用pyspark load()从s3存储桶读取csv文件。 read_s3_files = Spark.read.format("csv") \ .option("标题", True) \ .选项(“

回答 1 投票 0

Pyspark 中最多两列

这应该很简单,但我还是没有找到方法。我必须计算一个新列,其值为列 col1 和 col2 的最大值。所以如果 col1 是 2 并且 col2 是 4,则 new_col 应该有 4....

回答 1 投票 0

spark_catalog 需要 dbt python 增量模型中的单部分命名空间

说明: 使用允许创建 Python 模型的 dbt 功能,我创建了一个模型,该模型从某些 BigQuery 表中读取,执行一些计算并写回 BigQuery。 我...

回答 1 投票 0

将 Spark 数据帧转换为 pandas 时出错:类型错误:不支持转换为无单位 dtype 'datetime64'。通过例如改为“datetime64[ns]”

我将创建一个演示数据框来重新创建我在数据块中看到的错误。 从 pyspark.sql.types 导入 StructType、StructField、TimestampType、StringType 从日期时间导入日期时间 # 定义...

回答 1 投票 0

利用大 Parquet 数据和高磁盘使用率优化 PySpark 作业

我目前正在优化一个 PySpark 作业,该作业涉及跨大型数据集的一些聚合。我对处理大规模数据相当陌生,并且遇到了磁盘问题...

回答 1 投票 0

并行线程池执行器

我有 PySpark 代码,它很少对外部系统进行 POST API 调用。 对于输入数据帧中的每一行,我需要触发 POST API 请求(使用 Python 代码)以在外部创建一个条目...

回答 1 投票 0

spark-submit 本地模式下的 PySpark 虚拟环境问题

我正在尝试在本地模式下使用spark-submit在python虚拟环境中运行python程序,即使pyspark未安装在虚拟环境中,它仍然可以运行而不会失败。

回答 1 投票 0

根据模式读取 Spark DataFrame 中的数据,但不区分大小写

我在sparkconf中设置了“spark.sql.caseSensitive”,“False”。 这就是我创建架构的方式: 架构={ “类型”:“结构”, “字段”:[ ...

回答 1 投票 0

有什么技术可以解决databricks中的倾斜数据吗?

我创建了倾斜数据来测试加盐方法,并尝试了三种不同的解决方案,但没有一个能够通过显着的运行时间改进来达到预期的结果。你能指导我最好的应用吗...

回答 1 投票 0

使用加载到静态文件中的不同环境变量来测试多个 pyspark 管道

好吧,我有一个问题- 背景信息:我是一名新 QA,在一个使用 pyspark 作为 ETL 引擎的团队中工作。 我使用 pytest 作为我的测试框架。 我们有多个管道(ETL,我将使用这个...

回答 1 投票 0

使用事件中心管理 PySpark Streaming 中的数据封装

使用 PySpark 流式传输数据时,我收到的主要消息封装在名为“body”的键中。 Spark.readStream.format("eventhubs").options(**ehConf).load() 难道是……

回答 1 投票 0

调整 Spark 作业参数

我拥有约 100 个 CPU 节点,每个节点有 192 个内核和 1.5TB RAM。 我正在运行一些大型 Spark 作业(每个作业在 40 个实例上),但我真的不确定调整 Spark 的最佳方法是什么

回答 1 投票 0

在 pyspark 中旋转时跨不同类型的多个列进行聚合

我有一张融化的桌子,其形式为: +------+---------+--------------+------------+---- ----------+ |时间 |频道 |双值 |长值 |值字符串 | +------+---------+--------------+----...

回答 1 投票 0

如何在pySpark中distnct后进行压缩

以下程序在 zip 步骤中失败。 x = sc.parallelize([1, 2, 3, 1, 2, 3]) y = sc.parallelize([1, 2, 3]) z = x.distinct() 打印 x.zip(y).collect() 产生的错误取决于 w...

回答 2 投票 0

如何从iPython界面找出pyspark的内存量?

我用命令启动 IPYTHON=1 MASTER=本地[4] pyspark Spark 向我打招呼 欢迎使用 Spark,版本 1.2.1 SparkContext 可用作 sc。 但是使用sc,我找不到内存...

回答 2 投票 0

(Py) Spark - 在一段时间内按用户分组

我正在处理大量日志文件,我想将作业转移到 Spark,但我不知道如何像在 Pandas 中轻松地那样在基于事件的时间窗口上聚合事件。 这就是确切的...

回答 2 投票 0

对于包含 repartition() 的 Spark 作业,“尚未开始任何任务”

在对 pyspark 作业的“尚未开始任何任务”摸不着头脑一段时间后,问题已被隔离为: 作品: ssc = HiveContext(sc) sqlRdd = ssc.sql(someSql) sqlRdd.collect()

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.