内容简介:网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.7) 之 NettyRoutingFilter
摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-netty-routing/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Spring-Cloud-Gateway 2.0.X M4
关注**微信公众号:【芋道源码】**有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言 都 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢 。
- 新的 源码解析文章 实时 收到通知。 每周更新一篇左右 。
- 认真的 源码交流微信群。
1. 概述
本文主要分享 NettyRoutingFilter 的代码实现 。
NettyRoutingFilter ,Netty 路由 网关过滤器。其根据 http://
或 https://
前缀( Scheme )过滤处理,使用基于 Netty 实现的 HttpClient 请求后端 Http 服务。
NettyWriteResponseFilter ,与 NettyRoutingFilter 成对使用 的网关过滤器。其将 NettyRoutingFilter 请求后端 Http 服务的 响应 写回客户端。
大体流程如下 :
另外,Spring Cloud Gateway 实现了 WebClientHttpRoutingFilter / WebClientWriteResponseFilter ,功能上和 NettyRoutingFilter / NettyWriteResponseFilter 相同 ,差别在于基于 org.springframework.cloud.gateway.filter.WebClient
实现的 HttpClient 请求后端 Http 服务。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter》 ,我们会详细解析。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版, 等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与 Docker 微服务架构实战》
- 两书齐买,京东包邮。
2. NettyRoutingFilter
org.springframework.cloud.gateway.filter.NettyRoutingFilter
,Netty 路由 网关过滤器。
构造方法,代码如下 :
public class NettyRoutingFilter implements GlobalFilter, Ordered { private final HttpClient httpClient; public NettyRoutingFilter(HttpClient httpClient) { this.httpClient = httpClient; } }
-
httpClient
属性,基于 Netty 实现的 HttpClient 。通过该属性, 请求后端的 Http 服务 。
#getOrder()
方法,代码如下 :
@Override public int getOrder(){ return Ordered.LOWEST_PRECEDENCE; }
- 返回顺序为
Integer.MAX_VALUE
。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.1) 之 GatewayFilter 一览》「3. GlobalFilter」 ,我们列举了所有 GlobalFilter 的顺序。
#filter(ServerWebExchange, GatewayFilterChain)
方法,代码如下 :
1: @Override 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain){ 3: // 获得 requestUrl 4: URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); 5: 6: // 判断是否能够处理 7: String scheme = requestUrl.getScheme(); 8: if (isAlreadyRouted(exchange) || (!scheme.equals("http") && !scheme.equals("https"))) { 9: return chain.filter(exchange); 10: } 11: 12: // 设置已经路由 13: setAlreadyRouted(exchange); 14: 15: ServerHttpRequest request = exchange.getRequest(); 16: 17: // Request Method 18: final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString()); 19: 20: // 获得 url 21: final String url = requestUrl.toString(); 22: 23: // Request Header 24: final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); 25: request.getHeaders().forEach(httpHeaders::set); 26: 27: // 请求 28: return this.httpClient.request(method, url, req -> { 29: final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach) 30: .failOnClientError(false) // // 是否请求失败,抛出异常 31: .headers(httpHeaders); 32: 33: // Request Form 34: if (MediaType.APPLICATION_FORM_URLENCODED.includes(request.getHeaders().getContentType())) { 35: return exchange.getFormData() 36: .flatMap(map -> proxyRequest.sendForm(form -> { 37: for (Map.Entry<String, List<String>> entry: map.entrySet()) { 38: for (String value : entry.getValue()) { 39: form.attr(entry.getKey(), value); 40: } 41: } 42: }).then()) 43: .then(chain.filter(exchange)); 44: } 45: 46: // Request Body 47: return proxyRequest.sendHeaders() //I shouldn't need this 48: .send(request.getBody() 49: .map(DataBuffer::asByteBuffer) // Flux<DataBuffer> => ByteBuffer 50: .map(Unpooled::wrappedBuffer)); // ByteBuffer => Flux<DataBuffer> 51: }).doOnNext(res -> { 52: ServerHttpResponse response = exchange.getResponse(); 53: // Response Header 54: // put headers and status so filters can modify the response 55: HttpHeaders headers = new HttpHeaders(); 56: res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); 57: response.getHeaders().putAll(headers); 58: 59: // Response Status 60: response.setStatusCode(HttpStatus.valueOf(res.status().code())); 61: 62: // 设置 Response 到 CLIENT_RESPONSE_ATTR 63: // Defer committing the response until all route filters have run 64: // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter 65: exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); 66: }).then(chain.filter(exchange)); 67: }
-
第 4 行 :获得
requestUrl
。 -
第 7 至 10 行 :判断 ForwardRoutingFilter 是否能够处理该请求,需要满足两个条件 :
-
http://
或者https://
前缀( Scheme ) 。 -
调用
ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange)
方法,判断该请求暂未被其他 Routing 网关处理。代码如下 :public static boolean isAlreadyRouted(ServerWebExchange exchange){ return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false); }
-
-
第 13 行 :设置该请求已经被处理。代码如下 :
public static void setAlreadyRouted(ServerWebExchange exchange){ exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true); }
-
第 18 行 :创建 Netty Request Method 对象。
request#getMethod()
返回的不是io.netty.handler.codec.http.HttpMethod
,所以需要进行转换。 -
第 21 行 :获得
url
。 -
第 24 至 25 行 :创建 Netty Request Header 对象(
io.netty.handler.codec.http.DefaultHttpHeaders
),将请求的 Header 设置给它。 -
--------- 第 28 至 50 行 :调用
HttpClient#request(HttpMethod, String, Function)
方法,请求后端 Http 服务。 -
第 29 至 31 行 :创建 Netty Request 对象(
reactor.ipc.netty.http.client.HttpClientRequest
)。-
第 29 行 :TODO 【3024】 NettyPipeline.SendOptions::flushOnEach
-
第 30 行 :设置请求失败( 后端服务返回响应状体码
>= 400
)时,不抛出异常。相关代码如下 :// HttpClientOperations#checkResponseCode(HttpResponse response) // ... 省略无关代码 if (code >= 400) { if (clientError) { if (log.isDebugEnabled()) { log.debug("{} Received Request Error, stop reading: {}", channel(), response.toString()); } Exception ex = new HttpClientException(uri(), response); parentContext().fireContextError(ex); receive().subscribe(); return false; } return true; }
- 通过设置
clientError = false
,第 51 行可以调用Mono#doNext(Consumer)
方法, 统一订阅处理 返回的reactor.ipc.netty.http.client.HttpClientResponse
对象。
- 通过设置
-
第 31 行 :设置 Netty Request 对象的 Header 。
-
-
第 34 至 44 行 :【TODO 3025】目前是一个 BUG ,在 2.0.X 版本修复。见 FormIntegrationTests#formUrlencodedWorks() 单元测试的注释说明。
-
第 47 至 50 行 :请求后端的 Http 服务。
- 第 47 行 :发送请求 Header 。
- 第 48 至 50 行 :发送请求 Body 。其中中间的
#map(...)
的过程为Flux<DataBuffer> => ByteBuffer => Flux<DataBuffer>
。
-
--------- 第 51 至 65 行 :请求后端 Http 服务 完成 ,将 Netty Response 赋值给响应
response
。 -
第 53 至 57 行 :创建
org.springframework.http.HttpHeaders
对象,将 Netty Response Header 设置给它,而后设置回给响应response
。 -
第 60 行 :设置响应
response
的状态码。 -
第 65 行 :设置 Netty Response 到
CLIENT_RESPONSE_ATTR
。后续 NettyWriteResponseFilter 将 Netty Response 写回给客户端。 -
--------- 第 66 行 :提交过滤器链继续过滤。
3. NettyWriteResponseFilter
org.springframework.cloud.gateway.filter.NettyWriteResponseFilter
,Netty 回写 响应 网关过滤器。
#getOrder()
方法,代码如下 :
public static final int WRITE_RESPONSE_FILTER_ORDER = -1; @Override public int getOrder(){ return WRITE_RESPONSE_FILTER_ORDER; }
- 返回顺序为
-1
。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.1) 之 GatewayFilter 一览》「3. GlobalFilter」 ,我们列举了所有 GlobalFilter 的顺序。
#filter(ServerWebExchange, GatewayFilterChain)
方法,代码如下 :
1: @Override 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain){ 3: // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added 4: // until the WebHandler is run 5: return chain.filter(exchange).then(Mono.defer(() -> { 6: // 获得 Response 7: HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); 8: // HttpClientResponse clientResponse = getAttribute(exchange, CLIENT_RESPONSE_ATTR, HttpClientResponse.class); 9: if (clientResponse == null) { 10: return Mono.empty(); 11: } 12: log.trace("NettyWriteResponseFilter start"); 13: ServerHttpResponse response = exchange.getResponse(); 14: 15: // 将 Netty Response 写回给客户端。 16: NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); 17: //TODO: what if it's not netty 18: final Flux<NettyDataBuffer> body = clientResponse.receive() 19: .retain() // ByteBufFlux => ByteBufFlux 20: .map(factory::wrap); // ByteBufFlux => Flux<NettyDataBuffer> 21: return response.writeWith(body); 22: })); 23: }
- 第 5 行 :调用
#then(Mono)
方法,实现 After Filter 逻辑。 - 第 7 至 11 行 :从
CLIENT_RESPONSE_ATTR
中,获得 Netty Response 。 - 第 15 至 21 行 :将 Netty Response 写回给客户端。因为
org.springframework.http.server.reactive#writeWith(Publisher<? extends DataBuffer>)
需要的参数类型是Publisher<? extends DataBuffer>
,所以【第 18 至 20 行】的转换过程是ByteBufFlux => Flux<NettyDataBuffer>
。- 第 19 行 :TODO 【3024】ByteBufFlux#retain()
666. 彩蛋
下一篇 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter》 走起!
胖友,分享一波朋友圈可好!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Spring Cloud(七):服务网关zuul过滤器
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.3) 之 RouteToRequestUrlFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.5) 之 ForwardRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.4) 之 LoadBalancerClientFilter 负载均衡
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Head First Rails
David Griffiths / O'Reilly Media / 2008-12-30 / USD 49.99
Figure its about time that you hop on the Ruby on Rails bandwagon? You've heard that it'll increase your productivity exponentially, and allow you to created full fledged web applications with minimal......一起来看看 《Head First Rails》 这本书的介绍吧!
CSS 压缩/解压工具
在线压缩/解压 CSS 代码
XML 在线格式化
在线 XML 格式化压缩工具