分享程序网
首页
  • java
微服务
微前端
环境搭建
数据库
设计模式
算法
软件
解决问题
链接
首页
  • java
微服务
微前端
环境搭建
数据库
设计模式
算法
软件
解决问题
链接
  • 微服务

    • 介绍
  • 微服务搭建

    • 初步搭建
  • 服务发现

    • Eureka
    • nacos
  • 网关

    • zuul
    • 网关配置
    • 过滤器
    • 动态加载
  • 认证(Oauth)

    • 初始化项目
    • Oauth2配置
    • 对外接口
  • 通用服务

    • 通用功能父模块
    • redis
  • 任务调度

    • 任务调度服务
    • xxl-job示例
  • 业务服务

    • 业务设计

动态加载

之前如果我们已经完成了OAuth的项目,这时候我们就可以用网关进行代理。

在application.yml中添加:

zuul:
  ignored-services: '*'
  host:
    connect-timeout-millis: 15000
    socket-timeout-millis: 60000
  routes:
    auth:
      path: /oauth/**
      sensitive-headers: null
      serviceId: shareprog-auth
      strip-prefix: false
  add-host-header: true

一般来说,配置是这样的:

zuul:
  routes:
    auth:
      path: /oauth/**
      sensitive-headers: null
      serviceId: shareprog-auth
      strip-prefix: false

与简单路由类似, serviceId 也可以被省略。当省略时,将会使用 routeld 作为 serviceld,下面的配置片断,效果等同于上面的配置:

zuul:
  routes:
    shareprog-auth:
      path: /oauth/**
      sensitive-headers: null
      strip-prefix: false

需要注意的是,如果提供的时配置项不是简单路由格式(不以 http:或 https:开头),也不是跳转路由格式( forward:开头),那么将会执行 Ribbon 路由过滤器,将url看作serviceId。下面的配置片断,效果也等同于前面的配置:

zuul:
  routes:
    auth:
      path: /oauth/**
      sensitive-headers: null
      strip-prefix: false
      url: shareprog-auth

这个时候可以发现shareprog-auth是对应的。但是有时候我们需要通过数据库动态加载,那么就得做一些代码上的配置了。

创建一个自定义路由表结构,DDL如下:

-- ----------------------------
-- 网关路由
-- ----------------------------
DROP TABLE IF EXISTS `zuul_route`;
CREATE TABLE `zuul_route`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '路由的路径表达式,例如/foo/**',
  `service_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '映射到此路由的服务ID',
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由映射到完整的URL',
  `strip_prefix` tinyint(1) NOT NULL DEFAULT 1 COMMENT '转发之前是否应删除此路由的前缀',
  `retryable` tinyint(1) NULL DEFAULT 0 COMMENT '路由转发是否重试,默认false',
  `sensitive_headers` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '转发请求的敏感头的列表(Cookie,Set-Cookie,Authorization)',
  `custom_sensitive_headers` tinyint(1) NULL DEFAULT 0 COMMENT '是否自定义敏感头,默认false',
  `enabled` tinyint(1) NULL DEFAULT 1 COMMENT '是否启用',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '网关路由配置' ROW_FORMAT = Compact;

代码如下:

package com.shareprog.zuul.config.zuul;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.cloud.netflix.zuul.filters.RefreshableRouteLocator;
import org.springframework.cloud.netflix.zuul.filters.SimpleRouteLocator;
import org.springframework.cloud.netflix.zuul.filters.ZuulProperties;
import org.springframework.cloud.netflix.zuul.filters.ZuulProperties.ZuulRoute;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import com.shareprog.common.core.constants.MarkConstant;

import lombok.extern.slf4j.Slf4j;

/**
 * @ClassName: ZuulRouteLocator
 * @Description: 自定义动态路由
 * @author cl
 * @date 2021年1月20日
 */
@Slf4j
@Component
public class ZuulRouteLocator extends SimpleRouteLocator implements RefreshableRouteLocator {
	
	protected static final String FIND_ZUUL_STATEMENT = "SELECT id, path, service_id, url, strip_prefix, retryable, sensitive_headers, custom_sensitive_headers"
			+ " FROM zuul_route WHERE enabled = true";
	
	private final ZuulProperties properties;
	
	private final JdbcTemplate jdbcTemplate;
	
	public ZuulRouteLocator(JdbcTemplate jdbcTemplate, ServerProperties serverProperties,
			ZuulProperties properties) {
		super(serverProperties.getServlet().getContextPath(), properties);
		this.properties = properties;
		this.jdbcTemplate = jdbcTemplate;
		log.info("servletPath:{}", serverProperties.getServlet().getContextPath());
	}

	@Override
	public void refresh() {
		doRefresh();
	}

	@Override
	protected Map<String, ZuulRoute> locateRoutes() {
		LinkedHashMap<String, ZuulRoute> routesMap = new LinkedHashMap<>();
		//从application.yml中加载路由信息
		routesMap.putAll(super.locateRoutes());
		//从数据库加载路由信息
		routesMap.putAll(locateRoutesFromDB());
		//优化一下配置
		LinkedHashMap<String, ZuulProperties.ZuulRoute> values = new LinkedHashMap<>();
		for (Map.Entry<String, ZuulProperties.ZuulRoute> entry : routesMap.entrySet()) {
			String path = entry.getKey();
			// Prepend with slash if not already present.
			if (!path.startsWith(MarkConstant.FORWARD_SLASH)) {
				path = MarkConstant.FORWARD_SLASH + path;
			}
			if (StringUtils.isBlank(this.properties.getPrefix())) {
				path = this.properties.getPrefix() + path;
				if (!path.startsWith(MarkConstant.FORWARD_SLASH)) {
					path = MarkConstant.FORWARD_SLASH + path;
				}
			}
			values.put(path, entry.getValue());
		}
		return values;
	}

	/**
	 * @Title: locateRoutesFromDB
	 * @Description: 从数据库导入网关路由
	 * @return Map<String, ZuulRoute>
	 */
	private Map<String, ZuulRoute> locateRoutesFromDB() {
		LinkedHashMap<String, ZuulRoute> routes = new LinkedHashMap<>();
		List<ZuulRoute> results = jdbcTemplate.query(FIND_ZUUL_STATEMENT, 
				new BeanPropertyRowMapper<ZuulRoute>(ZuulRoute.class));
		for (ZuulRoute zuulRoute : results) {
			if ((StringUtils.isBlank(zuulRoute.getServiceId()) && StringUtils.isBlank(zuulRoute.getUrl()))
					|| StringUtils.isBlank(zuulRoute.getPath())) {
				continue;
			}
			routes.put(zuulRoute.getPath(), zuulRoute);
		}
		return routes;
	}

}

还要配置监听器:

package com.shareprog.zuul.config.zuul;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.cloud.client.discovery.event.HeartbeatMonitor;
import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.client.discovery.event.ParentHeartbeatEvent;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.cloud.netflix.zuul.RoutesRefreshedEvent;
import org.springframework.cloud.netflix.zuul.web.ZuulHandlerMapping;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

public class ZuulRefreshListener implements ApplicationListener<ApplicationEvent> {
	
	@Autowired
	private ZuulHandlerMapping zuulHandlerMapping;
	private HeartbeatMonitor heartbeatMonitor;

	private ZuulRefreshListener() {
		this.heartbeatMonitor = new HeartbeatMonitor();
	}

	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		if (event instanceof ContextRefreshedEvent || event instanceof RefreshScopeRefreshedEvent
				|| event instanceof RoutesRefreshedEvent || event instanceof InstanceRegisteredEvent) {
			/**
			 * 原来代码 this.reset();
			 */
			if ((event instanceof ContextRefreshedEvent) || (event instanceof RefreshScopeRefreshedEvent)
					|| (event instanceof RoutesRefreshedEvent)) {
				if (event instanceof ContextRefreshedEvent) {
					ContextRefreshedEvent contextRefreshedEvent = (ContextRefreshedEvent) event;
					ApplicationContext context = contextRefreshedEvent.getApplicationContext();
					String eventClassName = context.getClass().getName();
					/**
					 * 为了服务启动只执行一次从数据库里面获取路由信息,这儿进行判断
					 */
					if (eventClassName
							.equals("org.springframework.context.annotation.AnnotationConfigApplicationContext")) {
						this.reset();
					}
				} else {
					this.reset();
				}
			}
		} else if (event instanceof ParentHeartbeatEvent) {
			ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;
			resetIfNeeded(e.getValue());
		} else if (event instanceof HeartbeatEvent) {
			HeartbeatEvent e = (HeartbeatEvent) event;
			resetIfNeeded(e.getValue());
		}
	}

	private void resetIfNeeded(Object value) {
		/**
		 * 发送监控心态信息接收到注册服务中心的数据后,只更新心态的相关信息,不再从新load整个路由
		 * 原来是从新load路由信息,可以把新注册的服务都动态load进来。 现在要求新的服务的路由在数据库里面配置。
		 *
		 * 否则的话每30秒发送心态检测,就会更新一次路由信息,没有必要
		 *
		 * 原来代码 if (this.heartbeatMonitor.update(value)) { this.reset(); }
		 */
		if (!this.heartbeatMonitor.update(value)) {
			return;
		}
	}

	private void reset() {
		this.zuulHandlerMapping.setDirty(true);
	}
}

同时,我们可以通过api请求实时刷新网关的路由。

Controller层

package com.shareprog.zuul.controller;

import org.springframework.cloud.netflix.zuul.web.ZuulHandlerMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.shareprog.common.core.domain.ResponseResult;
import com.shareprog.zuul.service.RefreshRouteService;

import lombok.AllArgsConstructor;

/**
 * @ClassName: RefreshController
 * @Description: 网关路由刷新控制层
 * @author cl
 * @date 2021年1月18日
 */
@RestController
@AllArgsConstructor
public class RefreshController {

	private final RefreshRouteService refreshRouteService;
	private final ZuulHandlerMapping zuulHandlerMapping;
	
	@SuppressWarnings("rawtypes")
	@GetMapping("/refreshRoute")
	public ResponseResult refresh() {
		refreshRouteService.refreshRoute();
		return ResponseResult.success();
	}

	/**
	 * 
	 * @Title: watchRoute
	 * @Description: debugger模式
	 * @param 参数
	 * @return ResponseResult 返回类型
	 */
	@SuppressWarnings("rawtypes")
	@GetMapping("/watchRoute")
	public ResponseResult watchRoute() {
		return ResponseResult.success(zuulHandlerMapping.getHandlerMap());
	}
	
}

Service层

package com.shareprog.zuul.service;

import org.springframework.cloud.netflix.zuul.RoutesRefreshedEvent;
import org.springframework.cloud.netflix.zuul.filters.RouteLocator;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.jdbc.core.JdbcTemplate;
import com.shareprog.zuul.config.zuul.RibbonApplicationContextInit;

import lombok.AllArgsConstructor;

@Service
@AllArgsConstructor
public class RefreshRouteService {

	private final ApplicationEventPublisher publisher;
	private final RouteLocator routeLocator;
    private final RibbonApplicationContextInit ribbonApplicationContextInit;
    private final JdbcTemplate jdbcTemplate;
	
	public void refreshRoute() {
		publisher.publishEvent(new RoutesRefreshedEvent(routeLocator));
        ribbonApplicationContextInit.initialize(jdbcTemplate);
	}
}

到这里结束,一切都没有问题。

假如又有新的需求或者架构原因,我们不能通过Eureka进行识别负载均衡的服务,我们又该如何做呢?

在application-dev.yml中添加:

shareprog-auth: 
  ribbon: 
    NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerList
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.AvailabilityFilteringRule
    listOfServers: 127.0.0.1:8004
    ConnectTimeout: 6000
    ReadTimeout: 6000
    MaxTotalConnections: 500
    MaxConnectionsPerHost: 100

上面还不行,还必须在application.yml中添加中断ribbon和Eureka的配置。

ribbon.eureka.enabled: false  #禁用ribbon和eureka的关联

但是如果又需要从数据库动态加载呢?

添加代码如下:

package com.shareprog.zuul.config.zuul;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.configuration.AbstractConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.netflix.ribbon.RibbonApplicationContextInitializer;
import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
import org.springframework.cloud.netflix.zuul.filters.ZuulProperties.ZuulRoute;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import com.netflix.client.ClientException;
import com.netflix.config.ConfigurationManager;

/**
 * @ClassName: RibbonApplicationContextInit
 * @Description: 将网关中的serviceId作为ribbon客户端注入到上下文
 * @author cl
 * @date 2021年1月18日
 */
@Component
public class RibbonApplicationContextInit extends
		RibbonApplicationContextInitializer {
	
	private static final String FIND_RIBBON_STATEMENT = "SELECT * FROM ribbon WHERE service_id = ?";

	public static final boolean DEFAULT_GZIP_PAYLOAD = true;
	
	public static final String RIBBON_PREFIX = "ribbon";
	
	public RibbonApplicationContextInit(SpringClientFactory springClientFactory,
			JdbcTemplate jdbcTemplate) {
		super(springClientFactory, getServiceIdsFromZuulDB(jdbcTemplate));
	}

	/**
	 * @Title: getServiceIdsFromZuulDB
	 * @Description: 从数据库网关路由表中获取serviceId
	 * @param jdbcTemplate
	 * @return
	 */
	private static List<String> getServiceIdsFromZuulDB(JdbcTemplate jdbcTemplate) {
		List<ZuulRoute> results = jdbcTemplate.query(ZuulRouteLocator.FIND_ZUUL_STATEMENT, 
				new BeanPropertyRowMapper<ZuulRoute>(ZuulRoute.class));
		List<String> serviceIds = new ArrayList<>();
		for (ZuulRoute result : results) {
			if (StringUtils.isNotBlank(result.getPath()) && StringUtils.isNotBlank(result.getServiceId())) {
				serviceIds.add(result.getServiceId());
				register(jdbcTemplate, result.getServiceId());
			}
		}
		return serviceIds;
	}
	
	/**
	 * 
	 * @Title: register
	 * @Description: 注册所有ribbon的服务
	 * @param jdbcTemplate
	 * @param serviceId
	 * @throws ClientException
	 */
	private static void register(JdbcTemplate jdbcTemplate, String serviceId) {
		List<Map<String, Object>> queryForList = jdbcTemplate.queryForList(FIND_RIBBON_STATEMENT, serviceId);
		if (queryForList.isEmpty()) {
			return;
		}
		setRibbonProp(serviceId, queryForList);
	}

	/**
	 * TODO switch可以使用驼峰转换
	 * @param serviceId
	 * @param queryForList
	 */
	private static void setRibbonProp(String serviceId, List<Map<String, Object>> queryForList) {
		AbstractConfiguration configInstance = ConfigurationManager.getConfigInstance();
		Map<String, Object> queryForMap = queryForList.get(0);
		for (Entry<String, Object> entry : queryForMap.entrySet()) {
			if ("id".equals(entry.getKey()) || "service_id".equals(entry.getKey())) {
				continue;
			}
			switch (entry.getKey()) {
			case "max_total_connections":
				configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "MaxTotalConnections", entry.getValue());
				break;
			case "connect_timeout":
				configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "ConnectTimeout", entry.getValue());
				break;
			case "max_connections_per_host":
				configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "MaxConnectionsPerHost", entry.getValue());
				break;
			case "list_of_servers":
				configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "listOfServers", entry.getValue());
				break;
			case "niws_server_list_class_name":
				configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "NIWSServerListClassName", entry.getValue());
				break;
			case "read_timeout":
				configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "ReadTimeout", entry.getValue());
				break;
			case "nf_load_balancer_rule_class_name":
				configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + "NFLoadBalancerRuleClassName", entry.getValue());
				break;
			default:
				break;
			}
			configInstance.setProperty(serviceId + "." + RIBBON_PREFIX + "." + entry.getKey(), entry.getValue());
		}
	}

}

这时候我们看到,是没有监听器的,所以只是从项目的一开始加载进去的,不算是动态加载,关于这一点,还有待研究。

在zuul预加载Ribbon

调用集群服务时,会使用 Ribbon的客户端。默认情况下,客户端相关的 Bean 会延迟加载,在第一次调用集群服务时,才会初始化这些对象。

如果想提前加载 Ribbon客户端,可以在配置文件中进行以下配置:

zuul:
  ribbon:
    eager-load:
      enabled: true

IP_HASH负载均衡

import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.util.List;

@Slf4j
public class IpHashRule extends AbstractLoadBalancerRule {

    public IpHashRule() {}
    
    public IpHashRule(ILoadBalancer lb) {
        this.setLoadBalancer(lb);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig iClientConfig) {}

    @Override
    public Server choose(Object o) {
        return this.choose(this.getLoadBalancer(), o);
    }

    public Server choose(ILoadBalancer lb, Object o) {
        if (lb == null) {
            log.warn("No load balancer!");
            return null;
        }
        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if (upCount != 0 && serverCount != 0) {
                server = allServers.get(this.ipHashIndex(serverCount));
                log.info(">>> IP_Hash 策略预选[{}]!", server);
                if (server == null)
                    Thread.yield();
                // 这里除了判断服务是否存活以及是否可用外,我还额外判断了当前服务是否存在于可用服务列表中
                else if (server.isAlive() && server.isReadyToServe() && reachableServers.contains(server)) {
                    log.info("IP_Hash 策略选择[{}]服务!<<<", server);
                    return server;
                } else
                    server = null;

                continue;
            }
            log.warn("No up servers available from load balancer: {}", lb);
            return null;
        }
        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: {}", lb);
        }
        return server;
    }

    private int ipHashIndex(int serverCount){
        String userIp = getRemoteAddr();
        int hashCode = Math.abs(userIp.hashCode());
        return hashCode % serverCount;
    }

    private String getRemoteAddr() {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        // 两种获取Request的方式,这里容易出现 null的情况,第二步是关键
        // HttpServletRequest request = RequestContext.getCurrentContext().getRequest();
        String remoteAddr = request.getRemoteAddr();
        if (request.getHeader("X-FORWARDED-FOR") != null) {
            remoteAddr = request.getHeader("X-FORWARDED-FOR");
        }
        log.debug("RemoteAddr: {}", remoteAddr);
        return remoteAddr;
    }
}

上面getRemoteAddr获取的request可能为null,所以需要处理一下。

import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.concurrent.Callable;

@Slf4j
@Component // 注册到Spring容器
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    /**
     * 注册并发策略
     * {@link HystrixSecurityAutoConfiguration#init()}
     */
    public RequestAttributeHystrixConcurrencyStrategy() {
        // 通过HystrixPlugins注册当前扩展的HystrixConcurrencyStrategy实现
        HystrixPlugins.reset();
        HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
    }

    /*此方法应写入配置类中 - 上面的构造方法作用与此方法相同
    @PostConstruct
    public void init() {
        HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestAttributeHystrixConcurrencyStrategy());
    }*/

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
    	// 切换线程后,将父线程中上下文信息,记录到子线程任务
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {
            try {
                // 切换线程后,将父线程中上下文信息,记录到子线程任务
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return target.call();
            } finally {
            	// 任务执行完成后,清空线程本地缓存
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

当实现了request数据的传递,上述中的IpHashRule不写也可以。

Last Updated:
Contributors: chengli, clcheng
Prev
过滤器