尝试使用 PostgresHook.bulk_load 将 csv 文件中的数据加载到 Postgres db 时,会发生错误
[2024-08-20, 09:02:01 UTC] {postgres.py:168} INFO - Running copy expert: COPY test_nikita2 FROM STDIN, filename: /opt/airflow/dags/burmistrov/data_nikita/2024-08-19.csv
[2024-08-20, 09:02:01 UTC] {base.py:73} INFO - Using connection ID 'test_db' for task execution.
[2024-08-20, 09:02:01 UTC] {taskinstance.py:1943} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 221, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 209, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/burmistrov/test.py", line 175, in transfer_data_api
hook.bulk_load('test_nikita2', fname)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/postgres/hooks/postgres.py", line 192, in bulk_load
self.copy_expert(f"COPY {table} FROM STDIN", tmp_file)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/postgres/hooks/postgres.py", line 176, in copy_expert
cur.copy_expert(sql, file)
psycopg2.errors.BadCopyFileFormat: missing data for column "impressions"
CONTEXT: COPY test_nikita2, line 1: "campaign,impressions,clicks,costs"
[2024-08-20, 09:02:01 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=test_dag, task_id=transfer_data_api, execution_date=20240820T090154, start_date=20240820T090158, end_date=20240820T090201
[2024-08-20, 09:02:01 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 70948 for task transfer_data_api (missing data for column "impressions"
CONTEXT: COPY test_nikita2, line 1: "campaign,impressions,clicks,costs"
; 58980)
[2024-08-20, 09:02:01 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
csv 中的数据示例:
campaign,impressions,clicks,costs
asd.com_xzc_catvendor,52,27,3042.04
sd-fgh.com_SMEG_k50gen,31,1,15.17
我创建表格的方式:
sql_schema_init = """
CREATE TABLE IF NOT EXISTS test_nikita2 (
Campaign text NULL,
Impressions integer NULL,
Clicks integer NULL,
Cost float NULL
);
"""
我尝试复制数据的方式:
with open(os.path.join(path+"{}.csv".format(get_dates())), 'w+', encoding='utf-8') as f:
f.write(resultcsv)
fname = f.name
try:
hook = PostgresHook(postgres_conn_id='test_db')
print(321)
hook.bulk_load('test_nikita2', fname)
finally:
pass
我尝试更改分隔符,但不适用于“;”或者 ”,”。我还尝试将 db 中的数据类型更改为文本,以在其中获取一些内容以查看问题所在,但没有任何内容传递到表中
PostgresHook 的
bulk_load
函数不支持带标题的 CSV 文件。从 CSV 文件中删除标题(第一行)(即“广告系列、展示次数、点击次数、成本”),就可以了。