具有 CloudEvents 和多个队列的 Symfony Messenger

问题描述 投票:0回答:1

我正在使用 Symfony Messenger 组件在我的应用程序中实现 CloudEvents。

我有两个队列,两个队列都应该处理 CloudEvents。

这是我当前的配置:

routing:
    'CloudEvents\V1\CloudEvent':
      - incoming
      - outgoing

当我想将 CloudEvent 添加到队列中时,我想准确指定事件应添加到队列中的位置。

$this->messageBus->dispatch(
    new Envelope(
        $cloudEvent,
        [
            new TransportMessageIdStamp('outgoing')
        ]
    )
);

$this->messageBus->dispatch(
        $cloudEvent,
        [
            new TransportMessageIdStamp('outgoing')
        ]
);

我都尝试了,但事件仍然进入“传出”通道。我怎样才能防止这种情况发生?

symfony symfony-messenger message-bus cloudevents
1个回答
0
投票

我用中间件解决了这个问题,但解决方案的一部分我不满意。如果添加更多邮票,中间件将破坏该过程。也许有人有更优雅的方式来处理这个问题。

这是我的中间件类:

<?php

declare(strict_types=1);

namespace App\Messenger\Middleware;

use CloudEvents\V1\CloudEvent;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

readonly class CloudEventRoutingMiddleware implements MiddlewareInterface
{
    public function __construct(
        private SendersLocatorInterface $sendersLocator
    ) {
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $message = $envelope->getMessage();
        $stamps = $envelope->all();

        // We want to prevent the middleware from being used during event consumption.
        // This approach isn't ideal; we should find a more elegant solution.
        if (count($stamps) > 1) {
            return $stack->next()->handle($envelope, $stack);
        }

        if ($message instanceof CloudEvent) {
            $transportName = $this->determineTransportName($message);
            $senders = $this->sendersLocator->getSenders($envelope);

            foreach ($senders as $name => $sender) {
                if ($name === $transportName) {
                    /** @var SenderInterface $sender */
                    $sender->send($envelope);

                    return $envelope;
                }
            }
        }

        return $stack->next()->handle($envelope, $stack);
    }

    private function determineTransportName(CloudEvent $event): string
    {
        return match ($event->getType()) {
            'task.processing' => 'incoming',
            'task.completion' => 'outgoing',
            default => throw new \RuntimeException(
                sprintf('No transport found for event type: %s', $event->getType())
            ),
        };
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.