我的所有测试 Symfony (7.0.4) Messenger 和处理消息都遇到问题。我在消息中有实体 ID(在 aws sqs 中),处理程序从数据库(不同服务器上的 postgres)加载实体。处理后,我在数据库中看到正确的处理结果,但处理程序向日志抛出异常。
Messenger 配置:
framework:
messenger:
failure_transport: failed
transports:
async:
dsn: 'https://sqs.%env(AWS_REGION)%.amazonaws.com/test-messenger.fifo?access_key=%env(AWS_ACCESS_KEY)%&secret_key=%env(AWS_URLENCODED_SECRET_KEY)%'
retry_strategy:
max_retries: 0
multiplier: 2
failed: 'https://sqs.%env(AWS_REGION)%.amazonaws.com/test-messenger-failed.fifo?access_key=%env(AWS_ACCESS_KEY)%&secret_key=%env(AWS_URLENCODED_SECRET_KEY)%'
routing:
App\Messenger\Message\NewValidCertificateForCustomDomainMessage: async
工人配置:
\[program:messenger-consume\]
command=php /var/www/bin/console messenger:consume async --memory-limit=128M -vv --time-limit=600
user=root
numprocs=4
startsecs=0
autostart=true
autorestart=true
startretries=10
stopwaitsecs=5
process_name=%(program_name)s\_%(process_num)02d
留言:
<?php
declare(strict_types=1);
namespace App\Messenger\Message;
class NewValidCertificateForCustomDomainMessage
{
public function __construct(
private string $accessTokenId,
) {
}
public function getAccessTokenId(): string
{
return $this->accessTokenId;
}
}
处理者:
<?php
declare(strict_types=1);
namespace App\Messenger\Handler;
use App\Entity\AccessToken;
use App\Messenger\Message\NewValidCertificateForCustomDomainMessage;
use App\Service\CustomDomainService;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
class NewValidCertificateForCustomDomainHandler
{
private const CONTEXT = 'NewValidCertificateForCustomDomainHandler';
public function __construct(
private CustomDomainService $customDomainService,
private EntityManagerInterface $entityManager,
private LoggerInterface $logger,
) {
}
public function __invoke(NewValidCertificateForCustomDomainMessage $newValidCertificateForCustomDomainMessage): void
{
if ($this->entityManager->getConnection()->isConnected() === false) {
$this->entityManager->getConnection()
->close();
}
$accessToken = $this->entityManager->getRepository(AccessToken::class)->find(
Uuid::fromString($newValidCertificateForCustomDomainMessage->getAccessTokenId())
);
if (!$accessToken instanceof AccessToken) {
$this->logger->error(
'AccessToken not found!',
[
'context' => self::CONTEXT,
'accessTokenId' => $newValidCertificateForCustomDomainMessage->getAccessTokenId(),
]
);
throw new \RuntimeException('AccessToken not found!');
}
if (!$this->customDomainService->addCustomDomainToDistribution($accessToken)) {
throw new \RuntimeException('Failed to add custom domain to CloudFront distribution!');
}
}
}
处理后我看到正确的结果,但在日志中是例外:
{
"message": "Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: \"{error}\"",
"context": {
"class": "App\\Messenger\\Message\\NewValidCertificateForCustomDomainMessage",
"retryCount": 0,
"error": "Handling \"App\\Messenger\\Message\\NewValidCertificateForCustomDomainMessage\" failed: An exception occurred while executing a query: SQLSTATE[HY000]: General error: 7 no connection to the server",
"exception": {
"class": "Symfony\\Component\\Messenger\\Exception\\HandlerFailedException",
"message": "Handling \"App\\Messenger\\Message\\NewValidCertificateForCustomDomainMessage\" failed: An exception occurred while executing a query: SQLSTATE[HY000]: General error: 7 no connection to the server",
"code": 7,
"file": "/var/www/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php:124",
"previous": {
"class": "Doctrine\\DBAL\\Exception\\DriverException",
"message": "An exception occurred while executing a query: SQLSTATE[HY000]: General error: 7 no connection to the server",
"code": 7,
"file": "/var/www/vendor/doctrine/dbal/src/Driver/API/PostgreSQL/ExceptionConverter.php:80",
"previous": {
"class": "Doctrine\\DBAL\\Driver\\PDO\\Exception",
"message": "SQLSTATE[HY000]: General error: 7 no connection to the server",
"code": 7,
"file": "/var/www/vendor/doctrine/dbal/src/Driver/PDO/Exception.php:28",
"previous": {
"class": "PDOException",
"message": "SQLSTATE[HY000]: General error: 7 no connection to the server",
"code": 0,
"file": "/var/www/vendor/doctrine/dbal/src/Driver/PDO/Statement.php:55"
}
}
}
}
},
"level": 500,
"level_name": "CRITICAL",
"channel": "messenger",
"datetime": "2024-03-25T00:05:02.362953+00:00",
"extra": {}
}
...消息将重新发送到失败的队列。
当我将逻辑转移到基本 Symfony 命令并通过 CRON 进行处理时 - 一切正常。我尝试在处理程序中重置连接(您可以在代码中看到它),但没有效果。
我很高兴提供任何建议。谢谢你
我尝试重置连接。 - 不工作 将工人时间减少到 600 秒。 - 不工作 在 cron 命令中处理相同的逻辑。 - 工作
正如OP评论中提到的,解决方案在于在总线中使用Doctrine中间件。
https://symfony.com/doc/current/messenger.html#middleware-for-doctrine
除此之外,请确保在总线配置中指定正确的总线名称。例如,如果您更改了默认总线,则需要相应更新中间件配置。
以下是如何在messenger.yaml中进行配置的示例:
framework:
messenger:
default_bus: the.magic.bus
buses:
the.magic.bus: # this must match the bus you are using
middleware:
- doctrine_ping_connection
- doctrine_close_connection
- doctrine_open_transaction_logger
- doctrine_transaction
使用
debug:messenger
查看可用巴士列表:
bin/console debug:messenger