收到“psycopg2.errors.BadCopyFileFormat:缺少列数据”错误

问题描述 投票:0回答:1

尝试使用 PostgresHook.bulk_load 将 csv 文件中的数据加载到 Postgres db 时,会发生错误

[2024-08-20, 09:02:01 UTC] {postgres.py:168} INFO - Running copy expert: COPY test_nikita2 FROM STDIN, filename: /opt/airflow/dags/burmistrov/data_nikita/2024-08-19.csv
[2024-08-20, 09:02:01 UTC] {base.py:73} INFO - Using connection ID 'test_db' for task execution.
[2024-08-20, 09:02:01 UTC] {taskinstance.py:1943} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 221, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 209, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/burmistrov/test.py", line 175, in transfer_data_api
    hook.bulk_load('test_nikita2', fname)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/postgres/hooks/postgres.py", line 192, in bulk_load
    self.copy_expert(f"COPY {table} FROM STDIN", tmp_file)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/postgres/hooks/postgres.py", line 176, in copy_expert
    cur.copy_expert(sql, file)
psycopg2.errors.BadCopyFileFormat: missing data for column "impressions"
CONTEXT:  COPY test_nikita2, line 1: "campaign,impressions,clicks,costs"
[2024-08-20, 09:02:01 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=test_dag, task_id=transfer_data_api, execution_date=20240820T090154, start_date=20240820T090158, end_date=20240820T090201
[2024-08-20, 09:02:01 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 70948 for task transfer_data_api (missing data for column "impressions"
CONTEXT:  COPY test_nikita2, line 1: "campaign,impressions,clicks,costs"
; 58980)
[2024-08-20, 09:02:01 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1

csv 中的数据示例:

campaign,impressions,clicks,costs
asd.com_xzc_catvendor,52,27,3042.04
sd-fgh.com_SMEG_k50gen,31,1,15.17

我创建表格的方式:

sql_schema_init = """
CREATE TABLE IF NOT EXISTS test_nikita2 (
    Campaign text NULL,
    Impressions integer NULL,
    Clicks integer NULL,
    Cost float NULL
);
"""

我尝试复制数据的方式:

with open(os.path.join(path+"{}.csv".format(get_dates())), 'w+', encoding='utf-8') as f:
   f.write(resultcsv)                          
   fname = f.name                
try:
   hook = PostgresHook(postgres_conn_id='test_db')
   print(321)
   hook.bulk_load('test_nikita2', fname)                
finally:
   pass

我尝试更改分隔符,但不适用于“;”或者 ”,”。我还尝试将 db 中的数据类型更改为文本,以在其中获取一些内容以查看问题所在,但没有任何内容传递到表中

python postgresql airflow
1个回答
0
投票

PostgresHook 的

bulk_load
函数不支持带标题的 CSV 文件。从 CSV 文件中删除标题(第一行)(即“广告系列、展示次数、点击次数、成本”),就可以了。

© www.soinside.com 2019 - 2024. All rights reserved.