从源码全面解析 dubbo 服务端服务调用的来龙去脉

2023-06-29 09:44:28
  • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
  • 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
  • 📝联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

在这里插入图片描述

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、服务端调用流程

对于整个服务端调用来说,主要分为两部分:

  • 服务端启动时:封装的 HandlerFilter
  • 服务端调用时:经过 Handler,然后经过 Filter,最后调用目标方法

1、启动封装

我们启动的时候,会经过 Exporter<?> exporter = protocolSPI.export(invoker) ,走 Dubbo SPI 的扩展机制

1.1 过滤器的封装

首先是我们的过滤器的封装:ProtocolFilterWrapper

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
    // buildInvokerChain:
    return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}

public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
    // 获取所有的过滤器
    filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
    // 如果当前的过滤器不为空
    if (!CollectionUtils.isEmpty(filters)) {
        // 将所有的过滤器封装成链表(last)的形式
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
        }
        // 将过滤器链表封装成CallbackRegistrationInvoker类型
        return new CallbackRegistrationInvoker<>(last, filters);
    } 
}

到这里,我们将过滤器封装成一个链表并且将其封装成 CallbackRegistrationInvoker 形式

我们直接跳到 DubboProtocol.export 中:

public <T> Exporter<T> export(Invoker<T> invoker){
    // 得到当前注册Zookeeper的URL
    URL url = invoker.getUrl();
    // 根据URL得到唯一的key:cn/com.common.service.IUserService:1.0.0.test:20883
    // 分组(group) + 接口(intfenerce) + 版本(1.0.0.test) + 端口号(20883)
    String key = serviceKey(url);
    // 
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
}

public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;
    // 将当前的过滤器放至exporterMap中,方便我们后面的获取
    exporterMap.put(key, this);
    
    // 打开服务
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

到这里,我们的过滤器就封装到了 exporterMap 里面,后面会用到,我们后面再聊

1.2 Hander的封装

在我们上面封装完过滤器之后,我们会进行 openServer 打开服务这个操作,该操作会进行 Handler 的封装并启动我们的 Netty 服务

这里的Handler 一共封装成下面的流程:

NettyServerhandler -> NettyServer -> MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler -> DecodeHandler  -> HeaderExchangeHandler -> ExchangeHandlerAdapter

最终会走到 NettyServerdoOpen 方法:这里对 Netty 不太清楚的,可以看博主的 Netty 源码文章:【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码

// 典型的Netty启动的流程
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = createBossGroup();
    workerGroup = createWorkerGroup();

    final NettyServerHandler nettyServerHandler = createNettyServerHandler();
    channels = nettyServerHandler.getChannels();

    // 初始化我们的服务端启动器
    initServerBootstrap(nettyServerHandler);

    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}

protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
    boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);

    bootstrap.group(bossGroup, workerGroup)
        .channel(NettyEventLoopFactory.serverSocketChannelClass())
        .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
        .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
        .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
                }
                // 这里添加Netty的Handler责任链
                ch.pipeline()
                    // 解码器
                    .addLast("decoder", adapter.getDecoder())
                    // 编码器
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                    // 把上面封装的Handler放到Netty中,便于我们的调用执行
                    .addLast("handler", nettyServerHandler);
            }
        });
}

这里如果对 Netty 责任链不熟悉的可参考:【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?

2、服务调用

我们上篇文章剖析了 消费端 是如何进行的服务调用:从源码全面解析 dubbo 消费端服务调用的来龙去脉

这篇我们来看下服务端是如何进行服务调用的

2.1 Handler调用

我们直接跳到 NettyServerHandlerchannelRead 的方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    // 主要看这个Handler做的事情
 
    handler.received(channel, msg);
    ctx.fireChannelRead(msg);
}

image-20230627221354346

我们从上图看到,重点是这五个 Handler

  • MultiMessageHandler:处理多个数据包发送的消息,也称为“多包消息”
  • HeartbeatHandler:定期发送“心跳”消息,以确保连接仍然存在并响应正常
  • AllChannelHandler:管理所有通道的打开和关闭
  • DecodeHandler:将二进制数据解码为消息对象。
  • HeaderExchangeHandler:负责协议头部的交换和处理

我们挨个去看看他们实际的作用

2.1 MultiMessageHandler
  • 如果当前的请求是多个的话,需要进行切割成单个请求往下传递
  • 如果是单个请求的话,直接向下传递即可
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            try {
                handler.received(channel, obj);
            }
        }
    } else {
        handler.received(channel, message);
    }
}
2.2 HeartbeatHandler
  • 服务端:判断当前的请求是不是心跳请求,如果是心跳请求的话,发送心跳请求
  • 消费端:判断当前的请求是不是心跳请求,处理心跳请求
public void received(Channel channel, Object message) throws RemotingException {
    // 记录最近读取的时间
    setReadTimestamp(channel);
    // 判断当前的请求是不是心跳请求
    if (isHeartbeatRequest(message)) {
        Request req = (Request) message;
        if (req.isTwoWay()) {
            // 如果当前是同一个心跳检测则返回同一个响应
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(HEARTBEAT_EVENT);
            channel.send(res);
            if (logger.isDebugEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
            }
        }
        return;
    }
    // 消费端使用:处理心跳响应
    if (isHeartbeatResponse(message)) {
        return;
    }
    handler.received(channel, message);
}
2.3 AllChannelHandler
  • 为每一个服务端的请求从线程池中分配一个线程执行
public void received(Channel channel, Object message) throws RemotingException {
    // 获取线程
    ExecutorService executor = getPreferredExecutorService(message);
    try {
        // 将当前的Handler丢进线程池里面执行
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }
}

image-20230627224241967

2.4 DecodeHandler
  • 根据当前的响应请求进行解码操作
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }

    if (message instanceof Request) {
        decode(((Request) message).getData());
    }

    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }

    handler.received(channel, message);
}
2.5 HeaderExchangeHandler
  • 调用我们的过滤器并等待数据的返回
  • 将数据通过 Channel 返回至客户端
public void received(Channel channel, Object message) throws RemotingException {
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    // 服务端:当前的消息是请求
    if (message instanceof Request) {
        Request request = (Request) message;
        if (request.isEvent()) {
            handlerEvent(channel, request);
        } else {
            // 进行请求的解析
            if (request.isTwoWay()) {
                handleRequest(exchangeChannel, request);
            } else {
                handler.received(exchangeChannel, request.getData());
            }
        }
    } else if (message instanceof Response) {
        handleResponse(channel, (Response) message);
    } else {
        handler.received(exchangeChannel, message);
    }
}

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // Request [id=17, version=2.0.2, twoWay=true, event=false, broken=false, data=RpcInvocation [methodName=getUserById, parameterTypes=[class java.lang.Long]]]
    Object msg = req.getData();
    // 执行过滤器的操作
    CompletionStage<Object> future = handler.reply(channel, msg);
    // 等待数据的返回
    future.whenComplete((appResult, t) -> {
        try {
            if (t == null) {
                res.setStatus(Response.OK);
                res.setResult(appResult);
            } else {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            // 没有问题的话,将我们的数据返回
            channel.send(res);
        } 
    });
}

服务端解码之后的消息:

image-20230627225422895

2.2 过滤器调用

我们直接跳到 DubboProtocolreply 方法

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message){
    Invocation inv = (Invocation) message;
    // 得到过滤器的invoker
    Invoker<?> invoker = getInvoker(channel, inv);
    // 执行过滤器
    Result result = invoker.invoke(inv);
    // 返回结果
    return result.thenApply(Function.identity());
}

Invoker<?> getInvoker(Channel channel, Invocation inv){
    // 根据组+版本号+接口确定唯一的key
    String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));
    // 得到过滤器(我们在上面进行过对应的添加)
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    // 返回过滤器
    return exporter.getInvoker();
}

这里的过滤器总共十个,这里不带大家细看每一个的源码了,了解意思即可

  • ContextFilter:负责将请求上下文传递给请求处理流程中的其他组件
  • ProfilerServerFilter:负责性能分析和监视
  • EchoFilter:将请求消息作为响应消息返回
  • ClassLoaderFilter:负责加载和管理类加载器
  • GenericFilter:提供一种通用的请求处理机制
  • ExceptionFilter:负责处理异常并生成错误响应消息
  • MonitorFilter:负责监视请求处理过程中的状态和信息
  • TimeoutFilter:负责处理请求处理超时
  • TraceFilter:跟踪请求处理流程中的各个阶段和信息
  • ClassLoaderCallbackFilter:提供了回调函数的机制

2.3 方法调用

最终我们会到 AbstractProxyInvokerinvoke 方法

public Result invoke(Invocation invocation){
    // 执行我们动态代理的方法
     Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
    // 等待返回的结果
     CompletableFuture<Object> future = wrapWithFuture(value, invocation);
    // 封装请求
    CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
        AppResponse result = new AppResponse(invocation);
        if (t != null) {
            if (t instanceof CompletionException) {
                result.setException(t.getCause());
            } else {
                result.setException(t);
            }
        } else {
            result.setValue(obj);
        }
        // 返回数据
        // AppResponse [value=User(id=2, name=天涯, age=12), exception=null]
        return result;
    });
    return new AsyncRpcResult(appResponseFuture, invocation);
}

这里可以参考这篇文章:从源码全面解析 dubbo 服务暴露的来龙去脉

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    try {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            // proxy:实现类
            // methodName:getUserById
            // parameterTypes:class java.lang.Long
            // arguments:2
            // 执行相对应的方法即可
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

三、流程图

高清图片可私信博主

在这里插入图片描述

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

我们下期再见。

我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

往期文章推荐:

更多推荐

想了解RPA看这篇就够了(全面又清晰)

想了解RPA看这篇就够了(全面又清晰)本文目录如下:(1)什么是RPA(2)RPA技术发展路线(3)RPA应用场景(4)RPA对比人工的优势(5)RPA的核心能力(1)什么是RPARPA(RoboticProcessAutomation,机器人流程自动化)是一种通过软件机器人模拟和执行规则化、重复性的任务来提高工作效率

大模型重构行业,百度网盘再度抢跑?

随着ChatGPT爆火出圈,AI大模型逐渐渗透到各个行业和领域,诸多大厂也纷纷发布了自己的大模型产品。而在国内AI大模型的市场上,百度的文心大模型无疑是其中较为有名的一个。于是,在国内大厂都在探索如何在大模型时代取得更多优势时,百度大模型也不再局限于某个特定领域,而是将目光放在了网盘应用上。自百度网盘经历“百盘大战”进

Vue3中watch用法

在Vue3中的组合式API中,watch的作用和Vue2中的watch作用是一样的,他们都是用来监听响应式状态发生变化的,当响应式状态发生变化时,都会触发一个回调函数。当需要在数据变化时执行异步或开销较大的操作时,computed是无法操作异步数据的,所以需要使用watch进行侦听。侦听器watch作用是侦听一个或多个

斗地主案例及一些实现规则

4.斗地主发牌4.1案例介绍按照斗地主的规则,完成洗牌发牌的动作。具体规则:使用54张牌打乱顺序,三个玩家参与游戏,三人交替摸牌,每人17张牌,最后三张留作底牌。4.2案例分析准备牌:牌可以设计为一个ArrayList<String>,每个字符串为一张牌。每张牌由花色数字两部分组成,我们可以使用花色集合与数字集合嵌套迭

Visual Studio复制、拷贝C++项目与第三方库配置信息到新的项目中

本文介绍在VisualStudio软件中,复制一个已有的、配置过多种第三方库的C++项目,将其拷贝为一个新的项目,同时使得新项目可以直接使用原有项目中配置好的各类**C++**配置、第三方库等的方法。在撰写C++代码时,如果需要用到他人撰写的第三方库,那么每次新建一个项目时都需要重新配置一次环境,相对比较麻烦;而如果我

c语言每日一练(15)

前言:每日一练系列,每一期都包含5道选择题,2道编程题,博主会尽可能详细地进行讲解,令初学者也能听的清晰。每日一练系列会持续更新,上学期间将看学业情况更新。五道选择题:1、程序运行的结果为()#include<stdio.h>intmain(){intpad=0;intpAd=0;intsum=5;pad=5;pAd=

SpringMVC之JSR303和拦截器

一.什么是JSR303二.JSR303常用注解作用使用导入pom.xml在实体类相对应的属性中增加注解用来指定校验在hpjyController里面新加以下代码修改eidt.jsp测试结果​编辑二.拦截器什么是拦截器拦截器与过滤器的区别应用场景日志记录:拦截器可以用于记录请求的相关信息,如请求的URL、请求参数、请求的

Springmvc之JSR303和拦截器

JSR303拦截器1.JSR303什么是JSR303JSR是JavaSpecificationRequests的缩写,意思是Java规范提案。是指向JCP(JavaCommunityProcess)提出新增一个标准化技术规范的正式请求。任何人都可以提交JSR,以向Java平台增添新的API和服务。JSR已成为Java界

第九章(1):循环神经网络与pytorch示例(RNN实现股价预测)

第九章(1):循环神经网络与pytorch示例(RNN实现股价预测)作者:安静到无声个人主页作者简介:人工智能和硬件设计博士生、CSDN与阿里云开发者博客专家,多项比赛获奖者,发表SCI论文多篇。Thanks♪(・ω・)ノ如果觉得文章不错或能帮助到你学习,可以点赞👍收藏📁评论📒+关注哦!o( ̄▽ ̄)d欢迎大家来到

人工智能如何提高转录效率

人工转录已经以某种形式存在了数百年,甚至数千年。近年来,在人工智能(AI)技术推动下,转录取得长足发展。转录文稿本身是音频内容的文本形式;借此,读者无需再听一遍录音便可了解一段时间内所讲述的内容或所发生的情况。转录对于记录保存、知识共享和改善可访问性至关重要。过去几年,随着AI的发展,人们越来越依赖于一种称为自动语音识

详解Nacos和Eureka的区别

文章目录Eureka是什么Nacos是什么Nacos的实现原理Nacos和Eureka的区别CAP理论连接方式服务异常剔除操作实例方式自我保护机制Eureka是什么Eureka是SpringCloud微服务框架默认的也是推荐的服务注册中心,由Netflix公司与2012将其开源出来,Eureka基于REST服务开发,主

热文推荐