使用 ReactPHP 进行多个 API 请求

问题描述 投票:0回答:1

我的目标

我对 ReactPHP、异步编程和软件开发完全陌生。我目前正在开发一个我想要异步的 API。 API 必须向外部 API 发出大量 API 请求。我的想法是在后台启动 API 请求,这样它就不会阻止进一步的客户端请求,但我无法使其正常工作。

在整个 API 中,我们必须向多个分发者发出请求,这些分发者都有不同的处理 API 请求的方式,因此我们必须单独实现每个分发者。

在 API 中我使用的是 ReactPHP EventLoop。

我正在做什么

我正在使用 React\Promise ll()。因此我创建了 5 个测试 Promise 并将它们提供给 all():

信息:我使用 Guzzle 来执行请求,因为我必须将标头放入该特定分发者的curl 选项中。

    private function requestProduct($id, $access_token, $request_url, $client_id)
    {
        $request_url = str_replace(' ','',$request_url);

        $data = [
            'currency' => 'EUR',
            'limit' => 1,
            'search' => $id
        ];

        $request_url .= "search?" . http_build_query($data);

        $this->setRequestUrl($request_url);

        $header = [
            "content-type: application/x-www-form-urlencode",
            "accept: application/json",
            "authorization: Bearer $access_token",
            "client_id: $client_id"
        ];

        // $this->setHeader($header);

        $this->client = new Client();

        for ($i = 0; $i < 5; $i++) {
            $promises[] = new Promise(function ($resolve, $reject) use ($request_url, $header, $i) {
                try {
                    echo "Starting async request {$i} to URL: {$request_url}" . PHP_EOL;
        
                    $this->client->getAsync($request_url, [
                        'curl' => [
                            CURLOPT_HTTPHEADER => $header,
                            CURLOPT_SSL_VERIFYPEER => false,
                            CURLOPT_SSL_VERIFYHOST => false,
                        ]
                    ])
                    ->then(
                        function (ResponseInterface $response) use ($resolve, $i) {
                            $content = $response->getBody()->getContents();
                            $decodedResponse = json_decode($content, true);
        
                            if (json_last_error() !== JSON_ERROR_NONE) {
                                throw new Exception('Invalid JSON response');
                            }
        
                            echo "Request {$i} completed successfully." . PHP_EOL;
                            $resolve($decodedResponse);
                        },
                        function (Exception $e) use ($reject, $i) {
                            echo "Error during async GET request {$i}: {$e->getMessage()}" . PHP_EOL;
                            $reject($e);
                        }
                    );
                } catch (Exception $e) {
                    echo "Unexpected error in request {$i}: {$e->getMessage()}" . PHP_EOL;
                    $reject($e);
                }
            });
        }
        
        echo "Waiting for all promises to resolve" . PHP_EOL;
        // Wait for all promises to resolve
        return all($promises)
            ->then(function($responses) {
                $result = [];
                echo "All Promises resolved" . PHP_EOL;
                echo gettype($responses) . PHP_EOL;
                foreach ($responses as $i => $response) {
                    // print_r($response);
                    $result[$i] = $response['status']; 
                    echo "Result {$i} decoded" . PHP_EOL;
                }

                print_r($result);
                return $result;
            })
            ->catch(function($error) {
                echo "An error occurred: {$error->getMessage()}" . PHP_EOL;
                throw new Exception("Error in all()", 500);
            });
    }

该函数在名为 getProductByMpn 的函数中被调用。
从 all() 返回的 Promise 应该在控制器中解决。

    public function getProductByMpn($request)
    {
        try {
            $body = $request->getParsedBody();
            $distributor = DistributorHelper::getDistributorNameFromPath($request);
            $distributorID = $body['linr'];
            $mpn = $body['mpn'] ?? null;

            if (!$distributor || !$mpn || !$distributorID) {
                throw new Exception('Missing distributor, distributor ID or mpn', 400);
            }

            return $this->distributorService
                        ->getProductByMpn($distributor, $mpn, $distributorID)
                        ->then(function ($response) {
                            echo "Response successful". PHP_EOL;
                            return JsonResponse::ok($response);
                        })
                        ->catch(function ($error) {
                            echo "Error in getProductByMpn: {$error->getMessage()}" . PHP_EOL;
                            throw new Exception("Error in getProductByMpn: {$error->getMessage()}", 500);
                        });
        } catch (Exception $e) {
            return JsonResponse::individual($e->getCode(), ['error' => $e->getMessage()]);
        }
    }

我的问题

现在的问题是它没有解决。相反,它似乎卡在某个地方了。我认为我在解决 Promise 时做错了什么,但我找不到错误。

我还尝试了一个 API 请求,它工作得很好:

    private function requestProduct($id, $access_token, $request_url, $client_id)
    {
        $request_url = str_replace(' ','',$request_url);

        $data = [
            'currency' => 'EUR',
            'limit' => 1,
            'search' => $id
        ];

        $request_url .= "search?" . http_build_query($data);

        $this->setRequestUrl($request_url);

        $header = [
            "content-type: application/x-www-form-urlencode",
            "accept: application/json",
            "authorization: Bearer $access_token",
            "client_id: $client_id"
        ];

        // $this->setHeader($header);

        $this->client = new Client();

        return new Promise(function ($resolve, $reject) use ($request_url, $header) {
            try {
                echo "Starting async request to URL: {$request_url}" . PHP_EOL;

                $this->client->getAsync($request_url, [
                    'curl' => [
                        CURLOPT_HTTPHEADER => $header,
                        CURLOPT_SSL_VERIFYPEER => false,
                        CURLOPT_SSL_VERIFYHOST => false,
                    ]
                    ])
                    ->then(
                        function (ResponseInterface $response) use ($resolve) {
                            $content = $response->getBody()->getContents();
                            // echo "Response received: $content" . PHP_EOL;

                            $decodedResponse = json_decode($content, true);

                            if (json_last_error() !== JSON_ERROR_NONE) {
                                throw new Exception('Invalid JSON response');
                            }

                            $resolve($decodedResponse);
                        },
                        function (Exception $e) use ($reject) {
                            echo "Error during async GET request: {$e->getMessage()}" . PHP_EOL;
                            $reject($e);
                        }
                    )
                    ->wait(); // has to stand here to resolve correctly?
            } catch (Exception $e) {
                echo "Unexpected error: {$e->getMessage()}" . PHP_EOL;
                $reject($e);
            }
        });
    }

有趣的是,当我将它们混合在一起时,all() 函数工作正常并打印出结果。

这就是混合函数。我认为这与单个 Promise 中的 wait() 有关。当 Promise 没有 wait() 时,它也不起作用。

    private function requestProduct($id, $access_token, $request_url, $client_id)
    {
        $request_url = str_replace(' ','',$request_url);

        $data = [
            'currency' => 'EUR',
            'limit' => 1,
            'search' => $id
        ];

        $request_url .= "search?" . http_build_query($data);

        $this->setRequestUrl($request_url);

        $header = [
            "content-type: application/x-www-form-urlencode",
            "accept: application/json",
            "authorization: Bearer $access_token",
            "client_id: $client_id"
        ];

        // $this->setHeader($header);

        $this->client = new Client();

        for ($i = 0; $i < 5; $i++) {
            $promises[] = new Promise(function ($resolve, $reject) use ($request_url, $header, $i) {
                try {
                    echo "Starting async request {$i} to URL: {$request_url}" . PHP_EOL;
        
                    $this->client->getAsync($request_url, [
                        'curl' => [
                            CURLOPT_HTTPHEADER => $header,
                            CURLOPT_SSL_VERIFYPEER => false,
                            CURLOPT_SSL_VERIFYHOST => false,
                        ]
                    ])
                    ->then(
                        function (ResponseInterface $response) use ($resolve, $i) {
                            $content = $response->getBody()->getContents();
                            $decodedResponse = json_decode($content, true);
        
                            if (json_last_error() !== JSON_ERROR_NONE) {
                                throw new Exception('Invalid JSON response');
                            }
        
                            echo "Request {$i} completed successfully." . PHP_EOL;
                            $resolve($decodedResponse);
                        },
                        function (Exception $e) use ($reject, $i) {
                            echo "Error during async GET request {$i}: {$e->getMessage()}" . PHP_EOL;
                            $reject($e);
                        }
                    );
                } catch (Exception $e) {
                    echo "Unexpected error in request {$i}: {$e->getMessage()}" . PHP_EOL;
                    $reject($e);
                }
            });
        }
        
        echo "Waiting for all promises to resolve" . PHP_EOL;
        // Wait for all promises to resolve
        all($promises)
            ->then(function($responses) {
                $result = [];
                echo "All Promises resolved" . PHP_EOL;
                echo gettype($responses) . PHP_EOL;
                foreach ($responses as $i => $response) {
                    // print_r($response);
                    $result[$i] = $response['status']; 
                    echo "Result {$i} decoded" . PHP_EOL;
                }

                print_r($result);
                return $result;
            })
            ->catch(function($error) {
                echo "An error occurred: {$error->getMessage()}" . PHP_EOL;
                throw new Exception("Error in all()", 500);
            });

        return new Promise(function ($resolve, $reject) use ($request_url, $header) {
            try {
                echo "Starting async request to URL: {$request_url}" . PHP_EOL;

                $this->client->getAsync($request_url, [
                    'curl' => [
                        CURLOPT_HTTPHEADER => $header,
                        CURLOPT_SSL_VERIFYPEER => false,
                        CURLOPT_SSL_VERIFYHOST => false,
                    ]
                    ])
                    ->then(
                        function (ResponseInterface $response) use ($resolve) {
                            $content = $response->getBody()->getContents();
                            // echo "Response received: $content" . PHP_EOL;

                            $decodedResponse = json_decode($content, true);

                            if (json_last_error() !== JSON_ERROR_NONE) {
                                throw new Exception('Invalid JSON response');
                            }

                            $resolve($decodedResponse);
                        },
                        function (Exception $e) use ($reject) {
                            echo "Error during async GET request: {$e->getMessage()}" . PHP_EOL;
                            $reject($e);
                        }
                    )
                    ->wait(); // has to stand here to resolve correctly?
            } catch (Exception $e) {
                echo "Unexpected error: {$e->getMessage()}" . PHP_EOL;
                $reject($e);
            }
        });
    }

all() 的结果符合预期。它打印出每个请求的结果。

是否有另一种方法可以在不使用 wait() 的情况下解决 Promise 以及如何使用 all() 正确解决 Promise?

php asynchronous reactphp
1个回答
0
投票

Guzzle 默认使用

cURL
,但您可以将其配置为使用与 React 兼容的 HTTP 处理程序 (https://docs.guzzlephp.org/en/stable/faq.html):

Guzzle 需要 cURL 吗?

不。 Guzzle 可以使用任何 HTTP 处理程序来发送请求。这意味着 Guzzle 可以与 cURL、PHP 的流包装器、套接字和非阻塞库(如 React)一起使用。您只需配置 HTTP 处理程序即可使用不同的发送请求的方法。

您的示例可能不起作用,因为您正在混合基于

cURL
的 Guzzle 客户端,这意味着这些请求不会添加到 ReactPHP 的事件循环中,因此它们将永远等待。

您的最后一个示例之所以有效,是因为您直接在

Guzzle 的 
承诺上调用 wait,这会阻塞直到请求完成。

一种解决方案是使用将 Guzzle 与 ReactPHP 的事件循环集成的库,例如https://github.com/WyriHaximus/react-guzzle-psr7/.

或者,您可以使用 ReactPHP 的 HTTP 客户端而不是 Guzzle。它还支持设置标头(https://reactphp.org/http/#withheader):

withHeader()

withHeader(string $header, string $value): Browser
方法可用于为所有后续请求添加请求标头。

$browser = $browser->withHeader('User-Agent', 'ACME');

$browser->get($url)->then(…);
© www.soinside.com 2019 - 2024. All rights reserved.