如何使异步 aiohttp 的类安全?

问题描述 投票:0回答:1

我有这个 Python 类,希望您对这个

cls._instance._cache = {}
对于
tornado
是否是线程安全的有意见?如果不是,我该如何处理这个缓存以确保线程安全?

import logging
import aiohttp
import time

# Constants
DEFAULT_TIMEOUT = 20
MAX_ERRORS = 3
HTTP_READ_TIMEOUT = 1

class HTTPRequestCache:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            # TODO: check whether its tread safe with tornado event loop
            cls._instance._cache = {}
            cls._instance._time_out = DEFAULT_TIMEOUT
            cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
            cls._instance._loop = None
        return cls._instance

    async def _fetch_update(self, url):
        try:
            async with aiohttp.ClientSession() as session:
                logging.info(f"Fetching {url}")
                async with session.get(url, timeout=self._http_read_timeout) as resp:
                    resp.raise_for_status()
                    resp_data = await resp.json()

                    cached_at = time.time()
                    self._cache[url] = {
                        "cached_at": cached_at,
                        "config": resp_data,
                        "errors": 0
                    }
                    logging.info(f"Updated cache for {url}")
        except aiohttp.ClientError as e:
            logging.error(f"Error occurred while updating cache for {url}: {e}")

    async def get(self, url):
        if url not in self._cache or self._cache[url]["cached_at"] < time.time() - self._time_out:
            await self._fetch_update(url)
        return self._cache.get(url, {}).get("config")
python tornado aiohttp
1个回答
0
投票

由于臭名昭著的“全局解释器锁”(GIL),Python 线程安全性是相当“给定的”,它阻止多个线程并行执行 Python 代码(但线程在网络相关代码中具有巨大的优势,例如在您的示例中) )。正如您正在使用的那样,异步代码自然不受竞争条件的影响 - 我们可以忽略“线程”级别的并发性:异步并发性是在任务中处理的。尽管可以有多个线程,但每个线程都有一个单独的异步循环:在这种情况下,我们必须针对两种类型的并发保护代码。

因此,重要的部分,即缓存(字典结构)更新,在 Python 中以原子方式发生,并且不会出现混合或损坏的结果。

那里只有一个竞争条件的窗口,在最坏的情况下,这可能会同时请求相同的 URL - 不会出现程序故障:最后解决的请求将是“获胜者”,并且数据将是在缓存中。

然而,向此代码添加锁并不简单 - 一个简单的类锁会阻止启动不同 URL 的新请求,从而阻止程序。此外,锁要么与

async

 相关,要么与线程相关 - 如果您确实在并发线程中使用不同的 Asyncio 循环运行它(我更喜欢认为它是单个线程,而您只是混淆了术语),则必须每个线程仔细组合使用 threading.Lock 和 asyncio.Lock,使用 
threading.local()
contextvar
 来跟踪它们:这需要大量的小心和测试才能正确完成。

相反,我们可以添加一些其他信号来指示当前正在获取 URL,以防止重复获取。

此外,作为旁注,最好使用

time.monotonic()

 而不是 
time.time()
 进行间隔检查,如此处使用的:此函数将始终返回自程序启动以来不断增加的秒数 - 而 
time.time()
 可能会出现问题夏令时过渡或其他操作系统时间更新系统时钟。

我已经修改了您的代码,以使用锁来防止并行获取相同的条目,以及额外提到的更改:

import asyncio import logging import aiohttp import time # Constants DEFAULT_TIMEOUT = 20 MAX_ERRORS = 3 HTTP_READ_TIMEOUT = 1 class HTTPRequestCache: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) # TODO: check whether its tread safe with tornado event loop cls._instance._cache = {} cls._instance._time_out = DEFAULT_TIMEOUT cls._instance._http_read_timeout = HTTP_READ_TIMEOUT cls._instance._fetching_now = {} cls._instance.lock = asyncio.Lock() cls._instance._loop = None return cls._instance async def _fetch_update(self, url): with self.lock: if event:=self._feching_now.get(url): await event.wait() if url in self._cache: return # url was being fetched, but it failed, so it is being retried now: self._fetching_now[url] = asyncio.Event() try: async with aiohttp.ClientSession() as session: logging.info(f"Fetching {url}") async with session.get(url, timeout=self._http_read_timeout) as resp: resp.raise_for_status() resp_data = await resp.json() cached_at = time.monotonic() self._cache[url] = { "cached_at": cached_at, "config": resp_data, "errors": 0 } logging.info(f"Updated cache for {url}") except aiohttp.ClientError as e: logging.error(f"Error occurred while updating cache for {url}: {e}") finally: self._fetching_now[url].set() del self._fetching_now[url] async def get(self, url): if url not in self._cache or self._cache[url]["cached_at"] < time.monotonic() - self._time_out: await self._fetch_update(url) return self._cache.get(url, {}).get("config")
    
© www.soinside.com 2019 - 2024. All rights reserved.