我在 kubernetes 集群中使用 pika 并使用队列中的消息,这会触发在新线程中启动函数。然而 RabbitMQ 似乎崩溃了,这些是我迄今为止找到的最好的日志:
2020-12-23 10:39:10,906] WARNING - WRITE indicated on fd=9, but writer callback is None; events=0b100 {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/selector_ioloop_adapter.py:393}
(repeats to a total of n=38 times)
2020-12-23 10:39:10,908] ERROR - _AsyncBaseTransport._produce() failed, aborting connection: error=IndexError('pop from an empty deque'); sock=<socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)>; Caller's stack:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable
self._produce()
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 822, in _produce
chunk = self._tx_buffers.popleft()
IndexError: pop from an empty deque
{/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:1103}
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable
self._produce()
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 822, in _produce
chunk = self._tx_buffers.popleft()
IndexError: pop from an empty deque
2020-12-23 10:39:10,908] INFO - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=IndexError('pop from an empty deque'); <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:904}
2020-12-23 10:39:10,908] INFO - Deactivating transport: state=1; <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:869}
2020-12-23 10:39:10,909] ERROR - connection_lost: StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/adapters/base_connection.py:428}
2020-12-23 10:39:10,909] INFO - AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",); pending-error=None {/usr/local/lib/python3.9/site-packages/pika/connection.py:1996}
2020-12-23 10:39:10,909] INFO - Stack terminated due to StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/connection.py:2065}
2020-12-23 10:39:10,909] INFO - Closing transport socket and unlinking: state=2; <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:882}
2020-12-23 10:39:10,909] ERROR - Unexpected connection close detected: StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:520}
2020-12-23 10:39:31,416] INFO - Pika version 1.1.0 connecting to ('192.168.101.201', 5672) {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:179}
2020-12-23 10:39:31,417] INFO - Socket connected: <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 47142), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:345}
2020-12-23 10:39:31,418] INFO - Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>>). {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:428}
2020-12-23 10:39:31,421] INFO - AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:293}
2020-12-23 10:39:31,421] INFO - AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:725}
2020-12-23 10:39:31,421] INFO - Connection workflow succeeded: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:452}
2020-12-23 10:39:31,422] INFO - Created channel=1 {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:1247
}
我的消费者有以下定义:
def publish_message(channel, message):
channel.basic_publish(exchange='',
routing_key='my_queue',
body=message)
def connect_to_mq():
credentials = pika.PlainCredentials(rabbit_user, rabbit_password)
parameters = pika.ConnectionParameters(rabbit_host, rabbit_port, '/', credentials)
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
channel.queue_declare(queue='my_queue')
return connection, channel
def on_message(channel, method_frame, header_frame, body):
message = body.decode('utf-8')
if message == 'do_work':
thread = threading.Thread(target=start_processing, args=(channel,))
thread.start()
publish_message(channel, 'initiated thread')
def start_processing(channel):
publish_message(channel, 'starting...')
time.sleep(240)
publish_message(channel, 'processing complete!')
def main():
connection, channel = connect_to_mq()
channel.basic_consume(queue='my_queue',
auto_ack=True,
on_message_callback=on_message)
channel.start_consuming()
我在单独线程中处理消息和工作负载的实现和策略是否存在本质上的错误,导致发生这种情况?
Pika 默认情况下不是线程安全的。理想情况下,您应该为每个线程保留一个连接。
Pika 不是线程安全的。并且您无法在线程之间共享一个连接...
如果您想要线程安全,请使用amqpstorm。
这是在多线程应用程序中使用 pika 和 amqpstorm 的简单示例:
import amqpstorm
import time
import threading
import multiprocessing
def simple_consumer(conn: amqpstorm.Connection):
with conn.channel() as channel:
while True:
msg = channel.basic.get('fruits')
if msg is None:
time.sleep(1)
continue
print(msg.body)
msg.ack()
return
def producer_task(conn: amqpstorm.Connection, counter: int):
with conn.channel() as channel:
while counter > 0:
channel.queue.declare('fruits')
print(f'Thread {counter}')
message = amqpstorm.Message.create(
channel,
body=f'Hello RabbitMQ! {counter}',
properties={
'content_type': 'text/plain',
"expiration": '5000'
}
)
message.publish('fruits')
counter -= 1
time.sleep(1)
print(f'end {counter}')
return
def main():
conn = amqpstorm.Connection('localhost', 'guest', 'guest')
p1 = threading.Thread(
target=producer_task,
kwargs={'conn': conn, 'counter': 10},
)
p1.start()
p2 = threading.Thread(
target=producer_task,
kwargs={'conn': conn, 'counter': 20}
)
p2.start()
p3 = threading.Thread(
target=producer_task,
kwargs={'conn': conn, 'counter': 30}
)
p3.start()
p4 = threading.Thread(
target=simple_consumer,
kwargs={'conn': conn}
)
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
print('end main')
return
if __name__ == '__main__':
main()
作为脚注:请注意,您不能在 amqpstorm 中的进程之间共享一个连接,因为每个进程都有其内存空间。同一个应用程序无法使用鼠兔:
import pika
import time
import threading
def on_message(message):
print("Message:", message.body)
return
def consumer(conn: pika.BlockingConnection):
with conn.channel() as channel:
channel.queue_declare(queue='fruits')
channel.basic_consume(queue='fruits', on_message_callback=on_message, auto_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.close()
def producer_task(conn: pika.BlockingConnection, counter: int):
with conn.channel() as channel:
while counter > 0:
channel.queue_declare('fruits')
print(f'Thread {counter}')
channel.basic_publish(
exchange='',
routing_key='fruits',
body=f'Hello RabbitMQ! {counter}',
properties=pika.BasicProperties(expiration='5000')
)
counter -= 1
time.sleep(1)
print(f'end {counter}')
return
def main():
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
p1 = threading.Thread(
target=producer_task,
kwargs={'conn': conn, 'counter': 10},
)
p1.start()
p2 = threading.Thread(
target=producer_task,
kwargs={'conn': conn, 'counter': 20}
)
p2.start()
p3 = threading.Thread(
target=producer_task,
kwargs={'conn': conn, 'counter': 30}
)
p3.start()
p4 = threading.Thread(
target=consumer,
kwargs={'conn': conn}
)
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
print('end main')
return
if __name__ == '__main__':
main()