我正在尝试使用 Pyspark
中的
AWS Glue
本机方式加载 MongoDB Atlas数据。我可以通过 Ec2 实例连接到数据库并查看那里的数据。
版本:
5.0.26
mongo-spark-connector_2.13-10.3.0.jar
我还将此 JAR 的路径包含在 Glue 作业设置的
dependent-jars-file
路径中。下面是我的代码:
df = (
self.spark.read.format("mongodb")
.option(
"connection.uri",
"mongodb+srv://<something>.<some-thing>.mongodb.net/<db>?authSource=<user>",
)
.option("collection", "")
.load()
.limit(5)
)
但它给了我以下内容
error
:
Error Category: UNCLASSIFIED_ERROR; An error occurred while calling o116.showString. Invalid field: 'metadata.dateButtonInstalled'. The dataType 'timestamp' is invalid for 'BsonString{value='2018-07-25T21:47:23.195Z'}'.
我已经检查和测试了其他集合,效果很好。我无法更改源数据。那么我可以使用任何配置吗?
您可以尝试以下方法:
1。显式指定架构
处理此问题最简单的方法之一是显式定义模式。这样,您就可以准确地告诉 Spark 如何处理每个字段。这是一个简单的例子:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, to_timestamp
# Define your schema
schema = StructType([
StructField("metadata", StructType([
StructField("dateButtonInstalled", StringType(), True)
])),
# Add other fields here as necessary
])
# Read data with the schema
df = (
spark.read.format("mongodb")
.option("connection.uri", "mongodb+srv://<something>.<some-thing>.mongodb.net/<db>?authSource=<user>")
.option("collection", "<your_collection>")
.schema(schema)
.load()
)
# Convert the string to a timestamp
df = df.withColumn("metadata.dateButtonInstalled", to_timestamp(col("metadata.dateButtonInstalled"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
# Show the first 5 rows
df.show(5)
2。调整 Spark 会话配置 您还可以调整一些 Spark 配置来帮助解析时间戳:
from pyspark.sql import SparkSession
# Build your Spark session
spark = SparkSession.builder \
.appName("MongoDBIntegration") \
.config("spark.mongodb.input.uri", "mongodb+srv://<something>.<some-thing>.mongodb.net/<db>?authSource=<user>") \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()
# Read the data
df = (
spark.read.format("mongodb")
.option("collection", "<your_collection>")
.load()
)
# Convert the string to a timestamp
df = df.withColumn("metadata.dateButtonInstalled", to_timestamp(col("metadata.dateButtonInstalled"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
# Show the first 5 rows
df.show(5)