从 databricks 12.2TLS CosmosClientMetadataCachesSnapshots 写入 cosmosdb 时出错

问题描述 投票:0回答:1

我一直在使用 Databricks 运行时 10.4 LTS 并毫无问题地写入 Azure CosmosDB。 我必须升级到运行时 12.2 LTS 的更新版本,但在写入 cosmosdb 时遇到随机错误。

我的操作非常简单,我有 csv 文件,我将其加载到 Spark 中。然后我将

partition Key
列为一栏,命名为
id
。然后写入cosmosdb。我在写入之前对 RU/s 进行更改(40K RU/s),然后将其降低(10K RU/s)

cfg = {
    "spark.cosmos.accountEndpoint": f"https://{cosmosdb_name}.documents.azure.com:443/",
    "spark.cosmos.accountKey": cosmosdb.primary_master_key,
    "spark.cosmos.database": database_name,
    "spark.cosmos.container": table_name,
}
spark.conf.set(
    "spark.cosmos.throughputControl.globalControl.database", database_name
)
spark.conf.set("spark.cosmos.throughputControl.enabled", "true")
spark.conf.set(
    "spark.cosmos.throughputControl.name", "SourceContainerThroughputControl"
)
spark.conf.set("spark.cosmos.throughputControl.targetThroughputThreshold", "0.95")

data = (
    (
        spark.read.format("csv")
        .options(header="True", inferSchema="True", delimiter=";")
        .load(spark_file_path)
    )
    .withColumnRenamed("my_col", "id")
    .na.drop(subset=["id"])
)
data = data.withColumn("id", data["id"].cast("string"))

data = data.dropDuplicates(["id"])

data.write.format("cosmos.oltp").options(**cfg).mode("APPEND").save()
An error occurred while calling o76.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 92 in stage 37.0 failed 4 times, most recent failure: Lost task 92.3 in stage 37.0 (TID 683) (10.34.208.134 executor 2): java.lang.ClassCastException: com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
    at com.azure.cosmos.spark.CosmosWriterBase.<init>(CosmosWriterBase.scala:39)
    at com.azure.cosmos.spark.CosmosWriter.<init>(CosmosWriter.scala:33)
    at com.azure.cosmos.spark.ItemsDataWriteFactory.createWriter(ItemsDataWriteFactory.scala:51)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:432)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:97)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)`

我不明白为什么

com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
...

我使用的是

azure-cosmos-spark_3-3_2-12-4.30.0.jar
,它对应于集群中 Spark 的版本
12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)

在我的 Spark 配置中:

spark.sql.catalog.cosmosCatalog com.azure.cosmos.spark.CosmosCatalog Spark.jars.packages com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.30.0

pyspark azure-cosmosdb azure-databricks
1个回答
0
投票

我尝试过使用 Azure cosmos 的库

对于 12.2 LTS(包括 Apache Spark 3.3.2、Scala 2.12) Databricks 运行时:

您需要在集群上安装

com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.34.0
库。

enter image description here

接下来,下面是我尝试过的代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = [
    ("1", "dilip", 29),
    ("2", "Raj", 34),
    ("3", "Narayan", 21),
]
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])
df = spark.createDataFrame(data, schema)
config = {
  "spark.cosmos.accountEndpoint": "https://aXXX.documents.azure.com:443/",
  "spark.cosmos.accountKey": "XXXXX",
  "spark.cosmos.database": "db1",
  "spark.cosmos.container": "cont"
}
writeConfig = {
  "spark.cosmos.accountEndpoint": config["spark.cosmos.accountEndpoint"],
  "spark.cosmos.accountKey": config["spark.cosmos.accountKey"],
  "spark.cosmos.database": config["spark.cosmos.database"],
  "spark.cosmos.container": config["spark.cosmos.container"],
  "spark.cosmos.write.strategy": "ItemOverwrite", 
  "spark.cosmos.write.bulk.enabled": "true"
}
df.write\
  .format("cosmos.oltp")\
  .options(**writeConfig)\
  .mode("APPEND")\
  .save()
print("Data written successfully to Cosmos DB.")

结果:

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  dilip| 29|
|  2|    Raj| 34|
|  3|Narayan| 21|
+---+-------+---+

Data written successfully to Cosmos DB.

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.