php index.php JobProcessing/process_progress_rabbitmq
当单个用户上传文件时,此功能正常。但是,当有多个用户(例如,一次一次使用50个用户)时,每个用户必须等待上一个用户的任务完成。我想实现并行处理,以便用户不必等待转弯。 我考虑的一种解决方案是创建多个工人,但是我担心可以创建工人数量的限制。例如,如果有500名用户同时上传,我可以安全创建多少名工人?是否有建议处理此情况的方法,尤其是考虑系统资源和兔子限制?
defined('BASEPATH') or exit('No direct script access allowed');
require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Spatie\Async\Pool;
class Rabbit_mq
protected $ci, $connection, $channel, $pool;
public function __construct()
try {
$this->ci = &get_instance();
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
} catch (Exception $e) {
echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n";
function addToQueue($data)
$this->channel->queue_declare('file_processing_new', false, true, false, false);
$msg = new AMQPMessage(json_encode($data));
$this->channel->basic_publish($msg, '', 'file_processing_new');
public function processQueue()
$this->channel->queue_declare('file_processing_new', false, true, false, false);
// Set prefetch count to allow multiple messages to be handled concurrently
$this->channel->basic_qos(null, 15, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
$fileId = $data['file_id'];
$filePath = $data['file_path'];
try {
echo "Received message for file ID: $fileId\n";
// Initialize the async pool
$this->pool = Pool::create();
// Add task to the pool for parallel processing
$this->pool->add(function () use ($fileId, $filePath) {
try {
echo "Processing file ID: $fileId\n";
$data = array(
'progress_status' => 'Processing',
'progress_percentage' => 50,
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId marked as Processing.\n";
// Simulate file processing (OCR or other logic here)
// Update status to 'Completed' after processing
$data = array(
'progress_status' => 'Completed',
'progress_percentage' => 100,
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId processing completed.\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
// Wait for all tasks to finish
$this->pool->wait(); // This will wait until all tasks are done
// Acknowledge the message after all tasks are processed
echo "Acknowledging message for file ID: $fileId\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
// Multiple consumers (workers) consuming the messages
for ($i = 0; $i < 25; $i++) {
$this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback);
echo "Waiting for messages. To exit press CTRL+C\n";
// Consume messages concurrently by multiple workers
while ($this->channel->callbacks) {
// Close the channel and connection when done
然后您制定要求必须始终提供多少用户。假设500.然后您启动了该类型的10个工人实例,然后根据您的要求始终保持它们的运行(10 x 50 = 500)。等等,依此类推。 lways以足够的备用能力,然后不断添加和破坏机器,以便您习惯它,并且您将始终拥有足够的工作能力。