在我的项目中,我使用 hadoop-hive 和 pyspark。
我的表由此查询创建。
CREATE TABLE target_db.target_table(
id string)
PARTITIONED BY (
user_name string,
category string)
我得到这样的数据。
user_name: john
data list: # id, category
[('1', 'warrior'), ('2', 'warrior'), ('3', 'knight'), ...]
我想将此数据附加到
target_db.target_table
.
首先,我试试这个。
columns = ["account_id", "category"]
# creating a dataframe
df_user_list = spark_session.createDataFrame(id_list, columns)
file_path = f"{host}/target_db.db/target_table/user_name={user_name}"
df_user_list.write.partitionBy('category').mode("overwrite").parquet(file_path)
for category in category_list:
spark_session.sql(f"ALTER TABLE target_db.target_table DROP IF EXISTS PARTITION (user_name='{user_name}', category='{category}')")
spark_session.sql(f"ALTER TABLE target_db.target_table ADD PARTITION (user_name='{user_name}', category='{category}')")
上面的代码是有效的。 但是,在真实数据中,我无法得到
category_list
。另外,我不能使用“覆盖”,我需要“追加”模式。所以我不能使用上面的代码。
现在,我找到了一个解决方案。
columns = ['id', "user_name", "category"]
result = []
for item in id_list:
result.append((item[0], 'test.user', item[1]))
# creating a dataframe
df_user_list = spark_session.createDataFrame(result, columns)
df_user_list.write.mode("append").insertInto('target_db.target_table')
上面的代码是完美的。 但是,我认为添加
user_name
太奇怪了。这是相同的数据,我认为有更有效的解决方案。
我想要这样好的解决方案。
columns = ['id', "category"]
# creating a dataframe
df_user_list = spark_session.createDataFrame(id_list, columns)
df_user_list.write.mode("append")
.selectCategory(['user_name', 'test_user']) # how to do this?
.insertInto('target_db.target_table')
这样的事情怎么样:
columns = ['id', "category"]
# creating a dataframe
df_user_list = spark_session.createDataFrame(id_list, columns)
df_user_list.write.mode("append") \
.withColumn('user_name', F.lit(test_user)) \
.insertInto('target_db.target_table')
P.D.:我还没有测试过。