我正在将 WebSocket 与 Python 中的 Celery 和 FastAPI 集成,但面临 AttributeError 和 EncodeError 等问题。需要帮助解决这些错误以实现无缝集成。感谢任何建议或指导。
@user_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connection = await connect_robust("amqp://guest:guest@localhost//")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue('websocket_queue', durable=True)
async for message in queue:
data = message.body.decode('utf-8')
print("Received message:", data)
await websocket.send_text(data)
await message.ack()
def sync_send_websocket_message(message):
try:
print("Sending message:", message)
connection = asyncio.get_event_loop().run_until_complete(connect_robust("amqp://guest:guest@localhost//"))
channel = asyncio.get_event_loop().run_until_complete(connection.channel())
asyncio.get_event_loop().run_until_complete(channel.default_exchange.publish(
message=message,
routing_key='websocket_queue'
))
except Exception as e:
print("Error sending WebSocket message:", e)
raise e
@celery.task
def send_websocket_message(message):
sync_send_websocket_message(message)
@user_router.post("/testing_websocket")
def websocket_testing_route(input: dict):
try:
task.send_websocket_message.delay("Hello from Celery!")
return {"message": input}
except Exception as e:
logger.error("Error occurred in websocket_testing_route: %s", e)
raise HTTPException(status_code=500, detail="Internal Server Error")
def sync_send_websocket_message(message):
try:
print("Sending message:", message)
connection = asyncio.get_event_loop().run_until_complete(connect_robust("amqp://guest:guest@localhost//"))
channel = asyncio.get_event_loop().run_until_complete(connection.channel())
message_body = json.dumps({"message": message}) # Convert message to JSON string
asyncio.get_event_loop().run_until_complete(channel.default_exchange.publish(
Message(body=message_body.encode()), # Ensure message has a body attribute
routing_key='websocket_queue'
))
except Exception as e:
print("Error sending WebSocket message:", e)
raise e
@celery.task
def send_websocket_message(message):
sync_send_websocket_message(message)
@user_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connection = await connect_robust("amqp://guest:guest@localhost//")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue('websocket_queue', durable=True)
async for message in queue:
data = message.body.decode('utf-8')
print("Received message:", data)
await websocket.send_text(data)
await message.ack()
@user_router.post("/testing_websocket")
def websocket_testing_route(input: dict):
try:
task.send_websocket_message.delay("Hello from Celery!")
return {"message": input}
except Exception as e:
logger.error("Error occurred in websocket_testing_route: %s", e)
raise HTTPException(status_code=500, detail="Internal Server Error")
补充说明:
通过遵循这些准则并进行必要的调整,您可以在 Python 应用程序中将 WebSocket 通信与 Celery 和 FastAPI 无缝集成。