在Symfony Messenger组件中绑定exchange到exchange

问题描述 投票:0回答:1

是否可以在 Symfony 消息组件中定义交换之间的绑定? (4.4版本在这里)。

我知道可以将交换绑定到队列,如下所示:

transports:
    incoming:
        dsn: "%env(RABBITMQ_SHARED_URL)%"
        options:
            queues:
                app.pl_incoming_events:
                    binding_keys:
                        - pl.app.#
            exchange:
                name: my_app.incoming
                type: topic

然后应用程序设置它们之间的交换、队列和绑定。我希望根据路由密钥将交换绑定到另一个交换具有相同的效果。

我知道我可以使用rabbitmq-bundle,但IMO是多余的——我想保留一个组件来管理rabbitMQ。

例如,我想根据某个路由键将

other_app
交换绑定到
my_app.incoming
交换。

symfony rabbitmq symfony4 symfony-messenger
1个回答
4
投票

Messenger 不是 RabbitMQ 管理器,您甚至无法在同一传输中声明多个开箱即用的交换。 但由于它具有所有必需的组件,并且在这种情况下 symfony 对配置有点宽松,因此您可以滥用该系统并自己构建它。

由于我不知道您的要求,我会保持基本,希望它能帮助您入门。

从创建

AmqpTransportFactory
开始:

// src/Amqp/AmqpTransportFactory.php
namespace App\Amqp;

use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class AmqpTransportFactory implements TransportFactoryInterface
{
    public function createTransport(
        string $dsn, array $options, SerializerInterface $serializer
    ): TransportInterface
    {
        unset($options['transport_name']);
        $exchanges['name'] = $options['exchange']['name'];
        $exchanges['bindings'] = $options['exchange']['bindings'] ?? [];

        // Passing unknown options is deprecated in 5.1
        unset($options['exchange']['bindings']);

        $connection = Connection::fromDsn($dsn, $options);

        // Ensure our exchange is created first
        $connection->exchange()->declareExchange();
        $channel = $connection->channel();
    
        // This is normally done in the Connection, but is harder to override
        $this->createExchanges($channel, $exchanges);

        return new AmqpTransport($connection, $serializer);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'amqp://');
    }

    private function createExchanges(\AMQPChannel $channel, array $configuration): void
    {
        $factory = new AmqpFactory();

        foreach ($configuration['bindings'] as $exchange_name => $arguments) {
            $exchange = $factory->createExchange($channel);
            $exchange->setName($exchange_name);
            $exchange->setType($arguments['type'] ?? \AMQP_EX_TYPE_FANOUT);
            $exchange->setFlags($arguments['flags'] ?? \AMQP_DURABLE);
            $exchange->declareExchange();

            if (!is_array($arguments['binding_keys'])) {
                $arguments['binding_keys'] = [$arguments['binding_keys']];
            }

            foreach ($arguments['binding_keys'] as $key) {
                $exchange->bind($configuration['name'], $key);
            }
        }
    }
}

注册服务:

# config/services.yaml
services:
  messenger.transport.amqp.factory:
    class: App\Amqp\AmqpTransportFactory

将新配置添加到交易所:

# config/packages/messenger.yaml
exchange:
  name: my_app.incoming
  type: topic
  bindings:
    other_app:
      type: direct
      binding_keys: ['route']

它将导致以下绑定:

+-----------------+------------------------+------------------------+
|     source      |      destination       |      routing_key       |
+-----------------+------------------------+------------------------+
| my_app.incoming | app.pl_incoming_events | pl.app.#               |
| my_app.incoming | other_app              | route                  |
+-----------------+------------------------+------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.