Skip to content

反应式编程概览(中文版)

Zhongyang Ma edited this page Nov 8, 2020 · 53 revisions

Reactive Programming 概览

作 者: 马 仲 阳


新版本工具之间关系的梳理

Reactive Programming 在过去早已有之,并不是什么新鲜事物。但是在最近几年,它似乎有着越来越流行的趋势。近期,Java 技术圈围绕着 Reactive Programming 这一主题,推出了许许多多的新版本工具,让人感到眼花缭乱。本文首先选择其中的几项重点更新内容,梳理一下它们之间的关系。

Spring Boot 2.0

Spring Boot 2.0 are now offering first-class support for developing reactive applications, via auto-configuration and starter-POMs[1]. 一如既往体现着 Spring Boot 配置简便的特点,只需几处简单配置就可以开发 reactive 应用了。

围绕 reactive 主题,主要的更新点有:

  • 基于 Spring Framework 5(包括新模块:WebFlux)构建
  • 集成 Netty 作为默认的 web 服务器,支持 reactive 应用
  • WebFlux 默认运行在 Netty 上

Spring Framework 5

New spring-webflux module, an alternative to spring-webmvc built on a reactive foundation, intended for use in an event-loop execution model[2]. 最重要的更新是新增了 WebFlux 模块,支持基于事件循环的执行模型。

主要的更新点有:

  • 依赖:最低 Java 8,支持 Java 9
  • 提供许多支持 reactive 的基础设施
  • 提供面向 Netty 等运行时环境的适配器
  • 新增 WebFlux 模块(集成的是Reactor 3.x)

Java 9 Reactive Stream

在 Java 8 时代,Reactive Stream API 就已经存在,只不过那时它是单独的一个jar包,可用maven引入:

<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
</dependency>

而在 Java 9 时代,Reactive Stream 被正式集成到了 Java 的 API 中。In Java 9, Reactive Streams is officially part of the Java API[3].

主要的更新点有:

  • 提供 Reactive Stream API(java.util.concurrent.Flow)[4]

Java 9 的 Reactive Stream API 只是一套接口,约定了 Reactive 编程的一套规范,并没有具体的实现。而实现了这个接口的产品有:RxJava、Reactor、akka 等,而 Spring WebFlux 中集成的是 Reactor 3.x。

所以目前 Spring Framework 5.x 提供了两大开发栈:

  1. Spring WebFlux:基于 Reactive Stream API,需要运行在 servlet 3.1+ 容器(Tomcat 8)或 Netty 上,这些容器支持 NIO、Reactor 模式
  2. Spring MVC:基于传统的 Servlet API,运行在传统的 Servlet 容器上,one-request-per-thread,同步阻塞 IO

下图为两大开发栈的对比:

注意:目前 JDBC 仍然不支持 Reactive 异步调用模式,如果在代码里调用 MySQL 数据库,线程仍会阻塞等待数据返回。

什么是 Reactive Programming

想要搞清楚 Reactive 这种编程范式到底是什么,我们得先回顾 IO 的历史,需要了解一下过去发生了什么,基于什么原因产生了这种编程范式。我们首先从几个基本的 IO 模型说起。

Linux 五种 IO 模型

(1) 阻塞 IO (Blocking IO)
(2) 非阻塞 IO (Non-blocking IO)
(3) 多路复用 IO (IO multiplexing)
(4) 信号驱动 IO (signal driven I/O (SIGIO))
(5) 异步 IO (asynchronous I/O)
前四种都是同步的,只有最后一种才是异步 IO。

阻塞 IO (Blocking IO)

如图所示,IO 过程分为 (1)内核数据准备 和 (2)数据从内核空间拷贝到用户空间 这两个阶段。在用户线程发起 IO 调用后,用户线程在这两个阶段全程等待,直到有结果返回。这种 IO 就是阻塞 IO 模型。

非阻塞 IO (Non-blocking IO)

非阻塞 IO 模型下,在用户线程发起 IO 调用后,在内核数据还未准备好时, IO 函数可立即返回。用户线程通过多次调用 IO 函数来检查内核数据是否准备好(轮询),当检查到数据准备好后,执行第二阶段,将数据从内核空间拷贝到用户空间,这一阶段用户线程是阻塞的。

多路复用 IO (IO multiplexing)

用户线程通过 select、poll、epoll 等函数 阻塞式地监听多路 IO 事件,直到有一个或多个 IO 操作处于就绪状态(有数据可读或可写)时返回,开始执行 I/O 操作。多路复用模型的特点是可以通过一个线程处理多路 IO 请求。

信号驱动 IO (signal driven I/O (SIGIO))

用户线程发起 IO 操作后,可立即返回不阻塞。等到内核数据准备完成后,通过信号函数通知用户线程,执行第二阶段数据拷贝操作。

异步 IO (asynchronous I/O)

异步 IO 的特点是用户线程发起 IO 请求后立即返回,在 IO 操作的两个阶段全程都不阻塞,不需要用户线程参与,直到最后数据拷贝完成后,内核发送信号通知用户线程处理完毕。

五种 IO 模型的比较

五种 IO 模型的比较如图所示[5]。

Java 中的四种 IO 模型

Unix 中的五种 IO 模型,除信号驱动模型外,Java 对其它四种 IO 模型都有支持[6][7]。
(1). 其中 Java 最早提供的 blocking IO 即是阻塞 IO
(2). 而 NIO 即是非阻塞 IO
(3). 同时 NIO 中的 Reactor 模式即是 IO 多路复用模型的实现
(4). 通过 AIO 实现的 Proactor 模式即是异步 IO 模型的实现

Java 中传统的 IO 都是阻塞 IO,比如通过 socket 来读数据,调用 read 方法之后,如果数据没有就绪,当前线程就会一直阻塞在 read 方法调用那里,直到有数据才返回。因此在传统的网络服务设计模式中,比较经典的模式是多线程或线程池。当一条线程正在处理一个 client 的请求时,它是阻塞的状态,这时如果又来了一个 client 的请求,只好再启动一个新的线程去服务它,一个线程的阻塞不会影响其他线程工作。这种服务方式虽然看起来简便,但服务器为每个 client 都启动一个线程,资源消耗非常大。线程池的方式使得线程可以重复使用,在一定程度上减少了创建和销毁线程的系统开销。

多线程(或线程池)模式适用于大部分的连接为短连接的情景。如果大部分的连接都为长连接,而同一时刻只有少数的连接中存在 IO 操作,那么为每一个连接安排一个线程的服务方式是非常浪费的[8][9]。因此便出现了如下的高性能 IO 模型——采用 Reactor 模式的 Java NIO 模型,这也是 Reactive Programming 中最为重要的概念。

Java NIO 模型

先来了解一下 Java NIO 中的几个重要概念:Channel(通道)、Buffer(缓冲区)、Selector(选择器)。

Channel(通道)

Channel 和传统 IO 中的 Stream 流 很相似。只不过 Stream 是单向的,要么是 InputStream 只能进行读,要么是 OutputStream 只能进行写。而 Channel 是双向的。

以下是常用的几种通道:

  • FileChannel - 可以向文件读写数据
  • SocketChanel - 以 TCP 来向网络连接的两端读写数据
  • ServerSocketChanel - 能够监听客户端发起的 TCP 连接,并为每个 TCP 连接创建一个新的 SocketChannel
  • DatagramChannel - 以 UDP 协议来向网络连接的两端读写数据

Buffer(缓冲区)

Buffer,故名思意,缓冲区,实际上是一个容器,是一个连续数组。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer。

上图描述了客户端向服务端发送数据,然后服务端接收数据的全部过程。客户端发送数据时,必须先将数据存入 Buffer 中,然后将 Buffer 中的内容写入通道。服务端这边接收数据必须通过 Channel 将数据读入到 Buffer 中,然后再从 Buffer 中取出数据来处理。

Selector(选择器)

Selector 类是 NIO 的核心类,Selector 能够检测多个注册的通道 Channel 上是否有事件发生,如果有,便获取事件然后针对每个事件进行相应的处理。这样一来,只用一个单线程就可以管理多个通道,也就是管理多个连接。这样使得只有在连接真正有读写事件发生时,才会调用函数来进行读写。这种方式不必为每个连接都创建一个线程,不用去维护多个线程,大大地减少了系统开销。

与 Selector 有关的一个关键类是 SelectionKey,一个 SelectionKey 表示一个到达的事件,这2个类构成了服务端处理业务的关键逻辑[10]。

下图表示单线程运行的 Selector 同时管理多个通道 (连接)。

Selector 的创建和可以设置监听的事件如下[11]:

Selector selector = Selector.open();  // 创建 Selector
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);   // 将通道注册到 Selector 上
register 的第二个参数指明监听哪些事件,可选的值有:
Connect - 连接就绪
Accept - 接收就绪
Read - 读就绪
Write - 写就绪
与之对应的检测 Channel 中什么事件或操作已经就绪的函数为:
selectionKey.isConnectable();  // 是否连接就绪
selectionKey.isAcceptable();    // 是否接收就绪
selectionKey.isReadable();      // 是否读就绪
selectionKey.isWritable();        // 是否写就绪

Java NIO 代码示例

下面结合示例代码,进一步理解 Java NIO 中的各项概念[12][13]。

// 创建 Selector
Selector selector = Selector.open();
// 创建 ServerSocketChannel 并绑定到指定端口
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("127.0.0.1", 1234));
// 设置 ServerSocketChannel 为 non-blocking
server.configureBlocking(false);
// 将 server channel 注册到 selector 并设置监听 OP_ACCEPT 事件
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
    // selector 被 select() 阻塞
    // select() 会把注册的事件添加到 SelectionKeys (只增不减)
    if (selector.select() == 0) {
        continue;
    }
    // 获得 SelectionKey 集合, 每一个 Selectionkey 对应着一个已注册的 Channel
    Set<SelectionKey> keys = selector.selectedKeys();
    for (SelectionKey key : keys) {
        if (key.isAcceptable()) {
	    // 获取与 client 相连的 SocketChannel
	    SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
	    // 同样设置为 non-blocking
	    channel.configureBlocking(false);
            // 这里可以向 client 发送信息
            channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息!").getBytes()));  
            // 将此 channel 的 OP_READ 事件注册到 selector
            channel.register(this.selector, SelectionKey.OP_READ);  
	}
	// 如果 channel 可读
	if (key.isReadable()) {
	    SocketChannel channel = (SocketChannel) key.channel();
	    // 创建读缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(10);  
            channel.read(buffer);
	}
	// 由于 select() 对 SelectionKey 集合只增不减 这里需手动移除 key
	keys.remove(key);
    }
}

Reactor 模式

上面的代码展示的其实就是最简易的 Reactor 模式。下面我们看看 Doug Lea 大爷对 Reactor 模式的抽象、概括和总结。他把 Reactor 模式分为三种类型,分别是:Basic version, Multi-threaded version 和 Multi-reactors version[14][15]。

Basic version (Single threaded)

如图所示为 Reactor 模式的单线程版本,所有的 I/O 操作都在同一个 NIO 线程上面完成。
NIO 线程即是 Reactor、Acceptor 负责监听多路 socket 并 Accept 新连接,又负责分派、处理请求。对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适。实际当中基本不会采用单线程模型。

Multi-threaded version

图为 Reactor 模式的多线程版本。
多线程模型中有一个专门的线程负责监听和处理所有的客户端连接,网络 IO 操作由一个 NIO 线程池负责,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送。1个 NIO 线程可以同时处理 N 条链路,但是1个链路只对应1个 NIO 线程,防止发生并发操作问题。
在绝大多数场景下,Reactor 多线程模型都可以满足性能需求。但是在更高并发连接的场景(如百万客户端并发连接),它的性能似乎不能满足需求,于是便出现了下面的多 Reactor(主从 Reactor)模型。

Multi-reactors version

如图所示为主从 Reactor 模型。
这种模型是将 Reactor 分成两部分,mainReactor 负责监听 server socket、accept 新连接,并将建立的 socket 分派给 subReactor;subReactor 负责多路分离已连接的 socket,读写网络数据;而对业务处理的功能,交给 worker 线程池来完成。

Reactive Programming 的核心概念

在前文相关信息的基础上,这一部分我们将进一步抽象、总结一下 Reactive Programming 的一些核心概念[16][17]。

Reactive Programming 和 Reactive Streams

Reactive programming 是一种编程范式,它已经存在很久了,并不是什么新的东西。就像 面向过程编程、面向对象编程、函数式编程等,它只是另外一种编程范式。

而 Reactive Streams 指的是一套规范,对于 Java 开发者来讲,Reactive Streams 具体来说就是一套 API。Reactive Streams 给我们提供了一套通用的 API,让我们可以使用 Java 进行 Reactive Programming。

Reactive Programming 的基本要素

响应式宣言(Reactive Manifesto)[16] 描述了响应式系统(reactive systems)应该具备的四个关键属性:Responsive(灵敏的)、Resilient(可故障恢复的)、Elastic(可伸缩的)、Message Driven(消息驱动的)。

  • Responsive(灵敏的):只要有可能,系统就会及时响应。灵敏性是系统可用性的基石,除此之外,灵敏性也意味着系统的问题可以被快速地探测和解决。具有灵敏性的系统关注做出快速和一致的响应,提供可靠和一致的服务质量。
  • Resilient(可故障恢复的):在出现故障时,系统仍然可以保持响应。一个不具可恢复性的系统一旦出现故障,就会变得无法正常响应。可恢复性可以通过复制、围控、隔离和委派等方式实现。在可恢复性的系统中,故障被包含在每个组件中,各组件之间相互隔离,从而允许系统的某些部分出故障并且在不连累整个系统的前提下进行恢复。
  • Elastic(可伸缩的):在不同的工作负载下,系统保持响应。系统可以根据输入的工作负载,动态地增加或减少系统使用的资源。这意味着系统在设计上可以通过分片、复制等途径来动态申请系统资源并进行负载均衡,从而去中心化,避免节点瓶颈。
  • Message Driven(消息驱动的):响应式系统依赖异步消息传递机制,从而在组件之间建立边界,这些边界可以保证组件之间的松耦合、隔离性、位置透明性,还提供了以消息的形式把故障委派出去的手段。

前三种属性(Responsive, Resilient, Elastic)更多的是跟你的架构选型有关,我们可以很容易理解像 microservices、Docker 和 Kubernetes 这样的技术对建立响应式系统的重要性。作为系统的开发者,我们最感兴趣的还是最后一点 Message Driven。Reactive Manifesto 提到了 Message Driven 的三个主要方面:failures at messages, back-pressure, and non-blocking。

Failures at messages:在 Reactive 编程中,我们通常需要处理流式的信息,我们最不希望看到的是突然抛出一个异常,然后处理过程终止了。理想的解决办法是我们记下这个错误,然后开始执行某种重试或恢复的逻辑。在 Reactive Streams 中,异常是一等公民,异常不会被粗鲁地抛出,错误处理是正式建立在 Reactive Streams API 规范之内的。

Back-pressure:有没有听过这个谚语:“Drinking from the firehose”?John Thompson 在他的 What Are Reactive Streams in Java? [17] 一文中给出了非常生动的比喻,请看下图。

Back-pressure,中文一般翻译成“背压”、“回压”,意思是当消费端的消费能力跟不上生产端的生产速度时,消息流下游的消费方对上游的生产方说:“我喝饱了,请你慢点”。在 Reactive 的世界里,我们希望下游的消费方可以有某种机制按需请求一定数量的消息来消费(这类似消息队列中的 pull 的概念)。而不是上游把大量的消息一股脑灌给下游消费方,然后阻塞式等待,throttling(节流) is done programmatically rather than blocking threads。

Non-blocking:非阻塞对于一个响应式系统的重要性,也许是我们开发者最关心的一点了。John Thompson 用 Node.js Server 来举例,用它来和传统的 Java 多线程服务相对比。

Node.js Server 有着一个非阻塞的事件循环,访问请求以一种非阻塞的方式被处理,线程不会因为等待别的处理过程而卡住。

与之相对的是典型的 Java 多线程服务,并发性依靠多线程来获得。

John Thompson 把这两种方式比喻成了超级高速公路和有着许多红绿灯的城市道路:“With a single thread event loop, your process is cruising quickly along on a super highway. In a Multi-threaded server, your process is stuck on city streets in stop and go traffic.”

Reactive Stream API 介绍

让我们来简要看一下 Reactive Stream API。它只提供了四个接口。

Publisher:是元素(消息)序列的提供者,根据它的订阅者的需求,来发布这些元素(消息)。
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
Subscriber:当通过 Publisher.subscribe(Subscriber) 注册后,它将通过 Subscriber.onSubscribe(Subscription) 来接收消息。
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
Subscription:代表了消息从 PublisherSubscriber的生命周期。
public interface Subscription {
    public void request(long n);
    public void cancel();
}
Processor:继承了 PublisherSubscriber,用于转换发布者到订阅者之间管道中的元素。Processor<T,R> 订阅类型为 T 的数据元素,接收并转换为类型为 R 的数据,然后发布变换后的数据。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

下图显示了发布者和订阅者之间的典型交互顺序。

Spring Boot 2.0 构建 WebFlux 应用

Reactor 框架简介

Reactive Streams API 只是一套接口,并没有具体的实现。Reactor 项目是它的具体实现之一,并且也是 Spring Framework 5 中新增模块 WebFlux 默认集成的框架,Spring Framework 5 的响应式编程模型主要依赖 Reactor [18][19]。下面我们来简要介绍一下 Reactor 框架。

Reactor 有两种模型,Flux 和 Mono,提供了非阻塞、支持回压机制的异步流处理能力。当数据生成缓慢时,整个流自然进入推送模式;而当生产高峰来临数据生产速度加快时,整个流又进入了拉取模式。Flux 可以触发 0 到多个事件,用于异步地处理流式的信息;Mono 至多可以触发一个事件,通常用于在异步任务完成时发出通知。Flux 和 Mono 之间可以相互转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象,把两个 Mono 序列合并在一起,得到的是一个 Flux 对象[20][21]。

创建 Flux 序列的方式

通过 Flux 类的静态方法,例如:
List<String> words = Arrays.asList("aa","bb","cc","dd");
Flux<String> listWords = Flux.fromIterable(words);    //从集合获取
Flux<String> justWords = Flux.just("Hello","World");  //指定序列中包含的全部元素
listWords.subscribe(System.out::println);
justWords.subscribe(System.out::println);
使用 generate() 方法生成 Flux 序列:
generate() 方法通过同步和逐的方式来产生 Flux 序列,通过调用 SynchronousSink 对象的 next(),complete() 和 error(Throwable) 方法来完成。
Flux.generate(sink -> {
    sink.next("Hello");   //通过 next()方法产生一个简单的值,至多调用一次
    sink.complete();      //然后通过 complete()方法来结束该序列
}).subscribe(System.out::println);
使用 create() 方法生成 Flux 序列:
与 generate() 方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在次调用中产生多个元素。
Flux.create(sink -> {
    for (int i = 0; i < 10; i++) {
        sink.next(i);
    }
    sink.complete();
}).subscribe(System.out::println);

创建 Mono 的方式

// Mono 的创建方式与 Flux 类似。除了相同的几个静态方法之外,Mono 还有一些独有的静态方法。例如:
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

流的操作函数

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);  // 只输出满足filter条件的元素
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);  // zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。
// reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
etc.

消息处理

当需要处理 FluxMono 中的消息时,可以通过 subscribe 方法来添加相应的订阅逻辑。例如:
通过 subscribe()方法处理正常和错误消息:
Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .subscribe(System.out::println, System.err::println);

出现错误时返回默认值0Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);

出现错误时使用另外的流:
Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .switchOnError(Mono.just(0))
        .subscribe(System.out::println);

使用 retry 操作符进行重试:
Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);

以上只是简略地列举了一些 Reactor 的操作函数,更详细的介绍请见官方文档[22]。

WebFlux 模块简介

WebFlux 是 Spring Framework 5 的一个新模块,包含了响应式 HTTP 和 WebSocket 的支持,另外在上层服务端支持两种不同的编程模型:第一种是 Spring MVC 中使用的基于 Java 注解的方式,第二种是基于 Java 8 的 lambda 表达式的函数式编程模型。这两种编程模型只是在代码编写方式上存在不同,它们运行在同样的反应式底层架构之上,因此在运行时是相同的。

代码示例

1. 首先,引入 parent :
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

2. 引入 WebFlux 依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3. 入口文件跟普通的 Spring Boot 项目样:
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

4. 创建 RouterFunction 风格的路由:
@Configuration
public class Routes {
    @Bean
    public RouterFunction<?> routerFunction() {
        return route(
            GET("/api/city").and(accept(MediaType.APPLICATION_JSON)), cityService:: findAllCity
        ).and(route(
            GET("/api/user/{id}").and(accept(MediaType.APPLICATION_JSON)), cityService:: findCityById)
        );
    }
}

5. 创建 Spring MVC 注解风格的路由:
@RestController
public class ReactiveController {
    @GetMapping("/hello_world")
    public Mono<String> sayHelloWorld() {
        return Mono.just("Hello World");
    }
}

Reactive Controller 操作的是异步的 ServerHttpRequest 和 ServerHttpResponse,而不再是 Spring MVC 里的 HttpServletRequest 和 HttpServletResponse。Mono 和 Flux 是异步的,当流中的数据没有就绪时,方法也能立即返回(返回的是对象引用)。当数据就绪后,web server 会扫描到这个就绪事件。[23][24]

更加详细完整的示例代码请见: https://github.com/ZhongyangMA/webflux-streaming-demo

参考文献

[1] Spring Boot 2.0 Release Notes: https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.0-Release-Notes
[2] What's New in Spring Framework 5.x: https://github.com/spring-projects/spring-framework/wiki/What%27s-New-in-Spring-Framework-5.x
[3] Reactive Programming vs. Reactive stream: https://www.tuicool.com/articles/UZfMB3M
[4] Reactive Programming with JDK 9 Flow API: https://community.oracle.com/docs/DOC-1006738
[5] Socket IO 模型: https://blog.csdn.net/qq_23100787/article/details/60751795
[6] Java I/O 模型从 BIO 到 NIO 和 Reactor 模式: http://www.jasongj.com/java/nio_reactor/
[7] Java NIO、BIO、AIO 全面剖析: https://blog.csdn.net/jiaomingliang/article/details/47684713
[8] Java NIO:浅析I/O模型: http://www.cnblogs.com/dolphin0520/p/3916526.html
[9] Java IO:BIO和NIO区别及各自应用场景: https://blog.csdn.net/jiyiqinlovexx/article/details/51204726
[10] Java NIO:NIO概述: http://www.cnblogs.com/dolphin0520/p/3919162.html
[11] JAVA nio selector 入门: http://www.open-open.com/lib/view/open1452776436495.html
[12] Java NIO Socket通信: http://www.open-open.com/lib/view/open1343002247880.html
[13] Java NIO 代码示例: https://blog.csdn.net/u013967628/article/details/77841327
[14] Scalable IO in Java (Doug Lea): http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
[15] Netty(RPC高性能之道)原理剖析: https://blog.csdn.net/zhiguozhu/article/details/50517551
[16] The Reactive Manifesto: https://www.reactivemanifesto.org/
[17] What Are Reactive Streams in Java: https://dzone.com/articles/what-are-reactive-streams-in-java
[18] Project Reactor 官网: http://projectreactor.io/
[19] Reactor 实例解析: http://www.open-open.com/lib/view/open1482286087274.html
[20] 使用 Reactor 进行反应式编程: https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html
[21] Java ProjectReactor 框架之 Flux 篇: https://www.jianshu.com/p/8cb66e912641
[22] Reactor 3 Reference Guide: http://projectreactor.io/docs/core/release/reference/docs/index.html
[23] 聊聊 Spring Boot 2.0 的 WebFlux: https://www.bysocket.com/?p=1987
[24] 使用 Spring 5 的 WebFlux 开发反应式 Web 应用: https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html