例如,我有一个名为
TestQueueJob
的作业,它正在使用数据库队列驱动程序进行处理,在该作业中,我调用另一个名为 TestSyncJob
的作业,该作业以同步模式运行(TestSyncJob::dispatchSync()
)。我现在面临的问题是我需要 TestSyncJob
永远不要重叠,当我有 2 个或更多 TestQueueJob
作业同时运行时,问题就会出现。我尝试使用 WithoutOverlapping
类实现作业中间件,但在同步模式下,它只是跳过 TestSyncJob
作业,我需要它等到另一个 TestSyncJob
执行完成并继续。
TestQueueJob::dispatch(); //running using database queue driver, there are > 1 numprocs in supervisor for the queue
class TestQueueJob implements ShouldQueue
{
public function handle(): void
{
//some code
TestSyncJob::dispatchSync(); //should wait until other TestSyncJob is finished and then run
//some code
}
}
use Illuminate\Queue\Middleware\WithoutOverlapping;
class TestSyncJob implements ShouldQueue
{
public function handle(): void
{
\Log::info('Sync job started!');
sleep(10);
\Log::info('Sync job finished!');
}
public function middleware()
{
return [new WithoutOverlapping(1)];
}
}
我最终扩展了现有的
WithoutOverlapping
中间件。它现在正在等待其他作业完成并继续执行作业,而不是在“同步”队列模式下跳过它。
class CustomWithoutOverlapping extends WithoutOverlapping
{
public function handle($job, $next)
{
$lock = Container::getInstance()->make(Cache::class)->lock(
$this->getLockKey($job), $this->expiresAfter
);
if ($lock->get()) {
try {
$next($job);
} finally {
$lock->release();
}
//START of new code
} elseif ($job->connection === 'sync') {
do {
sleep(1);
} while (!$lock->get());
try {
$next($job);
} finally {
$lock->release();
}
//END of new code
} elseif (!is_null($this->releaseAfter)) {
$job->release($this->releaseAfter);
}
}
}