Databricks 和 SQL 服务器的令牌问题

问题描述 投票:0回答:2

我需要您的帮助来创建从 databricks 到 Azure 中的 sql server 数据库的“永久”连接。
我在 pyspark 中有一段代码用于连接到数据库,使用驱动程序“com.microsoft.sqlserver.jdbc.spark”和 JAR Spark_mssql_connector_2_12_3_0_1_0_0_alpha.jar。
我创建了一个类来通过令牌连接到数据库


class SQLSpark():
    database_name: str = ""
    sql_service_name: str = ""
    service_principal_id: str = ""
    service_principal_secret: str = ""
    tenant_id: str = ""
    authority: str = ""
    state = None
    except_error = None

    def __init__(self, database_name, service_principal_id, service_principal_secret, tenant_id,
                 authority, spark, sql_service_name=None):

        self.database_name = database_name
        self.sql_service_name = sql_service_name
        self.service_principal_id = service_principal_id
        self.service_principal_secret = service_principal_secret
        self.tenant_id = tenant_id
        self.authority = authority
        self.state = True
        self.except_error = ""       
        self._spark_session = spark

        context = adal.AuthenticationContext(self.authority)
        token = context.acquire_token_with_client_credentials("https://database.windows.net", self.service_principal_id,
                                                              self.service_principal_secret)
        self._access_token = token["accessToken"]

        server_name = "jdbc:sqlserver://" + self.sql_service_name + ".database.windows.net"
        self._url = server_name + ";" + "databaseName=" + self.database_name + ";"


    def select_table(self, table, sql_query):
        try:
            logger.info(f"Reading table {table} in DB {self.database_name} ")
            df = self._spark_session.read.format("com.microsoft.sqlserver.jdbc.spark") \
                    .options(
                    url=self._url,
                    databaseName=self.database_name,
                    accessToken=self._access_token,
                    hostNameInCertificate="*.database.windows.net",
                    query=sql_query) \
                    .load()

            self.custom_logger.info(f"Table {table} in database {self.database_name} has been read")
            return df
        except Exception as ex:
            logger.error(f"Failed to read table  {table}")
            logger.error(ex)

问题是我必须处理大量数据,并且处理时间超过 1 小时,并且数据库令牌已过期。当我调用

select_table
方法时,有没有办法刷新令牌?
给出的错误是:
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Login failed for user '<token-identified principal>'. Token is expired.

完整错误:

Py4JJavaError: An error occurred while calling o9092.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 4 times, most recent failure: Lost task 0.3 in stage 59.0 (TID 2611, 10.139.64.5, executor 0): com.microsoft.sqlserver.jdbc.SQLServerException: Login failed for user '<token-identified principal>'. Token is expired. ClientConnectionId:009909b8-d779-4df2-b077-59cf4c4b3c73
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:283)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:129)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:37)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.sendLogon(SQLServerConnection.java:5173)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.logon(SQLServerConnection.java:3810)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.access$000(SQLServerConnection.java:94)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection$LogonCommand.doExecute(SQLServerConnection.java:3754)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7225)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3053)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:2562)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2216)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2067)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:1204)
    at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:825)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Login failed for user '<token-identified principal>'. Token is expired. ClientConnectionId:009909b8-d779-4df2-b077-59cf4c4b3c73
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:283)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:129)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:37)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.sendLogon(SQLServerConnection.java:5173)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.logon(SQLServerConnection.java:3810)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.access$000(SQLServerConnection.java:94)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection$LogonCommand.doExecute(SQLServerConnection.java:3754)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7225)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3053)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:2562)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2216)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2067)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:1204)
    at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:825)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
sql-server apache-spark azure-sql-database azure-databricks
2个回答
0
投票

我能想到的几件事。

  1. 检查是否有向 Spark 提供刷新 URL 的选项。这样它就可以获得新的令牌。与 this 类似,但适用于 SQL Server 而不是 ADLS。您可能需要使用其他一些 API,例如 acquire_token_with_refresh_token() 来创建令牌。

  2. 我知道一些令牌生成器实现允许您在调用创建新令牌时提供请求的有效期。如果您这样做,请创建一个有效期为 2-3-6 小时(无论您需要什么)的令牌,而不是让它将到期时间设置为默认的一小时。


假设您的代码不正确的其他选项。 IE。没有充分的理由在

__init__()
中创建代币。您应该在使用令牌的地方附近创建令牌。即

class SQLSpark():
    # ...

    def __init__(self, database_name, service_principal_id, service_principal_secret, tenant_id,
                 authority, spark, sql_service_name=None):

        # same as OP, except no token is created and stored in self.token

    def select_table(self, table, sql_query):
            # ...

            # Generate the token closer to it's use.
            token = adal.AuthenticationContext(self.authority).acquire_token_with_client_credentials("https://database.windows.net",
                self.service_principal_id, self.service_principal_secret)

            df = self._spark_session.read.format("com.microsoft.sqlserver.jdbc.spark") \
                    .options(
                        # ...
                        accessToken=token["accessToken"],
                        query=sql_query) \
                    .load()
            # ...

0
投票

我们面临同样的问题.. 尝试创建一个 tokenmanager 类来强制更新刷新.. 到目前为止还没有运气..

              connection_properties["accessToken"] = token_manager.get_access_token()  # Ensure fresh token
              connection_properties["queryTimeout"] = str(timeout)
              
              df = spark.read.jdbc(url=azure_sql_url, table=query, properties=connection_properties)
              return df

有人设法解决这个问题吗? Databricks 运行时 14.3lts,最新 msal

© www.soinside.com 2019 - 2024. All rights reserved.