我已经使用 Python Dash 构建了一个 Web 应用程序,并且在 Databricks 中设置了数据库和表。我现在想要向 Databricks 提交并向其发出 POST 请求以保存 UI 中的数据。我能够建立与 Databricks 的连接并发出获取请求,但不能 POST。我该怎么做?
我在文件 databricks_connection.py 中创建了一个类:
#%%
import pandas as pd
import os
from pyspark.sql import SparkSession
#%%
class DatabricksConnection:
def __init__(self):
self.server = os.environ.get("SERVER_NAME")
self.http_path = os.environ.get("HTTP_PATH")
self.client_id = os.environ.get("CLIENT_ID")
self.client_secret = os.environ.get("CLIENT_SECRET")
self.connection = None
def connect(self):
self.spark = SparkSession.builder.getOrCreate()
def query(self, query_string, chunk_size=None):
if not hasattr(self, 'spark'):
from pyspark.sql import SparkSession
self.spark = SparkSession.builder.getOrCreate()
results = self.spark.sql(query_string)
results = results.toPandas()
return results
这就是我使用连接 data.py 的地方:
#%%
import sys
import os
from pathlib import Path
from databricks_connection import DatabricksConnection as conn
#%%
class data:
def __init__(self):
self.Databricks = conn.DatabricksConnection()
self.Databricks.connect()
def _load_query(self, query_file):
with open(query_file, 'r') as file:
return file.read()
def _format_query(self, query_template, **kwargs):
return query_template.format(**kwargs)
def fetch_data(self, query_file, execute = True, **kwargs):
query_template = self._load_query(os.path.join(self.queries_dir, query_file))
query_string = self._format_query(query_template, **kwargs)
self.last_query_string = query_string
if execute:
return self.Databricks.query(query_string)
else:
return query_string
data = data()
def test(parameter):
return data.fetch_data('query.sql', parameter=parameter)
我认为你可以使用 REST api 来做到这一点,但据我所知,你只能上传到 dbfs。我建议查看databricks sdk。您可以将文件上传到卷,如下所示:
from databricks.sdk import WorkspaceClient
class DatabricksConnection:
_client = None
def __init__(self, DATABRICKS_TOKEN: str, DATABRICKS_HOST: str) -> None:
self.init_client(DATABRICKS_TOKEN, DATABRICKS_HOST)
def init_client(self, DATABRICKS_TOKEN: str, DATABRICKS_HOST: str):
if DatabricksConnection._client is not None:
return DatabricksConnection._client
try:
DatabricksConnection._client = WorkspaceClient(token=DATABRICKS_TOKEN, host=DATABRICKS_HOST)
except Exception as e:
raise Exception(f"Error while creating Databricks client: {e}")
def upload_file(self, file_path: str, content: str):
self._client.files.upload(file_path, content)
conn = DatabricksConnection(DATABRICKS_TOKEN, DATABRICKS_HOST)
conn.upload_file(...)
确保找到要写入的表的正确路径。