我有一个 DMS 生成的 s3 数据湖,并设置 SQS 来跟踪生成的文件。现在我想将其流式传输到我的 EMR 集群中,为此我在这里找到了 Spark Streaming s3 连接器 https://github.com/aws-samples/spark-streaming-sql-s3-connector/blob/main/src /test/scala/pt/spark/sql/streaming/connector/DataConsumer.scala
我有超过 300 个表和超过 100 个数据库,因此我希望 SQS 跟踪每个文件,然后让我的 Spark 处理每个文件,将其转换为增量文件。然而,在从 sqs 读取之前,我似乎需要定义表和模式是什么。
我不知道是否有一种方法可以动态读取输入文件的路径,并将其输入到 readStream 中。下面是我的代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, DoubleType, IntegerType
import os
# Initialize Spark session
spark = SparkSession.builder \
.appName("RealTimeDataLakeToDeltaLakeConversion") \
.config("spark.jars.packages",
"io.delta:delta-spark_2.12-3.2.1,s3a://my-config-dev/spark-streaming-sql-s3-connector-0.0.1.jar") \
.getOrCreate()
# AWS SQS queue URL for the processing queue
SQS_QUEUE_URL = "https://sqs.eu-west-1.amazonaws.com/1234567890/EMRProcessingQueue"
# Configure the S3 connector options
connector_options = {
"spark.s3conn.queueRegion": "eu-west-1",
"spark.s3conn.fileFormat": "parquet",
"maxFilesPerTrigger": "500",
"maxFileAge": "15d",
"spark.s3conn.queueUrl": SQS_QUEUE_URL,
"queueFetchWaitTimeoutSeconds": "10",
"sqsLongPollingWaitTimeSeconds": "5",
"sqsVisibilityTimeoutSeconds": "60",
"pathGlobFilter": "*.parquet",
"partitionColumns": "valPartition",
"basePath": "s3://my-data-extract-dev/dms-output/my_db/public/address_coordinates/"
}
table_name = 'address_coordinates'
# Define the schema for the streaming data
test_schema = StructType([
StructField("valString", StringType(), nullable=True),
StructField("valBoolean", BooleanType(), nullable=True),
StructField("valDouble", DoubleType(), nullable=True),
StructField("valInt", IntegerType(), nullable=True),
StructField("valPartition", StringType(), nullable=False)
])
# Read from the S3 streaming source. Before this, I want to know which table it is, otherwise I can't define the schema dynamically
input_df = spark.readStream \
.format("s3-connector") \
.schema(test_schema)
# Apply each option from the dictionary to the streaming DataFrame
for key, value in connector_options.items():
print(f"Applying option: {key} = {value}")
input_df = input_df.option(key, value)
# Load the streaming DataFrame
input_df = input_df.load()
# Setting output directly the same as input for now
output_df = input_df
# Write output stream
query = output_df.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation",
os.path.join("s3a://my-data-extract-dev/checkpoints/my_db/", table_name)) \
.option("truncate", "false") \
.option("path", 's3a://my-data-extract-dev/sqs-bronze/my_db/public/address_coordinates/') \
.start()
query.awaitTermination()
为了回答我自己的问题,我最后用一个队列来管理这个问题,只需使用 glob 过滤器过滤 s3 文件