我们使用 databricks 集群,在 30 分钟不活动后关闭(13.3 LTS(包括 Apache Spark 3.4.1、Scala 2.12))。 我的目标是读取红移表并将其写入雪花,我使用以下代码:
df = spark.read \
.format("redshift") \
.option("url", jdbc_url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", "schem_name.table_name") \
.option("partitionColumn", "date_col1")\
.option("lowerBound", "2023-11-05")\
.option("upperBound", "2024-03-23")\
.option("numPartitions", "500")\
.load()\
.filter("date_col1>dateadd(month ,-6,current_date)")\
.filter(col("country_col").isin('India', 'China', 'Japan', 'Germany', 'United Kingdom', 'Brazil', 'United States', 'Canada'))
df1 = df.repartition(900)#Data is skedwed for that partition column, so repartitioning to 1* num cores in cluster for even dist
df1.write.format("snowflake") \
.option("host", host_sfl) \
.option("user", user_sfl) \
.option('role', role_sfl) \
.option("password", password_sfl) \
.option("database", database_sfl) \
.option("sfWarehouse", warehouse_sfl) \
.option("schema",'schema_name')\
.option("dbtable",'target_table_name')\
.mode('Overwrite') \
.save()
It throws the following error, despite not having used query option in my code:
IllegalArgumentException: requirement failed:
Options 'query' and 'partitionColumn' can not be specified together.
Please define the query using `dbtable` option instead and make sure to qualify
the partition columns using the supplied subquery alias to resolve any ambiguity.
Example :
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
当我注释掉重新分区并写入雪花代码并进行计数时,它会给出正确的计数。
这是另一个观察结果: 如果我在执行计数后将上面的代码更改为 JDBC 而不是 .format("redshift") 中的 redshift,则代码可以正常工作。
我不知道这里发生了什么。第一次重新启动集群时,作业一直失败,我必须先手动进行计数,然后将其更改为 JDBC 才能工作,如果我遗漏了一些明显的东西,请告诉我。我查了很多文档,但找不到我需要的东西。
这就是我最终所做的,有效的。
count_df = spark.read \
.format("com.databricks.spark.redshift") \
.option("dbtable", tbl1) \
.option("url", url) \
.option("user", user) \
.option("password", pwd) \
.load()
.limit(1)
count_df.count()```
And then the code in the question started working, a dummy count action with a different driver parameter(com.databricks.spark.redshift) before running the code in the question.