以下是我的脚本的核心步骤和逻辑:
PostgreSQLDB
类对象vw_valid_case_from_db1
获取将包含的 case_id
列表df_db1_case_table
是一个pandas df,它从名为db1的表中提取信息df_db1_case_table
db2_case_table
的表,我想批量插入过滤后的df_db1_case_table
(或名为df_db1_case_table_filtered
)中的信息。 df_db1_case_table_filtered
中的列名称与 db2_case_table
Exception has occurred: DatatypeMismatch column "occurence_timestamp" is of type timestamp without time zone but expression is of type bigint LINE 1: ...7.0, 2259027.0, NULL, 'CA23307772', NULL, '1441', 1689711600... HINT: You will need to rewrite or cast the expression. File "some_path_to.py", line 170, in <module> cursor.executemany(insert_query, data_to_insert) psycopg2.errors.DatatypeMismatch: column "occurence_timestamp" is of type timestamp without time zone but expression is of type bigint LINE 1: ...7.0, 2259027.0, NULL, 'CA23307772', NULL, '1441', 1689711600... HINT: You will need to rewrite or cast the expression.
for col in timestamp_columns:
部分是我尝试解决这个问题的方法。即,如果 (whatever)_timestamp 的数据类型为“int64”或“float64”格式,则将强制为日期时间格式。但是,它不起作用并且出现相同的错误。['occurence_timestamp', 'reported_timestamp', 'created_timestamp', 'modified_timestamp', 'agency_extract_timestamp', 'city_extract_timestamp', 'pdf_extract_timestamp' ]
,脚本可以正常工作。脚本如下:
import pandas as pd
import os
import logging
from datetime import datetime
from helper_db_operation import PostgreSQLDB
# Set up logging configuration
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Some database credential code
postgres_db = PostgreSQLDB(postgres_user, postgres_password, postgres_host, postgres_db_name)
# Query to fetch valid case IDs from db1 view
sql_query_get_valid_case_case_from_db1 = """
SELECT case_id
FROM vw_valid_case_from_db1
"""
# Execute query and load results into a Pandas DataFrame
try:
df_db1_valid_cases = pd.read_sql(sql_query_get_valid_case_case_from_db1, postgres_db.conn)
except Exception as e:
logging.error(f"Error while fetching data from db1 view: {e}")
raise
# 1.2) connect db1_case, then apply the filter from 1.1) to exclude invalid case
# Query to fetch all case from db1 table
sql_query_get_case_table_from_db1 = """
SELECT *
FROM public.db1_case
"""
# Execute query and load results into a Pandas DataFrame
df_db1_case_table = pd.read_sql(sql_query_get_case_table_from_db1, postgres_db.conn)
# Apply the filter to include only those case that are in the valid list from df_db1_valid_cases
valid_case_ids = df_db1_valid_cases['case_id'].tolist()
# Filter df_db1_case_table to include only rows where the ID is in the valid case IDs
df_db1_case_table_filtered = df_db1_case_table[df_db1_case_table['id'].isin(valid_case_ids)]
# Check the result of the filtering
logging.debug(f"Filtered {len(df_db1_case_table_filtered)} valid case.")
target_table = 'db2_case_table'
# Fetch the target table schema to determine the column names dynamically
try:
query_table_schema = f"""
SELECT column_name
FROM information_schema.columns
WHERE table_name = '{target_table}';
"""
df_table_schema = pd.read_sql(query_table_schema, postgres_db.conn)
target_columns = df_table_schema['column_name'].tolist()
except Exception as e:
logging.error(f"Error fetching schema for table {target_table}: {e}")
raise
# Convert epoch timestamps to datetime for `occurence_timestamp` and other timestamp columns
timestamp_columns = [
'occurence_timestamp', 'reported_timestamp', 'created_timestamp', 'modified_timestamp',
'agency_extract_timestamp', 'city_extract_timestamp', 'pdf_extract_timestamp'
]
for col in timestamp_columns:
if col in df_db1_case_table_filtered.columns:
if df_db1_case_table_filtered[col].dtype in ['int64', 'float64']:
df_db1_case_table_filtered[col] = pd.to_datetime(
df_db1_case_table_filtered[col], unit='s', errors='coerce'
)
elif pd.api.types.is_datetime64_any_dtype(df_db1_case_table_filtered[col]):
df_db1_case_table_filtered[col] = df_db1_case_table_filtered[col].dt.tz_localize(None)
# Ensure the correct columns are included in `df_for_insertion`
df_for_insertion = df_db1_case_table_filtered[
[col for col in df_db1_case_table_filtered.columns if col in target_columns]
]
logging.info(df_for_insertion.dtypes)
logging.info(df_for_insertion[['occurence_timestamp']].head())
# Insert the filtered and dynamically mapped data into PostgreSQL
try:
# Convert the DataFrame into a list of tuples
data_to_insert = df_for_insertion.to_records(index=False).tolist()
logging.info(data_to_insert[:5])
# Generate the INSERT query dynamically based on the DataFrame columns
columns = ', '.join(df_for_insertion.columns)
placeholders = ', '.join(['%s'] * len(df_for_insertion.columns))
insert_query = f"INSERT INTO {target_table} ({columns}) VALUES ({placeholders})"
# Execute the batch insert
with postgres_db.conn.cursor() as cursor:
cursor.executemany(insert_query, data_to_insert)
postgres_db.conn.commit()
except Exception as e:
logging.error(f"Error while inserting data into table {target_table}: {e}")
raise
附加信息: 这是数据框中列类型检查的打印结果
occurence_timestamp datetime64[ns]
reported_timestamp datetime64[ns]
这是occurrence_timestamp值的示例示例
occurence_timestamp
0 2023-07-18 20:20:00
1 2023-09-21 17:00:00
2 2023-09-21 15:48:00
3 2023-09-21 21:30:00
4 2023-09-11 08:45:00
尝试将日期时间转换为字符串格式
df['occurence_timestamp'] = df['occurence_timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')
不幸的是,我无法测试,因为我本地没有 postgres。