Netty源码分析-7-MessageToByteEncoder

栏目: 后端 · 发布时间: 5年前

内容简介:下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemo

前面我们分析了ByteToMessageDecoder解码器,对应于解码需要有相关的编码器,即从 Java 对象转换成二进制数据的组件,这就是MessageToByteEncoder。

Netty源码分析-7-MessageToByteEncoder

基本使用

下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中

class StringToByteEncoderextends MessageToByteEncoder<String>{

        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
            byteBuf.writeBytes(s.getBytes());
        }
}
值得注意的是MessageToByteEncoder 的I可以是ByteBuf,即可以从一种格式的二进制转换为另一种格式的二进制。下面的LengthFieldPrepender是LengthFieldBasedFrameDecoder解码器对应的编码器,负责将已经编码好的ByteBuf按照消息长度在前面追加length字段。虽然继承于MessageToMessageEncoder 但其实也是MessageToByteEncoder

,因为它的接收和输出都是ByteBuf类型。

public class LengthFieldPrependerextends MessageToMessageEncoder<ByteBuf>{

    private final ByteOrder byteOrder;
    private final int lengthFieldLength;
    private final boolean lengthIncludesLengthFieldLength;
    private final int lengthAdjustment;

    public LengthFieldPrepender(
ByteOrder byteOrder,int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength){
        // 只支持1,2,3,4,8字节长的length字段编码
        if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
            lengthFieldLength != 3 && lengthFieldLength != 4 &&
            lengthFieldLength != 8) {
            throw new IllegalArgumentException(
                    "lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
                    lengthFieldLength);
        }
        ObjectUtil.checkNotNull(byteOrder, "byteOrder");
        this.byteOrder = byteOrder;
        this.lengthFieldLength = lengthFieldLength;
        this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
        this.lengthAdjustment = lengthAdjustment;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception {
        int length = msg.readableBytes() + lengthAdjustment;
        if (lengthIncludesLengthFieldLength) {
            length += lengthFieldLength;
        }

        if (length < 0) {
            throw new IllegalArgumentException(
                    "Adjusted frame length (" + length + ") is less than zero");
        }

        switch (lengthFieldLength) {
        case 1:
            if (length >= 256) {
                throw new IllegalArgumentException(
                        "length does not fit into a byte: " + length);
            }
            out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
            break;
        ... 中间省略
        case 8:
            out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
            break;
        default:
            throw new Error("should not reach here");
        }
        out.add(msg.retain());
    }
}

实现分析

MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemory堆外直接内存,默认是true使用,如果当前classpath中有 sun.misc.Unsafe 类则使用其获取DirectMemory否则降级为heapMemory。

public abstract class MessageToByteEncoder<I>extends ChannelOutboundHandlerAdapter{
    private final TypeParameterMatcher matcher;
    private final boolean preferDirect;

    // 创建一个满足当前参数类型 I的Encoder,preferDirect表示是否优先使用DirectMemory
    protected MessageToByteEncoder(boolean preferDirect) {
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect;
    }

作为Encoder,需要处理的是write事件。

write逻辑为

  1. 判断是否为符合当前类型的对象
  2. 根据当前对象和是否使用DirectMemory来申请一个ByteBuf
  3. 调用需要子类override的encoder方法,将需要编码的msg对象encoder到ByteBuf中,然后release msg对象。
  4. 调用ChannelContextWrite.write方法,交给下一个ChannelHandlerContext处理这个ByteBuf
  5. 如果不符合1, 则直接交给下一个ChannelHandlerContext
    @Override
       public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception {
           ByteBuf buf = null;
           try {
               // 判断对象类型
               if (acceptOutboundMessage(msg)) {
                   @SuppressWarnings("unchecked")
                   I cast = (I) msg;
                   // 申请ByteBuf
                   buf = allocateBuffer(ctx, cast, preferDirect);
                   try {
                       // 编码写入ByteBuf中
                       encode(ctx, cast, buf);
                   } finally {
                       // release msg对象
                       ReferenceCountUtil.release(cast);
                   }
                   // 如果ByteBuf不空则ctx write
                   if (buf.isReadable()) {
                       ctx.write(buf, promise);
                   } else {
                       // 否则release byteBuf然后写入一个空BUFFER
                       buf.release();
                       ctx.write(Unpooled.EMPTY_BUFFER, promise);
                   }
                   // 帮助GC回收
                   buf = null;
               } else {
                   // 其他类型的对象不处理交给后面的处理
                   ctx.write(msg, promise);
               }
           } catch (EncoderException e) {
               // encode方法的异常抛给上层处理
               throw e;
           } catch (Throwable e) {
               // 其他包装成EncoderException的异常抛给上层处理
               throw new EncoderException(e);
           } finally {
               // 防止encode方法异常导致申请的ByteBuf没有释放
               if (buf != null) {
                   buf.release();
               }
           }
       }
    
       protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                                  boolean preferDirect) throws Exception {
           if (preferDirect) {
               // 这里会判断是否有Unsafe类来决定使用DirectMemory还是HeapMemory
               return ctx.alloc().ioBuffer();
           } else {
               return ctx.alloc().heapBuffer();
           }
       }
       // 留给子类去实现的具体编码方法
       protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)throws Exception;
    

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web前端黑客技术揭秘

Web前端黑客技术揭秘

钟晨鸣、徐少培 / 电子工业出版社 / 2013-1 / 59.00元

Web前端的黑客攻防技术是一门非常新颖且有趣的黑客技术,主要包含Web前端安全的跨站脚本(XSS)、跨站请求伪造(CSRF)、界面操作劫持这三大类,涉及的知识点涵盖信任与信任关系、Cookie安全、Flash安全、DOM渲染、字符集、跨域、原生态攻击、高级钓鱼、蠕虫思想等,这些都是研究前端安全的人必备的知识点。本书作者深入剖析了许多经典的攻防技巧,并给出了许多独到的安全见解。 本书适合前端工......一起来看看 《Web前端黑客技术揭秘》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码