golang etcd grpc 调用路由错误:rpc 错误:代码 = 未实现的 desc = 未知服务 kkk.Simple

问题描述 投票:0回答:1

我有两个

grpc
服务,一个在
simple_grpc
项目中称为
etcd-example
,另一个在shop-example项目中称为
shop_grpc
,客户端项目在
etcd-example-client
中。当我启动这两个服务并将它们注册到
etcd
中时。当客户端调用服务时出现。

调用路由错误:rpc错误:

 code = Unimplemented desc = unknown service kkk.Simple

etcd-示例项目:

package etcdv3

import (
    "context"
    "fmt"
    "log"
    "time"

    clientv3 "go.etcd.io/etcd/client/v3"
)

type ServiceRegister struct {
    cli           *clientv3.Client
    leaseID       clientv3.LeaseID
    keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
    key           string
    val           string
}

func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 60 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }

    ser := &ServiceRegister{
        cli: cli,
        key: "/shop/" + serName,
        val: addr,
    }

    if err := ser.putKeyWithLease(lease); err != nil {
        return nil, err
    }

    return ser, nil
}

func (s *ServiceRegister) putKeyWithLease(lease int64) error {
    resp, err := s.cli.Grant(context.Background(), lease)
    if err != nil {
        return err
    }
    _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }
    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

    if err != nil {
        return err
    }
    s.leaseID = resp.ID

    go func() {
        for {
            select {
            case _, ok := <-leaseRespChan:
                if !ok {
                    fmt.Println("etcd keepalive channel closed")
                } else {
                    goto END
                }
            }
        END:
            time.Sleep(500 * time.Millisecond)
        }
    }()
    log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
    return nil
}

func (s *ServiceRegister) Close() error {
    if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
        return err
    }
    log.Println("撤销租约")
    return s.cli.Close()
}

package kkk;
option go_package = "etcd-example/grpclb/proto/abc";
message SimpleRequest{
    string data = 1;
}

message SimpleResponse{
    int32 code = 1;
    string value = 2;
}

service Simple{
    rpc Route (SimpleRequest) returns (SimpleResponse){};
}
package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"

    "etcd-example/grpclb/etcdv3"
    pb "etcd-example/grpclb/proto"
)


type SimpleService struct {
    pb.UnimplementedSimpleServer
}

const (
    Address string = "192.168.60.246:8001"
    Network string = "tcp"
    SerName string = "simple_grpc"
)

var EtcdEndpoints = []string{"localhost:2379"}

func main() {
    listener, err := net.Listen(Network, Address)
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }
    log.Println(Address + " net.Listing...")
    grpcServer := grpc.NewServer()
    pb.RegisterSimpleServer(grpcServer, &SimpleService{})
    ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 60)
    if err != nil {
        log.Fatalf("register service err: %v", err)
    }
    defer ser.Close()
    err = grpcServer.Serve(listener)
    if err != nil {
        log.Fatalf("grpcServer.Serve err: %v", err)
    }
}

func (s *SimpleService) Route(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
    log.Println("receive: " + req.Data)
    res := pb.SimpleResponse{
        Code:  200,
        Value: "hello " + req.Data,
    }
    return &res, nil
}

店铺示例项目:

package etcdv3

import (
    "context"
    "fmt"
    "log"
    "time"

    clientv3 "go.etcd.io/etcd/client/v3"
)

// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
    cli     *clientv3.Client // etcd client
    leaseID clientv3.LeaseID // 租约ID
    // 租约keepalieve相应chan
    keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
    key           string // key
    val           string // value
}

// NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }

    ser := &ServiceRegister{
        cli: cli,
        key: "/shop/" + serName,
        val: addr,
    }

    // 申请租约设置时间keepalive
    if err := ser.putKeyWithLease(lease); err != nil {
        return nil, err
    }

    return ser, nil
}

// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
    // 设置租约时间
    resp, err := s.cli.Grant(context.Background(), lease)
    if err != nil {
        return err
    }
    // 注册服务并绑定租约
    _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }
    // 设置续租 定期发送需求请求
    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

    if err != nil {
        return err
    }
    s.leaseID = resp.ID

    go func() {
        for {
            select {
            case _, ok := <-leaseRespChan:
                if !ok {
                    fmt.Println("etcd keepalive channel closed")
                } else {
                    goto END
                }
            }
        END:
            time.Sleep(500 * time.Millisecond)
        }
    }()
    // s.keepAliveChan = leaseRespChan
    log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
    return nil
}

// ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
    for leaseKeepResp := range s.keepAliveChan {
        log.Println("续约成功", leaseKeepResp)
    }
    log.Println("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
    // 撤销租约
    if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
        return err
    }
    log.Println("撤销租约")
    return s.cli.Close()
}

syntax = "proto3";

package shop;
option go_package = "shop-example/grpclb/proto/fff";
message SimpleRequest{
    string data = 1;
}

message SimpleResponse{
    int32 code = 1;
    string value = 2;
}

service Simple{
    rpc AAA(SimpleRequest) returns (SimpleResponse){};
}
package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"

    "shop-example/grpclb/etcdv3"
    fff "shop-example/grpclb/proto"
)

type LiveService struct {
    fff.UnimplementedSimpleServer
}

const (
    Address string = "192.168.60.246:9001"
    Network string = "tcp"
    SerName string = "shop.Simple"
)

var EtcdEndpoints = []string{"localhost:2379"}

func main() {
    listener, err := net.Listen(Network, Address)
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }
    log.Println(Address + " net.Listing...")
    grpcServer := grpc.NewServer()
    fff.RegisterSimpleServer(grpcServer, &LiveService{})
    ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 1000)
    if err != nil {
        log.Fatalf("register service err: %v", err)
    }
    defer ser.Close()
    err = grpcServer.Serve(listener)
    if err != nil {
        log.Fatalf("grpcServer.Serve err: %v", err)
    }
}

func (s *LiveService) AAA(ctx context.Context, req *fff.SimpleRequest) (*fff.SimpleResponse, error) {
    log.Println("liveService receive: " + req.Data)
    res := fff.SimpleResponse{
        Code:  200,
        Value: "hello " + req.Data,
    }
    return &res, nil
}


etcd-示例-客户端

syntax = "proto3";

package shop;
option go_package = "shop-example/grpclb/proto/fff";
message SimpleRequest{
    string data = 1;
}

message SimpleResponse{
    int32 code = 1;
    string value = 2;
}

service Simple{
    rpc AAA(SimpleRequest) returns (SimpleResponse){};
}
package etcdv3

import (
    "context"
    "log"
    "sync"
    "time"

    "go.etcd.io/etcd/api/v3/mvccpb"
    clientv3 "go.etcd.io/etcd/client/v3"
    "google.golang.org/grpc/resolver"
)

const schema = "shop"

// ServiceDiscovery 服务发现
type ServiceDiscovery struct {
    cli        *clientv3.Client // etcd client
    cc         resolver.ClientConn
    serverList sync.Map // 服务列表
}

// NewServiceDiscovery  新建发现服务
func NewServiceDiscovery(endpoints []string) resolver.Builder {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }

    return &ServiceDiscovery{
        cli: cli,
    }
}

// Build 为给定目标创建一个新的`resolver`,当调用`grpc.Dial()`时执行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    log.Println("Build")
    s.cc = cc
    // Call the function to get the actual endpoint string
    // endpoint := target.Endpoint() // Call the function here
    // prefix := "/live/" + endpoint + "/"
    // a := target.URL

    // a.Path
    // fmt.Println(a.Path)
    prefix := "/shop/"
    // 根据前缀获取现有的key
    resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }

    for _, ev := range resp.Kvs {
        s.SetServiceList(string(ev.Key), string(ev.Value))
    }
    s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
    // 监视前缀,修改变更的server
    go s.watcher(prefix)
    return s, nil
}

// ResolveNow 监视目标更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOptions) {
    log.Println("ResolveNow")
}

// Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
    return schema
}

// Close 关闭
func (s *ServiceDiscovery) Close() {
    log.Println("Close")
    s.cli.Close()
}

// watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
    rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
    log.Printf("watching prefix:%s now...", prefix)
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case mvccpb.PUT: // 新增或修改
                s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
            case mvccpb.DELETE: // 删除
                s.DelServiceList(string(ev.Kv.Key))
            }
        }
    }
}

// SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
    s.serverList.Store(key, resolver.Address{Addr: val})
    s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
    log.Println("put key :", key, "val:", val)
}

// DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
    s.serverList.Delete(key)
    s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
    log.Println("del key:", key)
}

// GetServices 获取服务地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
    addrs := make([]resolver.Address, 0, 10)
    s.serverList.Range(func(k, v interface{}) bool {
        addrs = append(addrs, v.(resolver.Address))
        return true
    })
    return addrs
}

package main

import (
    "context"
    "fmt"
    "log"
    "strconv"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"

    "etcd-example/grpclb/etcdv3"
    pb "etcd-example/grpclb/proto"

    live "etcd-example/grpclb/proto/live"
)

var (
    // EtcdEndpoints etcd地址
    EtcdEndpoints = []string{"localhost:2379"}
    // SerName 服务名称
    SerName       = "kkk.Simple"
    grpcClient    pb.SimpleClient
    shopRpcClient live.SimpleClient
)

func main() {
    r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
    resolver.Register(r)
    // 连接服务器
    conn, err := grpc.Dial(
        fmt.Sprintf("%s:///%s", "shop", SerName),
        grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
        grpc.WithInsecure(),
    )
    // conn, err := grpc.Dial(
    //  fmt.Sprintf("192.168.60.246:8001"),
    //  grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
    //  grpc.WithInsecure(),
    // )
    if err != nil {
        log.Fatalf("net.Connect err: %v", err)
    }

    defer conn.Close()

    conn1, err := grpc.Dial(
        fmt.Sprintf("%s:///%s", "shop", "shop.Simple"),
        grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
        grpc.WithInsecure(),
    )

    grpcClient = pb.NewSimpleClient(conn)
    shopRpcClient = live.NewSimpleClient(conn1)
    for i := 0; i < 100; i++ {
        route(i)
        route1(i)
        time.Sleep(1 * time.Second)
    }

}

func route(i int) {
    req := pb.SimpleRequest{
        Data: "grpc " + strconv.Itoa(i),
    }

    res, err := grpcClient.Route(context.Background(), &req)
    if err != nil {
        log.Fatalf("Call Route err: %v", err)
    }
    log.Println(res)
}

func route1(i int) {
    req := live.SimpleRequest{
        Data: "grpc1 " + strconv.Itoa(i),
    }

    res, err := shopRpcClient.AAA(context.Background(), &req)
    if err != nil {
        log.Fatalf("Call Route err: %v", err)
    }
    log.Println(res)
}

go grpc etcd grpc-go
1个回答
0
投票

代码中的

grpc.Resolver
接口是由
etcdv3.ServiceDiscovery
结构实现的。解析器应该为给定的
resolver.Target
生成端点列表。然而,在您的代码中,您对
prefix := "/shop/"
进行了硬编码,而不是从目标字符串中解析它。结果,解析器为所有目标生成相同的地址集。这里似乎发生的是,
conn
conn1
发出的请求将发送到运行
shop.Simple
而不是
kkk.Simple
的服务器。因此,来自
conn
的请求到达了错误的服务器,并且 gRPC 找不到已注册名称为
kkk.Simple
的服务。

要解决此问题,您需要确保

etcdv3.ServiceDiscovery
为每个目标生成正确的端点列表。

© www.soinside.com 2019 - 2024. All rights reserved.