input_list=[输入A,输入B,输入C]
我下面有一张这样的桌子
job_id | timestamp | input_values |
|:---- |:---------------------------------- | ------------------|
| job1 | 2023-03-01T19:12:00.000+0000 | [0.12,0.34,0.23] |
| job2 | 2023-03-01T19:13:00.000+0000 | [0.23,0.55,0.12] |
| job3 | 2023-03-01T19:14:00.000+0000 | [0.23,0.12,0.32] |
我想把它变成这样的桌子
job_id | timestamp | input_values |inputA|inputB|inputC|
|:---- |:------------------------------| ------------------|------|------|------|
| job1 | 2023-03-01T19:12:00.000+0000 | [0.12,0.34,0.23] |0.12 |0.34 |0.23 |
| job2 | 2023-03-01T19:13:00.000+0000 | [0.23,0.55,0.12] |0.23 |0.55 |0.12 |
| job3 | 2023-03-01T19:14:00.000+0000 | [0.23,0.12,0.32] |0.23 |0.12 |0.32 |
如何使用 Pyspark 执行此操作?谢谢你!还需要性能调优,而且这个表太大了。
您可以创建以列表中的值命名的列,并使用 withColumn 方法从 PySpark 中的数组分配值。这是一个例子:
from pyspark.sql import functions as F
input_list = ["inputA", "inputB", "inputC"]
df = df.select("*", *[F.col("input_values")[i].alias(c) for i, c in enumerate(input_list)])
此代码将创建名为 inputA、inputB 和 inputC 的新列,并将 input_values 数组列中的值分配给这些新列。
更通用的解决方案
输入数据框
data = [("job1", "2023-03-01T19:12:00.000+0000", [0.12,0.34,0.23]),
("job2", "2023-03-01T19:13:00.000+0000", [0.23,0.55,0.12]),
("job3", "2023-03-01T19:14:00.000+0000", [0.23,0.12,0.32])]
df = spark.createDataFrame(data, ["job_id", "timestamp", "input_values"])
df.show(truncate=False)
+------+----------------------------+------------------+
|job_id|timestamp |input_values |
+------+----------------------------+------------------+
|job1 |2023-03-01T19:12:00.000+0000|[0.12, 0.34, 0.23]|
|job2 |2023-03-01T19:13:00.000+0000|[0.23, 0.55, 0.12]|
|job3 |2023-03-01T19:14:00.000+0000|[0.23, 0.12, 0.32]|
+------+----------------------------+------------------+
用
input_values
列的值创建字典,我们可以使用collect_list()
函数创建所有input_values
的数组,然后使用create_map()
将该数组转换为字典。 input_values
列表中的列在input_values_dict
中创建新列。 有关详细信息,请参见下面的实现-
from pyspark.sql.functions import *
import string
input_values_dict = df.select(create_map(lit("input_values"), collect_list("input_values")).alias("dict")).collect()[0]["dict"]
# type(input_values_dict)
# print(input_values_dict)
# Create new columns for each element in input_values list
num_cols = len(input_values_dict['input_values'][0])
for i in range(num_cols):
col_name = f"input{string.ascii_uppercase[i]}"
df = df.withColumn(col_name, col('input_values')[i])
df.show(truncate=False)
+------+----------------------------+------------------+------+------+------+
|job_id|timestamp |input_values |inputA|inputB|inputC|
+------+----------------------------+------------------+------+------+------+
|job1 |2023-03-01T19:12:00.000+0000|[0.12, 0.34, 0.23]|0.12 |0.34 |0.23 |
|job2 |2023-03-01T19:13:00.000+0000|[0.23, 0.55, 0.12]|0.23 |0.55 |0.12 |
|job3 |2023-03-01T19:14:00.000+0000|[0.23, 0.12, 0.32]|0.23 |0.12 |0.32 |
+------+----------------------------+------------------+------+------+------+