Gateway路由请求执行流程如下:
1 核心处理类DispatcherHandler
public class DispatcherHandler implements WebHandler {public Mono<Void> handle(ServerWebExchange exchange) {// ...return Flux.fromIterable(this.handlerMappings)// 找到合适的HandlerMapping// concatMap同步执行.concatMap(mapping -> mapping.getHandler(exchange))// 获取第一个元素.next()// 如果没有则创建一个错误.switchIfEmpty(createNotFoundError())// 找到合适的处理器HandlerAdapter// 异步执行.flatMap(handler -> invokeHandler(exchange, handler))// 找到合适的HandlerResult.flatMap(result -> handleResult(exchange, result));}}
2 查找HandlerMapping对象
路由请求找到的对象为:RoutePredicateHandlerMapping
这里面会做如下两个事:
找到的路由保存到上下文中
ServerWebExchange#.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
返回Mono.just(webHandler)
webHandler对象为:
FilteringWebHandler,该对象创建如下:
// 获取所有的全局过滤器public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {return new FilteringWebHandler(globalFilters);}
3 查找HandlerAdapter对象
在上一步在查找路由后返回的是一个FilteringWebHandler对象,该对象是WebHandler子类
最后确定为:SimpleHandlerAdapter处理器
public class SimpleHandlerAdapter implements HandlerAdapter {public boolean supports(Object handler) {return WebHandler.class.isAssignableFrom(handler.getClass());}public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {WebHandler webHandler = (WebHandler) handler;// 通过上面知道了这里的handler = FilteringWebHandler对象Mono<Void> mono = webHandler.handle(exchange);return mono.then(Mono.empty());}}
转换过滤器
public class FilteringWebHandler implements WebHandler {protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);private final List<GatewayFilter> globalFilters;public FilteringWebHandler(List<GlobalFilter> globalFilters) {// 将全局过滤器转换为GatewayFilter,通过GatewayFilterAdapter进行适配this.globalFilters = loadFilters(globalFilters);}private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {return filters.stream().map(filter -> {GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);if (filter instanceof Ordered) {int order = ((Ordered) filter).getOrder();return new OrderedGatewayFilter(gatewayFilter, order);}return gatewayFilter;}).collect(Collectors.toList());}private static class GatewayFilterAdapter implements GatewayFilter {private final GlobalFilter delegate;GatewayFilterAdapter(GlobalFilter delegate) {this.delegate = delegate;}public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return this.delegate.filter(exchange, chain);}public String toString() {final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");sb.append("delegate=").append(delegate);sb.append('}');return sb.toString();}}}
4 执行FilteringWebHandler
public class FilteringWebHandler implements WebHandler {public Mono<Void> handle(ServerWebExchange exchange) {// 取得在上面第2步中路由(从上下文ServerwebExchange)Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);// 获取路由中配置的过滤器List<GatewayFilter> gatewayFilters = route.getFilters();// 将路由中配置的过滤器与全局过滤器(自定义的GlobalFilter和系统的过滤器)List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);combined.addAll(gatewayFilters);// 排序AnnotationAwareOrderComparator.sort(combined);// 构建过滤器执行链执行过滤器return new DefaultGatewayFilterChain(combined).filter(exchange);}}
5 核心过滤器
上一步中构建了过滤器链后,接下来就是执行过滤器,这里介绍几个核心的过滤器
RouteToRequestUrlFilter
根据路由配置的url信息,构建成为要访问的目标地址,如下路由配置:
spring:cloud:gateway:enabled: true# 全局超时配置httpclient:: 10000: 5000discovery:locator:enabled: truelowerCaseServiceId: true# 这里是全局过滤器,也就是下面在介绍过滤器执行的时候一定会执行StripPrefixGatewayFilterFactory#apply# 返回的过滤器,如下路由配置:该过滤器会将你的请求转换为:http://localhost:8088/demos,保存到上下文中# ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI()):StripPrefix=1routes:id: R001uri: http://localhost:8787predicates:Path=/api-1/**,/api-2/**metadata:akf: "dbc"#局部超时设置: 10000: 5000
访问:http://localhost:8088/api-1/demos
转换后:http://localhost:8787/demos
该过滤器最后会将转换后的url保存到上下文中
ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
注意:上面的StripPrefixGatewayFilterFactory#apply过滤器执行完后,才会执行该过滤器。
NettyRoutingFilter
public class NettyRoutingFilter implements GlobalFilter {private final HttpClient httpClient;public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// 从上下文中获取解析后的目标地址URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);// ...// 获取上下文中的路由信息Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);// getHttpClient获取客户端信息Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {// ...}).request(method).uri(url).send((req, nettyOutbound) -> {// 发送网络请求return nettyOutbound.send(request.getBody().map(this::getByteBuf));}).responseConnection((res, connection) -> {exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();HttpHeaders headers = new HttpHeaders();res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);}setResponseStatus(res, response);HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,Type.RESPONSE);if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);}exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);return Mono.just(res);});// 从路由中的元数据中获取response-timeout响应超时时间Duration responseTimeout = getResponseTimeout(route);if (responseTimeout != null) {responseFlux = responseFlux// 设置超时时间.timeout(responseTimeout,Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class,th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));}return responseFlux.then(chain.filter(exchange));}protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {// 从路由的元数据中获取配置的连接超时时间:connect-timeoutObject connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR);if (connectTimeoutAttr != null) {Integer connectTimeout = getInteger(connectTimeoutAttr);// 设置Netty的连接超时时间// io.netty.channel.ChannelOptionreturn this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);}return httpClient;}}
完毕!!!





