我需要访问output_df的整个数据才能与input_df进行比较。我检查了事务视图input_df和output_df,它们都是快照。目前,output_df 有 250 行,我尝试使用“当前”、“添加”和“上一个”获取 output_df 的完整数据,但它们在预览模式下都返回 0 行。
@configure(profile=['KUBERNETES_NO_EXECUTORS'])
@incremental(semantic_version=17)
@transform(
input_df=Input('ri.foundry.lava-catalog.dataset.c855c91b-3e73-4803-84bd-7d35c45f724c'),
output_df=Output('ri.foundry.lava-catalog.dataset.4802782e-4436-4bdf-87f3-5457245574c1')
)
def incremental_filter(input_df, output_df):
df_new = input_df.dataframe('added')
df_new = df_new.withColumn('Start', to_timestamp(col('Start'), 'yyyy-MM-dd HH:mm:ss'))
df_new = df_new.withColumn('End', to_timestamp(col('End'), 'yyyy-MM-dd HH:mm:ss'))
print("df_new columns are {}".format(df_new.columns))
print('----------------------------------')
# Load previous dataframe
print(output_df.dataframe('current', schema=schema).localCheckpoint().count())
df_previous = output_df.dataframe('current', schema=schema)
print("df_previous current count: ", df_previous.count()) #0 rows
df_previous = output_df.dataframe('previous', schema=schema)
print("df_previous previous count: ", df_previous.count()) #0 rows
df_previous = output_df.dataframe('added', schema=schema)
print("df_previous added count: ", df_previous.count()) #0 rows
#Doing some comparisons here
# ...................
# --------------------------
mode = 'replace'
output_df.set_mode(mode)
# Write the output dataframe
output_df.write_dataframe(df_union)
如何获取代码库中output_df的全部数据?
Note that the Code Repositories preview feature will always run transforms in non-incremental mode. This is true even when require_incremental=True is passed into the incremental() decorator.
如果运行完整构建,您应该会成功。