在多个文件上载的Codeigniter中使用RabbitMQ进行平行处理

问题描述 投票:0回答:1
php index.php JobProcessing/process_progress_rabbitmq

当单个用户上传文件时,此功能正常。但是,当有多个用户(例如,一次一次使用50个用户)时,每个用户必须等待上一个用户的任务完成。我想实现并行处理,以便用户不必等待转弯。 我考虑的一种解决方案是创建多个工人,但是我担心可以创建工人数量的限制。例如,如果有500名用户同时上传,我可以安全创建多少名工人?是否有建议处理此情况的方法,尤其是考虑系统资源和兔子限制?

任何见解或建议将不胜感激!

below是我的代码:

<?php defined('BASEPATH') or exit('No direct script access allowed'); require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use Spatie\Async\Pool; class Rabbit_mq { protected $ci, $connection, $channel, $pool; public function __construct() { try { $this->ci = &get_instance(); $this->ci->load->model('invoice_scan_model'); $this->ci->load->library('scan_invoice_lib'); $this->ci->load->library('quick_books'); $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $this->channel = $this->connection->channel(); } catch (Exception $e) { echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n"; } } function addToQueue($data) { $this->channel->queue_declare('file_processing_new', false, true, false, false); $msg = new AMQPMessage(json_encode($data)); $this->channel->basic_publish($msg, '', 'file_processing_new'); } public function processQueue() { $this->channel->queue_declare('file_processing_new', false, true, false, false); // Set prefetch count to allow multiple messages to be handled concurrently $this->channel->basic_qos(null, 15, null); $callback = function ($msg) { $data = json_decode($msg->body, true); $fileId = $data['file_id']; $filePath = $data['file_path']; try { echo "Received message for file ID: $fileId\n"; // Initialize the async pool $this->pool = Pool::create(); // Add task to the pool for parallel processing $this->pool->add(function () use ($fileId, $filePath) { try { echo "Processing file ID: $fileId\n"; $data = array( 'progress_status' => 'Processing', 'progress_percentage' => 50, ); $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data); echo "File ID: $fileId marked as Processing.\n"; // Simulate file processing (OCR or other logic here) $this->process_invoice($fileId); // Update status to 'Completed' after processing $data = array( 'progress_status' => 'Completed', 'progress_percentage' => 100, ); $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data); echo "File ID: $fileId processing completed.\n"; } catch (Exception $e) { echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n"; } }); // Wait for all tasks to finish $this->pool->wait(); // This will wait until all tasks are done // Acknowledge the message after all tasks are processed $this->channel->basic_ack($msg->delivery_info['delivery_tag']); echo "Acknowledging message for file ID: $fileId\n"; } catch (Exception $e) { echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n"; $this->channel->basic_nack($msg->delivery_info['delivery_tag']); } }; // Multiple consumers (workers) consuming the messages for ($i = 0; $i < 25; $i++) { $this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback); } echo "Waiting for messages. To exit press CTRL+C\n"; // Consume messages concurrently by multiple workers while ($this->channel->callbacks) { $this->channel->wait(); } // Close the channel and connection when done $this->channel->close(); $this->connection->close(); } }

首先,您必须找出所有可以安全创建多少工人。那是一个负载测试。您可以在相同的系统上这样做,以免杀死生产框。

LLES说您发现一个系统可以一次处理50个用户。

然后您制定要求必须始终提供多少用户。假设500.
然后您启动了该类型的10个工人实例,然后根据您的要求始终保持它们的运行(10 x 50 = 500)。
php codeigniter rabbitmq
1个回答
0
投票
然后,如果您现在有100个用户,则可以用3台机器(12 x 50 = 600)减少。

等等,依此类推。 lways以足够的备用能力,然后不断添加和破坏机器,以便您习惯它,并且您将始终拥有足够的工作能力。

该计算不是完全正确的,但是您可以在前几周运行并收集您需要提前计划的必要指标。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.