我有这个 Python 类,希望您对这个
cls._instance._cache = {}
对于 tornado
是否是线程安全的有意见?如果不是,我该如何处理这个缓存以确保线程安全?
import logging
import aiohttp
import time
# Constants
DEFAULT_TIMEOUT = 20
MAX_ERRORS = 3
HTTP_READ_TIMEOUT = 1
class HTTPRequestCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
# TODO: check whether its tread safe with tornado event loop
cls._instance._cache = {}
cls._instance._time_out = DEFAULT_TIMEOUT
cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
cls._instance._loop = None
return cls._instance
async def _fetch_update(self, url):
try:
async with aiohttp.ClientSession() as session:
logging.info(f"Fetching {url}")
async with session.get(url, timeout=self._http_read_timeout) as resp:
resp.raise_for_status()
resp_data = await resp.json()
cached_at = time.time()
self._cache[url] = {
"cached_at": cached_at,
"config": resp_data,
"errors": 0
}
logging.info(f"Updated cache for {url}")
except aiohttp.ClientError as e:
logging.error(f"Error occurred while updating cache for {url}: {e}")
async def get(self, url):
if url not in self._cache or self._cache[url]["cached_at"] < time.time() - self._time_out:
await self._fetch_update(url)
return self._cache.get(url, {}).get("config")
由于臭名昭著的“全局解释器锁”(GIL),Python 线程安全性是相当“给定的”,它阻止多个线程并行执行 Python 代码(但线程在网络相关代码中具有巨大的优势,例如在您的示例中) )。正如您正在使用的那样,异步代码自然不受竞争条件的影响 - 我们可以忽略“线程”级别的并发性:异步并发性是在任务中处理的。尽管可以有多个线程,但每个线程都有一个单独的异步循环:在这种情况下,我们必须针对两种类型的并发保护代码。
因此,重要的部分,即缓存(字典结构)更新,在 Python 中以原子方式发生,并且不会出现混合或损坏的结果。
那里只有一个竞争条件的窗口,在最坏的情况下,这可能会同时请求相同的 URL - 不会出现程序故障:最后解决的请求将是“获胜者”,并且数据将是在缓存中。
然而,向此代码添加锁并不简单 - 一个简单的类锁会阻止启动不同 URL 的新请求,从而阻止程序。此外,锁要么与async
相关,要么与线程相关 - 如果您确实在并发线程中使用不同的 Asyncio 循环运行它(我更喜欢认为它是单个线程,而您只是混淆了术语),则必须每个线程仔细组合使用 threading.Lock 和 asyncio.Lock,使用
threading.local()
或
contextvar
来跟踪它们:这需要大量的小心和测试才能正确完成。相反,我们可以添加一些其他信号来指示当前正在获取 URL,以防止重复获取。
此外,作为旁注,最好使用
time.monotonic()
而不是
time.time()
进行间隔检查,如此处使用的:此函数将始终返回自程序启动以来不断增加的秒数 - 而
time.time()
可能会出现问题夏令时过渡或其他操作系统时间更新系统时钟。我已经修改了您的代码,以使用锁来防止并行获取相同的条目,以及额外提到的更改:
import asyncio
import logging
import aiohttp
import time
# Constants
DEFAULT_TIMEOUT = 20
MAX_ERRORS = 3
HTTP_READ_TIMEOUT = 1
class HTTPRequestCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
# TODO: check whether its tread safe with tornado event loop
cls._instance._cache = {}
cls._instance._time_out = DEFAULT_TIMEOUT
cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
cls._instance._fetching_now = {}
cls._instance.lock = asyncio.Lock()
cls._instance._loop = None
return cls._instance
async def _fetch_update(self, url):
with self.lock:
if event:=self._feching_now.get(url):
await event.wait()
if url in self._cache:
return
# url was being fetched, but it failed, so it is being retried now:
self._fetching_now[url] = asyncio.Event()
try:
async with aiohttp.ClientSession() as session:
logging.info(f"Fetching {url}")
async with session.get(url, timeout=self._http_read_timeout) as resp:
resp.raise_for_status()
resp_data = await resp.json()
cached_at = time.monotonic()
self._cache[url] = {
"cached_at": cached_at,
"config": resp_data,
"errors": 0
}
logging.info(f"Updated cache for {url}")
except aiohttp.ClientError as e:
logging.error(f"Error occurred while updating cache for {url}: {e}")
finally:
self._fetching_now[url].set()
del self._fetching_now[url]
async def get(self, url):
if url not in self._cache or self._cache[url]["cached_at"] < time.monotonic() - self._time_out:
await self._fetch_update(url)
return self._cache.get(url, {}).get("config")