我正在使用 Symfony Messenger 组件在我的应用程序中实现 CloudEvents。
我有两个队列,两个队列都应该处理 CloudEvents。
这是我当前的配置:
routing:
'CloudEvents\V1\CloudEvent':
- incoming
- outgoing
当我想将 CloudEvent 添加到队列中时,我想准确指定事件应添加到队列中的位置。
$this->messageBus->dispatch(
new Envelope(
$cloudEvent,
[
new TransportMessageIdStamp('outgoing')
]
)
);
或
$this->messageBus->dispatch(
$cloudEvent,
[
new TransportMessageIdStamp('outgoing')
]
);
我都尝试了,但事件仍然进入“传出”通道。我怎样才能防止这种情况发生?
我用中间件解决了这个问题,但解决方案的一部分我不满意。如果添加更多邮票,中间件将破坏该过程。也许有人有更优雅的方式来处理这个问题。
这是我的中间件类:
<?php
declare(strict_types=1);
namespace App\Messenger\Middleware;
use CloudEvents\V1\CloudEvent;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
readonly class CloudEventRoutingMiddleware implements MiddlewareInterface
{
public function __construct(
private SendersLocatorInterface $sendersLocator
) {
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$message = $envelope->getMessage();
$stamps = $envelope->all();
// We want to prevent the middleware from being used during event consumption.
// This approach isn't ideal; we should find a more elegant solution.
if (count($stamps) > 1) {
return $stack->next()->handle($envelope, $stack);
}
if ($message instanceof CloudEvent) {
$transportName = $this->determineTransportName($message);
$senders = $this->sendersLocator->getSenders($envelope);
foreach ($senders as $name => $sender) {
if ($name === $transportName) {
/** @var SenderInterface $sender */
$sender->send($envelope);
return $envelope;
}
}
}
return $stack->next()->handle($envelope, $stack);
}
private function determineTransportName(CloudEvent $event): string
{
return match ($event->getType()) {
'task.processing' => 'incoming',
'task.completion' => 'outgoing',
default => throw new \RuntimeException(
sprintf('No transport found for event type: %s', $event->getType())
),
};
}
}