Ratchet 消息未发送到套接字客户端

问题描述 投票:0回答:1

我正在开发一个供个人使用的

terminal on web
应用程序,并使用
Ratchet
作为套接字服务器。 它可以很好地使用
ls
等简单命令。 但是,当我运行像
ping
这样的命令时,它只是不会将
stdout
stderr
的内容发送到套接字客户端。但是,即使在这些命令中,我也可以在终端中看到该命令的实时输出。 这是我的实现

<?php

use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;

// ini_set('display_errors', 1);
// ini_set('display_startup_errors', 1);
// error_reporting(E_ALL);

require 'vendor/autoload.php'; // Ensure you have Ratchet installed

class TerminalServer implements MessageComponentInterface {
    protected $clients;
    private array $open_processes;

    public function __construct() {
        $this->open_processes = [];
        $this->clients = new \SplObjectStorage;
    }

    public function onOpen(ConnectionInterface $conn) {
        echo "New connection\n";
        $this->clients->attach($conn);
    }

    static function get_output_json( string $message , array $opts = []){
        return json_encode( [ 'data' => $message , 'opts' => $opts ] );
    }

    private function command_exec(ConnectionInterface &$from , string $command){
        
        $descriptorspec = [
            0 => ["pipe", "r"],  // stdin
            1 => ["pipe", "w"],  // stdout
            2 => ["pipe", "w"],  // stderr
        ];
    
        $process = proc_open($command, $descriptorspec, $pipes);
        $this->open_processes[] = $process;
    
        if (is_resource($process)) {

            // Close the input pipe, since we're not sending any input
            fclose($pipes[0]);
    
            // Set the pipes to non-blocking mode
            stream_set_blocking($pipes[1], false);
            stream_set_blocking($pipes[2], false);
    
            while (true) {

                $output = stream_get_contents($pipes[1]);
                $error = stream_get_contents($pipes[2]);
    
                if ($output) {

                    $string_to_send = $this->get_output_json($output);
                    echo "Out: $string_to_send\n";
                    $from->send($string_to_send);

                }
                
                if ($error) {

                    $string_to_send = $this->get_output_json($error);
                    echo "Err: $string_to_send\n";
                    $from->send($string_to_send);

                }

                flush();

                // Check if the process is still running                
                $status = proc_get_status($process);
                if (!$status['running']) {
                    break;
                }
    
                // Small delay to prevent busy-waiting
                usleep(100000); // 100ms
            }
    
            // Close pipes and process
            fclose($pipes[1]);
            fclose($pipes[2]);
            $return_value = $this->command_end($process);

            $from->send($this->get_output_json("Command ended with code $return_value\n", ['exit' => true]));

        } else {

            $from->send($this->get_output_json("Command failed to execute", ['exit' => true]));

        }

    }    

    /**end or terminate a command and return the exit code */
    public static function command_end( $proc ) :int|false {

        if( !is_resource($proc) ){
            return false;
        }

        $status = proc_get_status( $proc );

        if( $status['running'] === true ) {
            proc_terminate($proc , SIGINT);
            return SIGINT;
        } else {
            return proc_close($proc);
        }

    }

    public function onMessage(ConnectionInterface $from, $msg) {
        $command = trim($msg);
        
//        $allowedCommands = ['ls -l', 'pwd', 'whoami'];
    
        if (true || in_array($command, $allowedCommands)) {
            $this->command_exec($from , $command);
        } else {            
            $from->send($this->get_output_json("Command not allowed.\r\n"));
            
        }

    }

    public function onClose(ConnectionInterface $conn) {

        echo "Connection closed\n";

        foreach ($this->open_processes as $p) {
            var_dump($p);
            proc_close($p);
        }
        $this->clients->detach($conn);

    }

    public function onError(ConnectionInterface $conn, \Exception $e) {
        echo "Connection closed because error\n";
        $conn->close();
    }
}

$app = new Ratchet\App('localhost', 2222);
$app->route('/terminal', new TerminalServer, ['*']);
echo "No errors so far\n";
$app->run();

我尝试通过更改

command_exec
函数使命令在读取 10 行后退出,并且它在退出后立即发送所有输出

private function command_exec(ConnectionInterface &$from , string $command){
        
        $descriptorspec = [
            0 => ["pipe", "r"],  // stdin
            1 => ["pipe", "w"],  // stdout
            2 => ["pipe", "w"],  // stderr
        ];
    
        $process = proc_open($command, $descriptorspec, $pipes);
        $this->open_processes[] = $process;
    
        if (is_resource($process)) {

            // Close the input pipe, since we're not sending any input
            fclose($pipes[0]);
    
            // Set the pipes to non-blocking mode
            stream_set_blocking($pipes[1], false);
            stream_set_blocking($pipes[2], false);

            $i = 0;//count line read

            while (true) {

                $output = stream_get_contents($pipes[1]);
                $error = stream_get_contents($pipes[2]);
    
                if ($output) {

                    $string_to_send = $this->get_output_json($output);
                    echo "Out: $string_to_send\n";
                    $from->send($string_to_send);
                    $i++;

                    if( $i > 10 ){
                        break;
                    }

                }
                
                if ($error) {

                    $string_to_send = $this->get_output_json($error);
                    echo "Err: $string_to_send\n";
                    $from->send($string_to_send);

                }

                flush();

                // Check if the process is still running                
                $status = proc_get_status($process);
                if (!$status['running']) {
                    break;
                }
    
                // Small delay to prevent busy-waiting
                usleep(100000); // 100ms
            }
    
            // Close pipes and process
            fclose($pipes[1]);
            fclose($pipes[2]);
            $return_value = $this->command_end($process);

            $from->send($this->get_output_json("Command ended with code $return_value\n", ['exit' => true]));

        } else {

            $from->send($this->get_output_json("Command failed to execute", ['exit' => true]));

        }

    }
php ratchet phpwebsocket proc-open
1个回答
0
投票

我能够找到问题所在,这里

因此很明显,

$from->send($data)
方法正在等待执行 ping 命令,并且必须以非阻塞方式完成,如同一页面中所示

所以我改变了

command_exec

如下

private function command_exec(ConnectionInterface &$from, string $command){ $descriptorspec = [ 0 => ["pipe", "r"], // stdin 1 => ["pipe", "w"], // stdout 2 => ["pipe", "w"], // stderr ]; $process = proc_open($command, $descriptorspec, $pipes); println("Created process"); $this->add_to_open_processes( $from , $process ); if (is_resource($process)) { fclose($pipes[0]); $stdout_promise = new Deferred(); $stderr_promise = new Deferred(); $command_end_callback = function () use ($from, $pipes, $process) { println("all promise resolved"); array_map( 'fclose' , array_filter( [$pipes[1] , $pipes[2]] , 'is_resource') ); $return_value = $this->command_end($process); $this->send_ln( $from , "Command ended with code $return_value"); }; $main_promise = \React\Promise\all([$stdout_promise->promise() , $stderr_promise->promise()]); $main_promise->then($command_end_callback); if (is_resource($pipes[1]) && 'stream' === get_resource_type($pipes[1])) { println("stdout is stream"); $stdout_stream = new Stream($pipes[1]); $this->add_to_open_streams( $from , $stdout_stream); $stdout_stream->on('data', function($data) use ($from){ $string_to_send = $this->get_output_json($data); println("Out: $string_to_send"); $from->send($string_to_send); }); $stdout_stream->on('end', function() use( $stdout_promise ) { $stdout_promise->resolve(true); }); } else { $stdout_promise->resolve(true); } if (is_resource($pipes[2]) && 'stream' === get_resource_type($pipes[2])) { println("stderr is stream"); $stderr_stream = new Stream($pipes[2]); $this->add_to_open_streams( $from , $stderr_stream); $stderr_stream->on('data', function($data) use ($from){ $string_to_send = $this->get_output_json($data); println("Err: $string_to_send"); $from->send($string_to_send); }); $stderr_stream->on('end', function() use( $stderr_promise ) { $stderr_promise->resolve(true); }); } else { $stderr_promise->resolve(true); } } }
我在

\React\Promise\all

STDOUT
结束后使用
STDERR
来结束命令过程。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.