• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

Spring Cloud(20):Gateway 动态路由(金丝雀发布 / 灰度发布)

Spring Cloud winrains 来源:windmt 12个月前 (11-14) 57次浏览

为什么需要动态路由?

之前说过 Gateway 的路由配置,常用的有两种方式:

  • Fluent API
  • 配置文件

这两者之间因为配置文件的方式修改起来比较灵活,然后通过 Stream+Bus 的方式刷新路由配置,所以大家使用的比较多。
但是如果我们在网关层需要类似于 Canary Release(金丝雀发布,也称灰度发布)这样的能力的话,那么以上两种配置路由的方式就都显得太笨拙了。

矿井中的金丝雀
17 世纪,英国矿井工人发现,金丝雀对瓦斯这种气体十分敏感。空气中哪怕有极其微量的瓦斯,金丝雀也会停止歌唱;而当瓦斯含量超过一定限度时,虽然鲁钝的人类毫无察觉,金丝雀却早已毒发身亡。当时在采矿设备相对简陋的条件下,工人们每次下井都会带上一只金丝雀作为 “瓦斯检测指标”,以便在危险状况下紧急撤离。

Spring Cloud Gateway 中虽然已经提供了关于权重的断言,我们在配置文件中可以直接这样配置

spring:
  application:
    name: cloud-gateway
  cloud:
    gateway:
      routes:
      - id: service1_prod
        uri: http://localhost:8081
        predicates:
        - Path=/test
        - Weight=service1, 90
      - id: service1_canary
        uri: http://localhost:8082
        predicates:
        - Path=/test
        - Weight=service1, 10


以实现 Canary Release 的能力,但是每次发布都配置一遍未免太过麻烦了。
出于 “懒” 的本性,我们当然希望在发布脚本里能在运行时直接动态修改 service1_prodservice1_canary 的权重,这样我们就不用手动修改还提心吊胆的担心改错了。
这其实就是 “动态路由” 了。

Spring Cloud Gateway 默认动态路由实现

Spring Cloud Gateway 在去年 6 月份发布了 2.0 第一个 release 版本,其实已经自带动态路由了, 但是官方文档并没有讲如何动态配置。
不过我们翻看 Spring Cloud Gateway 源码,会发现类 org.springframework.cloud.gateway.actuate.GatewayControllerEndpoint 中提供了网关配置的 RESTful 接口,默认是没有启用的。
在配置类 org.springframework.cloud.gateway.config.GatewayAutoConfiguration 中配置了 GatewayControllerEndpoint

@Configuration
@ConditionalOnClass(Health.class)
protected static class GatewayActuatorConfiguration {
    @Bean
    @ConditionalOnEnabledEndpoint
    public GatewayControllerEndpoint gatewayControllerEndpoint(RouteDefinitionLocator routeDefinitionLocator, List<GlobalFilter> globalFilters,
                                                            List<GatewayFilterFactory> GatewayFilters, RouteDefinitionWriter routeDefinitionWriter,
                                                            RouteLocator routeLocator) {
        return new GatewayControllerEndpoint(routeDefinitionLocator, globalFilters, GatewayFilters, routeDefinitionWriter, routeLocator);
    }
}

也就是说在存在 org.springframework.boot.actuate.health.Health 时启用,我们想用自带的接口就需要添加 actuator 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

并且还要将 actuator 的端点暴露出来

management:
  endpoints:
    web:
      exposure:
        include: "*"

然后我们就能通过自带的 GatewayControllerEndpoint 的 RESTful API 修改运行时的路由了
GatewayControllerEndpoint

GatewayControllerEndpoint

此时我们已经能实现之前的目标了
JVM 级别的动态路由

JVM 级别的动态路由

但是 Gateway 自带的这套是仅仅支持了 JVM 级别的动态路由,不能序列化存储的。
默认的实现:

// GatewayAutoConfiguration
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
    return new InMemoryRouteDefinitionRepository();
}
// InMemoryRouteDefinitionRepository
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
    private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());
    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
        return route.flatMap( r -> {
            routes.put(r.getId(), r);
            return Mono.empty();
        });
    }
    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return routeId.flatMap(id -> {
            if (routes.containsKey(id)) {
                routes.remove(id);
                return Mono.empty();
            }
            return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition not found: "+routeId)));
        });
    }
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        return Flux.fromIterable(routes.values());
    }
}

这样就导致我们的路由配置要分散存储在两个地方:Config Server 和 内存中,非常不利于管理。
另外在生产环境使用的话,Gateway 一定是一个集群,一个个去调用每个实例的 refresh 端口并不利于扩展。

Spring Cloud Gateway 路由加载过程

看了上面的源码后,是不是感觉其实我们完全可以替换掉 InMemoryRouteDefinitionRepository 来用 DB 或 Redis 做持久化存储,来实现持久化的动态路由。
不过在动手之前,我们还是要先看一下 Gateway 的路由加载过程,这样才更好的实现我们的需求。
Gateway 路由加载过程

Gateway 路由加载过程

  1. DispatcherHandler 接管用户请求
  2. RoutePredicateHandlerMapping 路由匹配
    1. 根据 RouteLocator 获取 RouteDefinitionLocator
    2. 返回多个 RouteDefinitionLocator.getRouteDefinitions() 的路由定义信息
  3. FilteringWebHandler 执行路由定义中的 filter 最后路由到具体的业务服务中

从加载流程上可以看出,我们要扩展动态路由的话,最核心的是要从 RouteDefinitionLocator 上入手。

持久化的分布式动态路由组件

我们现在可以对 Gateway 做一些扩展来改善上述的问题。

扩展思路

  1. 增加一个路由管理模块
    • 参考 GatewayControllerEndpoint 实现
    • 路由配置全部存储在 MySQL 中(Config Server 还需要,但不再存储路由配置了)
    • 启动时将路由配置加载到 Redis 中,运行时双写
    • 提供 RESTful API 以便脚本调用
    • 前端页面可以配合 JSON Viewer 或类似插件,便于修改展示
  2. 网关模块扩展
    • 提供一个 RouteDefinitionRepository,使它直接从 Redis 获取路由配置
    • 网关集群刷新路由配置,这里用 Redis Pub/Sub 来充当 MQ 来实现

持久化的动态路由

持久化的动态路由

注:用 Redis 一方面是为了支持 WebFlux (Reactor) 的背压(Backpressure),另一方面是为了刷新 Gateway 集群。

具体实现

路由管理模块

数据库的表结构
表结构

表结构

定义相关实体,这里参考 Gateway 源码的相关定义,涉及到三个类:

  • org.springframework.cloud.gateway.route.RouteDefinition
  • org.springframework.cloud.gateway.handler.predicate.PredicateDefinition
  • org.springframework.cloud.gateway.filter.FilterDefinition
public class GatewayRoute {
    private String                           routeId;
    private String                           uri;
    private Integer                          order;
    private List<GatewayPredicateDefinition> predicates;
    private List<GatewayFilterDefinition>    filters;
    private Long                             id;
    private LocalDateTime                    createTime;
    private LocalDateTime                    updateTime;
    private EntityStatus                     status;
}
public class GatewayPredicateDefinition {
    private String name;
    private Map<String, String> args = new LinkedHashMap<>();
}
public class GatewayFilterDefinition {
    private String name;
    private Map<String, String> args = new LinkedHashMap<>();
}

Controller 参考 GatewayControllerEndpoint 实现即可。因为我的实现是软删除,所以对创建 / 更新做了明确区分。
注意里边有个 refresh() 方法,并不是像 GatewayControllerEndpoint 一样发 RefreshRoutesEvent,而是往 Redis publish 了一条消息。

@Slf4j
@RestController
@RequestMapping("")
public class GatewayDynamicRouteController {
    @Autowired
    private GatewayRouteService gatewayRouteService;
    /**
     * 创建路由
     *
     * @param model
     * @return
     */
    @PostMapping("/routes")
    public Mono<ResponseEntity<Map>> create(@RequestBody Mono<GatewayRoute> model) {
        return model.flatMap(r -> {
            String routeId = r.getRouteId();
            return gatewayRouteService.findOneByRouteId(routeId)
                    .defaultIfEmpty(new GatewayRoute())
                    .flatMap(old -> {
                        if (old.getId() != null) {
                            return Mono.defer(() -> Mono.just(ResponseEntity.status(HttpStatus.FORBIDDEN).body(buildRetBody(403, "routeId " + routeId + " 已存在", null))));
                        }
                        log.info("[ROUTE] <biz> creating. {}", defer(() -> JsonUtils.toJSON(r)));
                        return gatewayRouteService.insert(Mono.just(r))
                                .flatMap(id -> {
                                    return Mono.just((ResponseEntity.created(URI.create("/routes/" + id))
                                            .body(buildRetBody(0, "success", ImmutableMap.of("id", id)))));
                                });
                    });
        });
    }
    /**
     * 修改路由
     *
     * @param id
     * @param model
     * @return
     */
    @PutMapping("/routes/{id}")
    public Mono<ResponseEntity<Map>> update(@PathVariable Long id, @RequestBody Mono<GatewayRoute> model) {
        return model.flatMap(r -> {
            String routeId = r.getRouteId();
            return gatewayRouteService.findOneById(id)
                    .flatMap(old -> {
                        if (old == null) {
                            return Mono.defer(() -> Mono.just(ResponseEntity.status(HttpStatus.FORBIDDEN).body(buildRetBody(403, "routeId " + routeId + " 还未创建", null))));
                        }
                        log.info("[ROUTE] <biz> updating. id:{}\n  before:{}\n  after:{}",
                                id, defer(() -> JsonUtils.toJSON(old)), defer(() -> JsonUtils.toJSON(r)));
                        return gatewayRouteService.update(Mono.just(r))
                                .then(Mono.defer(() -> Mono.just((ResponseEntity.ok(buildRetBody(0, "success", null))))));
                    });
        });
    }
    /**
     * @param id
     * @param status 0 正常,1 删除
     * @return
     */
    @PutMapping("/routes/{id}/{status}")
    public Mono<ResponseEntity<Object>> updateStatus(@PathVariable Long id, @PathVariable Integer status) {
        EntityStatus entityStatus = EntityStatus.fromValue(status);
        if (entityStatus == null) {
            return Mono.defer(() -> Mono.just(ResponseEntity.status(HttpStatus.BAD_REQUEST).build()));
        }
        return gatewayRouteService.updateStatus(id, entityStatus)
                .then(Mono.defer(() -> Mono.just(ResponseEntity.ok().build())))
                .onErrorResume(t -> t instanceof NotFoundException, t -> Mono.just(ResponseEntity.notFound().build()));
    }
    /**
     * 获取单个路由信息
     *
     * @param id
     * @return
     */
    @GetMapping("/routes/{id}")
    public Mono<ResponseEntity<GatewayRoute>> route(@PathVariable Long id) {
        return gatewayRouteService.findOneById(id)
                .map(ResponseEntity::ok)
                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }
    /**
     * 刷新路由
     *
     * @return
     */
    @PostMapping("/routes/refresh")
    public Mono<ResponseEntity<Object>> refresh() {
        return gatewayRouteService.refresh()
                .map(aLong -> {
                    if (aLong > 0) {
                        return ResponseEntity.ok().build();
                    } else {
                        return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
                    }
                });
    }
    private Map<String, Object> buildRetBody(int code, String msg, Object data) {
        Map<String, Object> map = new HashMap<>();
        map.put("code", code);
        map.put("message", msg);
        map.put("data", data);
        return map;
    }
}

网关模块

重写一个新的 RouteDefinitionRepository,主要是要实现 getRouteDefinitions() 方法。
对于 savedelete 这两个方法,我是故意不处理的,因为路由的管理均在上边的路由管理模块实现了,网关模块只关注路由的获取。

@Slf4j
@Component
public class DynamicRouteDefinitionRepository implements RouteDefinitionRepository {
    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        return redisTemplate.opsForHash()
                .values(GATEWAY_ROUTES)
                .map(json -> JsonUtils.fromJson(json.toString(), RouteDefinition.class));
    }
    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
        return Mono.empty();
    }
    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return Mono.empty();
    }
}

除此之外,为了配合路由管理模块实现网关集群的刷新路由配置,网关模块里还需要加一个 Redis 的配置以订阅刷新消息。

@Slf4j
@Configuration
public class RedisConfig {
    @Bean
    ReactiveRedisMessageListenerContainer container(GatewayRouteService routeService, ReactiveRedisConnectionFactory connectionFactory) {
        ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            container.destroyLater().subscribe();
        }));
        container.receive(topic())
                .map(p -> p.getMessage())
                .subscribe(message -> {
                    log.info("Received <{}>", message);
                    routeService.publishRefreshEvent();
                });
        return container;
    }
    @Bean
    public ChannelTopic topic() {
        return new ChannelTopic("gateway-route-refresh-topic");
    }
}

自此也就大功告成了~

配置格式

这样的动态路由,是用 JSON 格式来配置的,如果格式不对,可是要报 500 错误的!
这里简单举个栗子:
如果我们在配置文件里要配的路由是这样

spring:
  cloud:
    gateway:
      routes:
        - id: user-api
          uri: http://user-api:8080
          order: 0
          predicates:
            - Path=/user/**
            - Weight=user-service, 90
          filters:
            - StripPrefix=1

那么翻译成 JSON 格式就是要这样(其中 status 是我自己加的,可以忽略)

{
  "routeId": "user-api",
  "uri": "http://user-api:8080",
  "order": 0,
  "predicates": [
    {
      "name": "Path",
      "args": {
        "pattern": "/user/**"
      }
    },
    {
      "name": "Weight",
      "args": {
        "weight.group": "user-service",
        "weight.weight": "90"
      }
    }
  ],
  "filters": [
    {
      "name": "StripPrefix",
      "args": {
        "parts": "1"
      }
    }
  ],
  "status": 0
}

至于其中 predicatefiltername 字段都还好理解,即使是 yaml 格式的我们也是要写的。这个有相关的文档,目前的规则就是 RoutePredicateFactoryGatewayFilterFactory 这两个接口下所有的实现类去掉这两个后缀后的名字(见 org.springframework.cloud.gateway.support.NameUtils)。
那么 args 里边的 key 的名字又是哪来的呢?
这个没有文档,翻看源码发现此处的 key 有两种配置方式:

  1. _genkey_0_genkey_1_genkey_n 这种形式,比较方便但是可读性比较差,还得注意顺序。(这个的源码也在 NameUtils 里)
  2. 另一种就是像我上边例子中写的,这需要去各个 RoutePredicateFactoryGatewayFilterFactory 的源码找对应的命名规则。(还需要参考 org.springframework.cloud.gateway.support.ShortcutConfigurable

作者:windmt

来源:https://windmt.com/2019/01/20/spring-cloud-20-gateway-dynamic-routing/


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)