我有一个仅写入文件的进程和多个仅读取文件的进程。如何让N个进程同时读取?

问题描述 投票:0回答:2

我正在开发一个多代理系统,其中一个 Writer 和多个 Reader 与文件交互。写入者更新文件,读取者从中读取。我正在使用 Python 的多处理模块,为了加快速度,当 Writer 正在做其他事情时,Readers 可以同时读取,当 Writer 想要写入时,他拥有优先级,并且是唯一可以访问该模块的人。那一刻归档

import multiprocessing
import time

class SharedData:
    def __init__(self):
        self.value = 0
        self.condition = multiprocessing.Condition()

    def modify_value(self, agent_id):
        with self.condition:
            print(f"Agent {agent_id} is modifying the value.")
            self.value += 1
            # Notify waiting processes that a modification has occurred
            self.condition.notify_all()

    def read_value(self, agent_id):
        with self.condition:
            # code....
            self.condition.wait()

            # Non-critical section for reading
            print(f"Agent {agent_id} is reading the value: {self.value}")


if __name__ == "__main__":
    # Create shared data
    shared_data = SharedData()

    # Create agent processes
    agents = [multiprocessing.Process(target=read_value, args=(shared_data, i)) for i in range(4)]
 agents.append(multiprocessing.Process(target=modify_value, args=(shared_data, i)) )
    # Start the agents
    [agent.start() for agent in agents]

    # Wait for all agents to finish
    [agent.join() for agent in agents]

    # Access the shared data after all agents have finished
    with shared_data.condition:
        final_value = shared_data.value
        print("Final Value:", final_value)



python multiprocessing
2个回答
0
投票

你说,“......当作家想要写作时,他拥有优先权,并且他(原文如此)是当时唯一可以访问文件的人”。您当然希望编写者在写入/更新数据时拥有独占控制权。否则,读者将读取到可能不一致的数据。相反,一旦读取者控制了共享数据,写入者就应该被锁定,无法对数据进行更改。让作家能够随时写作似乎与此有些矛盾。作者必须有某种方法让读者停止阅读并立即释放对数据的持有。但我认为这需要合作,因为我们不能在某种不确定的状态下停止读者的处理。这似乎需要某种可共享的“停止”变量,作者可以设置该变量,使读者尽快终止阅读。

我创建了类

RWLock
(读/写锁)。它是用读取器线程/进程的数量来实例化的。其工作原理如下:

  1. 每个读取器任务(线程或进程)都分配有自己的
    multiprocessing.JoinableQueue
    队列。
  2. 当读取器任务想要获取对正在共享的任何数据的可共享读取访问权限时,它会在其队列上执行阻塞队列
    get
    调用。队列最初是空的,因此所有读取器任务最初都会阻塞。
  3. 当写入器任务想要独占访问可共享数据时,它会“加入”所有队列,等待它放入每个队列的任何先前项目已被处理。由于队列最初是空的,因此加入请求立即成功。当写入器写入数据后,需要等待所有读取器任务读取并处理数据后才能写入新数据。因此,写入器向每个队列放入一些项目,这些项目将导致阻塞的读取器醒来。现在,当写入器在队列上发出
    join
    调用时,这些调用将阻塞,直到读取器任务在各自的队列上调用
    task_done

因此,我们让读取器等待(通过在各自的队列上发出阻塞

get
调用),直到写入器发出有新数据的信号(通过在每个读取器的队列上放置一个项目),然后写入器等待直到所有读者已向作者发出信号(通过调用
task_done
)已处理数据。然后,已在队列上发出阻塞
join
调用的写入器现在继续运行以进行下一次写入。

from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time

class RWLock:
    _local_storage = local()

    def __init__(self, num_readers: int):
        """Create a lock that supports a single writer
        and multiple readers, the number of which is specified by
        the num_readers argument."""

        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers must be a positive integer.')
        self._num_readers = num_readers
        self._queue_count = Value('i', 0)
        self._stop = Value('i', 0)
        self._lock = Lock()
        self._queues = [JoinableQueue(1) for _ in range(self._num_readers)]

    def acquire_for_reading(self) -> None:
        """The reader is asking for shared read access to the data."""

        # Assign ourselves one of the queues if we haven't already done so:
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count.value]
                self._queue_count.value += 1
            self._local_storage.queue = queue
        queue.get()  # wait for writer to have written something

    def release_for_reading(self):
        """The reader is thorugh with shared read access to the data."""
        self._local_storage.queue.task_done()

    def acquire_for_writing(self, immediate=True):
        """Acquire exclusive access to the data. If argument immediate
        is True, the readers are asked to give up access to the data
        as soon as possible."""

        if immediate:
            self._stop.value = 1;

        for queue in self._queues:
            queue.join()

    def release_for_writing(self) -> None:
        """Give up exclusive write access."""

        self._stop.value = 0  # Reset

        for queue in self._queues:
            queue.put(None)

    def is_stop_posted(self) -> bool:
        """A reader calls this function periodically to see if the
        writer wants immediate exclusive control of the shared resourece."""

        return True if self._stop.value else False

### Eaxmple usage ####

def reader(rw_lock, id, shared_data):
    while True:
        rw_lock.acquire_for_reading()
        # Simulate doing a long reading task.
        # Now that we have access for shared reading, we should
        # periodically check if the writer wants excusive control:
        sleep_time = id / 10
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                break
        print('reader', id, 'done processing', shared_data.value, flush=True)
        rw_lock.release_for_reading()

def writer(rw_lock, shared_data):
    while True:
        rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
        shared_data.value += 1
        print('wrote', shared_data.value, 'at', time.time(), flush=True)
        rw_lock.release_for_writing()

def main():
    rw_lock = RWLock(3)
    # Shared data:
    shared_data = Value('i', 0, lock=False)
    for id in range(1, 4):
        Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()
    Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()
    input('Hit enter to terminate:\n')

if __name__ == '__main__':
    main()

打印:

Hit enter to terminate:
wrote 1 at 1704820185.6386113
reader 1 done processing 1
reader 2 done processing 1
reader 3 done processing 1
wrote 2 at 1704820188.7424514
reader 1 done processing 2
reader 2 done processing 2
reader 3 done processing 2
wrote 3 at 1704820191.8461268
reader 1 done processing 3
reader 2 done processing 3
reader 3 done processing 3
wrote 4 at 1704820192.1564832
reader 1 done processing 4
reader 2 done processing 4
reader 3 done processing 4
wrote 5 at 1704820195.2668517
reader 1 done processing 5
reader 2 done processing 5
reader 3 done processing 5
wrote 6 at 1704820198.3816946
reader 1 done processing 6
... etc.

请注意,由于第三个读取任务在模拟其读取处理过程中总共休眠了 3 秒,因此写入者通常必须在两次写入之间等待大约 3 秒。但是,当第三条记录已被处理时,编写器接下来调用

acquire_for_writing
并将 immediate 参数设置为
True
,然后几乎立即写入其第四条记录(这取决于读取器任务检查“的频率”停止”标志被设置)。

编程笔记

上面的

RWLock
类是为处理多处理或多线程任务而编写的。如果您知道将使用多线程,那么可以通过将队列和锁替换为
multithreading
模块提供的队列和锁来提高效率。您还可以用普通的旧
multiprocessing.Value
变量替换作为可共享(跨进程)
int
实例实现的类的属性:

from threading import Thread, Lock
from queue import Queue
from threading import local
import time

class RWLockMultiThreading:
    _local_storage = local()

    def __init__(self, num_readers: int):
        """Create a lock that supports a single writer
        and multiple readers, the number of which is specified by
        the num_readers argument."""

        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers must be a positive integer.')
        self._num_readers = num_readers
        self._queue_count = 0
        self._stop = 0
        self._lock = Lock()
        self._queues = [Queue(1) for _ in range(self._num_readers)]

    def acquire_for_reading(self) -> None:
        """The reader is asking for shared read access to the data."""

        # Assign ourselves one of the queues if we haven't already done so:
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count]
                self._queue_count += 1
            self._local_storage.queue = queue
        queue.get()  # wait for writer to have written something

    def release_for_reading(self):
        """The reader is thorugh with shared read access to the data."""
        self._local_storage.queue.task_done()

    def acquire_for_writing(self, immediate=True):
        """Acquire exclusive access to the data. If argument immediate
        is True, the readers are asked to give up access to the data
        as soon as possible."""

        if immediate:
            self._stop = 1;

        for queue in self._queues:
            queue.join()

    def release_for_writing(self) -> None:
        """Give up exclusive write access."""

        self._stop = 0  # Reset

        for queue in self._queues:
            queue.put(None)

    def is_stop_posted(self) -> bool:
        """A reader calls this function periodically to see if the
        writer wants immediate exclusive control of the shared resourece."""

        return True if self._stop else False

### Eaxmple usage ####

class SharedValue:
    def __init__(self):
        self.value = 0

def reader(rw_lock, id, shared_data):
    while True:
        rw_lock.acquire_for_reading()
        # Simulate doing a long reading task.
        # Now that we have access for shared reading, we should
        # periodically check if the writer wants excusive control:
        sleep_time = id / 10
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                break
        print('reader', id, 'done processing', shared_data.value, flush=True)
        rw_lock.release_for_reading()

def writer(rw_lock, shared_data):
    while True:
        rw_lock.acquire_for_writing(immediate=(shared_data.value== 3))
        shared_data.value += 1
        print('wrote', shared_data.value, 'at', time.time(), flush=True)
        rw_lock.release_for_writing()

def main():
    rw_lock = RWLockMultiThreading(3)
    # Shared data:
    shared_data = SharedValue()
    for id in range(1, 4):
        Thread(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()
    Thread(target=writer, args=(rw_lock, shared_data), daemon=True).start()
    input('Hit enter to terminate:\n')

if __name__ == '__main__':
    main()

-1
投票

问题是询问并发,它通过 3 种方式实现,其中两种适合 I/O 任务。读取和写入数据通常是 I/O 任务。

所以你可以使用:

  1. threading
    - 多线程
  2. asyncio
    - 类似于线程但具有更多控制

第三种方法是

multiprocessing
,适用于处理器密集型任务,不幸的是问题示例使用了这种方法。

这是一个异步示例:

import time
import asyncio

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

这是一个实际的例子:

import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,), daemon=False )
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

一般来说,线程就足够了,并且避免了

async
组织的需要。但异步为程序员提供了更多控制权。

这里是文档:https://docs.python.org/3/library/concurrency.html

© www.soinside.com 2019 - 2024. All rights reserved.