尝试使用 SQLAlchemy 和 databricks-sql-connector 将数组和映射(字典/json)数据插入到 Azure databricks 表中。
出现不同的网络错误
from sqlalchemy import create_engine, Column, String, text, Integer, JSON, BigInteger, Identity, ARRAY
from sqlalchemy.orm import sessionmaker, declarative_base
payload = {
"schema_name": "catalog_schema",
"table_name": "policy11",
"column_names": ["CustomerId", "FirstName", "LastName", "Email", "DOB", "Gender", "AnnualIncome", "StreetAddress", "City", "State", "Country", "Zip", "CreatedDate", "UpdatedDate"],
"choice": {"AnomalyDetection": "1", "BusinessContext": "1", "DQRules": "1", "Standardization": "0"},
"standardization": []
}
Base = declarative_base()
class DataQuality(Base):
__tablename__ = 'data_quality'
id = Column(BigInteger, Identity(start=1, increment=1), primary_key=True)
schema_name = Column(String, nullable=False)
table_name = Column(String, nullable=False)
column_names = Column(ARRAY(String))
choice = Column(JSON())
standardization = Column(ARRAY(String))
engine = create_engine(
url = f"databricks://token:{access_token}@{server_hostname}?" +
f"http_path={http_path}&catalog={catalog}&schema={schema}"
)
Session = sessionmaker(bind=engine)
session = Session()
def insert_data(session, data):
try:
result = {}
result["schema_name"]=data["schema_name"]
result["table_name"]=data["table_name"]
result["column_names"]=data["column_names"]
result["choice"]=str(data["choice"])
result["standardization"]=data.get("standardization", [])
# Add and commit the new record
dq_config = DataQuality(**result)
session.add(dq_config)
session.commit()
except Exception as e:
session.rollback()
print(e)
finally:
session.close()
insert_data(session, payload)
出现以下错误:(builtins.AttributeError)“DatabricksDialect”对象没有属性“_json_serializer”
connection = sql.connect(server_hostname = server_hostname,
http_path = http_path,
access_token = access_token)
cursor = connection.cursor()
cursor.execute("INSERT INTO dq_config_driven_execution.dq_configuration_test.data_quality (schema_name, table_name, column_names, choice) VALUES ('schema_name', 'table_name', ['CustomerId', 'FirstName'], {'AnomalyDetection': '1'})")
cursor.close()
connection.close()
出现以下错误:databricks.sql.exc.ServerOperationError:[PARSE_SYNTAX_ERROR]“[”处或附近的语法错误。 SQLSTATE: 42601(第 1 行,第 159 位)`
错误:databricks.sql.exc.ServerOperationError:[PARSE_SYNTAX_ERROR]“[”处或附近的语法错误。 SQLSTATE:42601(第 1 行,第 159 位)
对于 databricks sql 连接器,由于语法不正确而导致上述错误。
将数组和映射数据插入到 databricks sql 表时,它应该位于圆括号内。因此,查询如下所示:
INSERT INTO tablename (schema_name, table_name, column_names, choice) VALUES ('schema_name', 'table_name', Array('CustomerId', 'FirstName'), Map('AnomalyDetection', '1'))"
查询执行成功:
输出: