消息中间件—RocketMQ 的 RPC 通信(一)

star2017 1年前 ⋅ 8291 阅读

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键

一、RocketMQ 中 Remoting 通信模块概览

RocketMQ 消息队列的整体部署架构如下图所示:先来说下 RocketMQ 消息队列集群中的几个角色:

(1)NameServer:在 MQ 集群中做的是做命名服务,更新和路由发现 broker 服务;

(2)Broker-Master:broker 消息主机服务器;

(3)Broker-Slave:broker 消息从机服务器;

(4)Producer:消息生产者;

(5)Consumer:消息消费者;

其中,RocketMQ 集群的一部分通信如下:
(1)Broker 启动后需要完成一次将自己注册至 NameServer 的操作;随后每隔 30s 时间定期向 NameServer 上报 Topic 路由信息;
(2)消息生产者 Producer 作为客户端发送消息时候,需要根据 Msg 的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上重新拉取;
(3)消息生产者 Producer 根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker 作为消息的接收者收消息并落盘存储;
从上面(1)~(3)中可以看出在消息生产者, Broker 和 NameServer 之间都会发生通信(这里只说了 MQ 的部分通信),因此如何设计一个良好的网络通信模块在 MQ 中至关重要,它将决定 RocketMQ 集群整体的消息传输能力与最终的性能。
rocketmq-remoting 模块是 RocketMQ 消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如 rocketmq-client、rocketmq-server、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ 消息队列自定义了通信协议并在 Netty 的基础之上扩展了通信模块。**ps:鉴于 RocketMQ 的通信模块是建立在 Netty 基础之上的,因此在阅读 RocketMQ 的源码之前,读者最好先对 Netty 的多线程模型、Java NIO 模型均有一定的了解,这样子理解 RocketMQ 源码会较为快一些。**作者阅读的 RocketMQ 版本是 4.2.0, 依赖的 Netty 版本是 4.0.42.Final. RocketMQ 的代码结构图如下:

源码部分主要可以分为 rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv 和 rocketmq-remoting 等模块,通信框架就封装在 rocketmq-remoting 模块中。
本文主要从 RocketMQ 的协议格式,消息编解码,通信方式(同步/异步/单向)和具体的发送/接收消息的通信流程来进行阐述等。

二、RocketMQ 中 Remoting 通信模块的具体实现

1、Remoting 通信模块的类结构图

从类层次结构来看:

(1)RemotingService:为最上层的接口,提供了三个方法:

1.  `void start();`

2.  `void shutdown();`

3.  `void registerRPCHook(RPCHook rpcHook);`

(2)RemotingClient/RemotingSever:两个接口继承了最上层接口—RemotingService,分别各自为 Client 和 Server 提供所必需的方法,下面所列的是 RemotingServer 的方法:

     void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

(3)NettyRemotingAbstract:Netty 通信处理的抽象类,定义并封装了 Netty 处理的公共处理方法;

(4)NettyRemotingClient 以及****NettyRemotingServer:分别实现了 RemotingClient 和 RemotingServer, 都继承了 NettyRemotingAbstract 抽象类。RocketMQ 中其他的组件(如 client、nameServer、broker 在进行消息的发送和接收时均使用这两个组件)



2、消息的协议设计与编码解码

在 Client 和 Server 之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义 RocketMQ 的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在 RocketMQ 中,RemotingCommand 这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

RemotingCommand 类的部分成员变量如下:

Header 字段 类型 Request 说明 Response 说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0 表示成功,非 0 则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于 reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通 RPC 还是 onewayRPC 得标志 区分是普通 RPC 还是 onewayRPC 得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap 请求自定义扩展信息 响应自定义扩展信息

这里展示下 Broker 向 NameServer 发送一次心跳注册的报文:

[
code=103,//这里的103对应的code就是broker向nameserver注册自己的消息
language=JAVA,
version=137,
opaque=58,//这个就是requestId
flag(B)=0,
remark=null,
extFields={
    brokerId=0,
    clusterName=DefaultCluster,
    brokerAddr=ip1: 10911,
    haServerAddr=ip1: 10912,
    brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON

可见传输内容主要可以分为以下 4 部分:
(1)消息长度:总长度,四个字节存储,占用一个 int 类型;
(2)序列化类型&消息头长度:同样占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3)消息头数据:经过序列化后的消息头数据;
(4)消息主体数据:消息主体的二进制字节数据内容;
消息的编码和解码分别在 RemotingCommand 类的 encode 和 decode 方法中完成,下面是消息编码 encode 方法的具体实现:

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;    //消息总长度

    // 2> header data length
    //将消息头编码成byte[]
    byte[] headerData = this.headerEncode(); 
    //计算头部长度 
    length += headerData.length;              

    // 3> body data length
    if (this.body != null) {
        //消息主体长度
        length += body.length;                
    }
    //分配ByteBuffer, 这边加了4, 
    //这是因为在消息总长度的计算中没有将存储头部长度的4个字节计算在内
    ByteBuffer result = ByteBuffer.allocate(4 + length);  

    // length
    //将消息总长度放入ByteBuffer
    result.putInt(length);   

    // header length
    //将消息头长度放入ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 

    // header data
    //将消息头数据放入ByteBuffer
    result.put(headerData);    

    // body data;
    if (this.body != null) {
        //将消息主体放入ByteBuffer
        result.put(this.body); 
    }
    //重置ByteBuffer的position位置
    result.flip();     

    return result;
}

    /**
     * markProtocolType方法是将RPC类型和headerData长度编码放到一个byte[4]数组中
     *
     * @param source
     * @param type
     * @return
     */
    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];

        result[0] = type.getCode();
        //右移16位后再和255与->“16-24位”
        result[1] = (byte) ((source >> 16) & 0xFF);
        //右移8位后再和255与->“8-16位”
        result[2] = (byte) ((source >> 8) & 0xFF);
        //右移0位后再和255与->“8-0位”
        result[3] = (byte) (source & 0xFF);
        return result;
    }

消息解码 decode 方法是编码的逆向过程,其具体实现如下:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
      //获取byteBuffer的总长度
      int length = byteBuffer.limit();

      //获取前4个字节,组装int类型,该长度为总长度
      int oriHeaderLen = byteBuffer.getInt();

      //获取消息头的长度,这里和0xFFFFFF做与运算,编码时候的长度即为24位
      int headerLength = getHeaderLength(oriHeaderLen);

      byte[] headerData = new byte[headerLength];
      byteBuffer.get(headerData);

      RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

      int bodyLength = length - 4 - headerLength;
      byte[] bodyData = null;
      if (bodyLength > 0) {
          bodyData = new byte[bodyLength];
          byteBuffer.get(bodyData);
      }
      cmd.body = bodyData;

      return cmd;
  }

3、消息的通信方式和通信流程

在 RocketMQ 消息队列中支持通信的方式主要有以下三种:
(1)同步(sync)
(2)异步(async)
(3)单向(oneway)
其中“同步”通信模式相对简单,一般用在发送心跳包场景下,无需关注其 Response。本文将主要介绍 RocketMQ 的异步通信流程(限于篇幅,读者可以按照同样的模式进行分析同步通信流程)。
下面先给出了 RocketMQ 异步通信的整体流程图:

null

RocketMQ 异步通信的整体时序图.png

下面两小节内容主要介绍了 Client 端发送请求消息和 Server 端接收消息的具体实现,其中对于 Client 端的回调可以参考 RocketMQ 的源码来分析这里就不做详细介绍。

3.1、Client 发送请求消息的具体实现

当客户端调用异步通信接口—invokeAsync 时候,先由 RemotingClient 的实现类—NettyRemotingClient 根据 addr 获取相应的 channel(如果本地缓存中没有则创建),随后调用 invokeAsyncImpl 方法,将数据流转给抽象类 NettyRemotingAbstract 处理(真正做完发送请求动作的是在 NettyRemotingAbstract 抽象类的 invokeAsyncImpl 方法里面)。具体发送请求消息的源代码如下所示:

/**
     * invokeAsync(异步调用)
     * 
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1

        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            //根据request ID构建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            //将ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
                //使用Netty的channel发送请求数据
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    //消息发送后执行
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            //如果发送消息成功给Server,那么这里直接Set后return
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                            //执行回调
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            //释放信号量
                            responseFuture.release();
                        }

                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                //异常处理
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

在 Client 端发送请求消息时有个比较重要的数据结构需要注意下:
(1)responseTable—保存请求码与响应关联映射

protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable 

opaque 表示请求发起方在同个连接上不同的请求标识代码,每次发送一个消息的时候,可以选择同步阻塞/异步非阻塞的方式。无论是哪种通信方式,都会保存请求操作码至 ResponseFuture 的 Map 映射—responseTable 中。
(2)ResponseFuture—保存返回响应(包括回调执行方法和信号量)

public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
        SemaphoreReleaseOnlyOnce once) {
        this.opaque = opaque;
        this.timeoutMillis = timeoutMillis;
        this.invokeCallback = invokeCallback;
        this.once = once;
    }

对于同步通信来说,第三、四个参数为 null;而对于异步通信来说,invokeCallback 是在收到消息响应的时候能够根据 responseTable 找到请求码对应的回调执行方法,semaphore 参数用作流控,当多个线程同时往一个连接写数据时可以通过信号量控制 permit 同时写许可的数量。
(3)异常发送流程处理—定时扫描 responseTable 本地缓存
在发送消息时候,如果遇到异常情况(比如服务端没有 response 返回给客户端或者 response 因网络而丢失),上面所述的 responseTable 的本地缓存 Map 将会出现堆积情况。这个时候需要一个定时任务来专门做 responseTable 的清理回收。在 RocketMQ 的客户端/服务端启动时候会产生一个频率为 1s 调用一次来的定时任务检查所有的 responseTable 缓存中的 responseFuture 变量,判断是否已经得到返回, 并进行相应的处理。

public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();

            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }

        for (ResponseFuture rf : rfList) {
            try {
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

3.2、Server 端接收消息并进行处理的具体实现

Server 端接收消息的处理入口在 NettyServerHandler 类的 channelRead0 方法中,其中调用了 processMessageReceived 方法(这里省略了 Netty 服务端消息流转的大部分流程和逻辑)。其中服务端最为重要的处理请求方法实现如下:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根据RemotingCommand中的code获取processor和ExecutorService
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //rpc hook
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }
                    //processor处理请求
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //rpc hook
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                        .equals(e.getClass().getCanonicalName())) {
                        PLOG.error("process request exception", e);
                        PLOG.error(cmd.toString());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //封装requestTask
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            //想线程池提交requestTask
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        //构建response
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

上面的请求处理方法中根据 RemotingCommand 的请求业务码来匹配到相应的业务处理器;然后生成一个新的线程提交至对应的业务线程池进行异步处理。
(1)processorTable—请求业务码与业务处理、业务线程池的映射变量

    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

我想 RocketMQ 这种做法是为了给不同类型的请求业务码指定不同的处理器 Processor 处理,同时消息实际的处理并不是在当前线程,而是被封装成 task 放到业务处理器 Processor 对应的线程池中完成异步执行。(在 RocketMQ 中能看到很多地方都是这样的处理,这样的设计能够最大程度的保证异步,保证每个线程都专注处理自己负责的东西

三、总结

刚开始看 RocketMQ 源码—RPC 通信模块可能觉得略微有点复杂,但是只要能够抓住 Client 端发送请求消息、Server 端接收消息并处理的流程以及回调过程来分析和梳理,那么整体来说并不复杂。RPC 通信部分也是 RocketMQ 源码中最重要的部分之一,想要对其中的全过程和细节有更为深刻的理解,还需要多在本地环境 Debug 和分析对应的日志。同时,鉴于篇幅所限,本篇还没有来得及对 RocketMQ 的 Netty 多线程模型进行介绍,将在消息中间件—RocketMQ 的 RPC 通信(二)篇中来做详细地介绍。
在此顺便为自己打个 Call,有兴趣的朋友可以关注下我的个人公众号:“匠心独运的博客”,对于 Java 并发、Spring、数据库和消息队列的一些细节、问题的文章将会在这个公众号上发布,欢迎交流与讨论。


本文地址:https://www.6aiq.com/article/1563128272857
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: