大数跨境
0
0

Spring Cloud Gateway路由处理流程

Spring Cloud Gateway路由处理流程 Spring全家桶实战案例
2022-11-14
0
导读:Spring Cloud Gateway路由处理流程

Gateway路由请求执行流程如下:

1 核心处理类DispatcherHandler

public class DispatcherHandler implements WebHandler {  public Mono<Void> handle(ServerWebExchange exchange) {    // ...    return Flux.fromIterable(this.handlerMappings)        // 找到合适的HandlerMapping        // concatMap同步执行        .concatMap(mapping -> mapping.getHandler(exchange))        // 获取第一个元素        .next()        // 如果没有则创建一个错误        .switchIfEmpty(createNotFoundError())        // 找到合适的处理器HandlerAdapter        // 异步执行        .flatMap(handler -> invokeHandler(exchange, handler))        // 找到合适的HandlerResult        .flatMap(result -> handleResult(exchange, result));  }}


2 查找HandlerMapping对象

路由请求找到的对象为:RoutePredicateHandlerMapping

这里面会做如下两个事:

  • 找到的路由保存到上下文中

    ServerWebExchange#.getAttributes().put(GATEWAY_ROUTE_ATTR, r);

  • 返回Mono.just(webHandler)

    webHandler对象为:FilteringWebHandler,该对象创建如下:

// 获取所有的全局过滤器@Beanpublic FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {  return new FilteringWebHandler(globalFilters);}


3 查找HandlerAdapter对象

在上一步在查找路由后返回的是一个FilteringWebHandler对象,该对象是WebHandler子类

最后确定为:SimpleHandlerAdapter处理器

public class SimpleHandlerAdapter implements HandlerAdapter {
@Override public boolean supports(Object handler) { return WebHandler.class.isAssignableFrom(handler.getClass()); }
@Override public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { WebHandler webHandler = (WebHandler) handler; // 通过上面知道了这里的handler = FilteringWebHandler对象 Mono<Void> mono = webHandler.handle(exchange); return mono.then(Mono.empty()); }
}

转换过滤器

public class FilteringWebHandler implements WebHandler {
protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);
private final List<GatewayFilter> globalFilters;
public FilteringWebHandler(List<GlobalFilter> globalFilters) { // 将全局过滤器转换为GatewayFilter,通过GatewayFilterAdapter进行适配 this.globalFilters = loadFilters(globalFilters); }
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) { return filters.stream().map(filter -> { GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter); if (filter instanceof Ordered) { int order = ((Ordered) filter).getOrder(); return new OrderedGatewayFilter(gatewayFilter, order); } return gatewayFilter; }).collect(Collectors.toList()); } private static class GatewayFilterAdapter implements GatewayFilter {
private final GlobalFilter delegate;
GatewayFilterAdapter(GlobalFilter delegate) { this.delegate = delegate; }
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return this.delegate.filter(exchange, chain); }
@Override public String toString() { final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{"); sb.append("delegate=").append(delegate); sb.append('}'); return sb.toString(); }
}}

4 执行FilteringWebHandler

public class FilteringWebHandler implements WebHandler {  public Mono<Void> handle(ServerWebExchange exchange) {    // 取得在上面第2步中路由(从上下文ServerwebExchange)    Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);    // 获取路由中配置的过滤器    List<GatewayFilter> gatewayFilters = route.getFilters();    // 将路由中配置的过滤器与全局过滤器(自定义的GlobalFilter和系统的过滤器)    List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);    combined.addAll(gatewayFilters);    // 排序    AnnotationAwareOrderComparator.sort(combined);    // 构建过滤器执行链执行过滤器    return new DefaultGatewayFilterChain(combined).filter(exchange);  }}


5 核心过滤器

上一步中构建了过滤器链后,接下来就是执行过滤器,这里介绍几个核心的过滤器

  • RouteToRequestUrlFilter

根据路由配置的url信息,构建成为要访问的目标地址,如下路由配置:

spring:  cloud:    gateway:      enabled: true      # 全局超时配置      httpclient:        connect-timeout: 10000        response-timeout: 5000      discovery:        locator:          enabled: true          lowerCaseServiceId: true      # 这里是全局过滤器,也就是下面在介绍过滤器执行的时候一定会执行StripPrefixGatewayFilterFactory#apply      # 返回的过滤器,如下路由配置:该过滤器会将你的请求转换为:http://localhost:8088/demos,保存到上下文中      # ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI())      default-filters:      - StripPrefix=1      routes:      - id: R001        uri: http://localhost:8787        predicates:        - Path=/api-1/**,/api-2/**        metadata:          akf: "dbc"          #局部超时设置          connect-timeout: 10000          response-timeout: 5000

访问:http://localhost:8088/api-1/demos

转换后:http://localhost:8787/demos

该过滤器最后会将转换后的url保存到上下文中

ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);

注意:上面的StripPrefixGatewayFilterFactory#apply过滤器执行完后,才会执行该过滤器。

  • NettyRoutingFilter

public class NettyRoutingFilter implements GlobalFilter {  private final HttpClient httpClient;  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {    // 从上下文中获取解析后的目标地址    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);    // ...    // 获取上下文中的路由信息    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);    // getHttpClient获取客户端信息    Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {      // ...    }).request(method).uri(url).send((req, nettyOutbound) -> {      // 发送网络请求      return nettyOutbound.send(request.getBody().map(this::getByteBuf));    }).responseConnection((res, connection) -> {      exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);      exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse(); HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } setResponseStatus(res, response); HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE);
if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); }
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
return Mono.just(res); });
// 从路由中的元数据中获取response-timeout响应超时时间 Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux // 设置超时时间 .timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); } protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) { // 从路由的元数据中获取配置的连接超时时间:connect-timeout Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR); if (connectTimeoutAttr != null) { Integer connectTimeout = getInteger(connectTimeoutAttr); // 设置Netty的连接超时时间 // io.netty.channel.ChannelOption return this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); } return httpClient; }}

完毕!!!


【声明】内容源于网络
0
0
Spring全家桶实战案例
Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
内容 832
粉丝 0
Spring全家桶实战案例 Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
总阅读38
粉丝0
内容832