动态加载
之前如果我们已经完成了OAuth的项目,这时候我们就可以用网关进行代理。
在application.yml中添加:
zuul:
ignored-services: '*'
host:
connect-timeout-millis: 15000
socket-timeout-millis: 60000
routes:
auth:
path: /oauth/**
sensitive-headers: null
serviceId: shareprog-auth
strip-prefix: false
add-host-header: true
一般来说,配置是这样的:
zuul:
routes:
auth:
path: /oauth/**
sensitive-headers: null
serviceId: shareprog-auth
strip-prefix: false
与简单路由类似, serviceId 也可以被省略。当省略时,将会使用 routeld 作为 serviceld,下面的配置片断,效果等同于上面的配置:
zuul:
routes:
shareprog-auth:
path: /oauth/**
sensitive-headers: null
strip-prefix: false
需要注意的是,如果提供的时配置项不是简单路由格式(不以 http:或 https:开头),也不是跳转路由格式( forward:开头),那么将会执行 Ribbon 路由过滤器,将url看作serviceId。下面的配置片断,效果也等同于前面的配置:
zuul:
routes:
auth:
path: /oauth/**
sensitive-headers: null
strip-prefix: false
url: shareprog-auth
这个时候可以发现shareprog-auth是对应的。但是有时候我们需要通过数据库动态加载,那么就得做一些代码上的配置了。
创建一个自定义路由表结构,DDL如下:
-- ----------------------------
-- 网关路由
-- ----------------------------
DROP TABLE IF EXISTS `zuul_route`;
CREATE TABLE `zuul_route` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '路由的路径表达式,例如/foo/**',
`service_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '映射到此路由的服务ID',
`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由映射到完整的URL',
`strip_prefix` tinyint(1) NOT NULL DEFAULT 1 COMMENT '转发之前是否应删除此路由的前缀',
`retryable` tinyint(1) NULL DEFAULT 0 COMMENT '路由转发是否重试,默认false',
`sensitive_headers` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '转发请求的敏感头的列表(Cookie,Set-Cookie,Authorization)',
`custom_sensitive_headers` tinyint(1) NULL DEFAULT 0 COMMENT '是否自定义敏感头,默认false',
`enabled` tinyint(1) NULL DEFAULT 1 COMMENT '是否启用',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '网关路由配置' ROW_FORMAT = Compact;
代码如下:
package com.shareprog.zuul.config.zuul;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.cloud.netflix.zuul.filters.RefreshableRouteLocator;
import org.springframework.cloud.netflix.zuul.filters.SimpleRouteLocator;
import org.springframework.cloud.netflix.zuul.filters.ZuulProperties;
import org.springframework.cloud.netflix.zuul.filters.ZuulProperties.ZuulRoute;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import com.shareprog.common.core.constants.MarkConstant;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName: ZuulRouteLocator
* @Description: 自定义动态路由
* @author cl
* @date 2021年1月20日
*/
@Slf4j
@Component
public class ZuulRouteLocator extends SimpleRouteLocator implements RefreshableRouteLocator {
protected static final String FIND_ZUUL_STATEMENT = "SELECT id, path, service_id, url, strip_prefix, retryable, sensitive_headers, custom_sensitive_headers"
+ " FROM zuul_route WHERE enabled = true";
private final ZuulProperties properties;
private final JdbcTemplate jdbcTemplate;
public ZuulRouteLocator(JdbcTemplate jdbcTemplate, ServerProperties serverProperties,
ZuulProperties properties) {
super(serverProperties.getServlet().getContextPath(), properties);
this.properties = properties;
this.jdbcTemplate = jdbcTemplate;
log.info("servletPath:{}", serverProperties.getServlet().getContextPath());
}
@Override
public void refresh() {
doRefresh();
}
@Override
protected Map<String, ZuulRoute> locateRoutes() {
LinkedHashMap<String, ZuulRoute> routesMap = new LinkedHashMap<>();
//从application.yml中加载路由信息
routesMap.putAll(super.locateRoutes());
//从数据库加载路由信息
routesMap.putAll(locateRoutesFromDB());
//优化一下配置
LinkedHashMap<String, ZuulProperties.ZuulRoute> values = new LinkedHashMap<>();
for (Map.Entry<String, ZuulProperties.ZuulRoute> entry : routesMap.entrySet()) {
String path = entry.getKey();
// Prepend with slash if not already present.
if (!path.startsWith(MarkConstant.FORWARD_SLASH)) {
path = MarkConstant.FORWARD_SLASH + path;
}
if (StringUtils.isBlank(this.properties.getPrefix())) {
path = this.properties.getPrefix() + path;
if (!path.startsWith(MarkConstant.FORWARD_SLASH)) {
path = MarkConstant.FORWARD_SLASH + path;
}
}
values.put(path, entry.getValue());
}
return values;
}
/**
* @Title: locateRoutesFromDB
* @Description: 从数据库导入网关路由
* @return Map<String, ZuulRoute>
*/
private Map<String, ZuulRoute> locateRoutesFromDB() {
LinkedHashMap<String, ZuulRoute> routes = new LinkedHashMap<>();
List<ZuulRoute> results = jdbcTemplate.query(FIND_ZUUL_STATEMENT,
new BeanPropertyRowMapper<ZuulRoute>(ZuulRoute.class));
for (ZuulRoute zuulRoute : results) {
if ((StringUtils.isBlank(zuulRoute.getServiceId()) && StringUtils.isBlank(zuulRoute.getUrl()))
|| StringUtils.isBlank(zuulRoute.getPath())) {
continue;
}
routes.put(zuulRoute.getPath(), zuulRoute);
}
return routes;
}
}
还要配置监听器:
package com.shareprog.zuul.config.zuul;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.cloud.client.discovery.event.HeartbeatMonitor;
import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.client.discovery.event.ParentHeartbeatEvent;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.cloud.netflix.zuul.RoutesRefreshedEvent;
import org.springframework.cloud.netflix.zuul.web.ZuulHandlerMapping;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
public class ZuulRefreshListener implements ApplicationListener<ApplicationEvent> {
@Autowired
private ZuulHandlerMapping zuulHandlerMapping;
private HeartbeatMonitor heartbeatMonitor;
private ZuulRefreshListener() {
this.heartbeatMonitor = new HeartbeatMonitor();
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent || event instanceof RefreshScopeRefreshedEvent
|| event instanceof RoutesRefreshedEvent || event instanceof InstanceRegisteredEvent) {
/**
* 原来代码 this.reset();
*/
if ((event instanceof ContextRefreshedEvent) || (event instanceof RefreshScopeRefreshedEvent)
|| (event instanceof RoutesRefreshedEvent)) {
if (event instanceof ContextRefreshedEvent) {
ContextRefreshedEvent contextRefreshedEvent = (ContextRefreshedEvent) event;
ApplicationContext context = contextRefreshedEvent.getApplicationContext();
String eventClassName = context.getClass().getName();
/**
* 为了服务启动只执行一次从数据库里面获取路由信息,这儿进行判断
*/
if (eventClassName
.equals("org.springframework.context.annotation.AnnotationConfigApplicationContext")) {
this.reset();
}
} else {
this.reset();
}
}
} else if (event instanceof ParentHeartbeatEvent) {
ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;
resetIfNeeded(e.getValue());
} else if (event instanceof HeartbeatEvent) {
HeartbeatEvent e = (HeartbeatEvent) event;
resetIfNeeded(e.getValue());
}
}
private void resetIfNeeded(Object value) {
/**
* 发送监控心态信息接收到注册服务中心的数据后,只更新心态的相关信息,不再从新load整个路由
* 原来是从新load路由信息,可以把新注册的服务都动态load进来。 现在要求新的服务的路由在数据库里面配置。
*
* 否则的话每30秒发送心态检测,就会更新一次路由信息,没有必要
*
* 原来代码 if (this.heartbeatMonitor.update(value)) { this.reset(); }
*/
if (!this.heartbeatMonitor.update(value)) {
return;
}
}
private void reset() {
this.zuulHandlerMapping.setDirty(true);
}
}
同时,我们可以通过api请求实时刷新网关的路由。
Controller层
package com.shareprog.zuul.controller;
import org.springframework.cloud.netflix.zuul.web.ZuulHandlerMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.shareprog.common.core.domain.ResponseResult;
import com.shareprog.zuul.service.RefreshRouteService;
import lombok.AllArgsConstructor;
/**
* @ClassName: RefreshController
* @Description: 网关路由刷新控制层
* @author cl
* @date 2021年1月18日
*/
@RestController
@AllArgsConstructor
public class RefreshController {
private final RefreshRouteService refreshRouteService;
private final ZuulHandlerMapping zuulHandlerMapping;
@SuppressWarnings("rawtypes")
@GetMapping("/refreshRoute")
public ResponseResult refresh() {
refreshRouteService.refreshRoute();
return ResponseResult.success();
}
/**
*
* @Title: watchRoute
* @Description: debugger模式
* @param 参数
* @return ResponseResult 返回类型
*/
@SuppressWarnings("rawtypes")
@GetMapping("/watchRoute")
public ResponseResult watchRoute() {
return ResponseResult.success(zuulHandlerMapping.getHandlerMap());
}
}
Service层
package com.shareprog.zuul.service;
import org.springframework.cloud.netflix.zuul.RoutesRefreshedEvent;
import org.springframework.cloud.netflix.zuul.filters.RouteLocator;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.jdbc.core.JdbcTemplate;
import com.shareprog.zuul.config.zuul.RibbonApplicationContextInit;
import lombok.AllArgsConstructor;
@Service
@AllArgsConstructor
public class RefreshRouteService {
private final ApplicationEventPublisher publisher;
private final RouteLocator routeLocator;
private final RibbonApplicationContextInit ribbonApplicationContextInit;
private final JdbcTemplate jdbcTemplate;
public void refreshRoute() {
publisher.publishEvent(new RoutesRefreshedEvent(routeLocator));
ribbonApplicationContextInit.initialize(jdbcTemplate);
}
}
到这里结束,一切都没有问题。
假如又有新的需求或者架构原因,我们不能通过Eureka进行识别负载均衡的服务,我们又该如何做呢?
在application-dev.yml中添加:
shareprog-auth:
ribbon:
NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerList
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.AvailabilityFilteringRule
listOfServers: 127.0.0.1:8004
ConnectTimeout: 6000
ReadTimeout: 6000
MaxTotalConnections: 500
MaxConnectionsPerHost: 100
上面还不行,还必须在application.yml中添加中断ribbon和Eureka的配置。
ribbon.eureka.enabled: false #禁用ribbon和eureka的关联
但是如果又需要从数据库动态加载呢?
添加代码如下:
package com.shareprog.zuul.config.zuul;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.configuration.AbstractConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.netflix.ribbon.RibbonApplicationContextInitializer;
import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
import org.springframework.cloud.netflix.zuul.filters.ZuulProperties.ZuulRoute;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import com.netflix.client.ClientException;
import com.netflix.config.ConfigurationManager;
/**
* @ClassName: RibbonApplicationContextInit
* @Description: 将网关中的serviceId作为ribbon客户端注入到上下文
* @author cl
* @date 2021年1月18日
*/
@Component
public class RibbonApplicationContextInit extends
RibbonApplicationContextInitializer {
private static final String FIND_RIBBON_STATEMENT = "SELECT * FROM ribbon WHERE service_id = ?";
public static final boolean DEFAULT_GZIP_PAYLOAD = true;
public static final String RIBBON_PREFIX = "ribbon";
public RibbonApplicationContextInit(SpringClientFactory springClientFactory,
JdbcTemplate jdbcTemplate) {
super(springClientFactory, getServiceIdsFromZuulDB(jdbcTemplate));
}
/**
* @Title: getServiceIdsFromZuulDB
* @Description: 从数据库网关路由表中获取serviceId
* @param jdbcTemplate
* @return
*/
private static List<String> getServiceIdsFromZuulDB(JdbcTemplate jdbcTemplate) {
List<ZuulRoute> results = jdbcTemplate.query(ZuulRouteLocator.FIND_ZUUL_STATEMENT,
new BeanPropertyRowMapper<ZuulRoute>(ZuulRoute.class));
List<String> serviceIds = new ArrayList<>();
for (ZuulRoute result : results) {
if (StringUtils.isNotBlank(result.getPath()) && StringUtils.isNotBlank(result.getServiceId())) {
serviceIds.add(result.getServiceId());
register(jdbcTemplate, result.getServiceId());
}
}
return serviceIds;
}
/**
*
* @Title: register
* @Description: 注册所有ribbon的服务
* @param jdbcTemplate
* @param serviceId
* @throws ClientException
*/
private static void register(JdbcTemplate jdbcTemplate, String serviceId) {
List<Map<String, Object>> queryForList = jdbcTemplate.queryForList(FIND_RIBBON_STATEMENT, serviceId);
if (queryForList.isEmpty()) {
return;
}
setRibbonProp(serviceId, queryForList);
}
/**
* TODO switch可以使用驼峰转换
* @param serviceId
* @param queryForList
*/
private static void setRibbonProp(String serviceId, List<Map<String, Object>> queryForList) {
AbstractConfiguration configInstance = ConfigurationManager.getConfigInstance();
Map<String, Object> queryForMap = queryForList.get(0);
for (Entry<String, Object> entry : queryForMap.entrySet()) {
if ("id".equals(entry.getKey()) || "service_id".equals(entry.getKey())) {
continue;
}
switch (entry.getKey()) {
case "max_total_connections":
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "MaxTotalConnections", entry.getValue());
break;
case "connect_timeout":
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "ConnectTimeout", entry.getValue());
break;
case "max_connections_per_host":
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "MaxConnectionsPerHost", entry.getValue());
break;
case "list_of_servers":
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "listOfServers", entry.getValue());
break;
case "niws_server_list_class_name":
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "NIWSServerListClassName", entry.getValue());
break;
case "read_timeout":
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "ReadTimeout", entry.getValue());
break;
case "nf_load_balancer_rule_class_name":
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "NFLoadBalancerRuleClassName", entry.getValue());
break;
default:
break;
}
configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + entry.getKey(), entry.getValue());
}
}
}
这时候我们看到,是没有监听器的,所以只是从项目的一开始加载进去的,不算是动态加载,关于这一点,还有待研究。
在zuul预加载Ribbon
调用集群服务时,会使用 Ribbon的客户端。默认情况下,客户端相关的 Bean 会延迟加载,在第一次调用集群服务时,才会初始化这些对象。
如果想提前加载 Ribbon客户端,可以在配置文件中进行以下配置:
zuul:
ribbon:
eager-load:
enabled: true
IP_HASH负载均衡
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
@Slf4j
public class IpHashRule extends AbstractLoadBalancerRule {
public IpHashRule() {}
public IpHashRule(ILoadBalancer lb) {
this.setLoadBalancer(lb);
}
@Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {}
@Override
public Server choose(Object o) {
return this.choose(this.getLoadBalancer(), o);
}
public Server choose(ILoadBalancer lb, Object o) {
if (lb == null) {
log.warn("No load balancer!");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if (upCount != 0 && serverCount != 0) {
server = allServers.get(this.ipHashIndex(serverCount));
log.info(">>> IP_Hash 策略预选[{}]!", server);
if (server == null)
Thread.yield();
// 这里除了判断服务是否存活以及是否可用外,我还额外判断了当前服务是否存在于可用服务列表中
else if (server.isAlive() && server.isReadyToServe() && reachableServers.contains(server)) {
log.info("IP_Hash 策略选择[{}]服务!<<<", server);
return server;
} else
server = null;
continue;
}
log.warn("No up servers available from load balancer: {}", lb);
return null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: {}", lb);
}
return server;
}
private int ipHashIndex(int serverCount){
String userIp = getRemoteAddr();
int hashCode = Math.abs(userIp.hashCode());
return hashCode % serverCount;
}
private String getRemoteAddr() {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
// 两种获取Request的方式,这里容易出现 null的情况,第二步是关键
// HttpServletRequest request = RequestContext.getCurrentContext().getRequest();
String remoteAddr = request.getRemoteAddr();
if (request.getHeader("X-FORWARDED-FOR") != null) {
remoteAddr = request.getHeader("X-FORWARDED-FOR");
}
log.debug("RemoteAddr: {}", remoteAddr);
return remoteAddr;
}
}
上面getRemoteAddr获取的request可能为null,所以需要处理一下。
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.Callable;
@Slf4j
@Component // 注册到Spring容器
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
/**
* 注册并发策略
* {@link HystrixSecurityAutoConfiguration#init()}
*/
public RequestAttributeHystrixConcurrencyStrategy() {
// 通过HystrixPlugins注册当前扩展的HystrixConcurrencyStrategy实现
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
}
/*此方法应写入配置类中 - 上面的构造方法作用与此方法相同
@PostConstruct
public void init() {
HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestAttributeHystrixConcurrencyStrategy());
}*/
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
// 切换线程后,将父线程中上下文信息,记录到子线程任务
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<>(callable, requestAttributes);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
// 切换线程后,将父线程中上下文信息,记录到子线程任务
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
} finally {
// 任务执行完成后,清空线程本地缓存
RequestContextHolder.resetRequestAttributes();
}
}
}
}
当实现了request数据的传递,上述中的IpHashRule不写也可以。