我已尝试此处描述的解决方案,但不起作用。
我的数据流管道将读取消息并连接到 Postgre SQL 进行一些检查。所以我以单例模式创建了一个psycopg2线程连接池。使用单例模式是因为通过我的自动缩放数据流,我希望控制连接总数,以便每个数据流线程不会创建自己的连接池,并且随着工作线程总数的增加,连接总数连接不会爆炸。
向 Dataflow 发送大量数据时,我不断收到此 “尝试放置未加密的连接” 错误。
通过查看 Psycopg2 源代码,我了解到当在 self._rused 字典中找不到连接的密钥时,会返回此错误。代码如下:
key = self._rused.get(id(conn))
if key is None:
raise PoolError("trying to put unkeyed connection")
并且密钥是在调用_getconn方法时生成的,Psycopg2代码如下:
def _getkey(self):
"""Return a new unique key."""
self._keys += 1
return self._keys
def _getconn(self, key=None):
"""Get a free connection and assign it to 'key' if not None."""
if self.closed:
raise PoolError("connection pool is closed")
if key is None:
key = self._getkey()
if key in self._used:
return self._used[key]
if self._pool:
self._used[key] = conn = self._pool.pop()
self._rused[id(conn)] = key
return conn
else:
if len(self._used) == self.maxconn:
raise PoolError("connection pool exhausted")
return self._connect(key)
我的怀疑是两个线程同时调用 _getkey 方法,并返回相同的密钥,当第一个线程完成其进程时,它会从 self._rused 中删除密钥和连接字典,因此当第二个线程完成其进程并尝试在 self._rused 字典中查找相同的键和连接时,它会失败,从而出现错误。
但是后来我发现调用_getconn方法时存在锁,所以我之前的怀疑是无效的。
现在我完全无能为力,有人可以对此提出一些想法吗?谢谢
您看到此错误是因为您将 None 传递给 putconn() 函数。来源可见:https://github.com/psycopg/psycopg2/blob/master/lib/pool.py