如何将数组中的值分配给具有从列表中命名的新列的表

问题描述 投票:0回答:2

我有一个这样的表,这里有一个名为 input_values 的列,数组中的值转到 ml 模型。每个数组的第一个值转到 model1,第二个值转到 model2 ..等等。

| job_id | timestamp                          | input_values      |
|:----   |:----------                         | -----------       |                     
| job1   | 2023-03-01T19:12:00.000+0000       | [0.12,0.34,0.23]  |
| job2   | 2023-03-01T19:13:00.000+0000       | [0.23,0.55,0.12]  |
| job3   | 2023-03-01T19:14:00.000+0000       | [0.23,0.12,0.32]  |

我还有一张这样的桌子 列有 ml 输出结果

    | model_A| model_B | model_C |
    |:---- |:------:   | -----:  |
    | 1    | 1         |  1      |
    | 1    | 1         |  0      |
    | 0    | 1         |  1      |

我想做的是我想将输入值列添加到输出表中。 我将创建一个输入名称列表

input_names = [input_A,input_B,input_C]

这是我最终需要的桌子。

| model_A| model_B | model_C |input_A|input_B|input_C|
|:---- |:------:   | -----:  |----   |----   |----   |
| 1    | 1         |1        |0.12 |0.34  |0.23  |
| 1    | 1         | 0       |0.23 |0.55   |0.12 |
| 0    | 1         | 1       |0.23| 0.12  |0.32   |

如何使用 pyspark 执行此操作?

任何帮助将不胜感激!

pyspark databricks azure-databricks
2个回答
0
投票

以下是我的假设,我假设列input_values的长度在数据中是相同的。此外,您只想将列中的数组转换为具有下面给出的值的列。

input_names = [input_A,input_B,input_C]

我希望下面的代码对你有用。

import pyspark.sql.functions as func

leng = len(df1.first()['input_values'])

df1.select([func.col('input_values')[i].alias(input_names[i]) for i in range(leng)]).show()

上面的代码应该给你如下输出

+-------+-------+-------+
|input_A|input_B|input_C|
+-------+-------+-------+
|   0.12|   0.34|   0.23|
|   0.23|   0.55|   0.12|
|   0.23|   0.12|   0.32|
+-------+-------+-------+

希望以上是您的期望。


0
投票

我在我的环境中复制了同样的东西。我得到了这个输出。

创建两个数据框并在行索引上连接两个数据框,选择要包含在输出表中的列。

示例代码:

from pyspark.sql.functions import monotonically_increasing_id   
# create sample dataframe for the input table
d1 = [(0.12, 0.34, 0.23), (0.23, 0.55, 0.12), (0.23, 0.12, 0.32)]
d1_cols = ['input_A', 'input_B', 'input_C']

df1 = spark.createDataFrame(d1, d1_cols)
input_df1 = df1.withColumn('id', monotonically_increasing_id())
 

# create Another dataframe.
op_data = [(1, 1, 1), (1, 1, 0), (0, 1, 1)]
op_cols = ['model_A', 'model_B', 'model_C']
op_df = spark.createDataFrame(op_data, op_cols)
op_df = op_df.withColumn('id', monotonically_increasing_id())
  

# join the two tables on the row index.
joined_df1 = op_df.join(input_df1, 'id', 'inner')

 
# select the columns what you want to include in the output table using `select`.
col_select = ['model_A', 'model_B', 'model_C', 'input_A', 'input_B', 'input_C']
result_df = joined_df1.select(col_select)
   

# show the final table
result_df.show()

enter image description here

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.