在twisted高负载下运行带有延迟线程的http服务器时出现问题

问题描述 投票:0回答:1

我遇到了一些与twisted http 框架相关的问题。具体来说,我正在尝试通过回调实现

threads.deferToThread
的使用,以实现更好的并发性,在尝试立即访问服务器的流量负载较高之前,这种方法可以正常工作。在这种情况下,回调通常根本无法执行,并且没有任何内容写入传输。

简而言之,对于每个请求,服务器将在延迟线程内处理客户端发送的数据包。然后,服务器将响应数据包写入队列,然后

on_request_done
回调将其出队并作为响应返回。如前所述,这对于 1 或 2 个客户端来说效果非常好。

这是我正在使用的代码的简化版本:

class HttpPlayer(Player):
    def __init__(self, address: str, port: int) -> None:
        super().__init__(address, port)
        self.queue = Queue()
        self.token = ""

    def enqueue(self, data: bytes):
        self.queue.put(data)

    def dequeue(self, max: int = 4096) -> bytes:
        data = b""

        while not self.queue.empty():
            data += self.queue.get()

        return data

class HttpBanchoProtocol(Resource):
    isLeaf = True

    def __init__(self) -> None:
        self.player: HttpPlayer | None = None
        self.children = {}

    def handle_login_request(self, request: Request) -> bytes:
        username, password, client = (
            request.content.read().decode().splitlines()
        )

        deferred = threads.deferToThread(
            self.player.login_received,
            username,
            password,
            client
        )

        deferred.addCallbacks(
            lambda _: self.on_request_done(request),
            lambda f: self.on_request_error(request, f)
        )

        return NOT_DONE_YET

    def handle_request(self, request: Request) -> bytes:
        deferred = threads.deferToThread(
            self.process_packets,
            request.content.read()
        )

        deferred.addCallbacks(
            lambda _: self.on_request_done(request),
            lambda f: self.on_request_error(request, f)
        )

        return NOT_DONE_YET

    def login_received(self, username: str, password: str, client: str) -> None:
        # Processing login here
        ...

    def process_packets(self, request: bytes):
        # Processing packets here
        ...

    def on_request_done(self, request: Request) -> None:
        if request._disconnected:
            self.player.logger.warning('Client disconnected before response')
            return

        if request.finished:
            self.player.logger.warning('Request finished before response')
            return

        request.write(self.player.dequeue())
        request.finish()

    def on_request_error(
        self,
        request: Request,
        failiure: Failure
    ) -> None:
        request.setResponseCode(500)
        self.player.send_error()
        self.player.logger.error(
            f'Failed to process request: {failiure.getErrorMessage()}',
            exc_info=failiure.value
        )
        self.on_login_done(request)

    def render_POST(self, request: Request) -> bytes:
        request.setResponseCode(200)

        if not (token := request.getHeader('token')):
            return self.handle_login_request(request)

        if not (player := app.session.players.by_token(token)):
            request.setResponseCode(401)
            return b""

        self.player = player
        return self.handle_request(request)

这是我正在编写的代码,如果需要的话可以作为更多参考:https://github.com/osuTitanic/anchor/blob/044a9b75dee8d36816bbb5d7af67dc8902f2d34e/app/http.py#L99

python twisted twisted.web twisted.internet
1个回答
0
投票

此代码不完整或不可运行(仅对于初学者来说,

login_received
未在
HttpPlayer
上定义,但似乎您这里的主要问题是您从线程调用Twisted API。Twisted自己的API非常故意不是线程安全,因此您无法从传递到
deferToThread
的任何内容写入传输。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.