无法通过Glue中的pyspark jdbc加载MongoDB atlas数据

问题描述 投票:0回答:1

我正在尝试使用 Pyspark 中的

AWS Glue
本机方式加载
MongoDB Atlas
数据。我可以通过 Ec2 实例连接到数据库并查看那里的数据。

版本

  1. MongoDB
    5.0.26
  2. JAR
    mongo-spark-connector_2.13-10.3.0.jar

我还将此 JAR 的路径包含在 Glue 作业设置的

dependent-jars-file
路径中。下面是我的代码:

df = (
            self.spark.read.format("mongodb")
            .option(
                "connection.uri",
                "mongodb+srv://<something>.<some-thing>.mongodb.net/<db>?authSource=<user>",
            )
            .option("collection", "")
            .load()
            .limit(5)
        )  

但它给了我以下内容

error

Error Category: UNCLASSIFIED_ERROR; An error occurred while calling o116.showString. Invalid field: 'metadata.dateButtonInstalled'. The dataType 'timestamp' is invalid for 'BsonString{value='2018-07-25T21:47:23.195Z'}'.

我已经检查和测试了其他集合,效果很好。我无法更改源数据。那么我可以使用任何配置吗?

mongodb amazon-web-services apache-spark pyspark aws-glue
1个回答
0
投票

您可以尝试以下方法:

1。显式指定架构

处理此问题最简单的方法之一是显式定义模式。这样,您就可以准确地告诉 Spark 如何处理每个字段。这是一个简单的例子:

from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, to_timestamp

# Define your schema
schema = StructType([
    StructField("metadata", StructType([
        StructField("dateButtonInstalled", StringType(), True)
    ])),
    # Add other fields here as necessary
])

# Read data with the schema
df = (
    spark.read.format("mongodb")
    .option("connection.uri", "mongodb+srv://<something>.<some-thing>.mongodb.net/<db>?authSource=<user>")
    .option("collection", "<your_collection>")
    .schema(schema)
    .load()
)

# Convert the string to a timestamp
df = df.withColumn("metadata.dateButtonInstalled", to_timestamp(col("metadata.dateButtonInstalled"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))

# Show the first 5 rows
df.show(5)

2。调整 Spark 会话配置 您还可以调整一些 Spark 配置来帮助解析时间戳:

from pyspark.sql import SparkSession

# Build your Spark session
spark = SparkSession.builder \
    .appName("MongoDBIntegration") \
    .config("spark.mongodb.input.uri", "mongodb+srv://<something>.<some-thing>.mongodb.net/<db>?authSource=<user>") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Read the data
df = (
    spark.read.format("mongodb")
    .option("collection", "<your_collection>")
    .load()
)

# Convert the string to a timestamp
df = df.withColumn("metadata.dateButtonInstalled", to_timestamp(col("metadata.dateButtonInstalled"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))

# Show the first 5 rows
df.show(5)
© www.soinside.com 2019 - 2024. All rights reserved.