如何用Spring WebFlux构建Reactive REST API

栏目: IT技术 · 发布时间: 3年前

内容简介:【51CTO.com快译】在本文中,我们将讨论如何使用下图简要地罗列了传统应用和现代应用系统的主要特点。如今的系统讲求的是:分布式应用、云原生、高可用性和可扩展性。因此,有效地利用系统现有的资源是至关重要的。

【51CTO.com快译】

在本文中,我们将讨论如何使用 Spring WebFlux 来构建响应式 REST API 。在正式讨论之前,让我们首先来看看系统的开发,传统 REST 在实现中遇到的问题,以及当前 API 的普遍需求。

下图简要地罗列了传统应用和现代应用系统的主要特点。如今的系统讲求的是:分布式应用、云原生、高可用性和可扩展性。因此,有效地利用系统现有的资源是至关重要的。

如何用Spring WebFlux构建Reactive REST API

应用程序 API 需求的演变

那么传统的 REST API 请求处理又是如何工作的呢?

如何用Spring WebFlux构建Reactive REST API

传统 REST API 模型

如上图所示,传统 REST API 会带来如下问题:

  •   阻塞和同步 → 通常,请求线程会去等待各种阻塞的 I/O 直至结束之后,才能被释放,进而将响应返回给调用方。
  •   每个请求的线程数 → Web 容器会用到基于请求的线程( thread-per-request )模型。该模型限制了待处理的并发请求数量。也就是说,容器会对请求进行排队,进而最终影响到 API 的性能。
  • 处理高并发用户的限制 → 正是由于 Web 容器使用了基于请求的线程模型,因此我们无法去处理那些高并发量的请求。
  • 无法更好地利用系统资源 → 阻塞的 I/O 会造成线程处于空闲状态,进而导致 Web 容器无法接受更多的请求,我们也就无法有效地利用现有的系统资源。
  •   没有背压( backpressure )支持 → 由于我们无法从客户端或服务器处施加背压,因此应用程序在负载量大时,无法维持正常运行。也就是说,倘若应用突然面临大量的请求,则服务器或客户端可能会由于中断,而无法访问到该应用。

下面,让我们来看看响应式 API 的优势,以及如何使用响应式编程,来解决上述问题。

  • 异步和无阻塞 → 响应式编程为编写异步和非阻塞应用程序提供了灵活性。
  • 事件 / 消息驱动 → 系统能够为任何活动生成对应的事件或消息。例如,那些来自数据库的数据会被视为事件流。
  • 支持背压 → 我们可以通过施加背压,来 优雅地”处理从一个系统到另一个系统的压力,从而避免了拒绝服务的出现。
  • 可预测的应用响应时间 → 由于线程是异步且非阻塞的,因此我们可以预测负载下的应用响应时间。
  • 更好地利用系统资源 → 同样由于线程是异步且非阻塞的,因此各种线程不会被 I/O 所占用,它们能够支持更多的用户请求。
  • 基于负载的扩容方式
  • 摆脱基于请求的线程 → 借助响应式 API ,并得益于异步且非阻塞的线程,我们可以摆脱基于请求的线程模型。在请求被产生后,模型会与服务器一起创建事件,并通过请求线程,去处理其他的请求。

那么,响应式编程的具体流程是怎样的呢?如下图所示,一旦应用程序调用了从某个数据源获取数据的操作,那么就会立即返回一个线程,并且让来自该数据源的数据作为数据 / 事件流出现。在此,应用程序是订阅者( subscriber ),数据源是发布者( publisher )。一旦数据流完成后, onComplete 事件就会被触发。

如何用Spring WebFlux构建Reactive REST API

数据流工作流程

如下图所示,如果发生了任何异常情况,发布者将会触发 onError 事件。

如何用Spring WebFlux构建Reactive REST API

数据流工作流程

在某些情况下,例如:从数据库中删除一个条目,发布者只会立即触发 onComplete/onError 事件,而不会调用 onNext 事件,毕竟没有任何数据可以返回。

如何用Spring WebFlux构建Reactive REST API

数据流工作流程

下面,我们进一步讨论:什么是背压,以及如何将背压应用于响应流。例如,我们有一个客户端应用正在向另一个服务请求数据。该服务能够以 1000 TPS (吞吐量)的速率发布事件,而客户端应用只能以 200 TPS 的速率处理事件。

那么在这种情况下,客户端应用程序需要通过缓冲数据来进行处理。而在随后的调用中,客户端应用程序可能会缓冲更多的数据,以致最终耗尽内存。显然,这对于那些依赖该客户端应用的其他程序,会造成级联效应。为了避免此类情况,客户端应用可以要求服务在事件的末尾进行缓冲,并以客户端应用的速率去推送各种事件。这就是所谓的背压,具体流程请见下图。

如何用Spring WebFlux构建Reactive REST API

背压示例

下面,我们将介绍响应流的规范(请参见 -- https://www.reactive-streams.org/ ),以及一个实现案例 --Project Reactor (请参见 -- https://projectreactor.io/ )。通常,响应流的规范定义了如下接口类型:

  • 发布者( Publisher ) → 发布者是那些具有无限数量顺序元素的提供者。它可以按照订阅者的要求进行发布。其 Java 代码如下所示:   
public interface Publisher<T> {    
     public void subscribe(Subscriber<? super T> s); 
}  
  •   订阅者( Subscriber ) → 订阅者恰好是那些具有无限数量顺序元素的使用者。其 Java 代码如下所示:   
public interface Subscriber<T> { 
    public void onSubscribe(Subscription s); 
     public void onNext(T t); 
     public void onError(Throwable t); 
     public void onComplete(); 
}  
  •   订阅( Subscription ) → 表示订阅者向发布者订阅的某个一对一的周期。其 Java 代码如下所示:
public interface Subscription { 
    public void request(long n); 
    public void cancel(); 
}
  •   处理器( Processor ) → 表示一个处理阶段,即订阅者和发布者之间根据约定进行处理。

下面是响应流规范的类图:

如何用Spring WebFlux构建Reactive REST API

响应流规范

其实,响应流规范具有许多种实现方式,上述 Project Reactor 只是其中的一种。 Reactor 可以完全实现无阻塞、且有效的请求管理。它能够提供两个响应式和可组合的 API ,即: Flux [N] (请参见 -- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html )和 Mono [0|1] (请参见 -- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html )。它们广泛地实现了响应式扩展( Reactive Extensions )。 ReactorHTTP (包括 Websocket )提供了非阻塞的背压式网络引擎、 TCPUDP 。它也非常适合于微服务的架构。

  •   Flux → 它是发布者带有各种 rx 运算符的响应流( Reactive Streams ),它会发出 0N 个元素,然后输出成功、或带有某个错误的完成结果。其流程图如下所示:  

如何用Spring WebFlux构建Reactive REST API

图片来源: https://projectreactor.io

  •   Mono → 它也是发布者具有各种基本 rx 运算符的响应流,能够通过发出 01 个元素,输出成功、或带有某个错误的完成结果。其流程图如下所示:  

如何用Spring WebFlux构建Reactive REST API

图片来源: https://projectreactor.io

由于 Reactor 的实施往往涉及到 Spring 5.x ,因此,我们可以使用带有 Spring servlet 栈的命令式编程,来构建 REST API 。下图展示了 Spring 如何支持响应式和 servlet 栈的实现。

如何用Spring WebFlux构建Reactive REST API

图片来源: spring.io

下面是一个公布了响应式 REST API 的应用。在该应用中,我们使用到了:

  •   带有 WebFluxSpring Boot
  •   具有响应式支持的 Spring 数据
  •   Cassandra 数据库

下图是该应用的整体架构:

如何用Spring WebFlux构建Reactive REST API

下面是 build.gradle 文件的 Groovy 代码,它包含了与 Spring WebFlux 协同使用的各种依赖项。

plugins { 
     id 'org.springframework.boot' version '2.2.6.RELEASE' 
     id 'io.spring.dependency-management' version '1.0.9.RELEASE' 
     id 'java' 
} 
group = 'org.smarttechie' 
version = '0.0.1-SNAPSHOT' 
sourceCompatibility = '1.8' 
repositories { 
    mavenCentral() 
}  
dependencies { 
   implementation 'org.springframework.boot:spring-boot-starter-data-cassandra-reactive' 
   implementation 'org.springframework.boot:spring-boot-starter-webflux' 
   testImplementation('org.springframework.boot:spring-boot-starter-test') { 
   exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' 
   } 
   testImplementation 'io.projectreactor:reactor-test' 
 } 
test { 
  useJUnitPlatform() 
}  

在此应用程序中,我公布了如下 API 。您可以通过 GitHub 的相关链接 -- https://github.com/2013techsmarts/Spring-Reactive-Examples ,下载源代码。

如何用Spring WebFlux构建Reactive REST API

在构建响应式 API 时,我们可以使用功能性样式编程模型来构建 API ,而无需使用 RestController 。当然,您需要具有如下的 routerhandler 组件:

Router

package org.smarttechie.router; 
import org.smarttechie.handler.ProductHandler; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.http.MediaType; 
import org.springframework.web.reactive.function.server.RouterFunction; 
import org.springframework.web.reactive.function.server.RouterFunctions; 
import org.springframework.web.reactive.function.server.ServerResponse; 
import static org.springframework.web.reactive.function.server.RequestPredicates.*; 
@Configuration 
public class ProductRouter { 
    /** 
     * The router configuration for the product handler. 
     * @param productHandler 
     * @return 
     */ 
    @Bean 
public RouterFunction<ServerResponse>    productsRoute(ProductHandler productHandler){ 
    return RouterFunctions.route(GET("/products").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::getAllProducts).andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON)),productHandler::createProduct).andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::deleteProduct).andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)),productHandler::updateProduct); 
 } 
}

Handler

package org.smarttechie.handler; 
  import org.smarttechie.model.Product; 
  import org.smarttechie.service.ProductService; 
  import org.springframework.beans.factory.annotation.Autowired; 
  import org.springframework.http.MediaType; 
  import org.springframework.stereotype.Component;    
  import org.springframework.web.reactive.function.server.ServerRequest; 
   import org.springframework.web.reactive.function.server.ServerResponse; 
  import reactor.core.publisher.Mono; 
   import static org.springframework.web.reactive.function.BodyInserters.fromObject;     
   @Component 
   public class ProductHandler { 
    @Autowired 
    private ProductService productService; 
     static Mono<ServerResponse> notFound = ServerResponse.notFound().build(); 
   /** 
     * The handler to get all the available products. 
     * @param serverRequest 
     * @return - all the products info as part of ServerResponse 
     */ 
    public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) { 
         return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.getAllProducts(), Product.class); 
  } 
 
    /** 
      * The handler to create a product 
      * @param serverRequest 
      * @return - return the created product as part of ServerResponse 
     */ 
    public Mono<ServerResponse> createProduct(ServerRequest serverRequest) { 
        Mono<Product> productToSave = serverRequest.bodyToMono(Product.class); 
        return productToSave.flatMap(product -> 
                ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.save(product), Product.class));   
 
   }    
 
    /** 
     * The handler to delete a product based on the product id.          
     * @param serverRequest 
     * @return - return the deleted product as part of ServerResponse 
     */    
    public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) { 
        String id = serverRequest.pathVariable("id");  
        Mono<Void> deleteItem = productService.deleteProduct(Integer.parseInt(id)); 
         return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(deleteItem, Void.class);   
   } 
    /** 
      * The handler to update a product. 
      * @param serverRequest   
      * @return - The updated product as part of ServerResponse 
     */ 
    public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) { 
      return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product ->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(product))).switchIfEmpty(notFound);  
    } 
} 

至止,我们已经对如何公布响应式 REST API 有所了解。针对上述实现,我们使用了 Gatling (译者注:是一款功能强大的负载测试工具),在响应式 API 和非响应式 API (使用 Spring RestController 构建非响应式 API )上,进行了简单的基准化测试。其结果比较如下图所示。具体的 Gatling 负载测试脚本,请参考 GitHub 上的链接: https://github.com/2013techsmarts/Spring-Reactive-Examples

如何用Spring WebFlux构建Reactive REST API

负载测试结果比较  

原标题: Build Reactive REST APIs With Spring WebFlux, 作者 :Siva Prasad Rao Janapati

【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】

【责任编辑:庞桂玉 TEL:(010)68476606】


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

信息简史

信息简史

[美] 詹姆斯·格雷克 / 高博 / 人民邮电出版社 / 2013-10 / 69.00元

人类与信息遭遇的历史由来已久。詹姆斯•格雷克笔下的这段历史出人意料地从非洲的鼓语讲起(第1章)。非洲土著部落在尚未直接跨越到移动电话之前,曾用鼓声来传递讯息,但他们是如何做到的呢?后续章节进而讲述了这段历史上几个影响深远的关键事件,包括文字的发明(第2章)、罗伯特•考德里的第一本英语词典(第3章)、查尔斯•巴贝奇的差分机与爱达•拜伦的程序(第4章)、沙普兄弟的信号塔与摩尔斯电码(第5章)。 ......一起来看看 《信息简史》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

SHA 加密
SHA 加密

SHA 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具