目前我正在使用 pytest、django 和rabbitmq-server 以及 celery 来执行后台任务 然而,我确实注意到,当我运行 pytest 而不在本地计算机上运行服务器时,它会导致一个很长的异常线程,特别显示
E:昆布连接被拒绝[111]
我正在运行 save() 模型方法的测试,该方法进而触发后台 celery 任务
我想问是否有一种方法可以在 pytest 中启动rabbitmq-server,而无需在本地计算机上实际启动它,这样我的测试用例就会通过
这也会影响我的 github CI/CD,因为我不知道如何在 github-actions.yml 文件中运行rabbitmq图像服务
感谢您的帮助
如果您愿意,可以仔细阅读下面的日志
self = <promise: 0x7fd0819d9d80>
def __call__(self):
try:
> return self.__value__
E AttributeError: 'ChannelPromise' object has no attribute '__value__'
../../../.local/lib/python3.10/site-packages/kombu/utils/functional.py:30: AttributeError
During handling of the above exception, another exception occurred:
self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>
ConnectionError = <class 'kombu.exceptions.OperationalError'>
ChannelError = <class 'kombu.exceptions.OperationalError'>
@contextmanager
def _reraise_as_library_errors(
self,
ConnectionError=exceptions.OperationalError,
ChannelError=exceptions.OperationalError):
try:
> yield
../../../.local/lib/python3.10/site-packages/kombu/connection.py:446:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>
errback = None, max_retries = None, interval_start = 2, interval_step = 2
interval_max = 30, callback = None, reraise_as_library_errors = True
timeout = 4
def _ensure_connection(
self, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30,
callback=None, reraise_as_library_errors=True,
timeout=None
):
"""Ensure we have a connection to the server.
If not retry establishing the connection with the settings
specified.
Arguments:
errback (Callable): Optional callback called each time the
connection can't be established. Arguments provided are
the exception raised and the interval that will be
slept ``(exc, interval)``.
max_retries (int): Maximum number of times to retry.
If this limit is exceeded the connection error
will be re-raised.
interval_start (float): The number of seconds we start
sleeping for.
interval_step (float): How many seconds added to the interval
for each retry.
interval_max (float): Maximum number of seconds to sleep between
each retry.
callback (Callable): Optional callback that is called for every
internal iteration (1 s).
timeout (int): Maximum amount of time in seconds to spend
waiting for connection
"""
if self.connected:
return self._connection
def on_error(exc, intervals, retries, interval=0):
round = self.completes_cycle(retries)
if round:
interval = next(intervals)
if errback:
errback(exc, interval)
self.maybe_switch_next() # select next host
return interval if round else 0
ctx = self._reraise_as_library_errors
if not reraise_as_library_errors:
ctx = self._dummy_context
with ctx():
> return retry_over_time(
self._connection_factory, self.recoverable_connection_errors,
(), {}, on_error, max_retries,
interval_start, interval_step, interval_max,
callback, timeout=timeout
)
../../../.local/lib/python3.10/site-packages/kombu/connection.py:433:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fun = <bound method Connection._connection_factory of <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>>
catch = (<class 'amqp.exceptions.RecoverableConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
args = [], kwargs = {}
errback = <function Connection._ensure_connection.<locals>.on_error at 0x7fd081f0c790>
max_retries = None, interval_start = 2, interval_step = 2, interval_max = 30
callback = None, timeout = 4
def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None, timeout=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
is increased for every retry until the max seconds is reached.
Arguments:
fun (Callable): The function to try
catch (Tuple[BaseException]): Exceptions to catch, can be either
tuple or a single exception class.
Keyword Arguments:
args (Tuple): Positional arguments passed on to the function.
kwargs (Dict): Keyword arguments passed on to the function.
errback (Callable): Callback for when an exception in ``catch``
is raised. The callback must take three arguments:
``exc``, ``interval_range`` and ``retries``, where ``exc``
is the exception instance, ``interval_range`` is an iterator
which return the time in seconds to sleep next, and ``retries``
is the number of previous retries.
max_retries (int): Maximum number of retries before we give up.
If neither of this and timeout is set, we will retry forever.
If one of this and timeout is reached, stop.
interval_start (float): How long (in seconds) we start sleeping
between retries.
interval_step (float): By how much the interval is increased for
each retry.
interval_max (float): Maximum number of seconds to sleep
between retries.
timeout (int): Maximum seconds waiting before we give up.
"""
kwargs = {} if not kwargs else kwargs
args = [] if not args else args
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
end = time() + timeout if timeout else None
for retries in count():
try:
> return fun(*args, **kwargs)
../../../.local/lib/python3.10/site-packages/kombu/utils/functional.py:312:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>
def _connection_factory(self):
self.declared_entities.clear()
self._default_channel = None
> self._connection = self._establish_connection()
../../../.local/lib/python3.10/site-packages/kombu/connection.py:877:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>
def _establish_connection(self):
self._debug('establishing connection...')
> conn = self.transport.establish_connection()
../../../.local/lib/python3.10/site-packages/kombu/connection.py:812:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <kombu.transport.pyamqp.Transport object at 0x7fd081b67490>
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
for name, default_value in self.default_connection_params.items():
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.hostname == 'localhost':
conninfo.hostname = '127.0.0.1'
# when server_hostname is None, use hostname from URI.
if isinstance(conninfo.ssl, dict) and \
'server_hostname' in conninfo.ssl and \
conninfo.ssl['server_hostname'] is None:
conninfo.ssl['server_hostname'] = conninfo.hostname
opts = dict({
'host': conninfo.host,
'userid': conninfo.userid,
'password': conninfo.password,
'login_method': conninfo.login_method,
'virtual_host': conninfo.virtual_host,
'insist': conninfo.insist,
'ssl': conninfo.ssl,
'connect_timeout': conninfo.connect_timeout,
'heartbeat': conninfo.heartbeat,
}, **conninfo.transport_options or {})
conn = self.Connection(**opts)
conn.client = self.client
> conn.connect()
../../../.local/lib/python3.10/site-packages/kombu/transport/pyamqp.py:201:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <AMQP Connection: 127.0.0.1:5672// (disconnected) at 0x7fd081b2ace0>
callback = None
def connect(self, callback=None):
# Let the transport.py module setup the actual
# socket connection to the broker.
#
if self.connected:
return callback() if callback else None
try:
self.transport = self.Transport(
self.host, self.connect_timeout, self.ssl,
self.read_timeout, self.write_timeout,
socket_settings=self.socket_settings,
)
> self.transport.connect()
../../../.local/lib/python3.10/site-packages/amqp/connection.py:323:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <TCPTransport: (disconnected) at 0x7fd081c8fac0>
def connect(self):
try:
# are we already connected?
if self.connected:
return
> self._connect(self.host, self.port, self.connect_timeout)
../../../.local/lib/python3.10/site-packages/amqp/transport.py:129:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <TCPTransport: (disconnected) at 0x7fd081c8fac0>, host = '127.0.0.1'
port = 5672, timeout = 4
def _connect(self, host, port, timeout):
entries = socket.getaddrinfo(
host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, SOL_TCP,
)
for i, res in enumerate(entries):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
set_cloexec(self.sock, True)
except NotImplementedError:
pass
self.sock.settimeout(timeout)
> self.sock.connect(sa)
E ConnectionRefusedError: [Errno 111] Connection refused
../../../.local/lib/python3.10/site-packages/amqp/transport.py:184: ConnectionRefusedError
The above exception was the direct cause of the following exception:
api_client = <utils.tenancy_api_client.TenantAPIClient object at 0x7fd081f1fdc0>
stock_json = {'collect_on': '2022-03-07', 'owner': {'address': 'Los Angeles', 'email': '[email protected]', 'first_name': 'Lukas', 'i...mZQknaVNvcXKiEtmlWxxTWGjpBzZbrCSDkkKhkhlIoLZeputqKZNSdfbKlQTJzWBAgfZIoHiOTTusdXAQoCYHVXiQIqrhUpIvMNFhYvHUWRcOxJff'}]}]}
celery_config = {'broker_url': 'amqp://'}, celery_worker_parameters = {}
def test_stock_created_with_valid_data(api_client, stock_json, celery_config, celery_worker_parameters):
""" tests stock is created with valid json """
> res = api_client.post(reverse_lazy("core_api:stocks-list"), stock_json, format="json")
好吧,我得到了修复,因为rabbitmq-server是一个服务器,它必须在上面,就像在pytest启动之前必须运行postgresql(基于服务器的数据库)一样
对于 github actions 的 CI 方面,问题是我没有为rabbitmq 图像使用动态端口 所以我改变了
运行于:ubuntu-latest
服务: 兔子MQ: 图片:rabbitmq:3.8 环境: RABBITMQ_DEFAULT_USER:访客 RABBITMQ_DEFAULT_PASS:访客 端口: - 5672
到
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3.8
env:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports: ["5762:5762"]
现在在 github 上完美通过,不再有 Kombu 连接被拒绝:)