pyspark 相关问题

Spark Python API(PySpark)将apache-spark编程模型暴露给Python。

打破数据框值并添加新的下一条记录

我有一个数据框 df 如下:一列(Column_1)和相应的 5 条记录如下 第_1栏 0000099598|000000|-1|0.00|需要映射到EDW|需要映射到EDW|break|006||0000099598|000000|

回答 1 投票 0

Pyspark 以分布式方式循环遍历值

我有以下使用Azure databricks(pyspark)开发的要求。 调用 Azure SQL 数据库以从表中读取列。 对于每个列值,请阅读上面的内容,调用 REST API,其中

回答 1 投票 0

为什么元数据消耗大量存储以及如何优化?

我在基于 HDFS 的数据湖上使用 PySpark 和 Apache Iceberg,并且遇到了严重的存储问题。我的应用程序每秒都会摄取实时数据。大约2小时后,我...

回答 1 投票 0

使用 Databricks 将 Spark 数据帧写入 Azure SQL Server 数据库时处理重复记录

问题陈述: 我们有一个 Databricks 作业,其中多个任务并行运行。每个任务都将 Spark 数据帧写入 Azure SQL 数据库表。每个任务都会写入自己的目标表......

回答 1 投票 0

使用 pyspark 列出 s3 存储桶中文件夹内的所有文件

我有一个 s3 存储桶“bucket1”,其中有一个名为“dir1”的目录。该目录内有多个文件。我只想创建...中所有文件名的列表

回答 1 投票 0

pyspark中的阅读词典专栏

我在 pyspark 数据框(字典)中有一个复杂的列。每行有三个键,string_value、timestamp 和 user_property。 User_property 包含其他数组中对应值的名称。

回答 1 投票 0

在 Pyspark SQL 查询中使用列名作为参数

在一个特定的 DataFrame 上,我有一个 SQL 查询,我想使用它两次,一次生成每日结果,一次获取每月结果。 我不能只是汇总每日信息,因为我没有-

回答 1 投票 0

如何使用pyspark VectorAssembler

我正在尝试使用pyspark的VectorAssembler函数,但它似乎无法正常工作。 我有一个 Twitter 数据的数据框,其中每个主题标签为一行,每一天为一列

回答 2 投票 0

无法将写入的kafka主题中的数据推送到Postgres表

我正在尝试将写入Kafka主题的数据加载到Postgres表中。我可以看到该主题每秒都在接收新消息,而且数据看起来不错。 然而,当我使用 b...

回答 1 投票 0

如何使用类似递归的操作在 PySpark 中计算累积衰减和?

我有一个 PySpark DataFrame,如下所示: ID 编号2 id3 h_生成 衰减因子 h_总计 1 164 1 149.8092121 1 164 2 1417.298433 0.944908987 1558.854504 1 164 3 3833.995761 0.886920437 5216.

回答 1 投票 0

如何使用 dbutils 获取 adls gen2 中文件的创建时间

我正在尝试获取存储在 ADLS gen2 中的文件的创建时间。该文件由下游进程生成。在databricks中,通过读取文件创建数据框,我需要创建的tim...

回答 1 投票 0

Apache Spark 警告“在 RowBasedKeyValueBatch 上调用溢出()”的含义

我正在使用 Apache Spark 本地模式运行 pyspark 2.2.0 作业,并看到以下警告: 警告 RowBasedKeyValueBatch:在 RowBasedKeyValueBatch 上调用溢出()。不会溢出但返回0。 ...

回答 4 投票 0

无法通过pyspark格式化kafka主题数据

我尝试使用以下代码通过 KSQL 将数据推送到 Kafka 主题: 创建流测试01(KEY_COL VARCHAR KEY、COL1 INT、COL2 VARCHAR) with (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT...

回答 1 投票 0

从元组列表创建 Spark DataFrame

我正在使用 CIFAR10 数据,并创建了包含以下数据的元组列表: (6.0, [0.23137255, 0.24313726, 0.24705882, 0.16862746, 0.18039216, 0.1764706, 0.19607843, .....

回答 1 投票 0

如何在Databricks上的纯sql中将字符串拆分为相等长度的字符串数组

我在表中有一个列,其中包含可变长度的字符串: |价值| |-------------| |abcdefgh | |1234567891011| 我需要将字符串拆分为字符串数组,其中每个字符串......

回答 1 投票 0

无法通过Kafka、JDBC源连接器和pyspark获取正确格式的postgres数据

我在Postgres中创建了一个表: 如果不存在则创建表 public.sample_a ( id 文本 COLLATE pg_catalog."default" NOT NULL, is_active 布尔值 NOT NULL, is_deleted 布尔值 ...

回答 1 投票 0

数据帧中的计数与从该数据帧创建的临时视图之间的差异

步骤1 我有一个从增量表创建的数据框。 df=spark.read.format(delta).load(路径) 步骤2 我正在从该数据帧创建一个临时视图 df.createorreplacetempbiew(dfview) 现在当我

回答 1 投票 0

将数据插入到具有更改架构的增量表中

如何将数据插入到 Databricks 中更改架构的增量表中。 在 Databricks Scala 中,我分解了 Map 列并将其加载到增量表中。我有一个预定义的 delta 模式...

回答 2 投票 0

如何在 PySpark 列中搜索值序列

我有一个带有“时间”列和“值”列的数据框。例子: 从 pyspark.sql 导入 SparkSession Spark = SparkSession.builder.appName("示例").getOrCrea...

回答 1 投票 0

在pyspark中使用IOUtils.setByteArrayMaxOverride

我遇到了错误 “org.apache.poi.util.RecordFormatException:尝试分配长度为 100,335,238 的数组,但此记录类型的最大长度为 100,000,000。” 当尝试...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.