gRPC服务发现

基于 etcd 实现的服务发现,按照非规范化的 etcd key 实现,详细见代码注释。

package discoveryimport ("context""encoding/json""fmt""go.etcd.io/etcd/api/v3/mvccpb"clientv3 "go.etcd.io/etcd/client/v3""google.golang.org/grpc/resolver""strings""time"
)// gRPC 的服务一般会使用 protobuf 作为数据传输的介质
// gRPC 服务定义在 proto 的文件中,例如:service RoutingService {}
// protoc 将 proto 后缀文件转为 go 文件,文件内自动生成了 gRPC 服务的描述信息、服务注册的函数、客户端声明的函数等内容
// 如下,它们的格式是固定的,注意函数的参数
// 服务描述信息:RoutingService_ServiceDesc,格式:服务名_ServiceDesc
// 服务注册函数:RegisterRoutingServiceServer,格式:Register你服务名Server
// 客户端声明函数:NewRoutingServiceClient,格式:New服务名Client
// 其中客户端声明函数的参数是 gRPC 连接,返回值是 gRPC 服务的客户端接口,这样就可以调用客户端接口定义的 rpc 方法了
// gRPC 连接不会与某个 gRPC 服务绑定,它只是一个连接。
// 获取 gRPC 连接的方式如下两种,第一个参数就是 gRPC 服务的地址,可以写死 ip + port,也可以使用服务发现来获取 gRPC 服务的地址。
// grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName))
// grpc.Dial(fmt.Sprintf("%s:///%s", scheme, serviceName))(废弃)
// 服务发现是实现 Builder 和 Resolver 接口,Builder 用于创建 Resolver 实例,Resolver 用于解析服务地址。
// Builder 的 Scheme 方法返回值是 与 grpc.NewClient 中的 scheme 对应
// Builder 的 Build 第一个参数 target.Endpoint() 得到的结果是 grpc.NewClient 中的 serviceName,Build 方法的触发分情况:
// grpc.NewClient 声明不会触发 Build 方法,首次调用 rpc 方法时触发 Build
// grpc.Dial 声明会触发 Build 方法,但已经废弃了
// Resolver 的 ResolveNow 方法是 gRPC 主动调用的,我们可以使用它动态去 etcd 中获取服务地址,也可以不实现它,自定义服务发现的逻辑// 服务发现的实现方式:
// 假如我们有三个应用,user-center、device-center、网关,user-center 和 device-center 暴露了很多 gRPC 服务,网关需要调用它们的服务
// 假如我们使用 etcd 作为注册中心,同时规范化 etcd 的 key ,例如:grpc/services/{serviceName}/实例ID
// grpc/services/user-center/实例1
// grpc/services/user-center/实例2
// grpc/services/device-center/实例1
// grpc/services/device-center/实例2
// 网关中分别实现 Builder 和 Resolver,并将 Builder 的实例注册在 grpc 的地址解析中,resolver.Register(Builder实现的实例)
// 获取 user-center 和 device-center 的 grpc 连接
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "user-center"))
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "device-center"))
// 当 gRPC 连接建立时,gRPC 会调用 Builder 的 Build 方法,我们获取 target.Endpoint() 就是 serviceName
// 这样 fmt.Sprintf("grpc/services/%s", serviceName) 获取 serviceName 的 etcd 的 key 前缀
// 如:grpc/services/user-center/
// Build 方法中按前缀匹配查询 etcd 的数据,这样就获取到了 user-center 的所有实例的地址,再同步到 Resolver 中
// 如上就实现了规范化 etcd 的 key 前缀的服务发现,不管有多少个应用,代码中只需要一个服务发现的实例// 如果没有规范化 etcd 的 key 前缀,那么我们需要为各个服务声明不同的 scheme,每个 scheme 对应一个服务发现的实例
// Builder 的实现必须包含 etcd 的 key 前缀 ,不能利用 target.Endpoint() 去实现服务发现
// 如:ServiceDiscovery 实现了 Builder
// type ServiceDiscovery struct {
//		serverKey string
// }
// grpc/services/user-center/ 固定写死赋值给 serverKey
// 声明 ServiceDiscovery { serverKey },注册 resolver.Register(ServiceDiscovery实例)
// grpc.NewClient(fmt.Sprintf("%s:///%s", "user", "user-center"))// 普通 rpc 调用时,服务端挂掉:
// 服务发现找不到数据时:rpc error: code = Unavailable desc = no children to pick from
// 服务挂掉但etcd/服务发现还有数据:rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.202.160.190:6888: connect: connection refused"
// 服务重启后客户端连接可以恢复
// 当某个服务节点不可用时,可以自动连接到可用的服务节点// 流式 rpc 调用,服务端挂掉:
// 客户端发送方:EOF
// 客户端接收方:rpc error: code = Unavailable desc = error reading from server: EOF
// 服务重启后客户端连接不可恢复,必须重新建立连接// ServiceDiscovery is a gRPC resolver that uses etcd for service discovery.
// 配合 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 来使用
// Build 方法的 target.Endpoint() 就是 serviceName
type ServiceDiscovery struct {scheme     string // 自定义的 grpc 服务的 scheme,例如:grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)serviceKey string // etcd 中服务的 key 前缀,例如:grpc/maxwell-ai/GatewayInfoService/1.0/etcdClient *clientv3.Client
}// ServiceResolver is a gRPC resolver that resolves service addresses from etcd.
// 一个 scheme 对应一个 ServiceResolver,当 grpc 建立连接时触发 ServiceDiscovery 的 Build 方法
// 注意:
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
type ServiceResolver struct {scheme     stringserviceKey stringtarget  resolver.Targetclient  *clientv3.Clientcc      resolver.ClientConnaddrMap map[string]resolver.Addressclosed  chan struct{}
}func NewServiceDiscovery(scheme string, serviceKey string, etcdClient *clientv3.Client) *ServiceDiscovery {return &ServiceDiscovery{scheme:     scheme,serviceKey: serviceKey,etcdClient: etcdClient,}
}// Build creates a new ServiceDiscovery resolver.
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
// target: grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 中的 serviceName 就是 target
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {// 创建服务解析器serviceResolver := &ServiceResolver{target:     target,cc:         cc,scheme:     s.scheme,serviceKey: s.serviceKey,client:     s.etcdClient,closed:     make(chan struct{}),addrMap:    make(map[string]resolver.Address),}// 首次拉取所有数据if err := serviceResolver.rePull(); err != nil {return nil, err}// 开启 watcher 监听 etcd 中的服务地址变化go serviceResolver.watcher()return serviceResolver, nil
}// Scheme returns the scheme of the resolver.
// scheme 是 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
func (s *ServiceDiscovery) Scheme() string {return s.scheme
}// ResolveNow is called by gRPC to resolve the service address immediately.
// grpc 主动调用去解析服务地址,这里可以实现从 etcd 获取服务地址的逻辑
// 但是不在这里实现,因为这里实现有同步和异步从 etcd 中查询数据
// 同步会阻塞
// 异步会开启很多 goroutine,可能会导致 goroutine 泄漏
func (s *ServiceResolver) ResolveNow(options resolver.ResolveNowOptions) {}func (s *ServiceResolver) Close() {close(s.closed)
}func (s *ServiceResolver) rePull() error {ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)defer cancelFunc()resp, err := s.client.Get(ctx, s.serviceKey, clientv3.WithPrefix())if err != nil {return err}s.addrMap = make(map[string]resolver.Address)for _, ev := range resp.Kvs {key := strings.TrimPrefix(string(ev.Key), s.serviceKey)s.addServer(key, ev.Value)}s.syncToGrpc()return nil
}func (s *ServiceResolver) addServer(key string, value []byte) {var si ServiceInfoif err := json.Unmarshal(value, &si); err != nil {return}s.addrMap[key] = resolver.Address{Addr: fmt.Sprintf("%s:%d", si.Ip, si.Port),}
}func (s *ServiceResolver) delServer(key string) {if _, ok := s.addrMap[key]; ok {delete(s.addrMap, key)}
}func (s *ServiceResolver) syncToGrpc() {addrSlice := make([]resolver.Address, 0, 10)for _, v := range s.addrMap {addrSlice = append(addrSlice, v)}err := s.cc.UpdateState(resolver.State{Addresses: addrSlice})if err != nil {return}
}func (s *ServiceResolver) watcher() {rePull := falsefor {select {case <-s.closed:returndefault:}if rePull {if err := s.rePull(); err != nil {time.Sleep(5 * time.Second)continue}}rch := s.client.Watch(context.Background(), s.serviceKey, clientv3.WithPrefix())loop:for {select {case <-s.closed:returncase resp, ok := <-rch:if !ok {rePull = truebreak loop}for _, ev := range resp.Events {key := strings.TrimPrefix(string(ev.Kv.Key), s.serviceKey)switch ev.Type {case mvccpb.PUT:s.addServer(key, ev.Kv.Value)case mvccpb.DELETE:s.delServer(key)}}s.syncToGrpc()}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/web/87429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Linux的Spark本地模式环境搭建实验指南

一、实验目的 掌握Spark本地模式的安装与配置方法验证Spark本地环境是否搭建成功了解Spark基本操作和运行原理 二、实验环境准备 操作系统&#xff1a;Linux&#xff08;推荐ubuntu&#xff09;Java环境&#xff1a;JDK 1.8或以上版本内存&#xff1a;至少4GB&#xff08;推…

数学建模_时间序列

什么是时间序列时间序列预测方法/模型条件&#xff1a;非白噪音平稳平稳性评估不平稳变成平稳然后用ARIMA模型确定p,qAR模型(ARMA特例)MA模型(ARMA特例)ARMA模型(普适)灰色模型神经网络/LSTM组合预测模型向量数据预测结果和为1的情况什么是时间序列 省略具体图形例子 时间序列…

linux用rpm包升级sudo包为sudo-1.9.17-2版本

rpm下载地址&#xff1a; https://www.sudo.ws/dist/packages/1.9.17p1/ 备注&#xff1a;其他压缩包下载地址&#xff1a;https://www.sudo.ws/download.html sudo-1.9.17-2.el7.x86_64.rpm 检查一下&#xff0c;本地sudo版本&#xff0c;执行&#xff1a;sudo -V 或者sudo -…

【开源项目】一款真正可修改视频MD5工具视频质量不损失

文章目录 视频MD5修改工具 🎬📋 目录✨ 功能特点💻 系统要求🏗️ 设计架构🔬 技术原理💻 核心代码1. 视频MD5修改核心逻辑2. 前端异步处理代码3. 错误处理与日志记录📥 安装方法方法一:直接下载方法二:使用本地服务器📚 使用教程基本使用步骤高级使用技巧📁…

Day05: Python 中的并发和并行(1)

理解 Python 中的线程和进程 理解线程和进程是实现在 Python 中并发和并行的基础。这种知识使你能够编写能够看似同时执行多个任务的程序&#xff0c;从而提高性能和响应能力。本课程将深入探讨线程和进程的核心概念、它们的区别&#xff0c;以及它们如何为更高级的并发技术奠…

Spring Boot 集成 MinIO 实现分布式文件存储与管理

Spring Boot 集成 MinIO 实现分布式文件存储与管理 一、MinIO 简介 MinIO 是一个高性能的分布式对象存储服务器&#xff0c;兼容 Amazon S3 API。它具有以下特点&#xff1a; 轻量级且易于部署高性能&#xff08;读写速度可达每秒数GB&#xff09;支持数据加密和访问控制提供…

从小白入门,基于Cursor开发一个前端小程序之Cursor 编程实践与案例分析

Cursor 编程实践与案例分析 Cursor 编程实践与案例分析 1. 什么是 Cursor&#xff1f; Cursor 是一款面向开发者的 AI 编程助手&#xff0c;集成于本地 IDE&#xff0c;支持自然语言与代码的无缝协作。它不仅能自动补全、重构、查找代码&#xff0c;还能理解业务上下文&#…

一、如何用MATLAB画一个三角形 代码

一、如何用MATLAB画一个三角形 代码在MATLAB中绘制三角形可以通过指定三个顶点的坐标并使用 fill 或 patch 函数实现。以下是详细代码示例&#xff1a;方法1&#xff1a;使用 fill 函数&#xff08;简单填充&#xff09;% 定义三角形的三个顶点坐标 (x, y) x [0, 1, 0.5]; % …

Postman自动化测试提取相应body体中的参数

文章目录Postman自动化测试提取相应body体中的参数1. 示例响应 Body 参数2. 提取响应 Body 参数Postman自动化测试提取相应body体中的参数 上一篇的文中介绍了使用postman自动化测试时从响应的header中提取token参数&#xff0c;很多同学私信问如何从响应体body中提取参数。 有…

vue-39(为复杂 Vue 组件编写单元测试)

实际练习:为复杂 Vue 组件编写单元测试 单元测试对于确保复杂 Vue 组件的可靠性和可维护性至关重要。通过隔离和测试代码的各个单元,您可以在开发过程的早期发现并修复错误,从而构建更健壮和可预测的应用程序。本课程重点介绍为复杂 Vue 组件编写单元测试的实用方面,建立在…

c语言中的函数IV

函数的先后关系 直接把函数放在程序上方是可以的 在实际开发中&#xff0c;我们更希望把main函数放在前面 这样子直接把自己定义的函数放在main函数下方&#xff0c;编译会出现warning和error正确的解决方案是&#xff1a;把函数的头放到main函数上方&#xff0c;这样就能正常…

大模型Decoder-Only深入解析

Decoder-Only整体结构 我们以模型Llama-3.1-8B-Instruct为例&#xff0c;打印其结构如下&#xff08;后面会慢慢解析每一部分&#xff0c;莫慌&#xff09;&#xff1a; LlamaForCausalLM((model): LlamaModel((embed_tokens): VocabParallelEmbedding(num_embeddings128256,…

web网页,在线%电商,茶叶,商城,网上商城系统%分析系统demo,于vscode,vue,java,jdk,springboot,mysql数据库

经验心得 这也是帮之前一客户加了几个功能&#xff0c;需要掌握crud&#xff0c;前后端开发&#xff0c;前后端怎么对接&#xff0c;前后端通讯是以那种格式&#xff0c;把这些掌握后咱们就可以进行网站开发了。后端记好一定要分层开发&#xff0c;不要像老早一起所有代码写到一…

MybatisPlus-05.核心功能-条件构造器

一.条件构造器 我们前面使用的MP功能主要是根据id进行操作的&#xff0c;并未涉及到复杂查询。而根据id所进行的增删改查操作在MP中都有直接的封装。但是遇到复杂的查询条件时&#xff0c;如何使用MP进行操作是我们要考虑的问题。因此MP为我们提供了条件构造器。 在BaseMapper…

ES6从入门到精通:常用知识点

变量声明ES6引入了let和const替代var。let用于声明可变的块级作用域变量&#xff0c;const用于声明不可变的常量。块级作用域有效避免了变量提升和污染全局的问题。let name Alice; const PI 3.1415;箭头函数箭头函数简化了函数写法&#xff0c;且自动绑定当前上下文的this值…

51单片机教程(十一)- 单片机定时器

11、单片机定时器 项目目标 通过定时器/计数器实现流水灯控制。知识要点 定时器的结构。TMOD和TCON;定时/计数器工作方式;定时/计数器编程步骤;1、项目分析 前面的流水灯的时间控制通过空循环语句来实现,定时不是很精确。本章通过用定时器来控制流水灯任务可以实现精确的时…

基于opencv的疲劳驾驶监测系统

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业多年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的毕业设计程序开发&#xff0c;开发过上千套毕业设计程序&#xff0c;没有什么华丽的语言&#xff0…

Vue 2 和 Vue 3 区别

1. 响应式系统原理 Vue 2&#xff1a;利用Object.defineProperty()实现属性拦截。存在局限性&#xff0c;无法自动监测对象属性增减&#xff0c;需用Vue.set/delete&#xff1b;数组变异方法要重写&#xff1b;深层对象递归转换性能差。Vue 3&#xff1a;采用 ES6 Proxy代理对…

mv重命名报错:-bash:syntax error near unexpected token ‘(‘

文章目录 一、报错背景二、解决方法2.1、方法一&#xff1a;文件名加引号2.2、方法二&#xff1a;特殊字符前加\进行转义 一、报错背景 在linux上对一文件执行重命名时报错。原因是该文件名包含空格与括号。 文件名如下&#xff1a; aa &#xff08;1).txt执行命令及报错如下…

AWS 开源 Strands Agents SDK,简化 AI 代理开发流程

最近&#xff0c;亚马逊网络服务&#xff08;AWS&#xff09;宣布推出 Strands Agents(https://github.com/strands-agents/sdk-python)&#xff0c;这一开源软件开发工具包&#xff08;SDK&#xff09;采用模型驱动的方法&#xff0c;助力开发者仅用数行代码即可构建并运行人工…