实现RPC框架——gsRPC
简介
使用Netty以及Zookeeper实现RPC框架。
模块设计
Client端
client端的基本功能是服务发现和服务代理。
细节实现部分:
- 服务发现:当服务调用者上线后,需要从注册中心订阅所有(可以做一个优化,只订阅自己关心的服务)的服务,以及服务提供者的信息。一旦发生变化,注册中心会及时通知服务调用者。
- 服务代理:通过动态代理将本地方法的执行转化为发送请求,让服务提供者执行后返回结果。初步考虑使用CGLIB实现代理。
- 负载均衡:多种负载均衡算法,随机路由、最少活跃优先、最短响应优先、轮询路由、一致性哈希路由。参考负载均衡 | Apache Dubbo
- 加权随机:按权重设置随机概率。权重由服务提供者给出,权重越高的服务提供者代表处理能力越强,越容易被选中。
- 加权轮询:平滑加权轮询算法。
- 最少活跃数优先:活跃数越低,越优先调用,相同活跃数的进行加权随机。
- 最短响应优先:在最近一个滑动窗口中,响应时间越短,越优先调用。
- 一致性Hash(实现较难):相同参数的请求总是发到同一提供者。
- 连接池管理:预先和服务端建立长连接,降低连接开销。
- 客户端扫描
@GsConsumer
注解标识的Service
属性,初始化Bean的时候进行动态代理。将所有客户端需要的Service
信息保存为集合。 - 服务发现和订阅,注册中心连接Zookeeper,进行按需订阅上述
Service
对应的所有服务者的信息,保存为Map。key
为Service
标识,value
为Provider
信息列表。 - 连接池和上述
Providers
建立长连接,并缓存。保存Service
和Channel
的Map。 - 应用端发起调用,执行代理方法。代理将调用信息
Invocation
发送给任务管理者。 - 任务管理者根据负载均衡策略找到一个对应的
Provider
信息,然后从连接池获取客户端和该Provider
连接的Channel
。生成Request
并发送到Channel
,提交到线程池。 - 等待服务端执行,返回结果
Channel
收到来自服务端的响应后,通知任务管理者- 任务管理者进行信息统计,响应时间等,然后将结果返回给代理。
@GsReference
用于标识消费者的服务。
- interfaceClass:接口的class
- version:版本号
Proxy类
用于代理客户端方法,隐藏调用逻辑。提供了cglib
、jdk
、javassist
三种方式。
DiscoveryCenter
抽象类,用于和远端的注册中心进行交互,提供服务发现和服务订阅功能。
属性:
- ServiceCache:存储由
@GsConsumer
标注的所有所需服务。 - Service2Invokers:存储
ServiceMeta -> Invokers
的映射 - InvokersMap:
ProviderKey -> Invoker
映射
方法:
- pullService:从注册中心拉取服务
- watchNode:
- watchNodeAndChildren:
ServiceMeta
客户端服务元信息,由@GsConsumer
注解扫描生成
属性:
- ServiceName:服务名,全类名
- Version:版本
方法: - getServiceKey:返回
ServiceName#version
Invoker
服务提供者信息。这是从注册中心拉取的。用于构建 ServiceKey->List<Invoker>
的映射。
属性:
- Ip:Ip地址
- Port:端口号
- Version:版本
- Weight:权重
- ServiceKey:标识。为
IP:Port#Version
Invocation
服务调用信息。
属性:
- Request Body:请求体
- Invoker:调用者
- StartTime:开始时间
- EndTime:结束时间
- IsSuccess:是否成功
- RequestId:请求唯一ID标识
InvocationManager
服务调用中心,执行所有客户端调用的中心,内部维护一个CachedThreadPool
LoadBalance接口
负责客户端负载均衡,从List<Invoker>
中按不同策略选出服务提供者,进行方法调用。
ConsistentHash
:一致性哈希算法RoundRobin
:基于权重的平滑轮询算法WeightedRandom
:基于权重的随机算法LeastActive
:最少活跃数优先算法ShortestResponse
:最短响应时间优先算法
方法:- Select:
List<Invoker> -> Invoker
Channel Pool
用于构建及缓存和服务提供者长连接。
属性:
- Invokers2Handler:
Invoker -> Handler
方法: - getHandlerByInvoker:获取Invoker对应的Handler。
- addChannel:添加一个新
Invoker
的channel连接 - removeChannel:移除一个
Invoker
的channel连接
Server端
Server端需要扫描提供的类接口和实现,并将服务注册到注册中心。之后开启NIO,接收客户端的连接,处理客户端发送的RPC请求。
细节实现部分:
- 服务注册:当服务提供者开启后,首先就是扫描@GsService的接口和实现类。将其注册到注册中心
- 需要和注册中心保持长连接,注册中心需要定时发送心跳(探活),来验证服务提供者是否在线;需要额外手段探测服务提供者是否还有能力处理请求。当服务提供者无法处理请求时,注册中心应该及时从服务提供者列表中移出该服务提供者。
- 健康检查机制:假死,健康,宕机。如果服务提供者和zookeeper断开连接,能不能发现?注册中心需要定期向zookeeper更新状态信息。
- 服务监控:评估自己的处理能力。
- 限流/容错:当无法接收更多的请求时,直接返回一个错误信息。
- 优雅关闭:当存在尚未处理完的请求时,可能需要延迟关闭。通常延迟十秒后,强制关闭。
- 监控客户端:服务提供者应该和客户端保持长连接,但如果若干时间内没有收到客户端发送的RPC请求时,应该主动关闭与该客户端的连接。优化点:当连接数超出最大限制时,应该首先关闭最久未请求的连接,LRU算法?(ChannelManager负责)
- 服务缓存:对服务实现类和服务Key进行缓存,用于通过反射执行方法。(Service Manager负责)
组件功能:
- Channel Manager:连接管理,管理所有与客户端的连接,连接控制,活跃检测。
- Service Manager:服务中心,保存服务Key -> 实现类的映射,用于Task Manager调用某个服务中的方法。此外,将服务信息告诉注册中心。服务熔断
- Registry Center:注册中心。将服务中心的服务信息提供给ZooKeeper,以及注销服务(熔断)
- Task Manager:任务调度中心,负责处理请求,其依靠线程池。此外维护活跃任务数量,用于优雅停机时避免有进行中的任务,限流。
@GsService
用于标识服务提供者的服务。
- interfaceClass:接口的class
- version:版本号
ServiceManager
服务提供者的服务中心,缓存了Servicekey -> ServiceImpl
的映射,用于执行实际的方法调用。初始化时,由@GsService
注解注册到本服务中心中。
属性:
- ServicesMap:缓存
Servicekey -> ServiceImpl
的映射,通过反射执行请求方法。
方法: - executeMethod:执行相应服务的相应方法,参数为request的请求信息。
- cacheService:缓存服务实现类。
RegistryCenter
服务提供者的注册中心,负责和远程注册中心(Zookeeper)建立连接,并将服务注册到注册中心。服务列表来自于ServiceManager。
方法:
- registry:注册服务
- unregistry:注销所有服务
- doClose:关闭注册中心以及和zookeeper的连接。用于服务提供者下机。
TaskManager
服务提供者的任务中心,用于处理所有来自客户端的请求。内部使用Map
映射requestId -> future
。此外还可以用来实现服务监控
属性:
- ThreadPool:内部线程池,用于并行处理任务
方法: - submitTask:提交任务Resquest到线程池,返回一个CompletableFuture< Response >对象,handler使用thenAccept方法避免阻塞。将ResponseBody写入Kindred,并写入ctx
ChannelManager
服务提供者的连接管理中心,用于维护和客户端的长连接。实现连接数控制、流量控制。
属性:
- ChannelMap:缓存所有和客户端的channel。
方法: - addChannel:添加channel
- removeChannel:移除channel
Handler
用于处理Channel事件的Handler。继承SimpleChannelInboundHandler
属性:
- TaskManager:用于提交任务到
- TaskMap:存储Request
通信协议和序列化
协议:先实现一个自定义协议Kindred(类似dubbo),分为请求头部和请求体
序列化:支持hessian,kryo,protostuff,jdk原生,fst
Request Body以及Response Body定义
RequestHandler以及ResponseHandler:协议Adapter,序列化Adapter,工厂模式
Protocol Magic:Kindred协议固定为0xCDED,可以快速过滤。
Bit Info:
- Request/Response(1bit):标识是Request还是Response
- Need return(1bit):用于标识该Request是否需要服务端返回数据
- Event(1bit):是否是事件,心跳事件
- Serialize Type(5bit):序列化方式
Status:请求状态(仅对于Response有效,对于Request固定为0x00) - 20:OK:成功
- 31:Client_timeout:客户端超时,客户端无法在合理的时间内收到服务端的响应。默认10s
- 32:Server_timeout:服务端超时,服务端处理请求超时。客户端由状态码判断。默认5s
- 40:Server_error:无法解析请求等
- 50:Server_limit_rate:服务端限流,请稍后再试 ok
- 60:Service_not_found:服务未找到 ok
- 70:Method_not_found:方法未找到 ok
- 80:Service_error:服务执行时发生错误,异常 ok
Request ID:请求的唯一标识,雪花算法
Data Length:请求体长度,用于序列化
Data:存放请求体(这些是需要被序列化的)
对于Request:Request Body
- Service name:服务名String
- Service version:服务版本String
- Service method name:方法名String
- Method parameter types:方法参数类型列表
Class<?>[]
- Method arguments:方法参数值列表
Object[]
- Extend fields:拓展字段,Map<String,Object>类型
对于Response:Response Body
- Value type:返回的类型,null/value/exception
- Value data:返回的数据,Object
编解码过程:
解码器/编码器需要知道使用的协议类型,以及序列化方式才能进行解码。
字节流 -> 解码器 -> 协议(请求头,请求体)
协议(请求头,请求体) -> 编码器 -> 字节流
类预注册:
部分序列化方式可以通过预先注册类Schema优化序列化性能。
序列化
Fst
Hessian
Protostuff
Kryo
JDK
性能
配置类
ApplicationProperties
前缀:gsrpc.application
描述:用于应用的配置信息
- name:应用名
- proxy:代理方式jdk、javassist、cglib
ConsumerProperties
前缀:gsrpc.consumer
描述:服务调用者配置信息
- timeout:客户端超时时间
- retries:重试次数
- loadbalance:负载均衡策略
- protocol:通信协议,默认Kindred(当前仅支持)
- port:端口号netty,默认-1,表示随机
- serialization:序列化方式,默认kryo。支持kryo、fst、hessian、Protostuff、jdk
ProviderProperties
前缀:gsrpc.provider
描述:服务提供者配置信息
- timeout:服务端超时时间
- retries:重试次数
- protocol:通信协议,默认Kindred(当前仅支持)
- port:端口号netty,默认20688
- serialization:序列化方式,默认kryo。支持kryo、fst、hessian、Protostuff、jdk
- weight:权重,默认5,范围1-10。
- accepts:最大客户端连接数,默认1024
待拓展 - eviction:客户端连接淘汰策略
RegistryProperties
前缀:gsrpc.registry
描述:服务注册中心的相关配置
- ip:地址
- port:端口号
- type:注册中心类型,默认zookeeper
- timeout:连接超时时间
- session:会话超时时间
Todo
- registryCenter的配置注入优化一下
- com.leggasai.rpc.client.netty.handler.KindredClientChannelHandler#invoke 第一次writeAndFlush方法耗时异常,尝试一下
- 序列化预注册支持用户自定义SerializationOptimizer
- com.leggasai.rpc.client.proxy.invoke.RpcMethodInvoke#invoke是否应该抛出异常?
- 统计中心
- 优化一下客户端服务端线程池
- 服务降级和熔断,服务端限流算法
- http协议,json/gson序列化方式
- 日志治理,统一一下框架,添加日志配置。一些调用信息不要输出到控制台
- 测试多提供者,在虚拟机测试完成
- 修复注册中心配置,应该对于提供者和服务者不一样
- 重大BUG,先启动发现中心,后启动注册中心,导致首次调用必失败,因为还没注册服务上去。