探究netty的观察者设计模式

栏目: IT技术 · 发布时间: 5年前 · 75

观察者的核心思想就是,在适当的时机回调观察者的指定动作函数

我们知道,在使用netty创建channel时,一般都是把这个channel设置成非阻塞的模式,这意味着什么呢? 意味着所有io操作一经调用,即刻返回

这让netty对io的吞吐量有了飞跃性的提升,但是异步编程相对于传统的串行化的编程模式来说,控制起来可太麻烦了

jdk提供了原生的Futrue接口,意为在未来任务,其实就是把任务封装起来交给新的线程执行,在这个线程执行任务的期间,我们的主线程可以腾出时间去做别的事情

下面的netty给出的实例代码,我们可以看到,任务线程有返回一个Futrue对象,这个对象中封装着任务执行的情况

  1. Copy * * void showSearch(final String target)
  2. * * throws InterruptedException {
  3. * * Future future
  4. * * = executor.submit(new Callable() {
  5. * * public String call() {
  6. * * return searcher.search(target);
  7. * * }});
  8. * * displayOtherThings(); // do other things while searching
  9. * * try {
  10. * * displayText(future.get()); // use future
  11. * * } catch (ExecutionException ex) { cleanup(); return; }
  12. * * }
  13. *

虽然jdk原生Futrue可以实现异步提交任务,并且返回了任务执行信息的Futrue,但是有一个致命的缺点,从futrue获取任务执行情况方法,是阻塞的,这是不被允许的,因为在netty中,一条channel可能关系着上千的客户端的链接,其中一个客户端的阻塞导致几千的客户端不可用是不被允许的,netty的Future设计成,继承jdk原生的future,而且进行扩展如下

  1. Copy// todo 这个接口继承了 java 并发包总的Futrue , 并在其基础上增加了很多方法
  2. // todo Future 表示对未来任务的封装
  3. public interface Future extends java.util.concurrent.Future {
  4. // todo 判断IO是否成功返回
  5. boolean isSuccess();
  6. // todo 判断是否是 cancel()方法取消
  7. boolean isCancellable();
  8. // todo 返回IO 操作失败的原因
  9. Throwable cause();
  10. /**
  11. * todo 使用了观察者设计模式, 给这个future添加监听器, 一旦Future 完成, listenner 立即被通知
  12. */
  13. Future addListener(GenericFutureListener> listener);
  14. // todo 添加多个listenner
  15. Future addListeners(GenericFutureListener>... listeners);
  16. Future removeListener(GenericFutureListener> listener);
  17. // todo 移除多个 listenner
  18. Future removeListeners(GenericFutureListener>... listeners);
  19. // todo sync(同步) 等待着 future 的完成, 并且,一旦future失败了,就会抛出 future 失败的原因
  20. // todo bind()是个异步操作,我们需要同步等待他执行成功
  21. Future sync() throws InterruptedException;
  22. // todo 不会被中断的 sync等待
  23. Future syncUninterruptibly();
  24. // todo 等待
  25. Future await() throws InterruptedException;
  26. Future awaitUninterruptibly();
  27. // todo 无阻塞的返回Future对象, 如果没有,返回null
  28. // todo 有时 future成功执行后返回值为null, 这是null就是成功的标识, 如 Runable就没有返回值, 因此文档建议还要 通过isDone() 判断一下真的完成了吗
  29. V getNow();
  30. @Override
  31. boolean cancel(boolean mayInterruptIfRunning);
  32. ...

netty的观察者模式

最常用的关于异步执行的方法writeAndFlush()就是典型的观察者的实现, 在netty中,当一个IO操作刚开始的时候,一个ChannelFutrue对象就会创建出来,此时,这个futrue对象既不是成功的,也不是失败的,更不是被取消的,因为这个IO操作还没有结束

如果我们想在IO操作结束后立刻执行其他的操作时,netty推荐我们使用addListenner()添加监听者的方法而不是使用await()阻塞式等待,使用监听者,我们就不用关系具体什么时候IO操作结束,只需要提供回调方法就可以,当IO操作结束后,方法会自动被回调

在netty中,一个IO操作是状态分为如下几种

  1. Copy * +---------------------------+
  2. * | Completed successfully |
  3. * +---------------------------+
  4. * +----> isDone() = true |
  5. * +--------- -----------------+ | | isSuccess() = true |
  6. * | Uncompleted | | +===========================+
  7. * +--------------------------+ | | Completed with failure |
  8. * | isDone() = false | | +---------------------------+
  9. * | isSuccess() = false |----+----> isDone() = true |
  10. * | isCancelled() = false | | | cause() = non-null 非空|
  11. * | cause() = null | | +===========================+
  12. * +--------------------------+ | | Completed by cancellation |
  13. * | +---------------------------+
  14. * +----> isDone() = true |
  15. * | isCancelled() = true |
  16. * +---------------------------+

源码追踪

对writeAndFlush的使用

  1. CopyChannelFuture channelFuture = ctx.writeAndFlush("from client : " + UUID.randomUUID());
  2. channelFuture.addListener(future->{
  3. if(future.isSuccess()){
  4. todo
  5. }else{
  6. todo
  7. }
  8. });

注意点: 我们使用writeAndFlush() 程序立即返回,随后我们使用返回的对象添加监听者,添加回调,这个时writeAndFlush()有可能已经完成了,也有可能没有完成,这是不确定的事

首先我们知道,writeAndFlush()是出站的动作,属于channelOutboundHandler,而且他是从pipeline的尾部开始传播的,源码如下:

  1. Copy@Override
  2. public final ChannelFuture writeAndFlush(Object msg) {
  3. return tail.writeAndFlush(msg);
  4. }

尾节点数据AbstractChannelHandlerContext类, 继续跟进查看源码如下:

  1. Copy@Override
  2. public final ChannelFuture writeAndFlush(Object msg) {
  3. return tail.writeAndFlush(msg);
  4. }
  5. @Override
  6. public ChannelPromise newPromise() {
  7. return new DefaultChannelPromise(channel(), executor());
  8. }

悄无声息的做了一个很重要的事情,创建了Promise,这个DefaultChannelPromise就是被观察者,过一会由它完成方法的回调

继续跟进writeAndFlush() ,源码如下, 我们可以看到promise被返回了, DefaultChannelPromiseChannelPromise的实现类,而ChannelPromise又继承了ChannelFuture,这也是为什么明明每次使用writeAndFlush()返回的都是ChannelFuture而我们这里却返回了DafaultChannelPromise

  1. Copy// todo 调用本类的 write(msg, true, promise)
  2. @Override
  3. public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
  4. if (msg == null) {
  5. throw new NullPointerException("msg");
  6. }
  7. if (isNotValidPromise(promise, true)) {
  8. ReferenceCountUtil.release(msg);
  9. return promise;
  10. }
  11. write(msg, true, promise);
  12. return promise;

在去目标地之前,先看一下addListenner()干了什么,我们进入到DefaultChannelPromise 源码如下:

  1. Copy@Override
  2. public ChannelPromise addListener(GenericFutureListener> listener) {
  3. super.addListener(listener);
  4. return this;
  5. }
  6. 随机进入它的父类 DefaultChannelPromise
  7. @Override
  8. public Promise addListener(GenericFutureListener> listener) {
  9. checkNotNull(listener, "listener");
  10. synchronized (this) {
  11. addListener0(listener);
  12. }
  13. if (isDone()) {
  14. notifyListeners();
  15. }
  16. return this;
  17. }

这个函数分两步进行

第一步: 为什么添加监听事件的方法需要同步?

在这种多线程并发执行的情况下,这个addListener0(listener);任意一个线程都能使用,存在同步添加的情况 这个动作不像将channel和EventLoop做的唯一绑定一样,没有任何必须使用inEventloop()去判断在哪个线程中,直接使用同步

接着进入addListener0(listener)

  1. Copyprivate void addListener0(GenericFutureListener> listener) {
  2. if (listeners == null) {
  3. listeners = listener; // todo 第一次添加直接在这里赋值
  4. } else if (listeners instanceof DefaultFutureListeners) {
  5. // todo 第三次添加调用这里
  6. ((DefaultFutureListeners) listeners).add(listener);
  7. } else {
  8. // todo 第二次添加来这里复制, 由这个 DefaultFutureListeners 存放观察者
  9. listeners = new DefaultFutureListeners((GenericFutureListener) listeners, listener);
  10. }
  11. }

第二步: 为什么接着判断isDone()

writeAndFlush()是异步执行的,而且在我们添加监听者的操作之前已经开始执行了,所以在添加完监听者之后,立即验证一把,有没有成功

思考一波:#

回顾writeAndFlush()的调用顺序,从tail开始传播两波事件,第一波write,紧接着第二波flush,一直传播到header,进入unsafe类中,由他完成把据写入jdk原生ByteBuffer的操作, 所以按理说,我们添加是listenner的回调就是在header的unsafe中完成的,这是我们的目标地

任何方法的回调都是提前设计好了的,就像pipeline中的handler中的方法的回调,就是通过遍历pipeline内部的链表实现的,这里的通知观察者,其实也是调用观察者的方法,而且他使用的一定是观察的父类及以上的引用实现的方法回调

回到我们的writeAndFlush()这个方法,在第二波事务传递完成,将数据真正写入jdk原生的ByteBuffer之前,只有进行的所有回调都是设置失败的状态,直到把数据安全发出后才可能是 回调成功的操作

此外,想要进行回调的操作,就得有被观察的对象的引用,所以一会我就回看到,Promise 一路被传递下去

我们进入的unsafe的write()就可以看到与回调相关的操作safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);,源码如下

  1. Copy@Override
  2. public final void write(Object msg, ChannelPromise promise) {
  3. assertEventLoop();
  4. ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
  5. if (outboundBuffer == null) { // todo 缓存 写进来的 buffer
  6. safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
  7. ReferenceCountUtil.release(msg);
  8. return;
  9. }

我们继续跟进本类方法safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);, 源码如下:

  1. Copyprotected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
  2. if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
  3. logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
  4. }
  5. }

其中重要的方法,就是回调 被观察者的 tryFailure(cause), 这个被观察者的类型是ChannelPromise, 我们去看它的实现,源码如下

  1. Copy@Override
  2. public boolean tryFailure(Throwable cause) {
  3. if (setFailure0(cause)) {
  4. notifyListeners();
  5. return true;
  6. }
  7. return false;
  8. }

调用本类方法notifyListeners()

继续跟进本类方法notifyListenersNow();

接着跟进本类方法notifyListener0(this, (GenericFutureListener) listeners);

继续l.operationComplete(future); 终于看到了调用了监听者的完成操作,实际上就是回调用户的方法,虽然是完成的,但是失败了


下面我们去flush()中去查看通知成功的回调过程, 方法的调用顺序如下

  1. Copyflush();
  2. flush0();
  3. doWrite(outboundBuffer);

在doWrite()方法中,就会使用自旋的方式往尝试把数据写出去, 数据被写出去后,有一个标识 done=true, 证明是成功写出了, 紧接着就是把当前的盛放ByteBuf的entry从链表上移除,源码出下

  1. Copyif (done) {
  2. // todo 跟进去
  3. in.remove();
  4. } else {

我们继续跟进remove(), 终于我们找到了成功回调的标志,在remove()的底端safeSuccess(promise);, 下一步就是用回调用户添加的监听者操作完成了,并且完成的状态是Success成功的

  1. Copypublic boolean remove() {
  2. // todo 获取当前的 Entry
  3. Entry e = flushedEntry;
  4. if (e == null) {
  5. clearNioBuffers();
  6. return false;
  7. }
  8. Object msg = e.msg;
  9. ChannelPromise promise = e.promise;
  10. int size = e.pendingSize;
  11. // todo 将当前的Entry进行移除
  12. removeEntry(e);
  13. if (!e.cancelled) {
  14. // only release message, notify and decrement if it was not canceled before.
  15. ReferenceCountUtil.safeRelease(msg);
  16. safeSuccess(promise);
  17. decrementPendingOutboundBytes(size, false, true);
  18. }
猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Linux内核设计与实现(原书第3版)

Linux内核设计与实现(原书第3版)

Robert Love / 陈莉君、康华 / 机械工业出版社华章公司 / 2011-4-30 / 69.00元

《Linux内核设计与实现(原书第3版)》详细描述了Linux内核的设计与实现。内核代码的编写者、开发者以及程序开发人员都可以通过阅读本书受益,他们可以更好理解操作系统原理,并将其应用在自己的编码中以提高效率和生产率。 《Linux内核设计与实现(原书第3版)》详细描述了Linux内核的主要子系统和特点,包括Linux内核的设计、实现和接口。从理论到实践涵盖了Linux内核的方方面面,可以满......一起来看看 《Linux内核设计与实现(原书第3版)》 这本书的介绍吧!

URL 编码/解码

URL 编码/解码

URL 编码/解码

Markdown 在线编辑器

Markdown 在线编辑器

Markdown 在线编辑器

RGB CMYK 转换工具

RGB CMYK 转换工具

RGB CMYK 互转工具