我的目标是使用 Databricks 运行 Spark 作业,我的挑战是我无法将文件存储在本地文件系统中,因为文件保存在驱动程序中,但是当我的执行程序尝试访问该文件时,它没有存在是因为它位于驱动程序文件系统中。
我想使用工作区来存储我的文件; DBFS 不适合我。问题是,在我的笔记本中,当我尝试使用 Python 代码存储文件时,它运行良好,并且我可以使用 Databricks UI 访问该文件。
以下 Python 代码有效:
import os
from pathlib import Path
# Define the path where you want to create the directory and file
directory_path = Path("/Workspace/Shared/credentials/test")
file_path = directory_path / "abc.txt"
# Create the directory if it doesn't exist
os.makedirs(directory_path, exist_ok=True)
# Create and write to the file
with open(file_path, 'w') as file:
file.write("This is a string stored in abc.txt")
我需要在 Scala 中执行此操作,但我尝试了以下代码并遇到了问题:
%scala
import java.nio.file.{Files, Paths, StandardOpenOption}
val directoryPath = "Workspace/Shared/credentials/test2"
val filePath = Paths.get(directoryPath, "abc.txt")
// Create the directory if it doesn't exist
val directory = Paths.get(directoryPath)
if (!Files.exists(directory)) {
Files.createDirectories(directory)
}
println(Files.exists(directory))
// Write the content to the file
val content = "This is a string stored in abc.txt"
Files.write(filePath, content.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)
println(s"Directory created at: $directoryPath")
println(s"File created at: $filePath with content: '$content'")
但是,我收到以下错误:
FileSystemException: /Workspace/Shared/credentials: Operation not permitted
最终,我想将 Spark 与 Kafka 配置结合使用并指定 JKS 文件的位置:
spark.read
.format("kafka")
.option("includeHeaders", IncludeHeaders)
.option("kafka.bootstrap.servers", topic.innerSource.bootstrapServers.get)
.option("subscribe", parsedTopicName)
.option("kafka.security.protocol", jobConfig.kafka.get.securityProtocol)
.option("kafka.ssl.enabled.protocols", "TLSv1.2")
.option("kafka.ssl.keystore.location", MyLocationToTheWorkspace)
您对如何使用 Scala 将我的文件存储在工作区或所有执行器都可以访问我的 JKS 文件的其他位置有什么建议吗?