Palantir Foundry 存储库 pyspark @transform 问题

问题描述 投票:0回答:1

我在 pyspark Foundry 存储库转换方面遇到问题。 到目前为止,我一直在使用@transform增量来满足我的需要,它允许我通过添加相关月份来每月将数据库排队到历史“_h”表中。但是,我现在还想做一件事,即如果月份表的“reference_dt”已经存在于历史表中(例如一个月的重新转换),则删除历史记录到该日期和队列的数据通过替换它来获取新数据。

为此,我想使用普通的变换


@transform (
Output_h = Output(table_h)
my_input = Input(table)       
                        )
def my_compute_function(my_input, Output_h):

df = my_input.dataframe()

distinct_year = df.select([F.max(“reference_dt”)]].distinct().collect()[0][0] 

output_prov = output_h.dataframe()
output_prov2 = output_prov.filter(F.col(“reference_dt”) != distinct_year)

if output_prov2.rdd.isEmpty():
    schema = output_prov.schema
    output_prov2 = output_prov.limit(0)
else:
    output_prov2 = output_prov2

output_new = output_prov2.union(df)

Output_h.write_dataframe.mode(“overwrite”)(output_new)

假设“reference_dt”在“my_input”表中始终是唯一的,因此distinct仅返回一个值,它给我带来的问题是无法识别的空模式错误,(无法分辨哪些表)。我想是因为历史 _h 表只有一个月,我试图用它自己覆盖它。 问题出在哪里? 你能帮我吗?抱歉,如果我错过了什么,如果有推荐,我会添加一些规格 谢谢

pyspark repository transform palantir-foundry
1个回答
0
投票

据我了解,您想要一个包含“当前月份”的数据集并每月刷新一次?这是正确的吗?

如果是这样,您可以将此数据集作为此转换输出的一部分。您不能同时在输出和输入中包含数据集,因为这构成了一个循环,这是不允许的。

但是,您可以将数据集作为输出,您可以从中读取数据。因此,严格来说,有效实现相同但没有循环。

那里有很多示例:https://www.palantir.com/docs/foundry/transforms-python/incremental-examples

典型代码是:

@incremental()
@transform(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def incremental_filter(students, processed):
    new_students_df = students.dataframe()
    current_output_df = processed.dataframe()

    # Do something with your current output, as it were an input
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.