apache-spark 相关问题

Apache Spark是一个用Scala编写的开源分布式数据处理引擎,为用户提供统一的API和分布式数据集。 Apache Spark的用例通常与机器/深度学习,图形处理有关。

Spark 流“initialPosition”与“startingPosition”?

spark Streaming 中的initialPosition 和startingPosition 有什么区别?我已经阅读了 Spark 文档、Delta 表文档、O'Reilly 指南,...他们提到了两者,但没有提到区别...

回答 1 投票 0

如何从 PySpark DataFrame 批量处理项目

我有一个 PySpark 数据框,对于每条(批次)记录,我想调用一个 API。所以基本上说我有 100000k 条记录,我想将项目批量分成 1000 条组并调用 API。怎么...

回答 3 投票 0

如何将非常大的 Spark 数据帧写入 AWS S3 中的单个 csv 文件?

我有一个非常大的 Spark DataFrame,我需要将其作为单个 CSV 文件写入 AWS S3 存储桶(我使用 pySpark)。 我无法使用标准 csv_df.coalesce(1).write.csv() 方法,因为文件是...

回答 1 投票 0

Pyspark 错误:“EMR 7.0.0 中未找到类 org.apache.hadoop.fs.s3a.S3AFileSystem”

我使用的是EMR 7.0.0版本,AWS中有python 3.9,spark 3.5.0,Hadoop 3.3.6。 我收到错误: 文件“/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/

回答 1 投票 0

为什么我需要使用数据框来处理数据块中的查询? (pyspark、sparksql)

我正在和一个朋友学习databricks,有一件事我真的不明白。 我正在尝试在azure中存储帐户中的json文件中使用pyspark和spark sql进行查询。 丝路...

回答 2 投票 0

Pyspark 自加入需要大量时间

我有 pyspark df,我基于 2 列自行加入 cluster_id 具有不同计数的不同簇,unique_id 在每一行中都是唯一的。 df_filtered.repartition('簇...

回答 1 投票 0

从 SQS 驱动的 Pyspark 结构化流检索路径

我有一个 DMS 生成的 s3 数据湖,并设置 SQS 来跟踪生成的文件。现在我想将其流式传输到我的 EMR 集群中,为此我在此处找到了 Spark Streaming s3 连接器 https://git...

回答 1 投票 0

Spark 结构化流:流-流连接与不写入的聚合

平台:Databricks Notebooks|语言:PySpark 上下文:我正在尝试在流数据管道中构建一个节点,该节点评估所有存在的行数(例如 count(*) == Expected_co...

回答 1 投票 0

Spark 执行器标准输出到 Kubernetes 标准输出

我的 Spark 应用程序在 Spark Worker 中运行,将执行程序日志输出到特定文件路径:“/worker_home_directory/app-xxxxxxxx/0/stdout” 我使用 log4j.properties 来重定向日志...

回答 1 投票 0

写入Delta表时如何添加新列?

我正在使用 delta-rs 写入 Delta Lake 中的 Delta 表。这是我的代码: 导入时间 将 numpy 导入为 np 将 pandas 导入为 pd 将 pyarrow 导入为 pa 从 deltalake.writer 导入 write_deltalake

回答 2 投票 0

如何确保我的 Python 逻辑仅在 Apache Ray Worker 节点上运行?

我正在使用 Apache Ray 创建一个自定义集群来运行我的逻辑。但是,当我使用 ray.remote 提交任务时,它们是在驱动程序节点而不是工作节点上执行我

回答 1 投票 0

以下 Databricks SQL 的等效 Py Spark 代码是什么

我有下面的Databricks SQL代码(带有过滤条件)并想转换为Py Spark代码但无法得到任何想法。 我搜索了谷歌,但只得到了 Py Spark 过滤器条件,但我......

回答 1 投票 0

如何在 Spark SQL 中解析嵌套的 JSON 对象?

我有一个架构,如下所示。如何解析嵌套对象? 根 |-- apps: 数组 (nullable = true) | |-- 元素:结构(containsNull = true) | | |-- appName:字符串(可为空...

回答 5 投票 0

在 Spark 中读取 Avro 文件

我已将 avro 文件读入 Spark RDD,并需要将其转换为 sql 数据帧。我该怎么做。 这就是我到目前为止所做的。 导入 org.apache.avro.generic.GenericRecord 导入 org.apache.a...

回答 3 投票 0

spark-3.5.0-bin-without-hadoop :: 无法启动 thriftserver.sh

在 RHEL-8 Linux 服务器上, Hadoop 3.3.6 jdk 1.8 和 Spark-3.5.0-bin-without-hadoop, 当尝试从spark-3.5.0-bin-without-hadoop目录启动./sbin/start-thriftserver.sh时,它抛出b...

回答 1 投票 0

Spark skewedPartitionThresholdInBytes 未得到强制执行

对 Spark 比较陌生,对 PySpark 的行为感到好奇,其中, 使用 PySpark 执行内部联接(根据执行计划:SortMergeJoin)时,使用以下参数 e...

回答 1 投票 0

pyspark.errors.exceptions.base.PySparkRuntimeError:[JAVA_GATEWAY_EXITED] Java网关进程在发送其端口号之前退出

我已经下载了spark(https://spark.apache.org/downloads.html)并通过pip命令安装了pyspark。我也尝试了几乎所有在线建议的解决方案,但问题仍然存在

回答 1 投票 0

在误差范围条件下对 Spark 数据集进行分组

我试图通过使用数据集来熟悉 Spark。该数据集具有以下列: [“邮政编码”、“城市”、“地点”、“流行音乐”、“州”}...

回答 1 投票 0

如何缓存 Spark 会话终止后保留的数据帧

我是数据工程新手,所以这可能是一个基本问题。但我还没能澄清 - 语境 - 我有一个由 Azure 数据工厂管道每 10 分钟执行一次的 Spark 作业。 在每个

回答 1 投票 0

Spark 错误消息建议设置检查点,但即使添加后错误仍然存在

我有一个用 Scala 编写的 Spark 作业,偶尔会失败并显示以下消息: org.apache.spark.SparkException:作业由于阶段失败而中止: 输出不确定的随机映射阶段是

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.