我有一个用 Twisted 编写的 HTTP 客户端,它从延迟发送请求到某个站点的 API。它是这样的(有点简化):
from json import loads
from core import output
from twisted.python.log import msg
from twisted.internet import reactor
from twisted.web.client import Agent, HTTPConnectionPool, _HTTP11ClientFactory, readBody
from twisted.web.http_headers import Headers
from twisted.internet.ssl import ClientContextFactory
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
class QuietHTTP11ClientFactory(_HTTP11ClientFactory):
# To shut up the garbage in the log
noisy = False
class Output(output.Output):
def start(self):
myQuietPool = HTTPConnectionPool(reactor)
myQuietPool._factory = QuietHTTP11ClientFactory
self.agent = Agent(
reactor,
contextFactory=WebClientContextFactory(),
pool=myQuietPool
)
def stop(self):
pass
def write(self, event):
messg = 'Whatever'
self.send_message(messg)
def send_message(self, message):
headers = Headers({
b'User-Agent': [b'MyApp']
})
url = 'https://api.somesite.com/{}'.format(message)
d = self.agent.request(b'GET', url.encode('utf-8'), headers, None)
def cbBody(body):
return processResult(body)
def cbPartial(failure):
failure.printTraceback()
return processResult(failure.value)
def cbResponse(response):
if response.code in [200, 201]:
return
else:
msg('Site response: {} {}'.format(response.code, response.phrase))
d = readBody(response)
d.addCallback(cbBody)
d.addErrback(cbPartial)
return d
def cbError(failure):
failure.printTraceback()
def processResult(result):
j = loads(result)
msg('Site response: {}'.format(j))
d.addCallback(cbResponse)
d.addErrback(cbError)
return d
这工作正常,但网站对请求进行速率限制,如果请求到达速度太快,就会开始丢弃它们。因此,我也需要对客户端进行速率限制,并确保它发送请求的速度不会太快 - 但它们不会丢失,因此需要某种缓冲/排队。我不需要精确的速率限制,例如“每秒不超过 X 个请求”;每次请求后只要有一些合理的延迟(比如 1 秒)就可以了。
不幸的是,我无法使用延迟中的
sleep()
,因此需要其他方法。
从四处逛逛来看,基本的想法似乎是做类似的事情
self.transport.pauseProducing()
delay = 1 # seconds
self.reactor.callLater(delay, self.transport.resumeProducing)
至少根据这个答案。但那里的代码不能“按原样”工作 -
SlowDownloader
预计会采用一个参数(反应器),因此 SlowDownloader()
会导致错误。
我还发现了这个答案,它使用了使用工厂作为存储的有趣想法,因此您不需要实现自己的队列和东西 - 但它处理服务器端的速率限制,而我需要对客户端进行速率限制。
我觉得我非常接近解决方案,但我仍然无法弄清楚如何准确地结合这两个答案中的信息,以生成工作代码,因此我们将不胜感激。
如果
cbError
表示您想要在延迟后执行重试的条件,那么您可以重写该函数,如下所示:
async def cbError(failure):
failure.printTraceback()
await deferLater(reactor, 3.0, lambda: None)
return self.send_message(message)