我需要您的帮助来创建从 databricks 到 Azure 中的 sql server 数据库的“永久”连接。
我在 pyspark 中有一段代码用于连接到数据库,使用驱动程序“com.microsoft.sqlserver.jdbc.spark”和 JAR Spark_mssql_connector_2_12_3_0_1_0_0_alpha.jar。
我创建了一个类来通过令牌连接到数据库
class SQLSpark():
database_name: str = ""
sql_service_name: str = ""
service_principal_id: str = ""
service_principal_secret: str = ""
tenant_id: str = ""
authority: str = ""
state = None
except_error = None
def __init__(self, database_name, service_principal_id, service_principal_secret, tenant_id,
authority, spark, sql_service_name=None):
self.database_name = database_name
self.sql_service_name = sql_service_name
self.service_principal_id = service_principal_id
self.service_principal_secret = service_principal_secret
self.tenant_id = tenant_id
self.authority = authority
self.state = True
self.except_error = ""
self._spark_session = spark
context = adal.AuthenticationContext(self.authority)
token = context.acquire_token_with_client_credentials("https://database.windows.net", self.service_principal_id,
self.service_principal_secret)
self._access_token = token["accessToken"]
server_name = "jdbc:sqlserver://" + self.sql_service_name + ".database.windows.net"
self._url = server_name + ";" + "databaseName=" + self.database_name + ";"
def select_table(self, table, sql_query):
try:
logger.info(f"Reading table {table} in DB {self.database_name} ")
df = self._spark_session.read.format("com.microsoft.sqlserver.jdbc.spark") \
.options(
url=self._url,
databaseName=self.database_name,
accessToken=self._access_token,
hostNameInCertificate="*.database.windows.net",
query=sql_query) \
.load()
self.custom_logger.info(f"Table {table} in database {self.database_name} has been read")
return df
except Exception as ex:
logger.error(f"Failed to read table {table}")
logger.error(ex)
问题是我必须处理大量数据,并且处理时间超过 1 小时,并且数据库令牌已过期。当我调用
select_table
方法时,有没有办法刷新令牌?Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Login failed for user '<token-identified principal>'. Token is expired.
完整错误:
Py4JJavaError: An error occurred while calling o9092.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 4 times, most recent failure: Lost task 0.3 in stage 59.0 (TID 2611, 10.139.64.5, executor 0): com.microsoft.sqlserver.jdbc.SQLServerException: Login failed for user '<token-identified principal>'. Token is expired. ClientConnectionId:009909b8-d779-4df2-b077-59cf4c4b3c73
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:283)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:129)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:37)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.sendLogon(SQLServerConnection.java:5173)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.logon(SQLServerConnection.java:3810)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.access$000(SQLServerConnection.java:94)
at com.microsoft.sqlserver.jdbc.SQLServerConnection$LogonCommand.doExecute(SQLServerConnection.java:3754)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7225)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3053)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:2562)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2216)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2067)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:1204)
at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:825)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Login failed for user '<token-identified principal>'. Token is expired. ClientConnectionId:009909b8-d779-4df2-b077-59cf4c4b3c73
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:283)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:129)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:37)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.sendLogon(SQLServerConnection.java:5173)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.logon(SQLServerConnection.java:3810)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.access$000(SQLServerConnection.java:94)
at com.microsoft.sqlserver.jdbc.SQLServerConnection$LogonCommand.doExecute(SQLServerConnection.java:3754)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7225)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3053)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:2562)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2216)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2067)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:1204)
at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:825)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我能想到的几件事。
检查是否有向 Spark 提供刷新 URL 的选项。这样它就可以获得新的令牌。与 this 类似,但适用于 SQL Server 而不是 ADLS。您可能需要使用其他一些 API,例如 acquire_token_with_refresh_token() 来创建令牌。
我知道一些令牌生成器实现允许您在调用创建新令牌时提供请求的有效期。如果您这样做,请创建一个有效期为 2-3-6 小时(无论您需要什么)的令牌,而不是让它将到期时间设置为默认的一小时。
假设您的代码不正确的其他选项。 IE。没有充分的理由在
__init__()
中创建代币。您应该在使用令牌的地方附近创建令牌。即
class SQLSpark():
# ...
def __init__(self, database_name, service_principal_id, service_principal_secret, tenant_id,
authority, spark, sql_service_name=None):
# same as OP, except no token is created and stored in self.token
def select_table(self, table, sql_query):
# ...
# Generate the token closer to it's use.
token = adal.AuthenticationContext(self.authority).acquire_token_with_client_credentials("https://database.windows.net",
self.service_principal_id, self.service_principal_secret)
df = self._spark_session.read.format("com.microsoft.sqlserver.jdbc.spark") \
.options(
# ...
accessToken=token["accessToken"],
query=sql_query) \
.load()
# ...
我们面临同样的问题.. 尝试创建一个 tokenmanager 类来强制更新刷新.. 到目前为止还没有运气..
connection_properties["accessToken"] = token_manager.get_access_token() # Ensure fresh token
connection_properties["queryTimeout"] = str(timeout)
df = spark.read.jdbc(url=azure_sql_url, table=query, properties=connection_properties)
return df
有人设法解决这个问题吗? Databricks 运行时 14.3lts,最新 msal