我在 azure datalake 中加载了多个表(每个表的 csv 文件),并且想使用自动加载器加载 Databricks Delta 表中的每个表。
我有一个Python代码,我使用
for loop
来创建每个表的模式,创建df
,然后writeStream
df
。
我还有函数
update_insert
,我可以在其中进行一些数据操作,并且还包含 merge
函数来更新插入增量表。
这是我的功能代码:
def update_insert(df, epochId, cdm):
# clean only 100% identical rows'
print("------------------- " + cdm)
df = df.dropDuplicates()
w = Window.partitionBy("Id").orderBy(F.col("modifiedon").desc())
df = df.withWatermark("modifiedon", "1 day").withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop('rn')
# final =df.join(agg, on=["id", "modifiedon"], how="right")
dfUpdates = df.withColumnRenamed("id","BK_id")
p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column} and '
string_insert = ''
for column in list_of_BK_columns:
string_insert += f'table.{column} = newData.{column} and '
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
print("printing " + cdm + " columns")
print(dfUpdates.columns)
deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/D365/{cdm}"+"_autoloader_nodups")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"),
string
) \
.whenMatchedUpdate(set =
dictionary
) \
.whenNotMatchedInsert(values =
dictionary
) \
.execute()
以上功能在自动加载器的
foreachBatch
中使用如下:
for entity in manifest.collect()[0]['entities']:
cdm = entity.asDict()['name']
print(cdm)
schema = StructType()
length = len(entity.asDict()['attributes']) - 1
for index1, attribute in enumerate(entity.asDict()['attributes']):
if (attribute.asDict()['dataType'] in ('int32', 'time')) and (index1 != length):
field = StructField(attribute.asDict()['name'],IntegerType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('dateTime') and (index1 != length):
field = StructField(attribute.asDict()['name'],TimestampType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('string') and (index1 != length):
field = StructField(attribute.asDict()['name'],StringType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('int64') and (index1 != length):
field = StructField(attribute.asDict()['name'],LongType(),True)
schema.add(field)
elif attribute.asDict()['dataType'] in ('decimal') and (index1 != length):
field = StructField(attribute.asDict()['name'],DecimalType(38, 20),True)
schema.add(field)
elif index1 == length:
field = StructField(attribute.asDict()['name'],StringType(),True)
schema.add(field)
LastColumnName = attribute.asDict()['name']
LastColumnDataType = attribute.asDict()['dataType']
else:
field = StructField(attribute.asDict()['name'],StringType(),True)
schema.add(field)
# Define variables
checkpoint_directory = f"abfss://[email protected]/D365/checkpoints/{cdm}"
data_source = f"abfss://[email protected]/*/{cdm}/*.csv"
source_format = "csv"
# Configure Auto Loader to ingest csv data to a Delta table
print("schema for " + cdm)
# print(schema)
df = (
spark.readStream
.option("delimiter", ",")
.option("quote", '"')
.option("mode", "permissive")
.option("lineSep", "\r\n")
.option("multiLine", "true")
.format("cloudFiles")
.option("cloudFiles.format", source_format)
# .option("cloudFiles.schemaLocation", checkpoint_directory)
.option("cloudFiles.inferColumnTypes","true")
.option("header", "false")
.option("escape", '"')
.schema(schema)
.load(data_source)
)
print("writing " + cdm)
# print(df.columns)
df.writeStream.format("delta").foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
问题是每个循环都没有按预期工作。我已将打印语句添加到代码中,以查看为哪些表创建了哪些
df
。
例如:
print(cdm)
(cdm
是表的名称),输出为 msdyn_workorder
print("schema for " + cdm)
并且输出是 schema for msdyn_workorder
print("writing " + cdm
,输出是 writing msdyn_workorder
这就是出错的地方,因为下一个打印应该给出函数内部打印的输出
print("------------------- " + cdm)
。相反,它的作用是打印下一个表名称 print(cdm)
,即 nrq_customerassetproperty
,因此再次开始 for 循环(我只有两个表,因此 for loop
应该运行两次)。
然后继续相同的打印语句顺序
print("schema for " + cdm)
,输出为 schema for nrq_customerassetproperty
print("writing " + cdm
并且输出正在写入 nrq_customerassetproperty在这里它开始打印
def
中的内容,例如:print("------------------- " + cdm)
,print("schema for " + cdm)
有输出printing nrq_customerassetproperty columns
。
随着下一次打印,当我询问
print(dfUpdates.columns)
应该是 df
时,我会在 for each
循环中阅读。它打印前一个 df
的列。在本例中,msdyn_workorder
的列。
我不知道哪里出了问题。是不是流数据有一些问题
for loop
s?
打印报表的屏幕截图。 请注意,其打印
printing nrq_customerassetproperty columns
,但列确实对应于msdyn_workorder
表。
将
cdm
传递到 foreachBatch
函数中,如下所示。
lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)
因为当您在 lambda 中未指定 cdm 的情况下传递时,它将采用 cdm 值 来自外部作用域,这是 lambda 创建时的值。
下面是使用的更新函数。
def update_insert(df, epochId,cdm):
print(epochId)
df.show()
print("------------------- " + cdm)
print("printing " + cdm + " columns")
print(df.columns)
我运行了你的
writeStream
代码。
输出:
看起来下一个循环的打印语句最初是因为
foreachBatch
函数针对每批流数据异步且重复地运行。无论 foreachBatch
之外的打印语句都在驱动程序中打印并在那里打印。
以下是输出。
msdyn_workorder
schema for msdyn_workorder
writing msdyn_workorder
2023-09-05T09:03:10.400+0000: [GC (Allocation Failure) [PSYoungGen: 1857533K->64491K(1965056K)] 2088300K->295274K(6238720K), 0.0468967 secs] [Times: user=0.09 sys=0.02, real=0.05 secs]
Next Df
nrq_customerassetproperty
schema for nrq_customerassetproperty
writing nrq_customerassetproperty
Next Df
2023-09-05 09:03:16,220 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'publicFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,222 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'privateFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,223 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.UsageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,224 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.ProductLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,225 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.LineageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,226 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.MetricsLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,227 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'dltExecution.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05T09:03:16.237+0000: [GC (Metadata GC Threshold) [PSYoungGen: 782632K->62922K(1994240K)] 1013415K->293722K(6267904K), 0.0367179 secs] [Times: user=0.09 sys=0.01, real=0.04 secs]
2023-09-05T09:03:16.274+0000: [Full GC (Metadata GC Threshold) [PSYoungGen: 62922K->0K(1994240K)] [ParOldGen: 230799K->105180K(4273664K)] 293722K->105180K(6267904K), [Metaspace: 254605K->254253K(1290240K)], 0.2592507 secs] [Times: user=0.56 sys=0.01, real=0.25 secs]
2023-09-05T09:03:21.380+0000: [GC (Allocation Failure) [PSYoungGen: 1843200K->28324K(1985536K)] 1948380K->133525K(6259200K), 0.0179690 secs] [Times: user=0.04 sys=0.00, real=0.02 secs]
0
------------------- nrq_customerassetproperty
printing nrq_customerassetproperty columns
['Col4', 'Col5', 'Col6']
0
------------------- msdyn_workorder
printing msdyn_workorder columns
['Col1', 'Col2', 'Col3']
要使其同步,您需要使用
awaitTermination
。
df.writeStream.format("delta").foreachBatch(lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()
输出: