我一直在使用 Databricks 运行时 10.4 LTS 并毫无问题地写入 Azure CosmosDB。 我必须升级到运行时 12.2 LTS 的更新版本,但在写入 cosmosdb 时遇到随机错误。
我的操作非常简单,我有 csv 文件,我将其加载到 Spark 中。然后我将
partition Key
列为一栏,命名为 id
。然后写入cosmosdb。我在写入之前对 RU/s 进行更改(40K RU/s),然后将其降低(10K RU/s)
cfg = {
"spark.cosmos.accountEndpoint": f"https://{cosmosdb_name}.documents.azure.com:443/",
"spark.cosmos.accountKey": cosmosdb.primary_master_key,
"spark.cosmos.database": database_name,
"spark.cosmos.container": table_name,
}
spark.conf.set(
"spark.cosmos.throughputControl.globalControl.database", database_name
)
spark.conf.set("spark.cosmos.throughputControl.enabled", "true")
spark.conf.set(
"spark.cosmos.throughputControl.name", "SourceContainerThroughputControl"
)
spark.conf.set("spark.cosmos.throughputControl.targetThroughputThreshold", "0.95")
data = (
(
spark.read.format("csv")
.options(header="True", inferSchema="True", delimiter=";")
.load(spark_file_path)
)
.withColumnRenamed("my_col", "id")
.na.drop(subset=["id"])
)
data = data.withColumn("id", data["id"].cast("string"))
data = data.dropDuplicates(["id"])
data.write.format("cosmos.oltp").options(**cfg).mode("APPEND").save()
An error occurred while calling o76.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 92 in stage 37.0 failed 4 times, most recent failure: Lost task 92.3 in stage 37.0 (TID 683) (10.34.208.134 executor 2): java.lang.ClassCastException: com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
at com.azure.cosmos.spark.CosmosWriterBase.<init>(CosmosWriterBase.scala:39)
at com.azure.cosmos.spark.CosmosWriter.<init>(CosmosWriter.scala:33)
at com.azure.cosmos.spark.ItemsDataWriteFactory.createWriter(ItemsDataWriteFactory.scala:51)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:432)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:97)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)`
我不明白为什么
com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
...
我使用的是
azure-cosmos-spark_3-3_2-12-4.30.0.jar
,它对应于集群中 Spark 的版本 12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)
在我的 Spark 配置中:
spark.sql.catalog.cosmosCatalog com.azure.cosmos.spark.CosmosCatalog Spark.jars.packages com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.30.0
我尝试过使用 Azure cosmos 的库
对于 12.2 LTS(包括 Apache Spark 3.3.2、Scala 2.12) Databricks 运行时:
您需要在集群上安装
com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.34.0
库。
接下来,下面是我尝试过的代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = [
("1", "dilip", 29),
("2", "Raj", 34),
("3", "Narayan", 21),
]
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
df = spark.createDataFrame(data, schema)
config = {
"spark.cosmos.accountEndpoint": "https://aXXX.documents.azure.com:443/",
"spark.cosmos.accountKey": "XXXXX",
"spark.cosmos.database": "db1",
"spark.cosmos.container": "cont"
}
writeConfig = {
"spark.cosmos.accountEndpoint": config["spark.cosmos.accountEndpoint"],
"spark.cosmos.accountKey": config["spark.cosmos.accountKey"],
"spark.cosmos.database": config["spark.cosmos.database"],
"spark.cosmos.container": config["spark.cosmos.container"],
"spark.cosmos.write.strategy": "ItemOverwrite",
"spark.cosmos.write.bulk.enabled": "true"
}
df.write\
.format("cosmos.oltp")\
.options(**writeConfig)\
.mode("APPEND")\
.save()
print("Data written successfully to Cosmos DB.")
结果:
+---+-------+---+
| id| name|age|
+---+-------+---+
| 1| dilip| 29|
| 2| Raj| 34|
| 3|Narayan| 21|
+---+-------+---+
Data written successfully to Cosmos DB.