Python Queue 模块在线程中使用时会卡住

问题描述 投票:0回答:1

我正在尝试使用

Queue
功能实现
producer-consumer
操作。 生产者在一个线程中从数据库读取数据,并将项目添加到队列中,该队列又被消费者在第二个线程中消费。

问题是每当我尝试将项目添加到队列中时,操作代码都会卡在

inqueue.put(raw_record)
行中。我不知道为什么。

有一个

maxsize
用于控制数据流。但是,即使队列为空,也会出现问题。

生产者操作的代码如下:

import sqlite3

from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread

from dare.logsetup import Logger

logger = Logger().get_logger()

class RawData:
    def __init__(self, 
                 db_path: str) -> None:
        self.read_frequency = 1
        self.read_db_path = db_path
        self.inqueue = Queue(maxsize=8)

    def _setup(self):
        conn = sqlite3.connect(self.read_db_path)
        self.cursor = conn.cursor()

    def fetch_surface_data(self,  
                           read_yet: int, 
                           num_read: int = 1) -> List[Tuple]:

        query = f"""SELECT seq, "Time & Date", Depth
        FROM "Table Name" 
        LIMIT {num_read} 
        OFFSET {read_yet};"""

        self.cursor.execute(query)
        return self.cursor.fetchall()
    
    def fetch_total_records(self) -> int:
        query = """SELECT count(*) FROM "Table Name";"""
        self.cursor.execute(query)
        return self.cursor.fetchone()[0]

    def process_raw_data(self):
        self._setup()
        read_yet = 0
        total_records = self.fetch_total_records()
        
        while read_yet < total_records:
            
            record = self.fetch_surface_data(read_yet)
            read_yet += 1

            logger.info(f"Rows read:{read_yet}")
            # Add converted_record to buffer for cleaning
            self.inqueue.put(record)
        self.inqueue.put(None) # sentinal value

    def clean_converted_data(self) -> List[Optional[float]]:
        data = []
        initial_setup_done = False
        last_val = None

        while True:
            with self.inqueue.not_empty:
                self.inqueue.not_empty.wait_for(self.inqueue.full())

                if not initial_setup_done:
                    first_record = self.inqueue.get()
                    if first_record is None:
                        logger.error("First record is None...")
                        raise ValueError("First record is None...")
                    last_val = first_record
                    data.append(last_val)
                    initial_setup_done = True

                while not self.inqueue.empty():
                    try:
                        record = self.inqueue.get(timeout=1)  # Get data from the queue
                        logger.info(f"Consumer thread record: {record}")
                        if record is None:  # Sentinel value to stop the loop
                            return data
                        last_val = record
                    except Empty:
                        continue  # Continue if no new data is available



if __name__ == '__main__':
    db_path = "path to sqlite db"
    raw = RawData(db_path=db_path)
    
    producer = Thread(target=raw.process_raw_data)
    logger.info("Starting Producer thread...")
    producer.start()
    
    consumer = Thread(target=raw.clean_converted_data)
    logger.info("Starting Consumer thread...")
    consumer.start()
    
    producer.join()
    consumer.join()

由于没有抛出错误,我只是对如何解决该问题感到困惑。您的建议将非常有价值。

python queue
1个回答
0
投票

您的实现是错误的,它与线程本身无关。 为了能够运行您的脚本,我重新编写了它以消除对第三方库或外部数据库的所有依赖。 然后它看起来像这样:

from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
import time

class RawData:
    def __init__(self) -> None:
        self.read_frequency = 1
        self.inqueue: Queue = Queue(maxsize=8)

    def fetch_surface_data(self, read_yet: int) -> str:
        return f"Data {read_yet}"
    
    def process_raw_data(self):
        read_yet = 0
        total_records = 10
        while read_yet < total_records:
            record = self.fetch_surface_data(read_yet)
            read_yet += 1
            print(f"Read:{read_yet}")
            self.inqueue.put(record)
            time.sleep(1.0 / self.read_frequency)
        self.inqueue.put(None) # sentinal value

    def clean_converted_data(self) -> List[Optional[float]]:
        data = []
        initial_setup_done = False
        last_val = None

        while True:
            with self.inqueue.not_empty:
                self.inqueue.not_empty.wait_for(self.inqueue.full())

                if not initial_setup_done:
                    first_record = self.inqueue.get()
                    if first_record is None:
                        print("First record is None...")
                        raise ValueError("First record is None...")
                    last_val = first_record
                    data.append(last_val)
                    initial_setup_done = True

                while not self.inqueue.empty():
                    try:
                        record = self.inqueue.get(timeout=1)
                        print(f"Consumer thread record: {record}")
                        if record is None:  # Sentinel value to stop the loop
                            return data
                        last_val = record
                        # Append to data seems to be missing??
                    except Empty:
                        continue  # Continue if no new data is available

if __name__ == '__main__':
    raw = RawData()
    
    producer = Thread(target=raw.process_raw_data)
    print("Starting Producer thread...")
    producer.start()
    
    consumer = Thread(target=raw.clean_converted_data)
    print("Starting Consumer thread...")
    consumer.start()
    
    producer.join()
    print("Producer thread closed")
    consumer.join()
    print("Consumer thread closed")

我只是格式化并传递一些字符串,而不是从数据库中读取。 该脚本还锁定了语句

self.inqueue.put(record)
。 我认为值得理解为什么。

在您的消费者线程中有这样的声明:

with self.inqueue.not_empty:

成员对象Queue.not_empty是一个threading.Condition,它的上下文管理器将获取Condition的锁并且不会释放它。 这个对象是 Queue 内部实现的一部分,没有记录,所以我不知道你是如何知道它在那里的。

当你的生产者线程向队列添加一个对象时,它会尝试通知Queue.not_empty。 为此,它必须获取锁,但它永远无法做到这一点,因为消费者线程正在持有它。 这也是Queue内部实现的一部分。 所以你的程序就挂在那里。

这绝对不是您应该使用队列的方式。 坚持记录的功能;他们有你需要的一切。 编写 Python 标准库的人知道他们在做什么。

旁注:您的消费者线程还包含此行:

self.inqueue.not_empty.wait_for(self.inqueue.full())

这是一个语法错误。 Condition.wait_for 的第一个参数应该是 Callable,并且您正在传递一个布尔值 - full() 的返回值。 您不会收到错误消息,因为您的代码永远不会达到那么远,原因已经解释过。

解决方案是删除所有不使用已记录函数的代码。 脚本更短,现在可以运行了。

from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
import time

class RawData:
    def __init__(self) -> None:
        self.read_frequency = 1
        self.inqueue: Queue = Queue(maxsize=8)

    def fetch_surface_data(self, read_yet: int) -> str:
        return f"Data {read_yet}"
    
    def process_raw_data(self):
        read_yet = 0
        total_records = 10
        while read_yet < total_records:
            record = self.fetch_surface_data(read_yet)
            print(f"Read:{read_yet}")
            read_yet += 1
            self.inqueue.put(record)
            time.sleep(1.0 / self.read_frequency)
        self.inqueue.put(None) # sentinal value

    def clean_converted_data(self) -> list[str]:
        data: list[str] = []

        first_record = self.inqueue.get()
        if first_record is None:
            print("First record is None...")
            raise ValueError("First record is None...")
        print(f"First record {first_record}")
        data.append(first_record)
        while True:
            record = self.inqueue.get()
            print(f"Consumer thread record: {record}")
            if record is None:  # Sentinel value to stop the loop
                break
            data.append(record)
        return data

if __name__ == '__main__':
    raw = RawData()
    
    producer = Thread(target=raw.process_raw_data)
    print("Starting Producer thread...")
    producer.start()
    
    consumer = Thread(target=raw.clean_converted_data)
    print("Starting Consumer thread...")
    consumer.start()
    
    producer.join()
    print("Producer thread closed")
    consumer.join()
    print("Consumer thread closed")

我还将处理队列中第一项的代码移到了 while 循环之外,并在 while 循环内添加了 data.append() 语句,因为它似乎丢失了。 我删除了 Queue.get() 函数的超时,因为您无论如何都抑制了异常。

© www.soinside.com 2019 - 2024. All rights reserved.