你好,欢迎进入江苏优软数字科技有限公司官网!

诚信、勤奋、创新、卓越

友好定价、专业客服支持、正版软件一站式服务提供

13262879759

工作日:9:00-22:00

codejock 162 深入解析Codejock 162:Java Dubbo源码核心调用流程与负载均衡机制

发布时间:2025-12-08

浏览次数:0

一、引言

对于身为Java开发者的人来讲,说到dubbo此物,按通常情况,我们是像对待黑盒那样去运用它的,并不需要把这个黑盒给打开。

但是,随着当下程序员这个行业的不断发展,我们是存在着一定必要去打开这个呈现为黑色的盒子codejock 162,进而对其中所蕴含的让人费尽思虑去探究的微妙之处进行探索 。

本期,关于 dubbo 源码解析这一系列的文章,是会带你去领略、感受 dubbo 源码所蕴含的奥秘的,句号。

这期源码文章,吸取了先前 、Kakfa、JUC 源码文章的经验教训,这文章不会逐一为大伙分析源码,会把一些不太重要的部分当作黑盒来处置,从而能让我们较为快速、高效地去阅读源码 。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、服务调用流程1、消费端

上篇的短文呀,论述了经由我们所限定的关于消费端怎样去预订我们服务端登记至 的服务的接口,这是从源码层面向全面剖析的把 dubbo 服务预订的所有经历叙述清楚 。

消费端都已经知晓了我们服务的相关信息,至此,下一个步骤便是要着手开启正规的调用操作了 。

我们先从消费端聊聊服务调用的流程

1.1 动态代理的回调

我们聊到消费端订阅服务时,最终创建的代码如下:

java复制代码public  T getProxy(Invoker invoker, Class[] interfaces) {
返回,(类型为T),通过代理获取代理对象,该代理对象基于接口,使用新实例化的,调用处理程序,此处理程序基于调用者,来创建新InstanceOf实例,(该实例是使用代理获取的基于接口的代理对象)。
}

应该清楚,那些看过关于动态代理的小伙伴,当我们去调用代理的接口之时,实际上所走的是dler这个类的方法。

Java复制代码,public修饰的Object,调用方法invoke,其参数为Object类型的proxy,Method类型的method,以及Object数组类型的args ,方法主体是左大括号之后的内容,具体为左大括号内,有一个花括号包裹的代码块,代码块内有一系列语句,但这里仅列出了方法签名 。
    // 获取方法名=getUserById
String,methodName,等于,method,调用,getName,方法,所返回,的值,。
    // 获取参数
    Class存放参数类型的数组,被赋值为,方法的,获取参数类型的,那个方法所返回的结果 。
    
    // 组装成 RpcInvocation 进行调用
就有了RpcInvocation这个实例呢,具体是这样得到的,那就要new一个RpcInvocation,它的参数依次是serviceModel,还有那method的名称叫做getName,invoker所对应的接口名称是getName,protocolServiceKey这个值,method的参数类型是getParameterTypes函数获取的结果对应的类型,还有args这个参数,这样就构建出了这个RpcInvocation实例 。
    
    // 执行调用方法
返回到,InvocationUtil所进行的,调用invoker的操作,以及rpcInvocation的操作 。
}

这里我们重点介绍下 的几个参数:

我们继续往下看 . 做了什么

将针对Java的复制代码,其中有一个公共静态的对象调用方法,该方法名为invoke,其参数是Invoker 。调用者,远程过程调用.invoke rpcInvocation) 抛出可抛出物。
    URL url = invoker.getUrl();
获取服务密钥,从字符串中,通过网址,利用其方法,来取得密钥,此密钥被存储在变量中。  变量名字叫服务密钥,它是字符串类型的。  网址。
将rpcInvocation的目标服务唯一名称设置为serviceKey ,。
    
}
// 判断当前的是应用注册还是接口注册
公开的结果调用,通过调用Invocation来实现,会抛出RpcException异常。
倘若,当下可利用的调用者并不为空值,那么,情况便是如此 。
倘若,步骤,等同于,应用程序初次之处 ,那么 ,(则在此处继续执行后续代码) , 若步骤在这段条件判断代码中,等于应用程序初次。
            if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
将调用者调用的结果当作返回值,这个调用是针对那个调用请求进行的,把调用的结果返回 。
            }
返回,决定调用者的函数,调用,该调用所对应的调用操作,有个括号,括号里面是调用时传入的参数,句号。
        }
返回,当前可使用的调用器,调用那个调用,那个调用的叫调用请求,的那个调用请求,的这项操作,的那个操作结果 。
    }
}

我们继续往下追源码

1.2 过滤器

java复制代码// 过滤器责任链模式
// 依次遍历,执行顺序:
公共的接口,名为FilterChainBuilder ,对吧 ? 。
公共的结果调用,通过调用这个调用,会抛出远程过程调用异常,以这种方式进行 ,标点符号: , 。逗号用于分开每个小部分,句号表示句子结束 。
        Result asyncResult;
InvocationProfilerUtils进入详细分析器,通过传入本次调用,还有一段代码,这段代码是返回一个字符串,该字符串内容是有“Filter”加上过滤器类的名字,加上“invoke.” 。
你提供的内容似乎是一段代码片段,不太明确你具体的改写要求。请你明确一下改写的具体方向或规则,以便我能更准确地按照要求进行改写。 如果仅从拆分角度: “asyncResult”,“等于”,“。
    }
}

这里会依次遍历所有的 :

究竟具体而言,每个过滤器是怎样去实现的呢,在此处就不会展开来进行讲述了,往后要是有机会的话,会单独去出一章 。

1.3 路由逻辑

当我们的责任链完成之后,下一步会经过我们的 路由 逻辑

Java,复制代码,公开的结果(Result),调用(invoke),最后的调用应用(Invocation),抛出(throws),远程过程调用异常(RpcException) 。
    // 
    List变量名“invokers”,将其设定为对“invocation” 执行“list”变换后的结果所构成的列表,。
InvocationProfilerUtils把详细剖析工具释放,针对的是调用。,,,。
LoadBalance loadbalance被初始化,其初始化借助的是invokers以及invocation,初始化之后的结果赋值给了loadbalance。
若为异步情况,RpcUtils.attachInvocationId于获取的Url处,针对此次调用进行操作 。
那么,返回值为:执行调用操作,其中调用操作涉及调用请求、调用者集合以及负载均衡策略,也就是返回执行该调用所需操作后的结果 。
}

其中,List>等于list(),这里便是我们的路由逻辑,。

java复制代码List> invokers = list(invocation);
public List有一个方法,其参数为Invocation类型的invocation ,该方法会抛出RpcException异常 ,并且这个方法返回类型是List 。
    List可以这样改写(但可能会比较难读):路由结果等于去执行列表操作作用于可用调用者以及调用请求 。
}
public List> doList(BitList调用者,调用,调用行为 (这里原内容似乎不完整且格式有些怪异,如果这非你本意,可补充完整正确内容以便我更好改写)。
    // 这里就是我们的路由策略!!!
    List把获取消费者网址的地址拿来,结合调用者们,再加上调用操作,然后通过路由器链进行路由,之后把数据结果给到变量result 。
要是返回的结果等于空值,那就返回由比特列表构成的空列表,否则就返回那个结果,。
}

这里的路由策略比较多,我举两个比较经典的:

而对于整体路由的流程:

到达此处,我们的路由会将符合相应要求的服务端挑选出来,紧接着便会进入我们的负载均衡阶段了,。

1.4 重试次数

这里我们设置 为 5

JAVA,复制代码,@DubboReference,协议为“dubbo”,超时时间100,重试次数5 。
用私有修饰的,名为iUserService的,属于IUserService类型的变量 。

我们来查看一下,在源码当中存在着几次调用,按照源码的情况而言,我们将会出现5加上1次的调用。

整数类型的len,被设定为调用calculateInvokeTimes这个方法,该方法以methodName作为参数,如此这般,得出相应结果 。
for (int i = 0; i < len; i++) {}
private int calculateInvokeTimes(String methodName) {
    // 获取当前的重试次数+1
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    RpcContext rpcContext = RpcContext.getClientAttachment();
    Object retry = rpcContext.getObjectAttachment(RETRIES_KEY);
    if (retry instanceof Number) {
        len = ((Number) retry).intValue() + 1;
        rpcContext.removeAttachment(RETRIES_KEY);
    }
    if (len <= 0) {
        len = 1;
    }
    return len;
}

我们直接 Debug 一下看看:

1.5 负载均衡

这一行,通过特定方式,也就是 (, ) 这种形式,从而得到我们的负载均衡策略,在默认情况下,呈现如下状态 。

我们可以看到,默认情况下是 随机负载。

我们继续往下追源码:

这段内容似乎不完整呀,请补充完整以便我能准确按照要求改写,仅这部分“java复制代码 public Result doInvoke(Invocation invocation, final List”无法进行有效改写。  。请你提供完整的句子以便我进行准确改写,仅这部分“invokers, LoadBalance loadbalance)”无法明确具体改写要求 。
    
    List> invoked = new ArrayList所调用者的数量(;复制调用者的数量();) // 被调用的那些用来进行调用操作的事物 。
    Set providers = new HashSet(len);
    for (int i = 0; i < len; i++) {
        // 如果是重新调用的,要去更新下Invoker,防止服务端发生了变化
        if (i > 0) {
            checkWhetherDestroyed();
将调用的内容放入列表中,把它赋值给用于复制调用者的变量,这一过程如此这般,借助调用函数,得到的结果是这个列表成为复制调用者的变量所引用的。
            // 再次校验
        }
        // 负载均衡逻辑!!!
        Invoker
        invoked.add(invoker);
RpcContext所获取的服务上下文,被设置为调用者列表,此列表即被调用的那个列表 。
        boolean success = false;
        try {
将.invokeWithContext(调用者, 调用)的结果赋值给结果变量, 该变量名为result, 句号。
            success = true;
            return result;
        } 
    }
}

这里我简单将下负载均衡的逻辑:

java复制代码Invokerinvoke者等于进行选择操作,该操作基于负载均衡,针对调用,通过调用者以及已选择的进行,最终得出结果 。
private Invoker执行选择操作,负载均衡器为LoadBalance,调用为Invocation,列表为List ,。 是不是相关内容没给全呀,提供完整准确句子以获更合适改写 。> invokers, List> selected){
    // 如果只有一个服务端,那还负载均衡个屁
    // 直接校验下OK不OK直接返回就好
    if (invokers.size() == 1) {
        Invoker tInvoker = invokers.get(0);
检查是否应当使调用者无效,对这个(t具体是什么不明确对应在这里语境下暂称)调用者进行检查 。

        return tInvoker;
    }
    // 如果多个服务端,需要执行负载均衡算法
    Invokerloadbalance.select(invokers, getUrl(), invocation)的结果被赋值给invoker 。
    return invoker;
}

Dubbo 里面的负载均衡算法如下:

这里也就不一介绍了,正常情况下,我们采用的都是 负载均衡

当然这里博主介绍另外一个写法,也是我们业务中使用的

1.4.1 自定义负载均衡

之上我们瞧见了,借由 = (, ) ,这样子我们能够获得一个负载均衡的达成类别,。

在咱们的生产场景里,不同集群有着不一样的合作方,我们得依据那些合作方来进行不同集群对外的发送调用 。

在这个特定的时候,我们能够去重新撰写我们所拥有的那个,于其中再次撰写我们自身的逻辑,而处于此处的集群A,实际上也就是被我们称作的group 。

1.6 调用服务

在我们达成如下流程之际:先是过滤器,接着是路由,然后是重试,最后是负载均衡,此时便来到了下述的这行:

将那Java代码进行复制,所得到的结果是,依靠调用上下文,借助那个调用者,去执行那个调用行为而产生的结果为resultresult等于使用这个调用上下文,调用那个调用者,执行那个调用行为所得到的结果 。

我们继续往下追:

(Java中),复制代码,(其中),public Result(这一部分),invoke(这个方法),(Invocation invocation)(这个参数),(会被用于),throws RpcException(抛出Rpc异常) ,(从而实现)特定行为操作 。
    try {
        // 加读写锁
        lock.readLock().lock();
进行返回操作,由调用者去调用那个调用请求,从而实现相应的调用行为 。
    } finally {
        lock.readLock().unlock();
    }
}

我们直接追到 的 方法

Java,复制代码,public Result invoke,Invocation inv,throws RpcException ,标点算吗。
RpcInvocation这个对象,其被用了这样,一种方式来赋值,也就是以这种取值动作,把inv转变成RpcInvocation类型,赋值给invocation , 。
    // 配置RPCinvocation
对调用进行准备之事,将调用予以准备,准备那个调用,(此处)准备调用事项 。
    // 调用RPC同时同步返回结果
将调用并返回的操作执行,得到的结果,赋值给异步RPC结果,即 AsyncRpcResult asyncResult,是通过 doInvokeAndReturn(invocation) 来达成的 ,。
    // 等待返回结果
在同步结果为真实情形时等待结果,此即为异步结果,针对调用操作而言 。
    return asyncResult;
}

我们可以看到,对于调用服务来说,一共分为一下三步:

1.6.1 配置

这里主要将 转变成

Java,复制代码,私有 void 方法,用于准备调用,该方法接收 RpcInvocation 类型的 inv 参数 !
将RpcInvocation的Invoker属性设置了,指出这个调用是由哪一个Invoker发起的,没错吧 。
    inv.setInvoker(this);
    
	// 当前线程的一些状态信息
你所提供的内容并不是一个完整的可用于改写的合适文本呀,它看起来像是一段代码片段,不太符合按照要求进行改写的条件呢。
    // 同步调用、异步调用
反序列化设置调用模式,在远程过程调用工具类利用统一资源定位符和反序列化设置调用模式获取调用模式的调用上应用此调用模式,句号。
    // 异步调用生成一个唯一的调用 ID
RpcUtils.attachInvocationIdIfAsync,去获取那个Url,把inv与之关联。 ,。
    // 选择序列化的类型
假如,序列化标识符并非为空,那么,就会出现一种情况,即只要,序列化标识符不等于空值,就会有,这样的一种状况存在,。
将序列化ID,放入到名为inv的对象里,对应的键是SERIALIZATION_ID_KEY 。
    }
}

1.6.2 调用 RPC 同步返回结果

java复制代码,私有,异步远程过程调用结果,执行调用并返回,针对rpc调用请求 ,这种操作结果的方法是,doInvokeAndReturn是什么呢 而且呢,它的参数是RpcInvocation类型的invocation 。
异步结果等于,将调用执行结果,转换为异步远程过程调用结果,放置进异步结果变量中 。
}
保持受保护状态的结果,去执行调用,该最终调用为每次调用,其方式为这样做 ,做出来为那个结果去进行 ,在结果成为那个的受严格保护状态下 ,这么做 ,做出来为最终的。
    // 获取超时时间
int timeout,等于RlpcUtils.calculateTimeout去计算超时时间,靠的是getUrl这个方法,还有invocation,以及methodName,并且使用DEFAULT_TIMEOUT这个默认超时时间值 。
   
    // 设置超时时间
invocation,将TIMEOUT_KEY这点设定为依附点,用String类型来表示timeout,把它转化成String.valueOf(timeout)这种形式 。
    
    // 从dubbo线程池中拿出一个线程
有一个名为executor的ExecutorService实例,它是通过调用getCallbackExecutor方法来获取的,而该方法的参数是getUrl方法的返回值以及inv这个对象,句号。
    // request:进行调用
	CompletableFuture向当前客户端发出请求,传入调用信息、超时时间以及执行器,之后对返回结果进行处理,通过将其转换为应用响应的方式,得到应用响应未来对象,此时应用响应未来对象就等于当前客户端发出那个有耗时操作和未来可预期结果的请求,在按指定时间等待完成后根据执行结果进行相应处理所形成一个具备延后属性的承接应用请求正确处理结果的未来对象,也就是应用响应未来对象,它是以当前客户端调用请求。
提取出“进行设置与调用(Set/get calls)这一操作”。在执行过“使用与设置未来应用应用响应未来,并进行设置与调用这一批次的操作”之后,“此操作应用(Apps)的未来上下文(Future Context)应用此批次操作中的未来应用应用响应未来”。 ,。
一个异步远程过程调用结果对象被创建,它是通过一个应用响应未来对象以及一个调用被构建而成,这个异步远程过程调用结果对象被赋值给一个名为result的变量 。
    result.setExecutor(executor);
    return result;
}

这里的 . 进行请求的发送:

爪哇,复制代码,公开的,可完成的未来 ,可完成完成的未来 ,可完成的未来的 ,可完成的未来啥啥啥 ,啥啥。request ,有 Object 类型的 request ,还有 int 类型的 timeout ,再有 ExecutorService 类型的 executor  ,这是一个方法 。
首先返回,然后由客户端进行请求,该请求应用给定的超时时间,并且通过一个执行器来发出这个求操作,最后完成啦。
}
public CompletableFuture request(Object request, int timeout, ExecutorService executor){
    Request req = new Request();
将版本设置为,获取到的协议版本所对应的版本 ,通过req ,借助Version 。
    req.setTwoWay(true);
    req.setData(request);
由DefaultFuture创建新的Future对象,该对象关联特定的信道,承载特定请求,设置了特定超时时间,并由特定执行器执行,此一过程所产生的对象被命名为future 。
    channel.send(req);
    return future;
}

这个地方的.send(req)呦,那可是dubbo自行进行包装的呢,咱们去瞅一瞅它的实现情况吧。

当然啦,要是我们这儿有人看过博主 Netty 源码文章,实际上是能够猜到的,必定是对 Netty 进行了封装的 。

对于Java而言,复制代码当中有一个公共的无返回值方法,其名为send ,该方法带有两个参数,一个是Object类型的message,另一个是布尔类型的sent , 这一方法还会抛出RemotingException异常 。
        // 校验当前的Channel是否关闭
        super.send(message, sent);
        boolean success = true;
        int timeout = 0;
        try {
            // channel 写入并刷新
此为通道,属于 io.netty.channel 这个范畴里的 Channel  。
获取通道未来对象,该对象是通过通道将消息写入并刷新后得到的,通道未来对象被赋予固定标签,此标签为“future”,消息被称作“message” ,通道被叫做“channel” 。
            if (sent) {
                // 等待超时的时间
                // 超过时间会报错
timeout的值,是通过调用getUrl()方法,获取其中的正参数,该正参数的关键词为TIMEOUT_KEY,若获取不到则使用默认 timeout:DEFAULT_TIMEOUT 。
成功等于未来等待超时时间,等于未来等待,等于超时时间,等于等待,此处的这个等于,是指成功的结果由未来等待超时时间来决定,这个等待是基于未来的时间概念,而超时时间。
            }
            // 这里如果报错了,就会走重试的逻辑
存在一个Throwable类型的cause,它的值是由future所导致的结果 ,。
    }
}

1.6.3 等待返回结果

对于使用Java进行代码复制的情况,若为同步操作,需等待结果,针对异步结果,要执行相应的调用操作,。
若为同步情况,要等待结果时实施此操作 将异步RPC结果和RPC调用作为参数传入 此操作是私有的 void类型 其名为waitForResultIfSync。
    // 判断当前的调用是不是同步调用
    // 异步调用直接返回即可
要是,(通过.invoke(getInvokeMode()),所获取到的调用模式),与(预设定的调用模式InvokeMode.SYNC),不一样的话,那会是个什么状况呢 ?
        return;
    }
    
    // 获取超时时间 
对象,超时键,等于,调用,获取对象附件,不转换,超时键,(这里“TIMEOUT_KEY”应改为具体的中文含义,假设是“超时键”) 。
长时间的超时时间,被设置为通过RpcUtils工具类按照特定规则将超时键转换为数字,此数字为整数类型的最大值 。
    // 等待timeout时间
    // 获取失败-直接抛出异常
异步结果获取,获取时设置超时时间,超时时间的单位为毫秒 。另外,获取操作通过调用get方法,该方法接受超时时间以及时间单位这两个参数 。
}
获取,以长时间,时间单位,为参数,返回结果 ,这样的一个方法 ,其中长的那个时间参数 ,是长整型 ,这个时间 ,有单位 ,是时间单位中的一种。
    // 获取响应返回的数据-等待timeout时间
返回,响应未来对象,获取,超时时间,时间单位,组成的结果 。
}

如果没有异常codejock 162,如下图所示:

到这里我们的消费端调用服务的整个流程源码剖析就完毕了~

三、流程

高清图片可私聊博主

四、总结

鲁迅先曾经讲过:独自走是困难的,众人一起走是容易的,与志向相同、兴趣相投的人共同取得进步。你们以全部真心绝不保留地去分享各自的经验,这才是去对抗互联网寒冷冬天最为合适的选择。

实际上,好多时候,并非是我们不尽力努力,极有可能恰是自身努力探寻的方向出现偏差,要是存在一个人能够略微给予你一些指明引导,你确实极有可能能减少好些年里所走的曲折弯路。

如有侵权请联系删除!

13262879759

微信二维码