基于 etcd 实现的服务发现,按照非规范化的 etcd key 实现,详细见代码注释。
package discoveryimport ("context""encoding/json""fmt""go.etcd.io/etcd/api/v3/mvccpb"clientv3 "go.etcd.io/etcd/client/v3""google.golang.org/grpc/resolver""strings""time"
)// gRPC 的服务一般会使用 protobuf 作为数据传输的介质
// gRPC 服务定义在 proto 的文件中,例如:service RoutingService {}
// protoc 将 proto 后缀文件转为 go 文件,文件内自动生成了 gRPC 服务的描述信息、服务注册的函数、客户端声明的函数等内容
// 如下,它们的格式是固定的,注意函数的参数
// 服务描述信息:RoutingService_ServiceDesc,格式:服务名_ServiceDesc
// 服务注册函数:RegisterRoutingServiceServer,格式:Register你服务名Server
// 客户端声明函数:NewRoutingServiceClient,格式:New服务名Client
// 其中客户端声明函数的参数是 gRPC 连接,返回值是 gRPC 服务的客户端接口,这样就可以调用客户端接口定义的 rpc 方法了
// gRPC 连接不会与某个 gRPC 服务绑定,它只是一个连接。
// 获取 gRPC 连接的方式如下两种,第一个参数就是 gRPC 服务的地址,可以写死 ip + port,也可以使用服务发现来获取 gRPC 服务的地址。
// grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName))
// grpc.Dial(fmt.Sprintf("%s:///%s", scheme, serviceName))(废弃)
// 服务发现是实现 Builder 和 Resolver 接口,Builder 用于创建 Resolver 实例,Resolver 用于解析服务地址。
// Builder 的 Scheme 方法返回值是 与 grpc.NewClient 中的 scheme 对应
// Builder 的 Build 第一个参数 target.Endpoint() 得到的结果是 grpc.NewClient 中的 serviceName,Build 方法的触发分情况:
// grpc.NewClient 声明不会触发 Build 方法,首次调用 rpc 方法时触发 Build
// grpc.Dial 声明会触发 Build 方法,但已经废弃了
// Resolver 的 ResolveNow 方法是 gRPC 主动调用的,我们可以使用它动态去 etcd 中获取服务地址,也可以不实现它,自定义服务发现的逻辑// 服务发现的实现方式:
// 假如我们有三个应用,user-center、device-center、网关,user-center 和 device-center 暴露了很多 gRPC 服务,网关需要调用它们的服务
// 假如我们使用 etcd 作为注册中心,同时规范化 etcd 的 key ,例如:grpc/services/{serviceName}/实例ID
// grpc/services/user-center/实例1
// grpc/services/user-center/实例2
// grpc/services/device-center/实例1
// grpc/services/device-center/实例2
// 网关中分别实现 Builder 和 Resolver,并将 Builder 的实例注册在 grpc 的地址解析中,resolver.Register(Builder实现的实例)
// 获取 user-center 和 device-center 的 grpc 连接
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "user-center"))
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "device-center"))
// 当 gRPC 连接建立时,gRPC 会调用 Builder 的 Build 方法,我们获取 target.Endpoint() 就是 serviceName
// 这样 fmt.Sprintf("grpc/services/%s", serviceName) 获取 serviceName 的 etcd 的 key 前缀
// 如:grpc/services/user-center/
// Build 方法中按前缀匹配查询 etcd 的数据,这样就获取到了 user-center 的所有实例的地址,再同步到 Resolver 中
// 如上就实现了规范化 etcd 的 key 前缀的服务发现,不管有多少个应用,代码中只需要一个服务发现的实例// 如果没有规范化 etcd 的 key 前缀,那么我们需要为各个服务声明不同的 scheme,每个 scheme 对应一个服务发现的实例
// Builder 的实现必须包含 etcd 的 key 前缀 ,不能利用 target.Endpoint() 去实现服务发现
// 如:ServiceDiscovery 实现了 Builder
// type ServiceDiscovery struct {
//		serverKey string
// }
// grpc/services/user-center/ 固定写死赋值给 serverKey
// 声明 ServiceDiscovery { serverKey },注册 resolver.Register(ServiceDiscovery实例)
// grpc.NewClient(fmt.Sprintf("%s:///%s", "user", "user-center"))// 普通 rpc 调用时,服务端挂掉:
// 服务发现找不到数据时:rpc error: code = Unavailable desc = no children to pick from
// 服务挂掉但etcd/服务发现还有数据:rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.202.160.190:6888: connect: connection refused"
// 服务重启后客户端连接可以恢复
// 当某个服务节点不可用时,可以自动连接到可用的服务节点// 流式 rpc 调用,服务端挂掉:
// 客户端发送方:EOF
// 客户端接收方:rpc error: code = Unavailable desc = error reading from server: EOF
// 服务重启后客户端连接不可恢复,必须重新建立连接// ServiceDiscovery is a gRPC resolver that uses etcd for service discovery.
// 配合 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 来使用
// Build 方法的 target.Endpoint() 就是 serviceName
type ServiceDiscovery struct {scheme     string // 自定义的 grpc 服务的 scheme,例如:grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)serviceKey string // etcd 中服务的 key 前缀,例如:grpc/maxwell-ai/GatewayInfoService/1.0/etcdClient *clientv3.Client
}// ServiceResolver is a gRPC resolver that resolves service addresses from etcd.
// 一个 scheme 对应一个 ServiceResolver,当 grpc 建立连接时触发 ServiceDiscovery 的 Build 方法
// 注意:
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
type ServiceResolver struct {scheme     stringserviceKey stringtarget  resolver.Targetclient  *clientv3.Clientcc      resolver.ClientConnaddrMap map[string]resolver.Addressclosed  chan struct{}
}func NewServiceDiscovery(scheme string, serviceKey string, etcdClient *clientv3.Client) *ServiceDiscovery {return &ServiceDiscovery{scheme:     scheme,serviceKey: serviceKey,etcdClient: etcdClient,}
}// Build creates a new ServiceDiscovery resolver.
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
// target: grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 中的 serviceName 就是 target
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {// 创建服务解析器serviceResolver := &ServiceResolver{target:     target,cc:         cc,scheme:     s.scheme,serviceKey: s.serviceKey,client:     s.etcdClient,closed:     make(chan struct{}),addrMap:    make(map[string]resolver.Address),}// 首次拉取所有数据if err := serviceResolver.rePull(); err != nil {return nil, err}// 开启 watcher 监听 etcd 中的服务地址变化go serviceResolver.watcher()return serviceResolver, nil
}// Scheme returns the scheme of the resolver.
// scheme 是 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
func (s *ServiceDiscovery) Scheme() string {return s.scheme
}// ResolveNow is called by gRPC to resolve the service address immediately.
// grpc 主动调用去解析服务地址,这里可以实现从 etcd 获取服务地址的逻辑
// 但是不在这里实现,因为这里实现有同步和异步从 etcd 中查询数据
// 同步会阻塞
// 异步会开启很多 goroutine,可能会导致 goroutine 泄漏
func (s *ServiceResolver) ResolveNow(options resolver.ResolveNowOptions) {}func (s *ServiceResolver) Close() {close(s.closed)
}func (s *ServiceResolver) rePull() error {ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)defer cancelFunc()resp, err := s.client.Get(ctx, s.serviceKey, clientv3.WithPrefix())if err != nil {return err}s.addrMap = make(map[string]resolver.Address)for _, ev := range resp.Kvs {key := strings.TrimPrefix(string(ev.Key), s.serviceKey)s.addServer(key, ev.Value)}s.syncToGrpc()return nil
}func (s *ServiceResolver) addServer(key string, value []byte) {var si ServiceInfoif err := json.Unmarshal(value, &si); err != nil {return}s.addrMap[key] = resolver.Address{Addr: fmt.Sprintf("%s:%d", si.Ip, si.Port),}
}func (s *ServiceResolver) delServer(key string) {if _, ok := s.addrMap[key]; ok {delete(s.addrMap, key)}
}func (s *ServiceResolver) syncToGrpc() {addrSlice := make([]resolver.Address, 0, 10)for _, v := range s.addrMap {addrSlice = append(addrSlice, v)}err := s.cc.UpdateState(resolver.State{Addresses: addrSlice})if err != nil {return}
}func (s *ServiceResolver) watcher() {rePull := falsefor {select {case <-s.closed:returndefault:}if rePull {if err := s.rePull(); err != nil {time.Sleep(5 * time.Second)continue}}rch := s.client.Watch(context.Background(), s.serviceKey, clientv3.WithPrefix())loop:for {select {case <-s.closed:returncase resp, ok := <-rch:if !ok {rePull = truebreak loop}for _, ev := range resp.Events {key := strings.TrimPrefix(string(ev.Kv.Key), s.serviceKey)switch ev.Type {case mvccpb.PUT:s.addServer(key, ev.Kv.Value)case mvccpb.DELETE:s.delServer(key)}}s.syncToGrpc()}}}
}