我有两个
grpc
服务,一个在simple_grpc
项目中称为etcd-example
,另一个在shop-example项目中称为shop_grpc
,客户端项目在etcd-example-client
中。当我启动这两个服务并将它们注册到etcd
中时。当客户端调用服务时出现。
调用路由错误:rpc错误:
code = Unimplemented desc = unknown service kkk.Simple
etcd-示例项目:
package etcdv3
import (
"context"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
type ServiceRegister struct {
cli *clientv3.Client
leaseID clientv3.LeaseID
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string
val string
}
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 60 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: "/shop/" + serName,
val: addr,
}
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
go func() {
for {
select {
case _, ok := <-leaseRespChan:
if !ok {
fmt.Println("etcd keepalive channel closed")
} else {
goto END
}
}
END:
time.Sleep(500 * time.Millisecond)
}
}()
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
func (s *ServiceRegister) Close() error {
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
package kkk;
option go_package = "etcd-example/grpclb/proto/abc";
message SimpleRequest{
string data = 1;
}
message SimpleResponse{
int32 code = 1;
string value = 2;
}
service Simple{
rpc Route (SimpleRequest) returns (SimpleResponse){};
}
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
"etcd-example/grpclb/etcdv3"
pb "etcd-example/grpclb/proto"
)
type SimpleService struct {
pb.UnimplementedSimpleServer
}
const (
Address string = "192.168.60.246:8001"
Network string = "tcp"
SerName string = "simple_grpc"
)
var EtcdEndpoints = []string{"localhost:2379"}
func main() {
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
log.Println(Address + " net.Listing...")
grpcServer := grpc.NewServer()
pb.RegisterSimpleServer(grpcServer, &SimpleService{})
ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 60)
if err != nil {
log.Fatalf("register service err: %v", err)
}
defer ser.Close()
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
func (s *SimpleService) Route(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
log.Println("receive: " + req.Data)
res := pb.SimpleResponse{
Code: 200,
Value: "hello " + req.Data,
}
return &res, nil
}
店铺示例项目:
package etcdv3
import (
"context"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client // etcd client
leaseID clientv3.LeaseID // 租约ID
// 租约keepalieve相应chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string // key
val string // value
}
// NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: "/shop/" + serName,
val: addr,
}
// 申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
// 设置租约时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
// 注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
// 设置续租 定期发送需求请求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
go func() {
for {
select {
case _, ok := <-leaseRespChan:
if !ok {
fmt.Println("etcd keepalive channel closed")
} else {
goto END
}
}
END:
time.Sleep(500 * time.Millisecond)
}
}()
// s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
// ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
// 撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
syntax = "proto3";
package shop;
option go_package = "shop-example/grpclb/proto/fff";
message SimpleRequest{
string data = 1;
}
message SimpleResponse{
int32 code = 1;
string value = 2;
}
service Simple{
rpc AAA(SimpleRequest) returns (SimpleResponse){};
}
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
"shop-example/grpclb/etcdv3"
fff "shop-example/grpclb/proto"
)
type LiveService struct {
fff.UnimplementedSimpleServer
}
const (
Address string = "192.168.60.246:9001"
Network string = "tcp"
SerName string = "shop.Simple"
)
var EtcdEndpoints = []string{"localhost:2379"}
func main() {
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
log.Println(Address + " net.Listing...")
grpcServer := grpc.NewServer()
fff.RegisterSimpleServer(grpcServer, &LiveService{})
ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 1000)
if err != nil {
log.Fatalf("register service err: %v", err)
}
defer ser.Close()
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
func (s *LiveService) AAA(ctx context.Context, req *fff.SimpleRequest) (*fff.SimpleResponse, error) {
log.Println("liveService receive: " + req.Data)
res := fff.SimpleResponse{
Code: 200,
Value: "hello " + req.Data,
}
return &res, nil
}
etcd-示例-客户端
syntax = "proto3";
package shop;
option go_package = "shop-example/grpclb/proto/fff";
message SimpleRequest{
string data = 1;
}
message SimpleResponse{
int32 code = 1;
string value = 2;
}
service Simple{
rpc AAA(SimpleRequest) returns (SimpleResponse){};
}
package etcdv3
import (
"context"
"log"
"sync"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
)
const schema = "shop"
// ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client // etcd client
cc resolver.ClientConn
serverList sync.Map // 服务列表
}
// NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) resolver.Builder {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
}
}
// Build 为给定目标创建一个新的`resolver`,当调用`grpc.Dial()`时执行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
log.Println("Build")
s.cc = cc
// Call the function to get the actual endpoint string
// endpoint := target.Endpoint() // Call the function here
// prefix := "/live/" + endpoint + "/"
// a := target.URL
// a.Path
// fmt.Println(a.Path)
prefix := "/shop/"
// 根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
// 监视前缀,修改变更的server
go s.watcher(prefix)
return s, nil
}
// ResolveNow 监视目标更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOptions) {
log.Println("ResolveNow")
}
// Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
return schema
}
// Close 关闭
func (s *ServiceDiscovery) Close() {
log.Println("Close")
s.cli.Close()
}
// watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: // 新增或修改
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: // 删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
// SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.serverList.Store(key, resolver.Address{Addr: val})
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("put key :", key, "val:", val)
}
// DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.serverList.Delete(key)
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("del key:", key)
}
// GetServices 获取服务地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
addrs := make([]resolver.Address, 0, 10)
s.serverList.Range(func(k, v interface{}) bool {
addrs = append(addrs, v.(resolver.Address))
return true
})
return addrs
}
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"etcd-example/grpclb/etcdv3"
pb "etcd-example/grpclb/proto"
live "etcd-example/grpclb/proto/live"
)
var (
// EtcdEndpoints etcd地址
EtcdEndpoints = []string{"localhost:2379"}
// SerName 服务名称
SerName = "kkk.Simple"
grpcClient pb.SimpleClient
shopRpcClient live.SimpleClient
)
func main() {
r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
resolver.Register(r)
// 连接服务器
conn, err := grpc.Dial(
fmt.Sprintf("%s:///%s", "shop", SerName),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
grpc.WithInsecure(),
)
// conn, err := grpc.Dial(
// fmt.Sprintf("192.168.60.246:8001"),
// grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
// grpc.WithInsecure(),
// )
if err != nil {
log.Fatalf("net.Connect err: %v", err)
}
defer conn.Close()
conn1, err := grpc.Dial(
fmt.Sprintf("%s:///%s", "shop", "shop.Simple"),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
grpc.WithInsecure(),
)
grpcClient = pb.NewSimpleClient(conn)
shopRpcClient = live.NewSimpleClient(conn1)
for i := 0; i < 100; i++ {
route(i)
route1(i)
time.Sleep(1 * time.Second)
}
}
func route(i int) {
req := pb.SimpleRequest{
Data: "grpc " + strconv.Itoa(i),
}
res, err := grpcClient.Route(context.Background(), &req)
if err != nil {
log.Fatalf("Call Route err: %v", err)
}
log.Println(res)
}
func route1(i int) {
req := live.SimpleRequest{
Data: "grpc1 " + strconv.Itoa(i),
}
res, err := shopRpcClient.AAA(context.Background(), &req)
if err != nil {
log.Fatalf("Call Route err: %v", err)
}
log.Println(res)
}
代码中的
grpc.Resolver
接口是由 etcdv3.ServiceDiscovery
结构实现的。解析器应该为给定的 resolver.Target
生成端点列表。然而,在您的代码中,您对 prefix := "/shop/"
进行了硬编码,而不是从目标字符串中解析它。结果,解析器为所有目标生成相同的地址集。这里似乎发生的是,conn
和conn1
发出的请求将发送到运行shop.Simple
而不是kkk.Simple
的服务器。因此,来自 conn
的请求到达了错误的服务器,并且 gRPC 找不到已注册名称为 kkk.Simple
的服务。
要解决此问题,您需要确保
etcdv3.ServiceDiscovery
为每个目标生成正确的端点列表。