栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

Netty进阶 协议设计与解析 HttpServerCodec、自定义协议Codec,源码分析

Java 更新时间:发布时间: 百科书网 趣学号
概述

我们的客户端和服务器进行通信的时候肯定需要遵守一定的协议,这些协议有可能是已经提前设计好的如Http协议,也可以是我们自定义的。

上一篇我们讲解了一些Netty给我们提供的一些编解码器,这是本篇文章的基础,否则可能看不懂。

这一篇主要是讲解Netty中的Http协议和我们自定义的协议。

1.HttpServerCodec

服务器的编解码器,遵从HTTP协议,先看如下类

首先看看HttpServerCodec的类结构,其中继承的CombinedChannelDuplexHandler其实就是合并了它的两个泛型的任务,就不多说了

关键的两个解码和编码类继承如下: 

 一般以Codec结尾的既可以做解码也可以编码,Decoder意为解码,Encoder意为编码。其中解码是ChannelInboundHandlerAdapter的子类(即入站handler),专门用于监听客户端发来的数据给他先进行一遍解码工作,Encoder为ChannelOutboundHandlerAdapter的子类(即出站handler),即需要发送的数据先进行一遍编码操作,再发去对应的客户端。

我们只需在服务器加上如下代码即可使用(完整代码略过)。

 //HTTP协议的编解码器
ch.pipeline().addLast(new HttpServerCodec());

演示解码:

大家可能会好奇,Codec到底会把客户端发来的信息解码为什么类型呢?是以前的ByteBuf、String?具体的细节下面分析

服务器加上如下代码,启动服务器,打开浏览器输入localhost:xxxx(绑定的端口),查看输出

结果如下(还有一些浏览器发的请求头请求行什么的太多了就不展示了)

 可以看到我在浏览器只发送了一次请求,却打印了两次。其实是HttpServerCodec把我们的请求解析成了两部分,第一部分为HttpRequest它包含请求头和请求行,第二部分为HttpContent代表请求体(即使是get请求,也会有请求体,顶多没有内容而已)。

按照这样的逻辑我们可能以后就需要在解码后的channelRead里区分一下请求头和请求体了:

 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       //打印出msg类信息
//    System.out.println(msg.getClass());
        if(msg instanceof HttpRequest){  //DefaultHttpRequest的父接口为HttpRequest
           //执行请求头的代码逻辑
        }else if (msg instanceof HttpContent){//同样比较父类即可,更通用
           //执行请求体的代码逻辑
      }
 }

但是这样似乎过于麻烦,如果我现在只关心其中的一种,不想做很多的if...else判断,那我们可以换另一个方式进行简化,即SimpleChannelInboundHandler

SimpleChannelInboundHandler的泛型就是他关心的消息类型,如果不是指定的消息类型则会跳过该handler。可以看到重写的channelRead0里的msg类型就是指定的泛型

ch.pipeline().addLast(new SimpleChannelInboundHandler() {//我们这里只关心请求头
   @Override
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest msg) throws Exception {
        //业务逻辑
    }
}

响应客户端:

我们服务器收到客户端的请求后当然需要回复客户端啦,根据上面的说明我们知道我们需要响应一个同样符合Http协议的响应对象给客户端,同样的写回数据时会被HttpServerCodec进行编码动作,先参考下面几个类

DefaultFullHttpResponse:Netty提供的给客户端响应的类如下

 HTTP协议版本类参考如下:

 状态码类参考(非常多):

 演示:

bootstrap.childHandler(new ChannelInitializer() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LoggingHandler());
        //HTTP协议的编解码器,
        ch.pipeline().addLast(new HttpServerCodec());
        //SimpleChannelInboundHandler的泛型就是他关心的消息格式,如果不是指定的格式则会跳过该handler
        ch.pipeline().addLast(new SimpleChannelInboundHandler() {//我们这里只关心请求头
            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest msg) throws Exception {
                log.debug("请求方法为{}",msg.method());//请求方法为GET
                log.debug("请求路径为{}",msg.uri());//请求资源路径为/
                log.debug("请求头为{}",msg.headers());//请求头为DefaultHttpHeaders[Host: localhost:8081, Connection: keep-alive……]

                //向客户端返回响应
                //netty提供的响应对象,这里指定响应版本和请求时候的版本相同,响应码为200
                DefaultFullHttpResponse response=new DefaultFullHttpResponse(msg.protocolVersion(),HttpResponseStatus.OK);
                //content方法是给上面的response写返回内容,返回值为ByteBuf,具体可以看源码
                //也可以构造该对象时就把消息体的ByteBuf放在构造方法里
                response.content().writeBytes("I am Netty".getBytes());
                //写回响应
                channelHandlerContext.writeAndFlush(response);
            }
        });
    }
});

结果可以看到,netty已经把该响应进行了编码并写回了对应的channel中,客户端也收到了该响应

 

但是还有个问题,就是浏览器这边一直在转圈加载,这是因为服务器并没有告诉它我这边已经响应完了,浏览器就以为还有数据,所以会一直加载,等待接收。

 

解决:在响应头里加一个字段content-length 为我们要发送的响应的长度,改动代码为

 各种响应头里的字段名字信息:

 浏览器正确的接收到响应信息。

另外,浏览器可能会自动给服务器发送图标请求

今后我们需要根据不同的请求uri,返回该客户端不同的资源。

HttpClientCodec是客户端的Http协议编解码器

2.自定义协议

前面我们讲解了常用的HTTP协议,如果想设计一套适合自己业务的协议来增强效率和减小浪费。

2.1自定义协议要素

自定义协议,就需要考虑如下的几个地方:

  • 魔数:用来在第—时间判定是否是无效数据包,了解jvm的小伙伴可能知道,class文件的魔数为CAFE BABY,如果你使用java命令执行文件,如果class文件前4个字节(魔数)不是约定的CAFE BABY,jvm就不会执行该calss文件。
  • 版本号:可以支持协议的升级,比如你的协议升级后,增加了几个字段,如果你想用旧的协议就必须按照协议的版本号进行区分识别。
  • 序列化算法:消息的正文采用哪种序列化和反序列化方式,如json,jdk等
  • 指令类型:是登录、注册、单聊、群聊...?跟业务相关
  • 请求序号:为了双工通信,提供异步能力,标志每一个不同的请求消息
  • 正文长度
  • 消息正文:消息正文就像是后台传给前端的各种复杂的数据,我们需要用序列化算法来解析该数据,否则的话数据接收的时候会乱,所以一般前后端交互采用json格式进行编解码

有兴趣的小伙伴可以查看HttpMessage类,看看Netty对Http协议是如何设计的(有点复杂)。

我们自定义协议应该对比HttpServerCodec学习,这样就算是哪天忘记代码了,也可以在官方的解决方案里找到答案,也可以学习HttpServerCodec更深入。

正文开始

1.首先建立一个Message抽象类和它的各种实现类,它是我们自定义协议的承载体,我们以Message为单位发送消息,这里写的比较简单,很多东西到后续扩充。

public abstract class Message implements Serializable {//实现序列化接口
    private int sequenceId;
    private int messageType;

    //可能的指令类型,用数字表示
    public static final int LoginRequestMessage=0;//登录请求的消息
    public static final int LoginResponseMessage=1;//登录回复的消息
    public static final int ChatRequestMessage=2;//聊天请求的消息
    public static final int ChatResponseMessage=3;//聊天答复的消息
    //由具体的子类Message实现,每一条子类对应上面的一个码子
    public abstract int getMessageType();
    //省略get set等方法
}

实现类只列举其中一种,以后会慢慢扩充:

public class LoginRequestMessage extends Message{
    private String username;
    private String password;
    private String nickname;
    @Override
    public int getMessageType() {
        return LoginRequestMessage;
    }
    public LoginRequestMessage(String username, String password, String nickname) {
        this.username = username;
        this.password = password;
        this.nickname = nickname;
    }
    //toString...
}

2.自定义编解码功能的实现 ,继承ByteToMessageCodec,该类和HttpServerCodec的CombinedChannelDuplexHandler一样融合HttpRequestDecoder、HttpResponseEncoder做编解码功能

public class MessageCodec extends ByteToMessageCodec {
    //重写编码和解码功能

    @Override
    //编码,看方法的参数,out就是需要传出去的ByteBuf,
    //我们需要把msg按我们的规定的协议格式放到out里传出去
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        //1).魔数,我们这里写个 BBQ! 为魔数吧,共占用4字节
        out.writeBytes(new byte[]{'B', 'B', 'Q', '!'});
        //2).版本号,一字节就够了,这里就写死写成1吧
        out.writeByte(1);
        //3).序列化算法,使用一字节表示序列化的方式,如jdk->0;json->1
        out.writeByte(0);
        //4).一字节的指令类型,Message各子类都封装了本消息是什么类型,直接取即可
        out.writeByte(msg.getMessageType());
        //5).请求序号,4字节
        out.writeInt(msg.getSequenceId());
        //6).正文,将对象转为二进制的字节数组,这里采用JDK的序列化
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(msg);
        byte[] bytes = baos.toByteArray();
        //7).长度写进去,占4字节
        out.writeInt(bytes.length);
        //填充1个字节,使得头信息为16字节,即2的整数次幂
        out.writeByte(0xff);
        //8).写入内容
        out.writeBytes(bytes);
    }

    @Override
    //解码,怎么编的码就怎么解码  ,out就是解析出来的数据集合,可能解析出多条数据
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serialType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        int length = in.readInt();
        in.readByte();//填充字段
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);

        Message message=null;
        //反序列化数据
        if (serialType == 0) {
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
             message= (Message) ois.readObject();
        }else if (serialType==1){
            //其他反序列方式
        }

        System.out.println("数据解码为:n  魔数:"+magicNum+"  版本号:"+version+"  序列化算法:"+serialType+"  指令类型:"+
                messageType+"  请求序号:"+sequenceId+"  长度:"+length);
        System.out.println("正文:"+message);

        //存入out,把消息给下一个Handler用
        out.add(message);
    }
} 

 3.测试工作

编码测试:

在我们服务器的初始化Handler中加入日志和我们新创建的编解码器,随后随便发送一条message(不要杠我为啥在服务器发送登录请求,因为我暂时只有这一个Message子类)

bootstrap.childHandler(new ChannelInitializer() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LoggingHandler());
        //加入我们的自定义的编解码器
        ch.pipeline().addLast(new MessageCodec());
        //
        ch.writeAndFlush(new LoginRequestMessage("java","netty","jvm"));
    }
});

当有客户端连接过来后,服务器即执行里面的代码逻辑,发送Message,日志如下:

 可以看到这条数据是我们编码成了想要的协议格式发送过去的,客户端那边只需要用我们写好的解码器解码即可。

解码测试:

在客户端加上我们的自定义编解码器,并且监听读事件,接收到消息后打印该消息,并顺便打印一下它的类信息,看看到底是List类型还是Message类型

handler(new ChannelInitializer() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LoggingHandler())
                .addLast(new MessageCodec())
                .addLast(new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        System.out.println("我接收到了消息"+msg);
                        System.out.println("该消息类型为"+msg.getClass());
                    }
                });
    }
});

结果如下:

客户端完美的收到消息,并解码成功。且验证了解析后的类型为LoginRequestMessage,不是List,应该是解析后把List集合里的数据迭代一条一条发送过来的。

问题分析:

但是代码还存在一个隐患就是上一篇我们学的黏包与半包问题,这里的自定义协议解析,只能解决粘包。半包解决不了,因为代码里写死了要读多少字节。如果字节数不够,那么再转为pojo的时候或者取数据的时候就会出错。所以要先通过lenghtfieldbased确保获取到一个完整消息。

如下在我们演示一下半包问题,在接收端设置限制一下接收数据的缓存大小

 重新测试的结果:

 第一次只取到60字节,下面还报了错即在我们读取ButeBuf时越了界,没有那么多字节给我们读

也就是我服务器现在发了一个很长的Message过去,全部序列化后一并发去客户端,客户端的滑动窗口可能不够,可能只接收到一半,此时客户端的解码器拿来解析了,就会出错。

我们采用上一篇的LengthFieldbasedframeDecoder解码器来解决问题,当该解码器发现实际收到的消息长度少于帧中的消息长度,那他就不会立马交给下一个Handler,他会继续接收,等收到的消息到达指定的长度后,才算接收完一个完整消息,发给下一个handler,也就会把完整消息发给了我们的自定义解码器,此时就不会出错了

 它的构造方法再重述一遍(按顺序):最大帧长度(根据我们的需求和长度字段能表示的最大值为准)、长度字段在我们帧中的偏移量(我们前面设计的协议长度字段是从第11位开始)、长度字段占用的字节数(4字节)、长度字段之后再调整1个字节的数据才是数据内容(因为后面我们有一字节的填充)、截取前0字节(因为我们不需要截取消息,完整的数据需要发送给我们自定义的解析器)。

 结果如下,可以看到日志打印的时候还是分了两次接收,但是LengthFieldbasedframeDecoder却给我们的两个数据进行了合并,之后再解码就不会出问题了

再来一个小问题分析:

我们创建Handler时,来了个channel建立连接都是new一系列新的编解码器,那我们Handler能否共享使用呢?

其实共享使用是有点问题的,就比如LengthFieldbasedframeDecoder用来解决黏包半包问题,如果把他的实例对象放在外面,给多个channel一起使用,那么有一个线程在里面放了数据,还没等到把数据拼凑起来,下一个线程又在里面存了数据肯定是有多线程并发问题的。但是像LoggingHandler这样的Handler就不会有问题,因为它只是做一个打印工作,没有记录状态等操作,它是线程安全的。

 如果你看到某个Handler有这个注解,他就是没有线程安全问题的,可以使用一个对象共享使用。

我现在想给我自定义的编解码器加上该注解并共享一个该对象,我就需要考虑我这个有没有线程安全问题的。按理来说我觉得我的编解码器没线程安全问题,因为该类没有一些共享的数据,只存在一些局部的数据。

我来加一个@Sharable玩一玩,并且把用到MessageCodec的地方改为如下的共享对象

 启动,并发现成功的报了错,它说不允许在该类加Shared注解。

 仔细看上面的错误,它发生在父类的方法,即构造方法上。父类的构造方法如下

 总之我们如果在该子类上加了Sharable注解一定会报错,Netty的设计还挺精妙

现在我们想加这个注解的话只能绕过这个ByteToMessageCodec父类了,来换个爹。

public class MessageCodec2 extends MessageToMessageCodec {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List out) throws Exception {
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
    }
} 

MessageToMessageCodec的意思就是一开始我就拿到的是一个完整的Message,并不存在黏包和半包问题,也就是对应我们前面使用LengthFieldbasedframeDecoder解决了这种问题的情况,所以可以安全的使用,不存在状态的问题。

它的两个泛型就是我们把LengthFieldbasedframeDecoder传过来的ButeBuf的类型要解码为Message,所以就是这两个类型。其他区别就是参数的小区别,我么需要自己创建ButeBuf,然后把它放进参数中的List集合即可,原理一样。

我们就可以愉快的使用@Sharable注解了

请一定要配合LengthFieldbasedframeDecoder使用。

 结果没问题,可以共享使用,节约了一点点内存^_^。

转载请注明:文章转载自 www.051e.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号