发生异常:数据类型不匹配列“occurrence_timestamp”的类型为没有时区的时间戳,但表达式的类型为bigint

问题描述 投票:0回答:1

以下是我的脚本的核心步骤和逻辑:

  1. 创建并实例化一个进行数据库操作的
    PostgreSQLDB
    类对象
  2. 使用视图
    vw_valid_case_from_db1
    获取将包含的
    case_id
    列表
  3. df_db1_case_table
    是一个pandas df,它从名为db1的表中提取信息
  4. 过滤并仅包含
    df_db1_case_table
  5. 中的行的 2 种情况
  6. 我建立了一个名为
    db2_case_table
    的表,我想批量插入过滤后的
    df_db1_case_table
    (或名为
    df_db1_case_table_filtered
    )中的信息。
    df_db1_case_table_filtered
    中的列名称与
    db2_case_table
  7. 中的列匹配
  8. 问题是我收到此错误
    Exception has occurred: DatatypeMismatch column "occurence_timestamp" is of type timestamp without time zone but expression is of type bigint LINE 1: ...7.0, 2259027.0, NULL, 'CA23307772', NULL, '1441', 1689711600... HINT:  You will need to rewrite or cast the expression. File "some_path_to.py", line 170, in <module> cursor.executemany(insert_query, data_to_insert) psycopg2.errors.DatatypeMismatch: column "occurence_timestamp" is of type timestamp without time zone but expression is of type bigint LINE 1: ...7.0, 2259027.0, NULL, 'CA23307772', NULL, '1441', 1689711600... HINT:  You will need to rewrite or cast the expression.
  9. 下面代码的
    for col in timestamp_columns:
    部分是我尝试解决这个问题的方法。即,如果 (whatever)_timestamp 的数据类型为“int64”或“float64”格式,则将强制为日期时间格式。但是,它不起作用并且出现相同的错误。
  10. 其他信息:如果我删除所有(无论)_timestamp 字段
    ['occurence_timestamp', 'reported_timestamp', 'created_timestamp', 'modified_timestamp', 'agency_extract_timestamp', 'city_extract_timestamp', 'pdf_extract_timestamp' ]
    ,脚本可以正常工作。

脚本如下:

import pandas as pd
import os
import logging
from datetime import datetime
from helper_db_operation import PostgreSQLDB

# Set up logging configuration
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Some database credential code
postgres_db = PostgreSQLDB(postgres_user, postgres_password, postgres_host, postgres_db_name)

# Query to fetch valid case IDs from db1 view
sql_query_get_valid_case_case_from_db1 = """
    SELECT case_id
    FROM vw_valid_case_from_db1
"""

# Execute query and load results into a Pandas DataFrame
try:
    df_db1_valid_cases = pd.read_sql(sql_query_get_valid_case_case_from_db1, postgres_db.conn)
except Exception as e:
    logging.error(f"Error while fetching data from db1 view: {e}")
    raise

# 1.2) connect db1_case, then apply the filter from 1.1) to exclude invalid case
# Query to fetch all case from db1 table
sql_query_get_case_table_from_db1 = """
    SELECT *
    FROM public.db1_case
"""

# Execute query and load results into a Pandas DataFrame
df_db1_case_table = pd.read_sql(sql_query_get_case_table_from_db1, postgres_db.conn)

# Apply the filter to include only those case that are in the valid list from df_db1_valid_cases
valid_case_ids = df_db1_valid_cases['case_id'].tolist()

# Filter df_db1_case_table to include only rows where the ID is in the valid case IDs
df_db1_case_table_filtered = df_db1_case_table[df_db1_case_table['id'].isin(valid_case_ids)]

# Check the result of the filtering
logging.debug(f"Filtered {len(df_db1_case_table_filtered)} valid case.")

target_table = 'db2_case_table'

# Fetch the target table schema to determine the column names dynamically
try:
    query_table_schema = f"""
        SELECT column_name
        FROM information_schema.columns
        WHERE table_name = '{target_table}';
    """
    df_table_schema = pd.read_sql(query_table_schema, postgres_db.conn)
    target_columns = df_table_schema['column_name'].tolist()
except Exception as e:
    logging.error(f"Error fetching schema for table {target_table}: {e}")
    raise

# Convert epoch timestamps to datetime for `occurence_timestamp` and other timestamp columns
timestamp_columns = [
    'occurence_timestamp', 'reported_timestamp', 'created_timestamp', 'modified_timestamp', 
    'agency_extract_timestamp', 'city_extract_timestamp', 'pdf_extract_timestamp'
]

for col in timestamp_columns:
    if col in df_db1_case_table_filtered.columns:
        if df_db1_case_table_filtered[col].dtype in ['int64', 'float64']:
            df_db1_case_table_filtered[col] = pd.to_datetime(
                df_db1_case_table_filtered[col], unit='s', errors='coerce'
            )
        elif pd.api.types.is_datetime64_any_dtype(df_db1_case_table_filtered[col]):
            df_db1_case_table_filtered[col] = df_db1_case_table_filtered[col].dt.tz_localize(None)

# Ensure the correct columns are included in `df_for_insertion`
df_for_insertion = df_db1_case_table_filtered[
    [col for col in df_db1_case_table_filtered.columns if col in target_columns]
]

logging.info(df_for_insertion.dtypes)
logging.info(df_for_insertion[['occurence_timestamp']].head())

# Insert the filtered and dynamically mapped data into PostgreSQL
try:
    # Convert the DataFrame into a list of tuples
    data_to_insert = df_for_insertion.to_records(index=False).tolist()
    logging.info(data_to_insert[:5])

    # Generate the INSERT query dynamically based on the DataFrame columns
    columns = ', '.join(df_for_insertion.columns)
    placeholders = ', '.join(['%s'] * len(df_for_insertion.columns))
    insert_query = f"INSERT INTO {target_table} ({columns}) VALUES ({placeholders})"

    # Execute the batch insert
    with postgres_db.conn.cursor() as cursor:
        cursor.executemany(insert_query, data_to_insert)
        postgres_db.conn.commit()
    
except Exception as e:
    logging.error(f"Error while inserting data into table {target_table}: {e}")
    raise

附加信息: 这是数据框中列类型检查的打印结果

occurence_timestamp             datetime64[ns]
reported_timestamp              datetime64[ns]

这是occurrence_timestamp值的示例示例

occurence_timestamp
0 2023-07-18 20:20:00
1 2023-09-21 17:00:00
2 2023-09-21 15:48:00
3 2023-09-21 21:30:00
4 2023-09-11 08:45:00
python sql python-3.x pandas postgresql
1个回答
0
投票

尝试将日期时间转换为字符串格式

df['occurence_timestamp'] = df['occurence_timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

不幸的是,我无法测试,因为我本地没有 postgres。

© www.soinside.com 2019 - 2024. All rights reserved.