Spring Cloud Gateway实现灰度发布实现原理
2023-02-20 10:17:59来源:实战案例锦集
灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。
灰度发布类型金丝雀发布将少量的请求引流到新版本上,因此部署新版本服务只需极小数的机器。验证新版本符合预期后,逐步调整流量权重比例,使得流量慢慢从老版本迁移至新版本,期间可以根据设置的流量比例,对新版本服务进行扩容,同时对老版本服务进行缩容,使得底层资源得到最大化利用。
(相关资料图)
金丝雀发布的优点:
按比例将流量无差别地导向新版本,新版本故障影响范围小;发布期间逐步对新版本扩容,同时对老版本缩容,资源利用率高。金丝雀发布的缺点:
流量无差别地导向新版本,可能会影响重要用户的体验;发布周期长。A/B测试A/B 测试基于用户请求的元信息将流量路由到新版本,这是一种基于请求内容匹配的灰度发布策略。只有匹配特定规则的请求才会被引流到新版本,常见的做法包括基于 Header 和 Cookie。基于 Header 方式例子,例如 User-Agent 的值为 Android 的请求 (来自安卓系统的请求)可以访问新版本,其他系统仍然访问旧版本。基于 Cookie 方式的例子,Cookie 中通常包含具有业务语义的用户信息,例如普通用户可以访问新版本,VIP 用户仍然访问旧版本。
蓝绿发布蓝绿发布需要对服务的新版本进行冗余部署,一般新版本的机器规格和数量与旧版本保持一致,相当于该服务有两套完全相同的部署环境,只不过此时只有旧版本在对外提供服务,新版本作为热备。当服务进行版本升级时,我们只需将流量全部切换到新版本即可,旧版本作为热备。由于冗余部署的缘故,所以不必担心新版本的资源不够。如果新版本上线后出现严重的程序 BUG,那么我们只需将流量全部切回至旧版本,大大缩短故障恢复的时间。
Gateway实现灰度发布本篇将文章将通过A/B测试方式实现灰度发布。接下来将展示在Spring Cloud Gateway中实现A/B测试核心组件。
引入依赖自定义负载均衡器org.springframework.cloud spring-cloud-starter-gateway 3.1.4 org.springframework.cloud spring-cloud-loadbalancer 3.1.4
自定义负载均衡器作用是根据请求的header中的v进行服务实例的筛选。
public class GrayRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer { private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class); final AtomicInteger position; final String serviceId; ObjectProviderserviceInstanceListSupplierProvider; public GrayRoundRobinLoadBalancer(ObjectProvider serviceInstanceListSupplierProvider, String serviceId) { this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000)); } public GrayRoundRobinLoadBalancer(ObjectProvider serviceInstanceListSupplierProvider, String serviceId, int seedPosition) { this.serviceId = serviceId; this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider; this.position = new AtomicInteger(seedPosition); } @SuppressWarnings("rawtypes") @Override public Mono > choose(Request request) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get(request).next() .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request)); } @SuppressWarnings("rawtypes") private Response processInstanceResponse(ServiceInstanceListSupplier supplier, List serviceInstances, Request request) { Response serviceInstanceResponse = getInstanceResponse(serviceInstances, request); if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) { ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer()); } return serviceInstanceResponse; } @SuppressWarnings("rawtypes") private Response getInstanceResponse(List instances, Request request) { if (instances.isEmpty()) { if (log.isWarnEnabled()) { log.warn("No servers available for service: " + serviceId); } return new EmptyResponse(); } List result = instances.stream().filter(instance -> { Map metadata = instance.getMetadata(); Object orgId = metadata.get("v"); RequestDataContext context = (RequestDataContext) request.getContext() ; RequestData requestData = context.getClientRequest() ; String v = null ; if (requestData instanceof GrayRequestData) { GrayRequestData grayRequestData = (GrayRequestData) requestData ; queryV = grayRequestData.getQueryParams().getFirst("v") ; } String value = requestData.getHeaders().getFirst("v") ; return v != null && (v.equals(value) || v.equals(queryV)) ; }).collect(Collectors.toList()); if (result.isEmpty()) { result = instances; } int pos = this.position.incrementAndGet() & Integer.MAX_VALUE; ServiceInstance instance = result.get(pos % result.size()); return new DefaultResponse(instance); }}
以上负载均衡器将从header或者请求参数中获取v参数,然后根据v参数的值从服务实例列表中获取metadata信息进行比对。
全局过滤器该过滤器的作用是通过上面的负载均衡器从其中选择一个服务实例进行服务的调用
@SuppressWarnings({ "rawtypes", "unchecked" })@Componentpublic class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered { private static final Log log = LogFactory.getLog(GrayReactiveLoadBalancerClientFilter.class); /** * Order of filter. */ public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150; private final LoadBalancerClientFactory clientFactory; private final GatewayLoadBalancerProperties properties; public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, GatewayLoadBalancerProperties properties) { this.clientFactory = clientFactory; this.properties = properties; } @Override public int getOrder() { return LOAD_BALANCER_CLIENT_FILTER_ORDER; } @Override public Mono配置filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"packlb".equals(url.getScheme()) && !"packlb".equals(schemePrefix))) { return chain.filter(exchange); } // preserve the original url addOriginalRequestUrl(exchange, url); URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String serviceId = requestUri.getHost(); Set supportedLifecycleProcessors = LoadBalancerLifecycleValidator .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); DefaultRequest lbRequest = new DefaultRequest<>( new RequestDataContext(new GrayRequestData(exchange.getRequest()), getHint(serviceId))); LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId); return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> { if (!response.hasServer()) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response))); throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } ServiceInstance retrievedInstance = response.getServer(); URI uri = exchange.getRequest().getURI(); // if the `lb: ` mechanism was used, use ` ` as the default, // if the loadbalancer doesn"t provide one. String overrideScheme = retrievedInstance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme); URI requestUrl = reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response)); }).then(chain.filter(exchange)) .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext ( CompletionContext.Status.FAILED, throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR))))) .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext ( CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR), buildResponseData(exchange, loadBalancerProperties.isUseRawStatusCodeInResponseData()))))); } @SuppressWarnings("deprecation") private ResponseData buildResponseData(ServerWebExchange exchange, boolean useRawStatusCodes) { if (useRawStatusCodes) { return new ResponseData(new GrayRequestData(exchange.getRequest()), exchange.getResponse()); } return new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest())); } protected URI reconstructURI(ServiceInstance serviceInstance, URI original) { return LoadBalancerUriTools.reconstructURI(serviceInstance, original); } private Mono > choose(Request lbRequest, String serviceId, Set supportedLifecycleProcessors) { ReactorLoadBalancer loadBalancer = this.clientFactory.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class); if (loadBalancer == null) { throw new NotFoundException("No loadbalancer available for " + serviceId); } supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest)); return loadBalancer.choose(lbRequest); } private String getHint(String serviceId) { LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId); Map hints = loadBalancerProperties.getHint(); String defaultHint = hints.getOrDefault("default", "default"); String hintPropertyValue = hints.get(serviceId); return hintPropertyValue != null ? hintPropertyValue : defaultHint; }}
@Configurationpublic class GrayDefaultConfiguration { @Bean public GrayRoundRobinLoadBalancer grayRandomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new GrayRoundRobinLoadBalancer( loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); } // 由于没有使用服务注册及发现,这里通过编码的方式定义服务实例 @Bean public ServiceInstanceListSupplier sscServiceInstanceListSupplier() { return new ServiceInstanceListSupplier() { @Override public Flux> get() { List
instances = new ArrayList<>() ; Map metadata1 = new HashMap<>() ; metadata1.put("v", "1") ; ServiceInstance s1 = new DefaultServiceInstance("s1", "ssc", "localhost", 8088 , false, metadata1) ; instances.add(s1) ; Map metadata2 = new HashMap<>() ; metadata2.put("v", "2") ; ServiceInstance s2 = new DefaultServiceInstance("s2", "ssc", "localhost", 8099 , false, metadata2) ; instances.add(s2) ; return Flux.just(instances) ; } @Override public String getServiceId() { return "ssc" ; } }; } }
负载均衡客户端设置默认的配置
@LoadBalancerClients(defaultConfiguration = GrayDefaultConfiguration.class)public class SpringCloudGatewayApplication {}
以上就是实现灰度发布的核心组件。
测试,设置不同的v返回不同服务的结果数据
完毕!!!