我正在尝试使用
Queue
功能实现 producer-consumer
操作。
生产者在一个线程中从数据库读取数据,并将项目添加到队列中,该队列又被消费者在第二个线程中消费。
问题是每当我尝试将项目添加到队列中时,操作代码都会卡在
inqueue.put(raw_record)
行中。我不知道为什么。
有一个
maxsize
用于控制数据流。但是,即使队列为空,也会出现问题。
生产者操作的代码如下:
import sqlite3
from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
from dare.logsetup import Logger
logger = Logger().get_logger()
class RawData:
def __init__(self,
db_path: str) -> None:
self.read_frequency = 1
self.read_db_path = db_path
self.inqueue = Queue(maxsize=8)
def _setup(self):
conn = sqlite3.connect(self.read_db_path)
self.cursor = conn.cursor()
def fetch_surface_data(self,
read_yet: int,
num_read: int = 1) -> List[Tuple]:
query = f"""SELECT seq, "Time & Date", Depth
FROM "Table Name"
LIMIT {num_read}
OFFSET {read_yet};"""
self.cursor.execute(query)
return self.cursor.fetchall()
def fetch_total_records(self) -> int:
query = """SELECT count(*) FROM "Table Name";"""
self.cursor.execute(query)
return self.cursor.fetchone()[0]
def process_raw_data(self):
self._setup()
read_yet = 0
total_records = self.fetch_total_records()
while read_yet < total_records:
record = self.fetch_surface_data(read_yet)
read_yet += 1
logger.info(f"Rows read:{read_yet}")
# Add converted_record to buffer for cleaning
self.inqueue.put(record)
self.inqueue.put(None) # sentinal value
def clean_converted_data(self) -> List[Optional[float]]:
data = []
initial_setup_done = False
last_val = None
while True:
with self.inqueue.not_empty:
self.inqueue.not_empty.wait_for(self.inqueue.full())
if not initial_setup_done:
first_record = self.inqueue.get()
if first_record is None:
logger.error("First record is None...")
raise ValueError("First record is None...")
last_val = first_record
data.append(last_val)
initial_setup_done = True
while not self.inqueue.empty():
try:
record = self.inqueue.get(timeout=1) # Get data from the queue
logger.info(f"Consumer thread record: {record}")
if record is None: # Sentinel value to stop the loop
return data
last_val = record
except Empty:
continue # Continue if no new data is available
if __name__ == '__main__':
db_path = "path to sqlite db"
raw = RawData(db_path=db_path)
producer = Thread(target=raw.process_raw_data)
logger.info("Starting Producer thread...")
producer.start()
consumer = Thread(target=raw.clean_converted_data)
logger.info("Starting Consumer thread...")
consumer.start()
producer.join()
consumer.join()
由于没有抛出错误,我只是对如何解决该问题感到困惑。您的建议将非常有价值。
您的实现是错误的,它与线程本身无关。 为了能够运行您的脚本,我重新编写了它以消除对第三方库或外部数据库的所有依赖。 然后它看起来像这样:
from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
import time
class RawData:
def __init__(self) -> None:
self.read_frequency = 1
self.inqueue: Queue = Queue(maxsize=8)
def fetch_surface_data(self, read_yet: int) -> str:
return f"Data {read_yet}"
def process_raw_data(self):
read_yet = 0
total_records = 10
while read_yet < total_records:
record = self.fetch_surface_data(read_yet)
read_yet += 1
print(f"Read:{read_yet}")
self.inqueue.put(record)
time.sleep(1.0 / self.read_frequency)
self.inqueue.put(None) # sentinal value
def clean_converted_data(self) -> List[Optional[float]]:
data = []
initial_setup_done = False
last_val = None
while True:
with self.inqueue.not_empty:
self.inqueue.not_empty.wait_for(self.inqueue.full())
if not initial_setup_done:
first_record = self.inqueue.get()
if first_record is None:
print("First record is None...")
raise ValueError("First record is None...")
last_val = first_record
data.append(last_val)
initial_setup_done = True
while not self.inqueue.empty():
try:
record = self.inqueue.get(timeout=1)
print(f"Consumer thread record: {record}")
if record is None: # Sentinel value to stop the loop
return data
last_val = record
# Append to data seems to be missing??
except Empty:
continue # Continue if no new data is available
if __name__ == '__main__':
raw = RawData()
producer = Thread(target=raw.process_raw_data)
print("Starting Producer thread...")
producer.start()
consumer = Thread(target=raw.clean_converted_data)
print("Starting Consumer thread...")
consumer.start()
producer.join()
print("Producer thread closed")
consumer.join()
print("Consumer thread closed")
我只是格式化并传递一些字符串,而不是从数据库中读取。 该脚本还锁定了语句
self.inqueue.put(record)
。 我认为值得理解为什么。
在您的消费者线程中有这样的声明:
with self.inqueue.not_empty:
成员对象Queue.not_empty是一个threading.Condition,它的上下文管理器将获取Condition的锁并且不会释放它。 这个对象是 Queue 内部实现的一部分,没有记录,所以我不知道你是如何知道它在那里的。
当你的生产者线程向队列添加一个对象时,它会尝试通知Queue.not_empty。 为此,它必须获取锁,但它永远无法做到这一点,因为消费者线程正在持有它。 这也是Queue内部实现的一部分。 所以你的程序就挂在那里。
这绝对不是您应该使用队列的方式。 坚持记录的功能;他们有你需要的一切。 编写 Python 标准库的人知道他们在做什么。
旁注:您的消费者线程还包含此行:
self.inqueue.not_empty.wait_for(self.inqueue.full())
这是一个语法错误。 Condition.wait_for 的第一个参数应该是 Callable,并且您正在传递一个布尔值 - full() 的返回值。 您不会收到错误消息,因为您的代码永远不会达到那么远,原因已经解释过。
解决方案是删除所有不使用已记录函数的代码。 脚本更短,现在可以运行了。
from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
import time
class RawData:
def __init__(self) -> None:
self.read_frequency = 1
self.inqueue: Queue = Queue(maxsize=8)
def fetch_surface_data(self, read_yet: int) -> str:
return f"Data {read_yet}"
def process_raw_data(self):
read_yet = 0
total_records = 10
while read_yet < total_records:
record = self.fetch_surface_data(read_yet)
print(f"Read:{read_yet}")
read_yet += 1
self.inqueue.put(record)
time.sleep(1.0 / self.read_frequency)
self.inqueue.put(None) # sentinal value
def clean_converted_data(self) -> list[str]:
data: list[str] = []
first_record = self.inqueue.get()
if first_record is None:
print("First record is None...")
raise ValueError("First record is None...")
print(f"First record {first_record}")
data.append(first_record)
while True:
record = self.inqueue.get()
print(f"Consumer thread record: {record}")
if record is None: # Sentinel value to stop the loop
break
data.append(record)
return data
if __name__ == '__main__':
raw = RawData()
producer = Thread(target=raw.process_raw_data)
print("Starting Producer thread...")
producer.start()
consumer = Thread(target=raw.clean_converted_data)
print("Starting Consumer thread...")
consumer.start()
producer.join()
print("Producer thread closed")
consumer.join()
print("Consumer thread closed")
我还将处理队列中第一项的代码移到了 while 循环之外,并在 while 循环内添加了 data.append() 语句,因为它似乎丢失了。 我删除了 Queue.get() 函数的超时,因为您无论如何都抑制了异常。