配置文件和客户端源代码
1. client 配置文件
registries :
"demoZk":
protocol: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
references:
"UserProvider":
# 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册
registry: "demoZk"
protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
retries: 3
2. 客户端使用框架源码
func init() {
config.SetConsumerService(userProvider)
hessian.RegisterPOJO(&User{})
}
func main() {
hessian.RegisterPOJO(&User{})
config.Load()
time.Sleep(3e9)
println("\n\n\nstart to test dubbo")
user := &User{}
err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
if err != nil {
panic(err)
}
println("response result: %v\n", user)
initSignal()
}
实现远程过程调用
1. 加载配置文件
// file: config/config_loader.go :Load()
// Load Dubbo Init
func Load() {
// init router
initRouter()
// init the global event dispatcher
extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
// start the metadata report if config set
if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
return
}
// reference config
loadConsumerConfig()
// config/config_loader.go
func loadConsumerConfig() {
// 1 init other consumer config
conConfigType := consumerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {}
checkApplicationName(consumerConfig.ApplicationConfig)
configCenterRefreshConsumer()
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
// 2 refer-implement-reference
for key, ref := range consumerConfig.References {
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
}
rpcService := GetConsumerService(key)
ref.id = key
ref.Refer(rpcService)
ref.Implement(rpcService)
}
// 3 wait for invoker is available, if wait over default 3s, then panic
for {}
}
检查配置文件并将配置写入内存
在 for 循环内部,依次引用(refer)并且实例化(implement)每个被调 reference
等待三秒钟所有 invoker 就绪
2. 获取远程 Service URL,实现可供调用的 invoker
1)构造注册 url
// file: config/reference_config.go: Refer()
func (c *ReferenceConfig) Refer(_ interface{}) {
//(一)配置url参数(serviceUrl),将会作为sub
cfgURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithProtocol(c.Protocol),
common.WithParams(c.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
)
...
// (二)注册地址可以通过url格式给定,也可以通过配置格式给定
// 这一步的意义就是配置->提取信息生成URL
if c.Url != "" {// 用户给定url信息,可以是点对点的地址,也可以是注册中心的地址
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")
for _, urlStr := range urlStrings {
serviceUrl, err := common.NewURL(urlStr)
...
}
} else {// 配置读入注册中心的信息
// assemble SubURL from register center's configuration mode
// 这是注册url,protocol = registry,包含了zk的用户名、密码、ip等等
c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)
...
// set url to regUrls
for _, regUrl := range c.urls {
regUrl.SubURL = cfgURL// regUrl的subURl存当前配置url
}
}
//至此,无论通过什么形式,已经拿到了全部的regURL
// (三)获取registryProtocol实例,调用其Refer方法,传入新构建好的regURL
if len(c.urls) == 1 {
// 这一步访问到registry/protocol/protocol.go registryProtocol.Refer
// 这里是registry
c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
} else {
// 如果有多个注册中心,即有多个invoker,则采取集群策略
invokers := make([]protocol.Invoker, 0, len(c.urls))
...
}
2)registryProtocol 获取到 zkRegistry 实例,进一步 Refer
// file: registry/protocol/protocol.go: Refer
// Refer provider service from registry center
// 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
// 这里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等
var serviceUrl = registryUrl.SubURL
if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry"
protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "")
registryUrl.Protocol = protocol//替换成了具体的值,比如"zookeeper"
}
// 接口对象
var reg registry.Registry
// (一)实例化接口对象,缓存策略
if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
// 缓存中不存在当前registry,新建一个reg
reg = getRegistry(®istryUrl)
// 缓存起来
proto.registries.Store(registryUrl.Key(), reg)
} else {
reg = regI.(registry.Registry)
}
// 到这里,获取到了reg实例 zookeeper的registry
//(二)根据Register的实例zkRegistry和传入的regURL新建一个directory
// 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory,
// 这一步将在下面详细给出步骤
// new registry directory for store service url from registry
directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
return nil
}
// (三)DoRegister 在zk上注册当前client service
err = reg.Register(*serviceUrl)
if err != nil {
logger.Errorf("consumer service %v register registry %v error, error message is %s",
serviceUrl.String(), registryUrl.String(), err.Error())
}
// (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
invoker := cluster.Join(directory)
// invoker保存
proto.invokers = append(proto.invokers, invoker)
return invoker
}
3)构造 directory(包含较复杂的异步操作)
extension.GetDefaultRegistryDirectory(®istryUrl, reg)
函数,本质上调用了已经注册好的 NewRegistryDirectory
函数:// file: registry/directory/directory.go: NewRegistryDirectory()
// NewRegistryDirectory will create a new RegistryDirectory
// 这个函数作为default注册在extension上面
// url为注册url,reg为zookeeper registry
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
go dir.subscribe(url.SubURL)
return dir, nil
}
// file: registry/directory/directory.go: subscribe()
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
// 增加两个监听,
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
// subscribe调用
dir.registry.Subscribe(url, dir)
}
我认为这种传入 listener 的设计模式非常值得学习,而且很有 java 的味道。 针对等待 zk 返回订阅信息这样的异步操作,需要传入一个 Listener,这个 Listener 需要实现 Notify 方法,进而在作为参数传入内部之后,可以被异步地调用 Notify,将内部触发的异步事件“传递出来”,再进一步处理加工。 层层的 Listener 事件链,能将传入的原始 serviceURL 通过 zkConn 发送给 zk 服务,获取到服务端注册好的 url 对应的二进制信息。 而 Notify 回调链,则将这串 byte[] 一步一步解析、加工;以事件的形式向外传递,最终落到 directory 上的时候,已经是成型的 newInvokers 了。 具体细节不再以源码形式展示,可参照上图查阅源码。
4)构造带有集群策略的 clusterinvoker
// (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
invoker := cluster.Join(directory)
123
// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker()
func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &failoverClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
12345
// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()
// Invoker 函数
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
...
//调用List方法拿到directory缓存的所有invokers
invokers := invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {// 检查是否可以实现调用
return &protocol.RPCResult{Err: err}
}
// 获取来自用户方向传入的
methodName := invocation.MethodName()
retries := getRetries(invokers, methodName)
loadBalance := getLoadBalance(invokers[0], invocation)
for i := 0; i <= retries; i++ {
// 重要!这里是集群策略的体现,失败后重试!
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
}
// 这里是负载均衡策略的体现!选择特定ivk进行调用。
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
}
return result
}
...
}
看了很多 Invoke 函数的实现,所有类似的 Invoker 函数都包含两个方向:一个是用户方向的 invcation;一个是函数方向的底层 invokers。 而集群策略的 invoke 函数本身作为接线员,把 invocation 一步步解析,根据调用需求和集群策略,选择特定的 invoker 来执行。 proxy 函数也是这样,一个是用户方向的 ins[] reflect.Type, 一个是函数方向的 invoker。 proxy 函数负责将 ins 转换为 invocation,调用对应 invoker 的 invoker 函数,实现连通。 而出于这样的设计,可以在一步步 Invoker 封装的过程中,每个 Invoker 只关心自己负责操作的部分,从而使整个调用栈解耦。 妙啊!!!
5)在 zookeeper 上注册当前 client
// file: config/refrence_config.go: Refer()
if len(c.urls) == 1 {
// 这一步访问到registry/protocol/protocol.go registryProtocol.Refer
c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
// (一)拿到了真实的invokers
} else {
// 如果有多个注册中心,即有多个invoker,则采取集群策略
invokers := make([]protocol.Invoker, 0, len(c.urls))
...
cluster := extension.GetCluster(hitClu)
// If 'zone-aware' policy select, the invoker wrap sequence would be:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
// (二)create proxy,为函数配置代理
if c.Async {
callback := GetCallback(c.id)
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
} else {
// 这里c.invoker已经是目的addr了
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
}
3. 将调用逻辑以代理函数的形式写入 rpc-service
config/config_loader.go: loadConsumerConfig()
// file: common/proxy/proxy.go: Implement()
// Implement
// proxy implement
// In consumer, RPCService like:
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
// Implement 实现的过程,就是proxy根据函数名和返回值,通过调用invoker 构造出拥有远程调用逻辑的代理函数
// 将当前rpc所有可供调用的函数注册到proxy.rpc内
func (p *Proxy) Implement(v common.RPCService) {
// makeDubboCallProxy 这是一个构造代理函数,这个函数的返回值是func(in []reflect.Value) []reflect.Value 这样一个函数
// 这个被返回的函数是请求实现的载体,由他来发起调用获取结果
makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
// 根据methodName和outs的类型,构造这样一个函数,这个函数能将in 输入的value转换为输出的value
// 这个函数具体的实现如下:
...
// 目前拿到了 methodName、所有入参的interface和value,出参数reply
// (一)根据这些生成一个 rpcinvocation
inv = invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inIArr),
invocation_impl.WithReply(reply.Interface()),
invocation_impl.WithCallBack(p.callBack),
invocation_impl.WithParameterValues(inVArr))
for k, value := range p.attachments {
inv.SetAttachments(k, value)
}
// add user setAttachment
atm := invCtx.Value(constant.AttachmentKey) // 如果传入的ctx里面有attachment,也要写入inv
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachments(k, value)
}
}
// 至此构造inv完毕
// (二)触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用
result := p.invoke.Invoke(invCtx, inv)
// 如果有attachment,则加入
if len(result.Attachments()) > 0 {
invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())
}
...
}
}
numField := valueOfElem.NumField()
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
methodName := t.Tag.Get("dubbo")
if methodName == "" {
methodName = t.Name
}
f := valueOfElem.Field(i)
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 针对于每个函数
outNum := t.Type.NumOut()
// 规定函数输出只能有1/2个
if outNum != 1 && outNum != 2 {
logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
t.Name, t.Type.String(), outNum)
continue
}
// The latest return type of the method must be error.
// 规定最后一个返回值一定是error
if returnType := t.Type.Out(outNum - 1); returnType != typError {
logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)
continue
}
// 获取到所有的出参类型,放到数组里
var funcOuts = make([]reflect.Type, outNum)
for i := 0; i < outNum; i++ {
funcOuts[i] = t.Type.Out(i)
}
// do method proxy here:
// (三)调用make函数,传入函数名和返回值,获得能调用远程的proxy,将这个proxy替换掉原来的函数位置
f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
logger.Debugf("set method [%s]", methodName)
}
}
...
}
在代理函数中实现由参数列表生成 Invocation 的逻辑
在代理函数实现调用 Invoker 的逻辑
将代理函数替换为原始 rpc-service 对应函数
// file: client.go: main()
config.Load()
user := &User{}
err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
从 client 到 server 的 invoker 嵌套链- 小结