根据文档,您可以使用 SAS 令牌访问 Azure Data Lake Storage Gen2,方法是实现
org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider
接口。
但是,只提供了一个 mock 实现,它使用直接访问主帐户密钥来生成 SAS 令牌:
try {
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, accountName);
accountKey = Base64.decode(abfsConfig.getStorageAccountKey());
} catch (Exception ex) {
throw new IOException(ex);
}
generator = new ServiceSASGenerator(accountKey);
当代码在有权访问主帐户密钥的上下文中运行时,通过提供 SAS 令牌来限制访问是完全没有意义的,显然这是一个仅用于测试目的的模拟实现。
所以,这个类的正确实现是:
这样的实现怎么写?
这是解决方案的最基本和最简单的版本,证明这 是可能做到的:
%scala
package com.foo
class CustomTokenProvider extends org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider {
def getSASToken(accountName: String,fileSystem: String,path: String,operation: String): String = {
return "sp=...etc etc"
}
def initialize(configuration: org.apache.hadoop.conf.Configuration, accountName: String): Unit = {
}
}
...
spark.conf.set("fs.azure.account.auth.type.STORAGE_ACC.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.STORAGE_ACC.dfs.core.windows.net", "com.foo.CustomTokenProvider")
dbutils.fs.ls("abfss://sandbox@STORAGE_ACC.dfs.core.windows.net/")
> [FileInfo(path='abfss://sandbox@STORAGE_ACC.dfs.core.windows.net/t.py', name='t.py', size=112)]
dbutils.fs.ls("abfss://restricted@STORAGE_ACC.dfs.core.windows.net/")
> Operation failed: "Server failed to authenticate the request.
但是,这个实现如何更改为从秘密存储中提取密钥而不是硬编码?
注意特别是:
这是指 abfss 驱动程序,not 遗留的 wasb 驱动程序。
问题是关于从秘密存储中获取 SAS 令牌的,这可以在组/用户级别进行限制;在集群级别(例如 env 变量)分配令牌不是有效的解决方案,因为这意味着所有集群用户都被授予相同的权限。
...是的,我知道,databricks 有它自己的自定义身份提供者系统 (Unity),但这只适用于 hive metastore 中的数据集。这个问题专门针对较低级别的存储帐户访问权限。
%scala
package com.foo
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.DefinedByConstructorParams
import scala.util.Try
import scala.language.implicitConversions
import scala.language.reflectiveCalls
trait DBUtilsApi {
type SecretUtils
type SecretMetadata
type SecretScope
val secrets: SecretUtils
}
object ReflectiveDBUtils extends DBUtilsApi {
private lazy val dbutils: DBUtils =
Class.forName("com.databricks.service.DBUtils$").getField("MODULE$").get().asInstanceOf[DBUtils]
override lazy val secrets: SecretUtils = dbutils.secrets
type DBUtils = AnyRef {
val secrets: SecretUtils
}
type SecretUtils = AnyRef {
def get(scope: String, key: String): String
def getBytes(scope: String, key: String): Array[Byte]
def list(scope: String): Seq[SecretMetadata]
def listScopes(): Seq[SecretScope]
}
type SecretMetadata = DefinedByConstructorParams { val key: String }
type SecretScope = DefinedByConstructorParams { val name: String }
}
class VaultTokenProvider extends org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider {
def getSASToken(accountName: String,fileSystem: String,path: String,operation: String): String = {
return ReflectiveDBUtils.secrets.get("scope", "SECRET")
}
def initialize(configuration: org.apache.hadoop.conf.Configuration, accountName: String): Unit = {
}
}
...
spark.conf.set("fs.azure.account.auth.type.bidbtests.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.bidbtests.dfs.core.windows.net", "com.foo.VaultTokenProvider")
这只是基本示例;真正的解决方案将根据帐户和文件系统选择正确的秘密。
您可以使用以下代码将 Azure ADLS(Azure Data Lake Storage Gen2) 访问到 Azure Databricks,您只需要添加存储帐户名称和 SAS 令牌密钥,SAS 令牌将从存储帐户中获取
spark.conf.set("fs.azure.account.auth.type.
spark.conf.set(“fs.azure.sas.token.provider.type.
spark.conf.set(“fs.azure.sas.fixed.token.