是否可以在 Symfony 消息组件中定义交换之间的绑定? (4.4版本在这里)。
我知道可以将交换绑定到队列,如下所示:
transports:
incoming:
dsn: "%env(RABBITMQ_SHARED_URL)%"
options:
queues:
app.pl_incoming_events:
binding_keys:
- pl.app.#
exchange:
name: my_app.incoming
type: topic
然后应用程序设置它们之间的交换、队列和绑定。我希望根据路由密钥将交换绑定到另一个交换具有相同的效果。
我知道我可以使用rabbitmq-bundle,但IMO是多余的——我想保留一个组件来管理rabbitMQ。
例如,我想根据某个路由键将
other_app
交换绑定到 my_app.incoming
交换。
Messenger 不是 RabbitMQ 管理器,您甚至无法在同一传输中声明多个开箱即用的交换。 但由于它具有所有必需的组件,并且在这种情况下 symfony 对配置有点宽松,因此您可以滥用该系统并自己构建它。
由于我不知道您的要求,我会保持基本,希望它能帮助您入门。
从创建
AmqpTransportFactory
开始:
// src/Amqp/AmqpTransportFactory.php
namespace App\Amqp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class AmqpTransportFactory implements TransportFactoryInterface
{
public function createTransport(
string $dsn, array $options, SerializerInterface $serializer
): TransportInterface
{
unset($options['transport_name']);
$exchanges['name'] = $options['exchange']['name'];
$exchanges['bindings'] = $options['exchange']['bindings'] ?? [];
// Passing unknown options is deprecated in 5.1
unset($options['exchange']['bindings']);
$connection = Connection::fromDsn($dsn, $options);
// Ensure our exchange is created first
$connection->exchange()->declareExchange();
$channel = $connection->channel();
// This is normally done in the Connection, but is harder to override
$this->createExchanges($channel, $exchanges);
return new AmqpTransport($connection, $serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'amqp://');
}
private function createExchanges(\AMQPChannel $channel, array $configuration): void
{
$factory = new AmqpFactory();
foreach ($configuration['bindings'] as $exchange_name => $arguments) {
$exchange = $factory->createExchange($channel);
$exchange->setName($exchange_name);
$exchange->setType($arguments['type'] ?? \AMQP_EX_TYPE_FANOUT);
$exchange->setFlags($arguments['flags'] ?? \AMQP_DURABLE);
$exchange->declareExchange();
if (!is_array($arguments['binding_keys'])) {
$arguments['binding_keys'] = [$arguments['binding_keys']];
}
foreach ($arguments['binding_keys'] as $key) {
$exchange->bind($configuration['name'], $key);
}
}
}
}
注册服务:
# config/services.yaml
services:
messenger.transport.amqp.factory:
class: App\Amqp\AmqpTransportFactory
将新配置添加到交易所:
# config/packages/messenger.yaml
exchange:
name: my_app.incoming
type: topic
bindings:
other_app:
type: direct
binding_keys: ['route']
它将导致以下绑定:
+-----------------+------------------------+------------------------+
| source | destination | routing_key |
+-----------------+------------------------+------------------------+
| my_app.incoming | app.pl_incoming_events | pl.app.# |
| my_app.incoming | other_app | route |
+-----------------+------------------------+------------------------+