我正在尝试在我的一项服务中实现一种服务架构,但 Cloud Run 失败并出现奇怪的错误。
我有一个基于后台的门户网站,支持业务用户自动执行某些操作。例如,假设我们需要在系统中创建 6000 名学生,用户将填写表单中的一些字段,附加包含所有信息的 csv 文件并提交。
单击提交后,请求将发送到 Cloud Run 服务,Cloud run 验证 csv 并确保数据正常,将文件数据划分为块(以保持在云任务任务大小限制内)并将每个块排队到云任务它充当队列。
接收一大块文件的队列云任务将为每个对象排队一个任务(在我们的例子中要创建的学生)。
所以在数字上,假设我们要创建 6000 名学生,这 6000 名学生被分成 100 个块,每个块有 60 个学生,每个块将转发到一个云任务(60 个任务)
每个带有块的任务都会将该块分解为一组任务。因此 60 个云任务中的每个任务将创建 100 个云任务。
我在下面分享一个模仿这种情况的简单代码。我使用 python 3.10、goblet-gcp 包进行部署。
from goblet import Goblet, goblet_entrypoint
app = Goblet(
function_name="test-google-support",
backend="cloudrun",
routes_type="cloudrun",
)
goblet_entrypoint(app)
cloud_task_queue = app.cloudtaskqueue(
"test-service",
config={
"rateLimits": {
"maxDispatchesPerSecond": 10,
"maxBurstSize": 10,
"maxConcurrentDispatches": 1,
},
"retryConfig": {
"maxAttempts": 1,
"minBackoff": "0.100s",
"maxBackoff": "3600s",
"maxDoublings": 16,
},
},
)
@app.route("/test-gcp-support", methods=["POST"])
def submit_bulk_campaigns_management():
"""Validation method for bulk campaigns creation/update. Dev portal calls are routed here"""
json_data = app.current_request.json
i = 0
for chunk in json_data["chunks"]:
i += 1
app.log.info(f"queueing chunk {i}")
cloud_task_queue.enqueue(
target="test-queuer",
payload={
"chunk": chunk,
"batch_id": i
},
)
app.log.info(f"queued chunk {i}")
app.log.info(f"queued {i} chunks ...")
return "Successfully submitted request.", 200, {}
@app.cloudtasktarget(name="test-queuer")
def queue_campaign_creation_tasks(request):
json_data = request.json
chunk = json_data.get("chunk")
batch_id = json_data.get("batch_id")
i = 0
for object in chunk:
i += 1
app.log.info(f"queuing task {i} of batch {batch_id}")
cloud_task_queue.enqueue(
target="single-object-management",
payload={
"data": object,
"batch_id": batch_id,
"task_id": i,
},
)
app.log.info(f"queued task {i} of batch {batch_id}")
app.log.info(f"queued {i} tasks for chunk {batch_id}")
return "queued all tasks", 200, {}
@app.cloudtasktarget(name="single-object-management")
def single_campaign_management(request):
json_data = request.json
app.log.info(f"received json_data: {json_data}")
return "Success", 200, {}
Docker 文件(通过 goblet-gcp 自动执行)是
# https://hub.docker.com/_/python
FROM python:3.10-slim
# setup environment
ENV APP_HOME /app
WORKDIR $APP_HOME
# install keyring backend to handle artifact registry authentication
# RUN pip install keyrings.google-artifactregistry-auth==1.1.1
# Install dependencies.
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy local code to the container image.
COPY . .
需求.txt
goblet-gcp==0.10.5
pydantic==1.10.11
请求正文是
{
"chunks":[
[
{
"task":"1.1"
},
{
"task":"1.2"
}
],
[
{
"task":"2.1"
},
{
"task":"2.2"
}
],
[
{
"task":"2.1"
},
{
"task":"2.2"
}
],
[
{
"task":"2.1"
},
{
"task":"2.2"
}
]
]
}
我使用以下云运行配置:并发设置为 80,超时 300,无启动提升,CPU 并非始终分配,RAM 为 512MB,CPU 设置为 1,有无服务器 VPC 连接器。
我们有不同的场景
1- 场景 1:在将第二个块从主路由排队到队列任务时,我们收到如下 SSL 错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flask/app.py", line 2213, in __call__
return self.wsgi_app(environ, start_response)
File "/usr/local/lib/python3.10/site-packages/flask/app.py", line 2193, in wsgi_app
response = self.handle_exception(e)
File "/usr/local/lib/python3.10/site-packages/flask/app.py", line 2190, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.10/site-packages/flask/app.py", line 1486, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.10/site-packages/flask/app.py", line 1484, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.10/site-packages/flask/app.py", line 1469, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "/usr/local/lib/python3.10/site-packages/functions_framework/__init__.py", line 130, in view_func
return function(request._get_current_object())
File "/usr/local/lib/python3.10/site-packages/goblet/app.py", line 160, in goblet_entrypoint_wrapper
return app(request, context)
File "/usr/local/lib/python3.10/site-packages/goblet/decorators.py", line 427, in __call__
response = self.handlers["route"](request)
File "/usr/local/lib/python3.10/site-packages/goblet/handlers/routes.py", line 93, in __call__
return entry(request)
File "/usr/local/lib/python3.10/site-packages/goblet/handlers/routes.py", line 413, in __call__
resp = self.route_function(**args)
File "/app/main.py", line 36, in submit_bulk_campaigns_management
cloud_task_queue.enqueue(
File "/usr/local/lib/python3.10/site-packages/goblet/infrastructures/cloudtask.py", line 72, in enqueue
task = self.build_task(target, payload, in_seconds, task_name, deadline)
File "/usr/local/lib/python3.10/site-packages/goblet/infrastructures/cloudtask.py", line 28, in build_task
"url": self.backend.http_endpoint,
File "/usr/local/lib/python3.10/site-packages/goblet/backends/cloudrun.py", line 221, in http_endpoint
return get_cloudrun_url(self.client, self.name)
File "/usr/local/lib/python3.10/site-packages/goblet/common_cloud_actions.py", line 216, in get_cloudrun_url
resp = client.execute(
File "/usr/local/lib/python3.10/site-packages/goblet_gcp_client/client.py", line 195, in execute
return getattr(api_chain, api)(**_params).execute()
File "/usr/local/lib/python3.10/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
return wrapped(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/googleapiclient/http.py", line 923, in execute
resp, content = _retry_request(
File "/usr/local/lib/python3.10/site-packages/googleapiclient/http.py", line 222, in _retry_request
raise exception
File "/usr/local/lib/python3.10/site-packages/googleapiclient/http.py", line 191, in _retry_request
resp, content = http.request(uri, method, *args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google_auth_httplib2.py", line 218, in request
response, content = self.http.request(
File "/usr/local/lib/python3.10/site-packages/httplib2/__init__.py", line 1724, in request
(response, content) = self._request(
File "/usr/local/lib/python3.10/site-packages/httplib2/__init__.py", line 1444, in _request
(response, content) = self._conn_request(conn, request_uri, method, body, headers)
File "/usr/local/lib/python3.10/site-packages/httplib2/__init__.py", line 1396, in _conn_request
response = conn.getresponse()
File "/usr/local/lib/python3.10/http/client.py", line 1375, in getresponse
response.begin()
File "/usr/local/lib/python3.10/http/client.py", line 318, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.10/http/client.py", line 279, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.10/socket.py", line 705, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.10/ssl.py", line 1307, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/lib/python3.10/ssl.py", line 1163, in read
return self._sslobj.read(len, buffer)
ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:2578)"
2- 场景 2:在将第二个块从主路由排队到队列任务时,服务因未捕获的信号错误(信号 11)而关闭,如下所示
severity: "ERROR"
textPayload: "Uncaught signal: 11, pid=2, tid=61, fault_addr=0."
3- 场景 3:有时服务会成功完成请求。
对于上面分享的简单服务,我将最小实例增加到 2,并且工作正常。我仍然不知道上述错误如何与实例数相关,这会阻止服务的正确调整大小活动。
对上述错误有什么想法吗?
面对几乎完全相同的 3 个场景,使用 Pub/Sub 而不是 Cloud Tasks。你是如何解决这个OP的?