Psycopg2“尝试在数据流多线程中放置未加密的连接”

问题描述 投票:0回答:1

我已尝试此处描述的解决方案,但不起作用。

我的数据流管道将读取消息并连接到 Postgre SQL 进行一些检查。所以我以单例模式创建了一个psycopg2线程连接池。使用单例模式是因为通过我的自动缩放数据流,我希望控制连接总数,以便每个数据流线程不会创建自己的连接池,并且随着工作线程总数的增加,连接总数连接不会爆炸。

向 Dataflow 发送大量数据时,我不断收到此 “尝试放置未加密的连接” 错误。

通过查看 Psycopg2 源代码,我了解到当在 self._rused 字典中找不到连接的密钥时,会返回此错误。代码如下:

key = self._rused.get(id(conn))
if key is None:
   raise PoolError("trying to put unkeyed connection")

并且密钥是在调用_getconn方法时生成的,Psycopg2代码如下:

    def _getkey(self):
        """Return a new unique key."""
        self._keys += 1
        return self._keys

    def _getconn(self, key=None):
        """Get a free connection and assign it to 'key' if not None."""
        if self.closed:
            raise PoolError("connection pool is closed")
        if key is None:
            key = self._getkey()

        if key in self._used:
            return self._used[key]

        if self._pool:
            self._used[key] = conn = self._pool.pop()
            self._rused[id(conn)] = key
            return conn
        else:
            if len(self._used) == self.maxconn:
                raise PoolError("connection pool exhausted")
            return self._connect(key)

我的怀疑是两个线程同时调用 _getkey 方法,并返回相同的密钥,当第一个线程完成其进程时,它会从 self._rused 中删除密钥和连接字典,因此当第二个线程完成其进程并尝试在 self._rused 字典中查找相同的键和连接时,它会失败,从而出现错误。

但是后来我发现调用_getconn方法时存在锁,所以我之前的怀疑是无效的。

现在我完全无能为力,有人可以对此提出一些想法吗?谢谢

multithreading google-cloud-dataflow psycopg2
1个回答
0
投票

您看到此错误是因为您将 None 传递给 putconn() 函数。来源可见:https://github.com/psycopg/psycopg2/blob/master/lib/pool.py

© www.soinside.com 2019 - 2024. All rights reserved.