我正在开发一个使用 SocketIO 进行实时通信的 Django 应用程序,我遇到了一个问题:RDS(关系数据库服务)重新启动后 SocketIO 连接失败,而我的 Django HTTP API 继续正常工作。
问题描述 我的 Django 应用程序与 SocketIO 集成以实现实时功能。 RDS实例重启后,HTTP API正常,但SocketIO连接出现问题,无法访问数据库模型。具体来说,在尝试处理 SocketIO 连接时,我收到与数据库连接相关的错误。
代码片段 以下是我配置 ASGI 应用程序和处理 SocketIO 连接的方法:
ASGI 配置(asgi.py):
import os
from django.core.asgi import get_asgi_application
import socketio
from backend.socketio import socketio_server
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings.dev')
django_asgi_app = get_asgi_application()
application = socketio.ASGIApp(socketio_server=socketio_server.socketio_server, other_asgi_app=django_asgi_app)
SocketIO 连接处理程序:
async def on_connect(self, sid: str, environ: dict):
try:
query_string = environ['asgi.scope']['query_string']
token, chat_id = get_token_chat_id_from_query(query_string=query_string)
if not token or not chat_id:
raise ConnectionRefusedError("Invalid connection parameters.")
user = await get_user_from_token(token=token)
chat_obj = await get_chat_from_id(chat_id=chat_id, user=user)
await update_all_chat_redis(chat_obj=chat_obj)
async with self.session(sid=sid, namespace=self.namespace) as session:
session['user_id'] = user.id
session['chat_id'] = chat_obj.machine_translation_request_id
await self.enter_room(sid=sid, room=chat_id)
except ConnectionRefusedError as e:
logger.error(f"Connection refused: {e}")
raise
except UserErrors as exc:
logger.error(f"User error: {exc.message}")
raise ConnectionRefusedError(exc.message)
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise ConnectionRefusedError("An unexpected error occurred. Please try again later.")
数据库访问函数(@sync_to_async):
import logging
from django.utils import timezone
from django.db import OperationalError
from yourapp.exceptions import UserErrors
logger = logging.getLogger(__name__)
@sync_to_async
def get_user_from_token(token: str):
retries = 3
for attempt in range(retries):
try:
token_obj = AccessToken.objects.filter(token=token, expires__gt=timezone.now()).last()
if not token_obj:
raise UserErrors("Invalid Token!")
return token_obj.user
except OperationalError as e:
logger.error(f"Database operational error on attempt {attempt + 1}: {e}")
await asyncio.sleep(2) # Wait before retrying
except Exception as e:
logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt == retries - 1:
raise
await asyncio.sleep(2) # Wait before retrying
我尝试过的事情
你解决了吗?我面临着类似的问题。 我在 process#1 上有 Django 和 FastAPI,它们在同步和异步函数中工作没有问题。我正在 process#2 中使用
multiprocessing.
Processusing
AsyncServerand
ASGIApp. The problem is that Django works with Sync functions like
get()or
create(), but if we use
aget()or
acreate()`进程运行 SocketIO 应用程序消失又消失。该行的其余部分永远不会没有错误地运行。
self.sio = socketio.AsyncServer(
async_mode="aiohttp",
cors_allowed_origins=self.socket_config.cors_allowed_origins,
always_connect=self.socket_config.always_connect,
logger=self.logger if self.socket_config.logger else False,
engineio_logger=self.logger if self.socket_config.engineio_logger else False,
)
self.socket_application = socketio.ASGIApp(self.sio, socketio_path=self.socket_config.socketio_path)
并使用
uvicorn
with pro 运行它
multiprocessing.Process(
target=uvicorn.run,
kwargs={
"app": "0.0.0.0",
"host": 8002,
"port": int(service_config.SERVICE_PORT),
},
daemon=True
).start()
我尝试将
get_asgi_application()
添加到other_asgi_app
的socketio.ASGIApp
中,但没有任何改变。
我认为问题不在于具有异步权限的 Django 设置,而在于 ASGIApp 和 Django 之间。
当它从 self.socket_application
记录 ASGIApp
时,出现了一些有趣的东西,...DjangoDBProcessRemove object ...
。
我期待任何帮助。