我想使用 databricks 结构化流处理 24/7 的数据。
但在处理所有数据之前,我需要对其进行一些转换。
因此我使用
foreach
。下面是我的def
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, from_json, regexp_replace
from datetime import datetime
def flatten_nested_df_2(df_nested, columns_to_flatten, epochId ):
"""
Cleans and flattens a nested DataFrame by extracting and parsing JSON from a specified column,
then iteratively creating new columns out of original columns which are structs.
The new column names get the original parent column name as prefix.
:param df_nested: The input DataFrame with nested JSON.
:param columns_to_flatten: List of column names to flatten.
:param json_column_name: The column name containing JSON data to be cleaned and parsed.
:return: A flattened DataFrame.
"""
#----------------------------------------- Part1: Cleaning and Flattening --------------------------------------------------------------
# Step 1: Clean the JSON string in the specified column
json_column_name="data"
clean_col = col(json_column_name)
for pattern, replacement in [
('`', ''),
("'", ''),
# ('-', '_'),
(' ', '_'),
('\\.', '_'),
('á', 'a'),
('\\$', ''),
('\\n', '_')]:
clean_col = regexp_replace(clean_col, pattern, replacement)
filtered_df = df_nested.filter(col("type").like("%YardOrder%"))
filtered_df = filtered_df.select("data")
df_cleaned = filtered_df.withColumn(json_column_name, clean_col)
# Step 2: Parse the JSON string into a DataFrame
schema = spark.read.json(df_cleaned.rdd.map(lambda row: row[json_column_name])).schema
df_with_json = df_cleaned.withColumn(json_column_name, from_json(json_column_name, schema=schema))
# Step 3: Flatten the nested DataFrame
stack = [((), df_with_json)]
column_names = []
while len(stack) > 0:
parents, df = stack.pop()
flat_column_names = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if (c[1][:6] != "struct") and c[0] in columns_to_flatten
]
nested_column_names = [c[0] for c in df.dtypes if c[1][:6] == "struct" and c[0] in columns_to_flatten]
column_names.extend(flat_column_names)
for nested_column_name in nested_column_names:
projected_df = df.select(nested_column_name + ".*")
stack.append((parents + (nested_column_name,), projected_df))
flattened_df = df_with_json.select(column_names)
# Rename the column 'data_id' to 'bk_data_id'
flattened_df = flattened_df.withColumnRenamed("data_id", "bk_data_id")
#------------------------------------------------------------ Part 2: Upsert to silver-------------------------------------------------
deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/shippingUnit")
list_of_columns = flattened_df.columns
list_of_BK_columns = ['bk_data_id']
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column}'
string_insert = ''
for column in list_of_BK_columns:
string_insert += f'table.{column} = newData.{column} and '
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
# Executing the merge function itself
print(f"batch {epochId} starting merge now at {datetime.now()}")
deltaTable = DeltaTable.forPath(spark, f"abfss://[email protected]/shippingUnit")
deltaTable.alias('table') \
.merge(flattened_df.alias("newData"), string) \
.whenMatchedUpdate(set=dictionary) \
.whenNotMatchedInsert(values=dictionary) \
.execute()
print(f"batch {epochId} done at {datetime.now()}")
我就是这样写的
# print(f"Merge initiated at {datetime.now()}")
df.writeStream.foreach(lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten, epochId)).option("checkpointLocation", checkpoint_directory).start()
print(f"Merge done at {datetime.now()}")
但是我收到错误消息:
[CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
我认为这个问题是由这一步造成的
#Step 2: Parse the JSON string into a DataFrame schema = spark.read.json(df_cleaned.rdd.map(lambda row: row[json_column_name])).schema
。这是不可能以连续方式运行的。
我使用单用户集群,运行时 15.3 Beta
非常感谢任何帮助
您可以根据您的数据解决此问题,无论 json 列是否位于所有行中的常量相同字段中。
如果
json_column_name
中的数据是恒定的,并且每行之间的字段没有变化,那么您可以使用 schema_of_json
函数。
使用下面的代码。
json_string = df.rdd.map(lambda row: row['json_column_name']).collect()[0]
df_with_json = df.withColumn('json_column_name', from_json('json_column_name', schema=schema_of_json(json_string)))
df_with_json.display()
输出:
id | 名字 | json_列名称 |
---|---|---|
1 | 贾亚 | [{"键":"值"}] |
2 | 亚历克斯 | [{"key":null}] |
3 | 萨姆 | [{"key":null}] |
如果数据中的字段在行之间不断变化,那么您需要为其定义标准架构并将该架构传递给批处理函数以使用它。
df.writeStream.foreach(lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten,standard_json_schema ,epochId)).option("checkpointLocation", checkpoint_directory).start()
并在函数中使用它来解析数据。
df_with_json = df.withColumn('json_column_name', from_json('json_column_name', schema=standard_json_schema))
lambda 表达式对类变量的使用是这个问题的根源。 Pyspark 在内部腌制它遇到的每个项目,包括具有对 Spark 上下文的引用的对象。 你必须从 lambda 中取出类引用才能解决这个问题:
正如错误消息所示 “SparkContext 只能在驱动程序上使用,不能在工作程序上运行的代码中使用”
当 pyspark.sql.DataFrame.foreach() 在驱动程序上执行时,它只是将该工作委托给工作人员,以并行执行。为此,它将可调用/函数的代码(在您的情况下是
lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten, epochId)
)传递给工作人员。每个工作人员(有自己的操作系统/可执行文件/进程/内存)在其运行时环境中编译并运行此代码。
没有从worker到driver的RPC回调来执行driver上的函数。因此,在工作线程上运行的代码无法使用驱动程序上可用的任何状态信息。变量
spark
仅存在于司机身上,而不存在于工人身上。
也许使用Python的json模块或pyspark.sql.function
中的
json函数之一,例如
schema_of_json()
或from_json()
或get_json_object()
。而不是spark.read.json()
。
另请参阅: