无法在Python中以编程方式插入数组并将数据映射到databricks表中

问题描述 投票:0回答:1

尝试使用 SQLAlchemy 和 databricks-sql-connector 将数组和映射(字典/json)数据插入到 Azure databricks 表中。

出现不同的网络错误

from sqlalchemy import create_engine, Column, String, text, Integer, JSON, BigInteger, Identity, ARRAY
from sqlalchemy.orm import sessionmaker, declarative_base

payload = {
    "schema_name": "catalog_schema",
    "table_name": "policy11",
    "column_names": ["CustomerId", "FirstName", "LastName", "Email", "DOB", "Gender", "AnnualIncome", "StreetAddress", "City", "State", "Country", "Zip", "CreatedDate", "UpdatedDate"],
    "choice": {"AnomalyDetection": "1", "BusinessContext": "1", "DQRules": "1", "Standardization": "0"},
    "standardization": []
}

Base = declarative_base()

class DataQuality(Base):
    __tablename__ = 'data_quality'

    id = Column(BigInteger, Identity(start=1, increment=1), primary_key=True)
    schema_name = Column(String, nullable=False)
    table_name = Column(String, nullable=False)
    column_names = Column(ARRAY(String))
    choice = Column(JSON())
    standardization = Column(ARRAY(String))

engine = create_engine(
  url = f"databricks://token:{access_token}@{server_hostname}?" +
        f"http_path={http_path}&catalog={catalog}&schema={schema}"
)

Session = sessionmaker(bind=engine)
session = Session()

def insert_data(session, data):
    try:
        
        result = {}
        result["schema_name"]=data["schema_name"]
        result["table_name"]=data["table_name"]
        result["column_names"]=data["column_names"]
        result["choice"]=str(data["choice"])
        result["standardization"]=data.get("standardization", [])

        # Add and commit the new record
        dq_config = DataQuality(**result)
        session.add(dq_config)
        session.commit()

    except Exception as e:
        session.rollback()
        print(e)
        
    finally:
        session.close()

insert_data(session, payload)

出现以下错误:(builtins.AttributeError)“DatabricksDialect”对象没有属性“_json_serializer”

还尝试直接使用databricks-sql-connector

connection = sql.connect(server_hostname = server_hostname,
                 http_path       = http_path,
                 access_token    = access_token)
cursor = connection.cursor()


cursor.execute("INSERT INTO dq_config_driven_execution.dq_configuration_test.data_quality (schema_name, table_name, column_names, choice) VALUES ('schema_name', 'table_name', ['CustomerId', 'FirstName'], {'AnomalyDetection': '1'})")

cursor.close()
connection.close()

出现以下错误:databricks.sql.exc.ServerOperationError:[PARSE_SYNTAX_ERROR]“[”处或附近的语法错误。 SQLSTATE: 42601(第 1 行,第 159 位)`

arrays python-3.x dictionary sqlalchemy azure-databricks
1个回答
0
投票

错误:databricks.sql.exc.ServerOperationError:[PARSE_SYNTAX_ERROR]“[”处或附近的语法错误。 SQLSTATE:42601(第 1 行,第 159 位)

对于 databricks sql 连接器,由于语法不正确而导致上述错误。

将数组和映射数据插入到 databricks sql 表时,它应该位于圆括号内。因此,查询如下所示:

INSERT INTO tablename (schema_name, table_name, column_names, choice) VALUES ('schema_name', 'table_name', Array('CustomerId', 'FirstName'), Map('AnomalyDetection', '1'))"

查询执行成功:

enter image description here

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.