我有一个这样的表,这里有一个名为 input_values 的列,数组中的值转到 ml 模型。每个数组的第一个值转到 model1,第二个值转到 model2 ..等等。
| job_id | timestamp | input_values |
|:---- |:---------- | ----------- |
| job1 | 2023-03-01T19:12:00.000+0000 | [0.12,0.34,0.23] |
| job2 | 2023-03-01T19:13:00.000+0000 | [0.23,0.55,0.12] |
| job3 | 2023-03-01T19:14:00.000+0000 | [0.23,0.12,0.32] |
我还有一张这样的桌子 列有 ml 输出结果
| model_A| model_B | model_C |
|:---- |:------: | -----: |
| 1 | 1 | 1 |
| 1 | 1 | 0 |
| 0 | 1 | 1 |
我想做的是我想将输入值列添加到输出表中。 我将创建一个输入名称列表
input_names = [input_A,input_B,input_C]
这是我最终需要的桌子。
| model_A| model_B | model_C |input_A|input_B|input_C|
|:---- |:------: | -----: |---- |---- |---- |
| 1 | 1 |1 |0.12 |0.34 |0.23 |
| 1 | 1 | 0 |0.23 |0.55 |0.12 |
| 0 | 1 | 1 |0.23| 0.12 |0.32 |
如何使用 pyspark 执行此操作?
任何帮助将不胜感激!
以下是我的假设,我假设列input_values的长度在数据中是相同的。此外,您只想将列中的数组转换为具有下面给出的值的列。
input_names = [input_A,input_B,input_C]
我希望下面的代码对你有用。
import pyspark.sql.functions as func
leng = len(df1.first()['input_values'])
df1.select([func.col('input_values')[i].alias(input_names[i]) for i in range(leng)]).show()
上面的代码应该给你如下输出
+-------+-------+-------+
|input_A|input_B|input_C|
+-------+-------+-------+
| 0.12| 0.34| 0.23|
| 0.23| 0.55| 0.12|
| 0.23| 0.12| 0.32|
+-------+-------+-------+
希望以上是您的期望。
我在我的环境中复制了同样的东西。我得到了这个输出。
创建两个数据框并在行索引上连接两个数据框,选择要包含在输出表中的列。
示例代码:
from pyspark.sql.functions import monotonically_increasing_id
# create sample dataframe for the input table
d1 = [(0.12, 0.34, 0.23), (0.23, 0.55, 0.12), (0.23, 0.12, 0.32)]
d1_cols = ['input_A', 'input_B', 'input_C']
df1 = spark.createDataFrame(d1, d1_cols)
input_df1 = df1.withColumn('id', monotonically_increasing_id())
# create Another dataframe.
op_data = [(1, 1, 1), (1, 1, 0), (0, 1, 1)]
op_cols = ['model_A', 'model_B', 'model_C']
op_df = spark.createDataFrame(op_data, op_cols)
op_df = op_df.withColumn('id', monotonically_increasing_id())
# join the two tables on the row index.
joined_df1 = op_df.join(input_df1, 'id', 'inner')
# select the columns what you want to include in the output table using `select`.
col_select = ['model_A', 'model_B', 'model_C', 'input_A', 'input_B', 'input_C']
result_df = joined_df1.select(col_select)
# show the final table
result_df.show()
输出: