我有一项任务,需要将数据从一个 SQLite 表复制到几乎相同的 postgreSQL 表。我写了一个不起作用的代码,我不知道问题出在哪里。它打印连接已成功建立,没有显示任何错误,但是当我进入终端使用命令
content.film_work
检查 SELECT * FROM content.film_work;
表时,我看不到数据。它显示:
movies_database=# SELECT * FROM content.film_work;
created | modified | id | title | description | creation_date | rating | type | file_path
---------+----------+----+-------+-------------+---------------+--------+------+-----------
(0 rows)
以下是一些步骤:
SQLite 表(电影作品):
0|id|TEXT|0||1
1|title|TEXT|1||0
2|description|TEXT|0||0
3|creation_date|DATE|0||0
4|file_path|TEXT|0||0
5|rating|FLOAT|0||0
6|type|TEXT|1||0
7|created_at|timestamp with time zone|0||0
8|updated_at|timestamp with time zone|0||0
postgreSQL 表(content.film_work):
created | modified | id | title | description | creation_date | rating | type | file_path
代码片段:
psycopg2.extras.register_uuid()
db_path = 'db.sqlite'
@contextmanager
def conn_context(db_path: str):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
@dataclass
class FilmWork:
created_at: date = None
updated_at: date = None
id: uuid.UUID = field(default_factory=uuid.uuid4)
title: str = ''
description: str = ''
creation_date: date = None
rating: float = 0.0
type: str = ''
file_path: str = ''
def __post_init__(self):
if self.creation_date is None:
self.creation_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
if self.created_at is None:
self.created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
if self.updated_at is None:
self.updated_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
if self.description is None:
self.description = 'Нет описания'
if self.rating is None:
self.rating = 0.0
def copy_from_sqlite():
with conn_context(db_path) as connection:
cursor = connection.cursor()
cursor.execute("SELECT * FROM film_work;")
result = cursor.fetchall()
films = [FilmWork(**dict(film)) for film in result]
save_film_work_to_postgres(films)
def save_film_work_to_postgres(films: list):
psycopg2.extras.register_uuid()
dsn = {
'dbname': 'movies_database',
'user': 'app',
'password': '123qwe',
'host': 'localhost',
'port': 5432,
'options': '-c search_path=content',
}
try:
conn = psycopg2.connect(**dsn)
print("Successfull connection!")
with conn.cursor() as cursor:
cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = 'film_work' ORDER BY ordinal_position;")
column_names_list = [row[0] for row in cursor.fetchall()]
column_names_str = ','.join(column_names_list)
col_count = ', '.join(['%s'] * len(column_names_list))
bind_values = ','.join(cursor.mogrify(f"({col_count})", astuple(film)).decode('utf-8') for film in films)
cursor.execute(f"""INSERT INTO content.film_work ({column_names_str}) VALUES {bind_values} """)
except psycopg2.Error as _e:
print("Ошибка:", _e)
finally:
if conn is not None:
conn.close()
copy_from_sqlite()
检查此代码是否正确执行,进行了一些更改:数据格式、字段初始化并修复了列名称不匹配
import sqlite3
import psycopg2
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from psycopg2 import extras
from uuid import UUID
psycopg2.extras.register_uuid()
db_path = 'db.sqlite'
@contextmanager
def conn_context(db_path: str):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
@dataclass
class FilmWork:
created_at: datetime = None
updated_at: datetime = None
id: UUID = field(default_factory=uuid.uuid4)
title: str = ''
description: str = ''
creation_date: datetime = None
rating: float = 0.0
type: str = ''
file_path: str = ''
def __post_init__(self):
if self.creation_date is None:
self.creation_date = datetime.now()
if self.created_at is None:
self.created_at = datetime.now()
if self.updated_at is None:
self.updated_at = datetime.now()
if self.description is None:
self.description = 'Нет описания'
if self.rating is None:
self.rating = 0.0
def copy_from_sqlite():
with conn_context(db_path) as connection:
cursor = connection.cursor()
cursor.execute("SELECT * FROM film_work;")
result = cursor.fetchall()
films = [FilmWork(**dict(film)) for film in result]
save_film_work_to_postgres(films)
def save_film_work_to_postgres(films: list):
dsn = {
'dbname': 'movies_database',
'user': 'app',
'password': '123qwe',
'host': 'localhost',
'port': 5432,
'options': '-c search_path=content',
}
try:
conn = psycopg2.connect(**dsn)
print("Successful connection!")
with conn.cursor() as cursor:
cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = 'film_work' ORDER BY ordinal_position;")
column_names_list = [row[0] for row in cursor.fetchall()]
column_names_str = ','.join(column_names_list)
col_count = ', '.join(['%s'] * len(column_names_list))
bind_values = ','.join(cursor.mogrify(f"({col_count})", film).decode('utf-8') for film in films)
cursor.execute(f"""INSERT INTO content.film_work ({column_names_str}) VALUES {bind_values} """)
conn.commit() # Don't forget to commit changes
except psycopg2.Error as _e:
print("Ошибка:", _e)
finally:
if conn is not None:
conn.close()
copy_from_sqlite()