netty的Future异步回调难理解?手写个带回调异步框架就懂了

栏目: IT技术 · 发布时间: 4年前

内容简介:netty是一个经典的网络框架,提供了基于NIO、AIO的方式来完成少量线程支持海量用户请求连接的模型。netty里面充斥了大量的非阻塞回调模式,主要是靠Future/Promise异步模型来实现的。Future是java.util.concurrent.Future,是Java提供的接口,可以用来做异步执行的状态获取,它避免了异步任务在调用者那里阻塞等待,而是让调用者可以迅速得到一个Future对象,后续可以通过Future的方法来获取执行结果。Jdk的Future不便之处

netty是一个经典的网络框架,提供了基于NIO、AIO的方式来完成少量线程支持海量用户请求连接的模型。netty里面充斥了大量的非阻塞回调模式,主要是靠Future/Promise异步模型来实现的。

Future是java.util.concurrent.Future,是 Java 提供的接口,可以用来做异步执行的状态获取,它避免了异步任务在调用者那里阻塞等待,而是让调用者可以迅速得到一个Future对象,后续可以通过Future的方法来获取执行结果。

Jdk的Future不便之处

Java的Future有一个比较尴尬的问题,就是当你想获取异步执行结果时,要通过future.get()方法,这一步还是阻塞的!而且我们无法确定到底异步任务何时执行完毕,提前get了,就还是阻塞,get晚了,可能会漏掉执行结果,写个死循环,不停去轮询是否执行完毕,又浪费资源。所以,这个Future并不好用。

先来看一下Java的future使用:

import java.util.concurrent.*;

/**

* @author wuweifeng wrote on 2019-12-10

* @version 1.0

*/

public class Test {

public static void main(String[] args) throws ExecutionException, InterruptedException {

//创建线程池

ExecutorService executor = Executors.newCachedThreadPool();

Future future = executor.submit(new Task());

//这一步get会阻塞当前线程

System.out.println(future.get());

executor.shutdown();

}

private static class Task implements Callable<Integer> {

@Override

public Integer call() throws Exception {

System.out.println("子线程在进行计算");

Thread.sleep(2000);

return 1;

}

}

}

代码很简单,就是将一个Runnable、Callable的实例放到一个线程池里,就会返回一个Future对象。后续通过future.get()取得执行结果,但事实上代码并没有达到异步回调的结果,而是get时阻塞了。

Netty future无法单独抽出来使用

理想状态其实是netty的future,可以添加Listener,当异步任务执行完毕后,主动回调一下自己就可以了,不必在那苦等get()方法的执行结果。

netty的Future异步回调难理解?手写个带回调异步框架就懂了

看一个netty的回调的小例子:

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group).channel(NioSocketChannel.class)

.remoteAddress(host, port)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

.addLast(new StringDecoder())

.addLast(new StringEncoder())

//10秒没消息时,就发心跳包过去

.addLast(new IdleStateHandler(10, 0, 0), new NettyClientHandler())

;

}

});

ChannelFuture channelFuture = bootstrap.connect().sync().addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture channelFuture) throws Exception {

//do your job

}

});

我们可以理解为bootstrap.connect这一步是一个耗时操作,我不想在那等待它执行完毕,而是希望它执行完毕后主动给我一个回调即可。所以,在connect后面有个addListener,当connect完成后,会回调operationComplete方法。

可以看到netty的这种回调方式比较优雅,不像java的future那样需要阻塞get。整个netty里面大量充斥着类似的回调,但是如果我们要用,仅仅是针对一个或多个异步任务,希望能有个类似的回调,netty就帮不上忙了。

打开netty的源码,想搞明白future、promise的逻辑,一眼看去,心情是这样的:

netty的Future异步回调难理解?手写个带回调异步框架就懂了

如何实现一个简单带回调异步任务

netty是为特定的场景设计的,里面的各种逻辑也是为了服务于netty本身。 当看不懂,或难以理解它的工作逻辑时,我们可以考虑自己实现一个对任意异步线程进行回调的框架。

首先我们来拆分一下需求,我有N个耗时任务,可能是一次网络请求,可能是一个耗时文件IO,可能是一堆复杂的逻辑,我在主线程里发起这个任务的调用,但不希望它阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。

据此,我们拆分出几个角色,master主线程,调度器(发起异步调用),worker(异步工作线程)。然后就是将他们组合起来,完成各种异步回调,以及每个worker的正常、异常、超时等的回调。

下面来看一下worker的定义:

/**

* @author wuweifeng wrote on 2019-12-13

* @version 1.0

*/

public interface Worker {

String action(Object object);

}

一个worker,它需要有个方法,来代表这个worker将来做什么,action就可以理解为一个耗时任务。action可以接收一个参数。

再看一下回调器的定义:

/**

* @author wuweifeng wrote on 2019-12-13

* @version 1.0

*/

public interface Listener {

void result(Object result);

}

这个listener用来做为回调,将worker的执行结果,放到result的参数里。

此外,我们还需要一个包装器Wrapper,来将worker和回调器包装一下。

public class Wrapper {

private Object param;

private Worker worker;

private Listener listener;

public Object getParam() {

return param;

}

public void setParam(Object param) {

this.param = param;

}

public Worker getWorker() {

return worker;

}

public void setWorker(Worker worker) {

this.worker = worker;

}

public Listener getListener() {

return listener;

}

public void addListener(Listener listener) {

this.listener = listener;

}

}

OK,下面就是主逻辑了。

/**

* @author wuweifeng wrote on 2019-12-13

* @version 1.0

*/

public class Bootstrap {

public static void main(String[] args) {

Bootstrap bootstrap = new Bootstrap();

Worker worker = bootstrap.newWorker();

Wrapper wrapper = new Wrapper();

wrapper.setWorker(worker);

wrapper.setParam("hello");

bootstrap.doWork(wrapper).addListener(new Listener() {

@Override

public void result(Object result) {

System.out.println(Thread.currentThread().getName());

System.out.println(result);

}

});

System.out.println(Thread.currentThread().getName());

}

private Wrapper doWork(Wrapper wrapper) {

new Thread(() -> {

Worker worker = wrapper.getWorker();

String result = worker.action(wrapper.getParam());

wrapper.getListener().result(result);

}).start();

return wrapper;

}

private Worker newWorker() {

return new Worker() {

@Override

public String action(Object object) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

return object + " world";

}

};

}

}

执行结果如下:

netty的Future异步回调难理解?手写个带回调异步框架就懂了

可以看到主线程没有被耗时的线程阻塞掉,耗时线程在执行完毕后,进行了回调。

这就是一个简单的设计模式——“监听器模式”,再来认识一下这种 设计模式 的三个要素:事件源(被监听的对象)、事件对象(事件完毕这个动作)、监听器(我们的Listener)。

完成了这样的小demo,立马从netty的复杂中恢复了过来,心情变成了这样:

netty的Future异步回调难理解?手写个带回调异步框架就懂了

实现一个简单带回调、超时异步任务

public class BootstrapNew {

public static void main(String[] args) {

BootstrapNew bootstrap = new BootstrapNew();

Worker worker = bootstrap.newWorker();

Wrapper wrapper = new Wrapper();

wrapper.setWorker(worker);

wrapper.setParam("hello");

//添加结果回调器

wrapper.addListener(new Listener() {

@Override

public void result(Object result) {

System.out.println(result);

}

});

CompletableFuture future = CompletableFuture.supplyAsync(() -> bootstrap.doWork(wrapper));

try {

future.get(800, TimeUnit.MILLISECONDS);

} catch (InterruptedException | TimeoutException | ExecutionException e) {

//超时了

wrapper.getListener().result("time out exception");

}

}

private Wrapper doWork(Wrapper wrapper) {

Worker worker = wrapper.getWorker();

String result = worker.action(wrapper.getParam());

wrapper.getListener().result(result);

return wrapper;

}

private Worker newWorker() {

return new Worker() {

@Override

public String action(Object object) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

return object + " world";

}

};

}

}

执行结果如下:

netty的Future异步回调难理解?手写个带回调异步框架就懂了

这就是一个简单的带超时回调的小demo了。

更复杂的场景

更复杂的场景,要有超时控制,要支持N个线程并行、串行、串并行结合。

上面的demo过于简单,也不能实战于复杂的业务场景。那么需求来了,希望有这样一个并发框架:

以下的执行单元就是worker,可以理解为一个任务,一段耗时代码。

1、提供任何形式的串行、并行执行单元的组合。如a、b、c的串行,a、b的串行同时与c并行,a、b、c的并行。

所以这一组执行单元可能长如下的样子:

netty的Future异步回调难理解?手写个带回调异步框架就懂了

2、为每个执行单元提供执行成功、失败、超时、异常的回调。

方便对整个流程的执行进行控制,当有很多个执行单元时,我们会非常关注每一个执行单元的执行状况,而不仅仅是全部执行完毕后的结果汇总。

3、支持为单个执行单元设置超时和失败后的默认值。

有了默认值,可以进一步减少bug产生的概率。

4、支持下一个执行单元获取上一个执行单元的返回值(计算结果),作为自己的入参。

譬如a-b-c串行,可以在任务编排时,就让b的入参为a的执行结果,即便此时各任务都还未执行

5、支持为整个group(多个任意组合的执行单元)设置超时时间。单个执行单元失败,不影响其他单元的回调和最终结果获取。

防止整个流程无限时的执行下去,要给它设置超时的阈值。

6、执行顺序的强依赖和弱依赖。

如上图3,A和B并发执行,最后是C。

有些场景下,我们希望A和B都执行完毕后,才能执行C,CompletableFuture里有个allOf(futures...).then()方法可以做到。

有些场景下,我们希望A或者B任何一个执行完毕,就执行C,CompletableFuture里有个anyOf(futures...).then()方法可以做到。

那么,可以指定依赖的任务是否must执行完毕。如果依赖的是must要执行的,那么就一定会等待所有的must依赖项全执行完毕,才执行自己。

如果依赖的都不是must,那么就可以任意一个依赖项执行完毕,就可以执行自己了。

7、支持每个group独享线程池,或所有group共享线程池。

交给调用者来决定将这组任务,放到共享线程池,还是独享线程池。如果你熟悉hystrix的话,应该明白线程池隔离的重要性。

8、更少的线程数,更高的性能表现。

整体上要实现以上所有还是有点麻烦的,这里我挑一个图3为例,简单描述一下实现方式。

netty的Future异步回调难理解?手写个带回调异步框架就懂了

执行A比较简单,直接在主线程里执行它,或者新开线程执行它都可以,主要是A执行完毕后,当发现自己的nextWrappers有多个(即自己后面有多个执行单元)时,该怎么办。

还好,CompleteableFuture提供了allOf这个方法,它可以让你传入多个future,并且能够等待这多个future都完成时再统一返回。见下面代码。

CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];

for (int i = 0; i < nextWrappers.size(); i++) {

int finalI = i;

futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)

.work(poolExecutor, WorkerWrapper.this, remainTime - costTime), poolExecutor);

}

try {

CompletableFuture.allOf(futures).get();

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

其他的场景实现细节可以后续看代码或者联系作者。

框架具备了上面的功能后,既可以回调,也能同步返回结果,还能为一组任务配置超时时间。任务失败了、超时了,还会返回设定的默认值。

在业务中就可以将框架应用于如下的一些场景了:

1、客户端请求服务端接口,该接口需要调用其他N个微服务的接口。譬如 请求我的订单,那么就需要去调用用户的rpc、商品详情的rpc、库存rpc、优惠券等等好多个服务。同时,这些服务还有相互依赖关系,譬如必须先拿到用户的某个字段后,再去某rpc服务请求数据。最终全部获取完毕后,或超时了,就汇总结果,返回给客户端。

2、任务是工作流性质的,希望一次编排后,就不用管它了,让它按照规则执行,直至成功或失败。譬如,数据清洗时经常有类似场景,从多个数据源拉取数据,各种合并组合,最后清洗完毕后结束。

3、爬虫的相关场景。

目前,基于线程池和CompletableFuture已经完成了这样的一个并发框架,由于代码较多,不便于贴在文章内,有需要的可以联系 wuweifeng10@jd.com .

后续会推出框架的测试数据、性能对比、使用场景指南等,敬请留意。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Game Engine Architecture, Second Edition

Game Engine Architecture, Second Edition

Jason Gregory / A K Peters/CRC Press / 2014-8-15 / USD 69.95

A 2010 CHOICE outstanding academic title, this updated book covers the theory and practice of game engine software development. It explains practical concepts and techniques used by real game studios,......一起来看看 《Game Engine Architecture, Second Edition》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具