我正在尝试使用 python 的
postgresql
模块并行读取和写入 multiprocessing
数据库。我有两个进程,read_database
和write_database
。我正在使用 write_database
函数(在循环中)编写条目,如果我看到有特定数字的条目(在本例中为 60
),则并行读取该特定行并在内部完成一些处理写入时的 read_database
功能是并行进行的。这是我的示例代码:
import psycopg2
import multiprocessing as mp
def initialize_database(database_name):
conn = psycopg2.connect(database=database_name,
user='user',
host='host_name',
password='password')
return conn
def write_database(list_id, database_name, table_name, process):
conn = initialize_database(database_name)
cur = conn.cursor()
for i in list_id:
cur.execute("INSERT INTO "+str(table_name)+"(emp_id) VALUES(%s)",
(i,))
conn.commit()
print(str(process)+ ' writes into the database')
def read_database(num_mult, database_name, table_name, process):
conn = initialize_database(database_name)
cur = conn.cursor()
cur.execute("select * from "+str(table_name)+" where emp_id = "
+str(num_mult))
data = cur.fetchall()
print(data)
cur.execute("update " + str(table_name) +
" set emp_id = 1000 where emp_id = %s", (data[0][1],))
conn.commit()
print(str(process) + ' reads from the database')
if __name__ == '__main__':
database_name = 'db_name'
table_name = 'test_table'
conn = initialize_database(database_name)
try:
cur = conn.cursor()
cur.execute("DROP TABLE " + str(table_name))
conn.commit()
cur.close()
except psycopg2.errors.UndefinedTable:
print('A new table name ' + str(table_name) + ' is created')
cur = conn.cursor()
cur.execute(""" CREATE TABLE IF NOT EXISTS """ + str(table_name) + """(
ID BIGSERIAL PRIMARY KEY,
emp_id BIGINT);
""")
conn.commit()
num_range = [i for i in range(0, 100)]
p1 = mp.Process(target=write_database, args=(num_range, database_name, table_name, 'Process 1'))
p2 = mp.Process(target=read_database, args=(60, database_name, table_name, 'Process 2'))
p1.start()
p2.start()
p1.join()
p2.join()
我有以下问题:
data
中的列表read_database
超出范围。这是因为 data
是 empty,我使用 print
函数验证了这一点。如果并行阅读有效,则 emp_id
(即 60
)应更新为 1000
。我是不是错过了什么?
正如我提到的,我认为您不需要多重处理(除非您想并行执行插入并且不关心插入顺序)
以下是如何使用
threading
做到这一点
import psycopg2
import threading
import time
def get_connection():
conn = psycopg2.connect(
dbname="your_database_name",
user="your_username",
password="your_password",
host="your_host",
port="your_port",
)
return conn
def create_table():
conn = get_connection()
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS test_table (
id SERIAL PRIMARY KEY,
value INTEGER
)
"""
)
conn.commit()
conn.close()
def insert_numbers():
conn =get_connection()
cur = conn.cursor()
for i in range(1, 101):
cur.execute("INSERT INTO test_table (value) VALUES (%s)", (i,))
conn.commit()
time.sleep(0.1)
conn.close()
def check_table():
conn =get_connection()
cur = conn.cursor()
while True:
cur.execute("SELECT * FROM test_table WHERE value = %s", (50,))
row = cur.fetchone()
if row:
print("Value 50 found!")
break
time.sleep(1)
conn.close()
def main():
create_table()
check_thread = threading.Thread(target=check_table)
insert_thread = threading.Thread(target=insert_numbers)
check_thread.start()
insert_thread.start()
check_thread.join()
insert_thread.join()
if __name__ == "__main__":
main()
这将开始每 0.1 秒插入一次数字,并每 1 秒检查一次值 50。