我正在尝试将数据从 SQL Server 传输到 BigQuery。
我使用 pyspark 将数据提取到 GCS 中的 Parquet 文件中。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Initialize SparkSession
spark = SparkSession.builder \
.appName("SQLServerToGCS") \
.config("spark.jars", r"path\to\mssql-jdbc-12.8.1.jre8.jar, path\to\gcs-connector-hadoop3-latest.jar") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", r"path\to\sa.json") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") \
.config("spark.driver.memory", "32g") \
.getOrCreate()
jdbc_url = f"jdbc_url" # SQL Server
table_name = "table_name" # Table name to read from SQL Server
gcs_bucket = "gs://bucket/prefix" # GCS bucket and destination path
# Read data from SQL Server using JDBC
try:
df = spark.read \
.format("jdbc") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("url", jdbc_url) \
.option("dbtable", table_name) \
.option("partitionColumn", "UpdateDate") \
.option("lowerBound", "2014-11-11") \
.option("upperBound", "2025-01-13") \
.option("numPartitions", "100") \
.load()
print("Data loaded successfully from SQL Server.")
df.write.mode("overwrite").parquet(gcs_bucket) # Save df to Parquet in GCS
print(f"Data written successfully to GCS bucket at {gcs_bucket}.")
except Exception as e:
print(f"Error occurred: {e}")
finally:
# Stop the SparkSession
spark.stop()
print("SparkSession stopped.")
镶木地板:
然后我使用 loadJob 将 Parquet 文件加载到 BQ 表中:
from google.cloud import bigquery
from google.cloud import storage
# GCS bucket and prefix
bucket_name = "bucket_name"
file_prefix = "file_prefix" # Folder path or common prefix in GCS
# BigQuery table information
project_id = "bq_project_id"
dataset_id = "dataset"
table_id = "table"
bq_client = bigquery.Client() # Initialize BigQuery client
storage_client = storage.Client() # Initialize Storage client
bucket = storage_client.bucket(bucket_name)
blobs = bucket.list_blobs(prefix=file_prefix)
table_ref = f"{project_id}.{dataset_id}.{table_id}"
files = sorted([blob.name for blob in blobs if blob.name.endswith('.parquet')]) # Sort files in ascending order by name
for file_name in files:
file_path = f"gs://bucket_name/{file_name}"
print(f"Processing file: {file_path}")
# Configure the load job
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
ignore_unknown_values=True # Ignore extra columns in source file
)
load_job = bq_client.load_table_from_uri(file_path, table_ref, job_config=job_config) # Start the load job
load_job.result()
print(f"Loaded file {file_name} into BigQuery table {table_ref}.")
尝试插入日期时间列时加载作业失败。
错误信息:
400 Provided Schema does not match Table project_id:dataset.table_name. Field DateAdded has changed type from DATETIME to TIMESTAMP
在寻找解决方案时,我似乎必须手动编辑所有日期时间列的格式才能解决此问题。
我想知道是否可以在提取或加载步骤中设置任何配置来正确处理日期时间列。
解决方案是将outputTimestampType配置添加到我的SparkSession中。
.config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") \
默认情况下,日期时间列被转换为 TIMESTAMP 并保存为 INT96。 将 outputTimestampType 设置为 TIMESTAMP_MICROS 意味着日期时间值保存为 INT64,因此与 BigQuery 的日期时间列兼容。
话虽如此 - 我事先知道在我的例子中来自 SQL Server 的源数据没有 TIMESTAMP 类型。
如果有并且我仍然使用此配置 - 时间戳可能会被截断为微秒,这将导致源和目标中的数据不一致。所以请谨慎使用。