我使用 Python 和 Flask 开发了一个 Cloud Run 服务,该服务接受传入的 HTTP 请求。一旦发出请求,有效负载就会发送到具有上下文管理器的类。在该类中,进入后会建立数据库连接。退出后,它会关闭,以确保没有连接泄漏。
以下是如何在类的 enter 方法中设置数据库连接:
if self.db_connection is None:
try:
self.db_connection = psycopg2.connect(
host=
dbname=
user=
password=
)
self.db_cursor = self.db_connection.cursor()
except Exception as e:
print(f'Error connecting to database: {e}')
return
我们将 Google Cloud SQL 与 PostgreSQL V15.2 实例结合使用,并使用 psycopg2 Python 库进行连接。因此,每个数据库的最大活动连接数为 100。因此,我们将活动 HTTP 请求/任务的数量限制在 80 左右,这会导致工作流程变慢。
肯定有更好的方法,但我还不是很熟悉。其中一种方法似乎是连接池,但我还不确定最好的方法。因为我们有多个活动实例并在“任务”期间连接到数据库,所以如果我们创建一个连接池,是否会创建多个池,每个池都有多个保留连接?我们如何为例如 80 个可重用的连接创建一个池?具有 80 个连接的单个池是否会占用 Cloud SQL 数据库每个数据库 max_connections 的 80%?
在这种情况下最好的方法是什么?我们可能会重新配置数据库以增加 MAX_CONNECTIONS 参数,但同时更改我们连接到它的方式可能也是一个好主意。
我们还没有尝试过不同的连接方式,但到目前为止我发现的可能的解决方案/信息是:
我绝对建议在您的用例中使用连接池,以利用重用连接并减少最多 100,000 个连接的开销。支持 psycopg2 的流行 Python 连接池库是 SQLAlchemy。
您可以建立一个类似于上面代码的全局连接池,可以在类对象之间共享。
SQLAlchemy建立psycopg2连接引擎(池)的代码如下:
import sqlalchemy
pool = None
def init_connection_pool(user, password, ip_address, dbname):
return sqlalchemy.create_engine(
# Equivalent URL:
# postgresql+psycopg2://<db_user>:<db_pass>@<db_host>:<db_port>/<db_name>
sqlalchemy.engine.url.URL.create(
drivername="postgresql+psycopg2",
username=user,
password=password,
host=ip_address,
port=5432,
database=dbname,
),
# requiring SSL is recommended for Private IP connections
connect_args={'sslmode': 'require'},
# Pool size is the maximum number of permanent connections to keep.
pool_size=10,
# Temporarily exceeds the set pool_size if no connections are available.
max_overflow=2,
# The total number of concurrent connections for your application will be
# a total of pool_size and max_overflow.
# 'pool_timeout' is the maximum number of seconds to wait when retrieving a
# new connection from the pool. After the specified amount of time, an
# exception will be thrown.
pool_timeout=30, # 30 seconds
# 'pool_recycle' is the maximum number of seconds a connection can persist.
# Connections that live longer than the specified amount of time will be
# re-established
pool_recycle=1800, # 30 minutes
)
# ... inside your endpoint code
global pool
if pool is None:
pool = init_connection_pool("YOUR-USER", "YOUR-PASsWORD", "CLOUD-SQL-PRIVATE-IP", "YOUR-DB")
# acquire connection from pool
with pool.connect() as db_conn:
# ...
我刚刚设置了一些用于配置池的默认值,可以根据需要轻松调整它们。