大数跨境
0
0

Spring Cloud Gateway网关全局核心过滤器路由执行过程详解

Spring Cloud Gateway网关全局核心过滤器路由执行过程详解 Spring全家桶实战案例
2023-04-14
0
导读:Spring Cloud Gateway网关全局核心过滤器路由执行过程详解

环境:SpringBoot2.7.10 + Spring Cloud gateway3.1.6


1 RouteToRequestUrlFilter

根据路由配置的url信息,构建成为要访问的目标地址,如下路由配置:

spring:  cloud:    gateway:      enabled: true      # 全局超时配置      httpclient:        connect-timeout: 10000        response-timeout: 5000      discovery:        locator:          enabled: true          lowerCaseServiceId: true      # 这里是全局过滤器,也就是下面在介绍过滤器执行的时候一定会执行StripPrefixGatewayFilterFactory#apply      # 返回的过滤器,如下路由配置:该过滤器会将你的请求转换为:http://localhost:8088/demos,保存到上下文中      # ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI())      default-filters:      - StripPrefix=1      routes:      - id: R001        uri: http://localhost:8787        predicates:        - Path=/api-1/**,/api-2/**        metadata:          akf: "dbc"          #局部超时设置          connect-timeout: 10000          response-timeout: 5000      - id: st001        uri: lb://storage-service        predicates:        - Path=/api-x/**          - id: o001        uri: lb://order-service        predicates:        - Path=/api-a/**, /api-b/**        metadata:          akf: "dbc"          #局部超时设置          connect-timeout: 10000          response-timeout: 5000

访问:
http://localhost:8088/api-1/demos

转换后:
http://localhost:8787/demos

该过滤器最后会将转换后的url保存到上下文中

 ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);

注意:上面的
StripPrefixGatewayFilterFactory#apply
过滤器执行完后,才会执行该过滤器。

总结:

访问:
http://localhost:9090/api-x/orders ,路由地址:lb://order-service

  1. 转换地址
    转换后:http://localhost:9090/orders

  2. 合并地址
    将上一步的地址进一步合并为:lb://order-service/orders
    将地址存储到上下文中:exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);

2 ReactiveLoadBalancerClientFilter

如果URL有一个lb(例如lb://order-service),它使用Spring Cloud ReactorLoadBalancer将名称(在本例中为order-service)解析为一个实际的主机和端口,并替换相同属性中的URI。

public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {  private final LoadBalancerClientFactory clientFactory;  private final GatewayLoadBalancerProperties properties;  private final LoadBalancerProperties loadBalancerProperties;
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 从上下文中获取,如:lb://order-service/orders URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } // preserve the original url addOriginalRequestUrl(exchange, url); // 再次获取 URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); // 获取服务名;order-service String serviceId = requestUri.getHost(); // clientFactory.getInstances方法会从NamedContextFactory.contexts集合中查找以order-service为key对应的 // AnnotationConfigApplicationContext,然后从这个容器中查找LoadBalancerLifecycle,默认返回{} // ------------------------------------------------------------ /** * 每个服务对应的ApplicationContext包含如下13个Bean * org.springframework.context.annotation.internalConfigurationAnnotationProcessor * org.springframework.context.annotation.internalAutowiredAnnotationProcessor * org.springframework.context.annotation.internalCommonAnnotationProcessor * org.springframework.context.event.internalEventListenerProcessor * org.springframework.context.event.internalEventListenerFactory * propertyPlaceholderAutoConfiguration loadBalancerClientConfiguration * propertySourcesPlaceholderConfigurer * LoadBalancerClientConfiguration$ReactiveSupportConfiguration * discoveryClientServiceInstanceListSupplier * LoadBalancerClientConfiguration$BlockingSupportConfiguration, * reactorServiceInstanceLoadBalancer */ // 这里集合返回{} Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext( new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint()))); // choose负载查找指定服务(order-server) return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> { if (!response.hasServer()) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response))); throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } ServiceInstance retrievedInstance = response.getServer(); URI uri = exchange.getRequest().getURI(); // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default, // if the loadbalancer doesn't provide one. String overrideScheme = retrievedInstance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme); URI requestUrl = reconstructURI(serviceInstance, uri); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response)); }).then(chain.filter(exchange)) .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete( new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.FAILED, throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR))))) .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach( lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>( CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR), new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest())))))); }
protected URI reconstructURI(ServiceInstance serviceInstance, URI original) { return LoadBalancerUriTools.reconstructURI(serviceInstance, original); }
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId, Set<LoadBalancerLifecycle> supportedLifecycleProcessors) { // 从order-service对应的ApplicationContext中查找ReactorServiceInstanceLoadBalancer ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class); if (loadBalancer == null) { throw new NotFoundException("No loadbalancer available for " + serviceId); } supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest)); // 查找服务实例 return loadBalancer.choose(lbRequest); }
private String getHint(String serviceId, Map<String, String> hints) { String defaultHint = hints.getOrDefault("default", "default"); String hintPropertyValue = hints.get(serviceId); return hintPropertyValue != null ? hintPropertyValue : defaultHint; }}
// 轮询算分public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer { final AtomicInteger position; ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public Mono<Response<ServiceInstance>> choose(Request request) { // 接下面ClientFactoryObjectProvider中获取ServiceInstanceListSupplier ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances)); }
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) { Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances); if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) { ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer()); } return serviceInstanceResponse; }
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) { if (instances.isEmpty()) { return new EmptyResponse(); } // TODO: enforce order? int pos = Math.abs(this.position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance); }}
class ClientFactoryObjectProvider<T> implements ObjectProvider<T> { private final NamedContextFactory<?> clientFactory; // type = ServiceInstanceListSupplier private final Class<T> type; // name = order-service private final String name;
private ObjectProvider<T> delegate() { if (this.provider == null) { // 从order-service对应ApplicationContext中获取ServiceInstanceListSupplier // 这里最终返回的是:DiscoveryClientServiceInstanceListSupplier this.provider = this.clientFactory.getProvider(this.name, this.type); } return this.provider; }}
public class LoadBalancerClientConfiguration { @Configuration(proxyBeanMethods = false) @ConditionalOnReactiveDiscoveryEnabled @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER) public static class ReactiveSupportConfiguration {
@Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default", matchIfMissing = true) public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( ConfigurableApplicationContext context) { // 这里最终构建的是:DiscoveryClientServiceInstanceListSupplier return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context); } }}
public final class ServiceInstanceListSupplierBuilder { public ServiceInstanceListSupplierBuilder withDiscoveryClient() { this.baseCreator = context -> { // 先从order-service对应的ApplicationContext中查找ReactiveDiscoveryClient,如果你没有自定义,那么就会从 // 父容器中查找,如果你使用的nacos,那么会返回NacosReactiveDiscoveryClient ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class); return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment()); }; return this; }}

总结:

  1. 获取地址
    获取上一步中保存在上下文的地址
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);

  2. 获取LoadBalancerLifecycle
    取得当前服务(order-service),对应的AnnotationConfigApplicationContext中配置的LoadBalancerLifecycle,该负载均衡生命周期能够监控负载均衡的执行过程。该类是泛型类,3个泛型参数,类型依次为:RequestDataContext.class, ResponseData.class, ServiceInstance.class。

  3. 获取ReactorServiceInstanceLoadBalancer
    获取当前服务(order-server),对应的AnnotationConfigApplicationContext中配置的ReactorServiceInstanceLoadBalancer。每一个服务都有一个对应的默认配置类LoadBalancerClientConfiguration,该配置类中有默认的RoundRobinLoadBalancer。我们可以为具体的服务提供LoadBalancerClientSpecification 类型的Bean,该类我们可以指定你要配置的serviceId及配置类,在配置类中我们可以自定义ReactorServiceInstanceLoadBalancer 的实现类Bean。

  4. 选择服务
    在上一步中获得ReactorServiceInstanceLoadBalancer后,接下来就是选取一个服务实例了。

  5. 重构URI
    上一步中获取了ServiceInstance 后就能够重构URL了,当前的URL为: http://localhost:9090/orders 构建后:http://localhost:9093/storages ,将该URL保存到上下文中 exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);

3 NettyRoutingFilter

public class NettyRoutingFilter implements GlobalFilter {  private final HttpClient httpClient;
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 从上下文中获取解析后的目标地址 URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); // ... // 获取上下文中的路由信息 Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); // getHttpClient获取客户端信息 Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> { // ... }).request(method).uri(url).send((req, nettyOutbound) -> { // 发送网络请求 return nettyOutbound.send(request.getBody().map(this::getByteBuf)); }).responseConnection((res, connection) -> { exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); // 建立的Connection对象保存到上下文中,在后续的NettyWriteResponseFilter中会获取该对象获取响应数据 exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); ServerHttpResponse response = exchange.getResponse(); HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } setResponseStatus(res, response); HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); return Mono.just(res); });
// 从路由中的元数据中获取response-timeout响应超时时间 Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux // 设置超时时间 .timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); }
protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) { // 从路由的元数据中获取配置的连接超时时间:connect-timeout Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR); if (connectTimeoutAttr != null) { Integer connectTimeout = getInteger(connectTimeoutAttr); // 设置Netty的连接超时时间 // io.netty.channel.ChannelOption return this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); } return httpClient; }}

总结:

  1. 获取URL
    获取上一步保存在上下文中的URL
    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

  2. 设置当前路由状态
    设置当前路由已经路由状态
    setAlreadyRouted(exchange);
    exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);

  3. 获取路由
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    获取当前的Route信息。主要就用来获取配置路由时提供的配置信息,比如:超时时间设置,如上配置。RoutePredicateHandlerMapping#getHandlerInternal方法中保存路由到上下文中

  4. 构建HttpClient
    通过上一步取得的Route对象,配置HttpClient相关属性,比如:超时时间。配置基本的http相关信息,建立连接后将Connection对象保存到上下文中,供下一个过滤器获取响应数据

4 NettyWriteResponseFilter

该过滤器的作用是处理由NettyRoutingFilter中建立的HTTP请求(包括:请求参数,请求头,建立连接);在NettyRoutingFilter中会将建立连接后的Connection保存到ServerWebExchange上下文中。

public class NettyWriteResponseFilter implements GlobalFilter, Ordered {  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {    // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added    // until the NettyRoutingFilter is run    // @formatter:off    return chain.filter(exchange)        .doOnError(throwable -> cleanup(exchange))        .then(Mono.defer(() -> {          Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);          if (connection == null) {            return Mono.empty();          }          ServerHttpResponse response = exchange.getResponse();          // TODO: needed?          final Flux<DataBuffer> body = connection              .inbound()              .receive()              .retain()              .map(byteBuf -> wrap(byteBuf, response));          MediaType contentType = null;          try {            contentType = response.getHeaders().getContentType();          }          // 根据不同的ContentType做不同的响应          return (isStreamingMediaType(contentType)              ? response.writeAndFlushWith(body.map(Flux::just))              : response.writeWith(body));        })).doOnCancel(() -> cleanup(exchange));    // @formatter:on  }
protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) { DataBufferFactory bufferFactory = response.bufferFactory(); if (bufferFactory instanceof NettyDataBufferFactory) { NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory; return factory.wrap(byteBuf); } // MockServerHttpResponse creates these else if (bufferFactory instanceof DefaultDataBufferFactory) { DataBuffer buffer = ((DefaultDataBufferFactory) bufferFactory).allocateBuffer(byteBuf.readableBytes()); buffer.write(byteBuf.nioBuffer()); byteBuf.release(); return buffer; } throw new IllegalArgumentException("Unkown DataBufferFactory type " + bufferFactory.getClass()); }
private void cleanup(ServerWebExchange exchange) { Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR); if (connection != null && connection.channel().isActive() && !connection.isPersistent()) { connection.dispose(); } }
private boolean isStreamingMediaType(@Nullable MediaType contentType) { return (contentType != null && this.streamingMediaTypes.stream().anyMatch(contentType::isCompatibleWith)); }}

总结:

  1. 取得Connection
    取得上一步中保存的Connection
    Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

  2. 响应内容
    输出微服务端响应的数据

final Flux<DataBuffer> body = connection  .inbound()  .receive()  .retain()  .map(byteBuf -> wrap(byteBuf, response));

以上就是Gateway在处理一个路由请求的执行流程

完毕!!!


【声明】内容源于网络
0
0
Spring全家桶实战案例
Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
内容 832
粉丝 0
Spring全家桶实战案例 Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
总阅读38
粉丝0
内容832