理解grpc的异步服务器代码流程

问题描述 投票:0回答:1

我无法理解 grpc 异步代码流的逻辑,如本页所述:https://grpc.io/docs/languages/cpp/async/

它从这个例程开始,该例程在服务器服务代码初始化时被调用

void HandleRpcs() {
  // Spawn a new CallData instance to serve new clients.
  new CallData(&service_, cq_.get());
  void* tag;  // uniquely identifies a request.
  bool ok;
  while (true) {
    // Block waiting to read the next event from the completion queue. The
    // event is uniquely identified by its tag, which in this case is the
    // memory address of a CallData instance.
    cq_->Next(&tag, &ok);
    GPR_ASSERT(ok);
    static_cast<CallData*>(tag)->Proceed();
  }
}

这里在堆上创建了一个

CallData
对象,因此它的构造函数立即被调用

class CallData {
public:
  // Take in the "service" instance (in this case representing an asynchronous
  // server) and the completion queue "cq" used for asynchronous communication
  // with the gRPC runtime.
  CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
      : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
    // Invoke the serving logic right away.
    Proceed();
  }

  void Proceed() {
    if (status_ == CREATE) {
      // As part of the initial CREATE state, we *request* that the system
      // start processing SayHello requests. In this request, "this" acts are
      // the tag uniquely identifying the request (so that different CallData
      // instances can serve different requests concurrently), in this case
      // the memory address of this CallData instance.
      service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                this);
      // Make this instance progress to the PROCESS state.
      status_ = PROCESS;
    } else if (status_ == PROCESS) {
      // Spawn a new CallData instance to serve new clients while we process
      // the one for this CallData. The instance will deallocate itself as
      // part of its FINISH state.
      new CallData(service_, cq_);

      // The actual processing.
      std::string prefix("Hello ");
      reply_.set_message(prefix + request_.name());

      // And we are done! Let the gRPC runtime know we've finished, using the
      // memory address of this instance as the uniquely identifying tag for
      // the event.
      responder_.Finish(reply_, Status::OK, this);
      status_ = FINISH;
    } else {
      GPR_ASSERT(status_ == FINISH);
      // Once in the FINISH state, deallocate ourselves (CallData).
      delete this;
    }
  }
}

尽管有评论,我不明白为什么立即调用

Proceed()
,这意味着立即调用
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
(因此立即调用请求处理代码......?)。另外,在创建堆对象之后有一个
while(true)
循环,因此它会快速连续两次进入
Proceed()
并立即调用
else if (status_ == PROCESS)
..?

我不理解控制流或这段代码背后的基本原理。

c++ grpc
1个回答
0
投票

尽管有评论,我不明白为什么立即调用 Proceed() ,这意味着 service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this);立即调用(因此立即调用请求处理代码...?)。

RequestSayHello
告诉 grpc 当请求到来时“回电”。它不会同步执行请求处理代码。 grpc“回电”的方式是在队列中推送一个项目
cq_
。但这直到稍后收到请求时才会发生。

另外,在创建堆对象之后有一个 while(true) 循环,因此它会快速连续两次进入 Proceed() ,并且立即调用 else if (status_ == PROCESS) ..?

Next
的呼叫被阻塞。一旦队列中有条目(或者队列正在关闭),它将返回。因此,它不会被快速连续调用。

我不理解控制流或这段代码背后的基本原理。

CallData
是正在处理的单个传入请求的状态机。在构造时(
CREATE
),它告诉 grpc 它的存在。当它出现在完成队列中时,grpc 已将请求与其匹配。然后我们再次调用
Proceed
(
PROCESS
) 并响应请求。响应也是异步的,并由相同的
CallData
标签标识。当响应完成后,我们将在队列中获得另一个条目。当我们调用
Proceed
时,我们将处理
CallData

所以...

CallData
是请求-响应周期的简单状态机,其中等待请求和响应都是异步的。

© www.soinside.com 2019 - 2024. All rights reserved.