从 SQS 驱动的 Pyspark 结构化流检索路径

问题描述 投票:0回答:1

我有一个 DMS 生成的 s3 数据湖,并设置 SQS 来跟踪生成的文件。现在我想将其流式传输到我的 EMR 集群中,为此我在这里找到了 Spark Streaming s3 连接器 https://github.com/aws-samples/spark-streaming-sql-s3-connector/blob/main/src /test/scala/pt/spark/sql/streaming/connector/DataConsumer.scala

我有超过 300 个表和超过 100 个数据库,因此我希望 SQS 跟踪每个文件,然后让我的 Spark 处理每个文件,将其转换为增量文件。然而,在从 sqs 读取之前,我似乎需要定义表和模式是什么。

我不知道是否有一种方法可以动态读取输入文件的路径,并将其输入到 readStream 中。下面是我的代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, DoubleType, IntegerType

import os

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RealTimeDataLakeToDeltaLakeConversion") \
    .config("spark.jars.packages",
            "io.delta:delta-spark_2.12-3.2.1,s3a://my-config-dev/spark-streaming-sql-s3-connector-0.0.1.jar") \
    .getOrCreate()

# AWS SQS queue URL for the processing queue
SQS_QUEUE_URL = "https://sqs.eu-west-1.amazonaws.com/1234567890/EMRProcessingQueue"

# Configure the S3 connector options
connector_options = {
    "spark.s3conn.queueRegion": "eu-west-1",
    "spark.s3conn.fileFormat": "parquet",
    "maxFilesPerTrigger": "500",
    "maxFileAge": "15d",
    "spark.s3conn.queueUrl": SQS_QUEUE_URL,
    "queueFetchWaitTimeoutSeconds": "10",
    "sqsLongPollingWaitTimeSeconds": "5",
    "sqsVisibilityTimeoutSeconds": "60",
    "pathGlobFilter": "*.parquet",
    "partitionColumns": "valPartition",
    "basePath": "s3://my-data-extract-dev/dms-output/my_db/public/address_coordinates/"
}

table_name = 'address_coordinates'
# Define the schema for the streaming data
test_schema = StructType([
    StructField("valString", StringType(), nullable=True),
    StructField("valBoolean", BooleanType(), nullable=True),
    StructField("valDouble", DoubleType(), nullable=True),
    StructField("valInt", IntegerType(), nullable=True),
    StructField("valPartition", StringType(), nullable=False)
])
# Read from the S3 streaming source. Before this, I want to know which table it is, otherwise I can't define the schema dynamically
input_df = spark.readStream \
    .format("s3-connector") \
    .schema(test_schema)

# Apply each option from the dictionary to the streaming DataFrame
for key, value in connector_options.items():
    print(f"Applying option: {key} = {value}")
    input_df = input_df.option(key, value)

# Load the streaming DataFrame
input_df = input_df.load()

# Setting output directly the same as input for now
output_df = input_df

# Write output stream
query = output_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation",
            os.path.join("s3a://my-data-extract-dev/checkpoints/my_db/", table_name)) \
    .option("truncate", "false") \
    .option("path", 's3a://my-data-extract-dev/sqs-bronze/my_db/public/address_coordinates/') \
    .start()

query.awaitTermination()
apache-spark pyspark amazon-sqs spark-structured-streaming
1个回答
0
投票

为了回答我自己的问题,我最后用一个队列来管理这个问题,只需使用 glob 过滤器过滤 s3 文件

© www.soinside.com 2019 - 2024. All rights reserved.