QConfig配置中心Server端架构设计,有哪些关键点需要注意?

摘要:声明 原创文章,转载请标注。https:www.cnblogs.comboyceleep18055933 配置中心系列文章 《【架构师视角系列】风控场景下的配置中心设计思考》 https:www.cnblogs.comboy
声明 原创文章,转载请标注。https://www.cnblogs.com/boycelee/p/18055933 配置中心系列文章 《【架构师视角系列】风控场景下的配置中心设计思考》 https://www.cnblogs.com/boycelee/p/18355942 《【架构师视角系列】Apollo配置中心之架构设计(一)》https://www.cnblogs.com/boycelee/p/17967590 《【架构师视角系列】Apollo配置中心之Client端(二)》https://www.cnblogs.com/boycelee/p/17978027 《【架构师视角系列】Apollo配置中心之Server端(ConfigSevice)(三)》https://www.cnblogs.com/boycelee/p/18005318 《【架构师视角系列】QConfig配置中心系列之架构设计(一)》https://www.cnblogs.com/boycelee/p/18013653 《【架构师视角系列】QConfig配置中心系列之Client端(二)》https://www.cnblogs.com/boycelee/p/18033286 《【架构师视角系列】QConfig配置中心系列之Server端(三)》https://www.cnblogs.com/boycelee/p/18055933 一、通知与配置拉取 二、设计思考 1、Admin如何通知Server所有实例配置发生变更? 2、Server如何通知Client端配置发生变更? 3、Client如何拉取配置? 三、源码分析 1、Admin配置推送 1.1、主动推送 1.1.1、逻辑描述 QConfig的Server配置发现有两种方式,一种是主动推送,另一种是被动扫描。 主动发现是Admin(管理平台)通过注册中心获取到已经注册的Server实例相关IP与Port信息,然后通过遍历的方式调用Server接口通知实例此时有配置更新。 被动发现是Server实例中自主定时进行数据库扫描,当发现新版本时通知Client端有配置变更。 1.1.2、时序图 1.1.3、代码位置 1.1.3.1、NotifyServiceImpl#notifyPush 当用户在操作平台进行配置修改时,会调用该接口进行配置变更推送,由于需要通知所有已经部署的Servers有配置更新,所以需要从注册中心中获取到对应的Host信息,然后通过遍历的方式进行配置推送。 @Service public class NotifyServiceImpl implements NotifyService, InitializingBean { /** * 管理平台操作,配置变更通知 */ @Override public void notifyPush(final ConfigMeta meta, final long version, List<PushItemWithHostName> destinations) { // 从注册中心(Eureka)获取Server实例的Hosts信息 List<String> serverUrls = getServerUrls(); if (serverUrls.isEmpty()) { logger.warn("notify push server, {}, version: {}, but no server, {}", meta, version, destinations); return; } // Server中接收变更推送的接口URL String uri = this.notifyPushUrl; logger.info("notify push server, {}, version: {}, uri: {}, servers: {}, {}", meta, version, uri, serverUrls, destinations); StringBuilder sb = new StringBuilder(); for (PushItemWithHostName item : destinations) { sb.append(item.getHostname()).append(',') .append(item.getIp()).append(',') .append(item.getPort()).append(Constants.LINE); } final String destinationsStr = sb.toString(); // 根据已注册Server的Host列表,配置信息、配置版本等信息,执行通知推送动作 doNotify(serverUrls, uri, "push", new Function<String, Request>() { @Override public Request apply(String url) { AsyncHttpClient.BoundRequestBuilder builder = getBoundRequestBuilder(url, meta, version, destinationsStr); return builder.build(); } }); } /** * 获取注册中心中已注册的Server Hosts信息 */ private List<String> getServerUrls() { return serverListService.getOnlineServerHosts(); } private void doNotify(List<String> serverUrls, String uri, String type, Function<String, Request> requestBuilder) { List<ListenableFuture<Response>> futures = Lists.newArrayListWithCapacity(serverUrls.size()); for (String oneServer : serverUrls) { String url = "http://" + oneServer + "/" + uri; Request request = requestBuilder.apply(url); ListenableFuture<Response> future = HttpListenableFuture.wrap(httpClient.executeRequest(request)); futures.add(future); } dealResult(futures, serverUrls, type); } } 1.1.3.2、LongPollingStoreImpl#manualPush @Service public class LongPollingStoreImpl implements LongPollingStore { private static final ConcurrentMap<ConfigMeta, Cache<Listener, Listener>> listenerMappings = Maps.newConcurrentMap(); private static final int DEFAULT_THREAD_COUNT = 4; private static final long DEFAULT_TIMEOUT = 60 * 1000L; private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool( DEFAULT_THREAD_COUNT, new NamedThreadFactory("qconfig-config-listener-push")); private static ExecutorService onChangeExecutor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("config-on-change")); @Override public void manualPush(ConfigMeta meta, long version, final Set<IpAndPort> ipAndPorts) { logger.info("push client file: {}, version {}, {}", meta, version, ipAndPorts); Set<String> ips = Sets.newHashSetWithExpectedSize(ipAndPorts.size()); for (IpAndPort ipAndPort : ipAndPorts) { ips.add(ipAndPort.getIp()); } manualPushIps(meta, version, ips); } @Override public void manualPushIps(ConfigMeta meta, long version, final Set<String> ips) { logger.info("push client file: {}, version {}, {}", meta, version, ips); Stopwatch stopwatch = Stopwatch.createStarted(); try { doChange(meta, version, Constants.PULL, new Predicate<Listener>() { @Override public boolean apply(Listener input) { return ips.contains(input.getContextHolder().getIp()); } }); } finally { Monitor.filePushOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS); } } @Override public void onChange(final ConfigMeta meta, final long version) { logger.info("file change: {}, version {}", meta, version); onChangeExecutor.execute(new Runnable() { @Override public void run() { Stopwatch stopwatch = Stopwatch.createStarted(); try { doChange(meta, version, Constants.UPDATE, Predicates.<Listener>alwaysTrue()); } finally { Monitor.fileOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS); } } }); } private void doChange(ConfigMeta meta, long newVersion, String type, Predicate<Listener> needChange) { List<Listener> listeners = getListeners(meta, needChange); if (listeners.isEmpty()) { return; } Changed change = new Changed(meta, newVersion); // 如果没超过直接推送数量,则直接推送 if (listeners.size() <= pushConfig.getDirectPushLimit()) { directDoChange(listeners, change, type); } else { // 如果超过一定数量,则scheduled定时,通过一定节奏来推送,避免惊群 PushItem pushItem = new PushItem(listeners, type, change); scheduledExecutor.execute(new PushRunnable(pushItem)); } } private void directDoChange(List<Listener> listeners, Changed change, String type) { Stopwatch stopwatch = Stopwatch.createStarted(); try { for (Listener listener : listeners) { logger.debug("return {}, {}", listener, change); returnChange(change, listener, type); } } catch (Exception e) { Monitor.batchReturnChangeFailCounter.inc(); logger.error("batch direct return changes error, type {}, change {}", type, change, e); } finally { Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS); } } private static class PushRunnable implements Runnable { private final PushItem pushItem; private PushRunnable(PushItem pushItem) { this.pushItem = pushItem; } @Override public void run() { Stopwatch stopwatch = Stopwatch.createStarted(); try { long start = System.currentTimeMillis(); PushConfig config = pushConfig; int num = Math.min(pushItem.getListeners().size(), config.getPushMax()); for (int i = 0; i < num; ++i) { Listener listener = pushItem.getListeners().poll(); returnChange(pushItem.getChange(), listener, pushItem.getType()); } if (!pushItem.getListeners().isEmpty()) { long elapsed = System.currentTimeMillis() - start; long delay; if (elapsed >= config.getPushInterval()) { delay = 0; } else { delay = config.getPushInterval() - elapsed; } //一次推送后,以这次推送时间为起始时间,延迟一定时间后再次推送。这里的PushRunnable递归执行 scheduledExecutor.schedule(new PushRunnable(pushItem), delay, TimeUnit.MILLISECONDS); } } catch (Exception e) { Monitor.batchReturnChangeFailCounter.inc(); logger.error("batch return changes error, {}", pushItem, e); } finally { Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS); } } } private static void returnChange(Changed change, Listener listener, String type) { Stopwatch stopwatch = Stopwatch.createStarted(); try { // 通知注册的监听器,响应client,返回版本信息 listener.onChange(change, type); } finally { Monitor.returnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS); } } } 1.2、被动推送 1.2.1、逻辑描述 首次启动或启动后每3分钟,刷新一次配置的最新版本,如果出现最新版本,则触发推送逻辑,将配置最新的版本推送至Client端中。 1.2.2、代码位置 1.2.2.1、CacheConfigVersionServiceImpl#freshConfigVersionCache @Service public class CacheConfigVersionServiceImpl implements CacheConfigVersionService { private volatile ConcurrentMap<ConfigMeta, Long> cache = Maps.newConcurrentMap(); /** * 首次启动或启动后每3分钟,刷新一次配置的最新版本 */ @PostConstruct public void init() { freshConfigVersionCache(); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); // 每3分钟执行一次缓存刷新,判断配置是否有最新版本 scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { Thread.currentThread().setName("fresh-config-version-thread"); try { freshConfigVersionCache(); } catch (Throwable e) { logger.error("fresh config version error", e); } } }, 3, 3, TimeUnit.MINUTES); } @Override public Optional<Long> getVersion(ConfigMeta meta) { return Optional.fromNullable(cache.get(meta)); } /** * 定时刷新配置最新版本,如果出现最新版本,则触发推送逻辑 */ private void freshConfigVersionCache() { Stopwatch stopwatch = Stopwatch.createStarted(); try { logger.info("fresh config version cache"); List<VersionData<ConfigMeta>> configIds = configDao.loadAll(); ConcurrentMap<ConfigMeta, Long> newCache = new ConcurrentHashMap<ConfigMeta, Long>(configIds.size()); ConcurrentMap<ConfigMeta, Long> oldCache = this.cache; // 判断是否有最新版本 synchronized (this) { for (VersionData<ConfigMeta> configId : configIds) { long newVersion = configId.getVersion(); Long oldVersion = cache.get(configId.getData()); // 暂时不考虑delete的情况 // 从数据库load数据先于配置更新 if (oldVersion != null && oldVersion > newVersion) { newVersion = oldVersion; } // 如果有最新版本则刷新缓存 newCache.put(configId.getData(), newVersion); } this.cache = newCache; } logger.info("fresh config version cache successOf, count [{}]", configIds.size()); int updates = 0; for (Map.Entry<ConfigMeta, Long> oldEntry : oldCache.entrySet()) { ConfigMeta meta = oldEntry.getKey(); Long oldVersion = oldEntry.getValue(); Long newVersion = newCache.get(meta); if (newVersion != null && newVersion > oldVersion) { updates += 1; // 配置变更,通知Client端 longPollingStore.onChange(meta, newVersion); } } logger.info("fresh size={} config version cache from db", updates); } finally { Monitor.freshConfigVersionCacheTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS); } } } 2、变更监听 2.1.1、逻辑描述 Client端与Server端建立长轮询,长轮询建立完成之后会为当前请求建立一个监听器,当配置发生变变更时就会触发监听器,然后通过监听机制结束长轮询并返回最新的配置版本。如果没有版本变更,长轮询会每分钟断开重新建立一次。 2.1.2、时序图 2.1.3、代码位置 2.1.3.1、AbstractCheckVersionServlet#doPost public abstract class AbstractCheckVersionServlet extends AbstractServlet { private static final long serialVersionUID = -8278568383506314625L; @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { ... checkVersion(requests, req, resp); } } 2.1.3.2、LongPollingCheckServlet#checkVersion public class LongPollingCheckServlet extends AbstractCheckVersionServlet { @Override protected void checkVersion(List<CheckRequest> checkRequests, HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { ... try { // 异步 AsyncContext context = req.startAsync(); // (核心流程,重点关注),执行版本检查(长轮询) getLongPollingProcessService().process(context, checkRequests); } catch (Throwable e) { // never come here !!! logger.error("服务异常", e); } } } 2.1.3.3、LongPollingProcessServiceImpl#process @Service public class LongPollingProcessServiceImpl implements LongPollingProcessService { @PostConstruct public void init() { MapConfig config = MapConfig.get("config.properties"); config.asMap(); // 向config中添加监听器 config.addListener(new Configuration.ConfigListener<Map<String, String>>() { @Override public void onLoad(Map<String, String> conf) { String newTimeout = conf.get("longPolling.server.timeout"); if (!Strings.isNullOrEmpty(newTimeout)) { timeout = Numbers.toLong(newTimeout, DEFAULT_TIMEOUT); } } }); } // 核心逻辑,重点关注 @Override public void process(AsyncContext context, List<CheckRequest> requests) { IpAndPort address = new IpAndPort(clientInfoService.getIp(), clientInfoService.getPort()); AsyncContextHolder contextHolder = new AsyncContextHolder(context, address); // 设置超时 context.setTimeout(timeout); // 设置监听器 context.addListener(new TimeoutServletListener(contextHolder)); processCheckRequests(requests, clientInfoService.getIp(), contextHolder); } private void processCheckRequests(List<CheckRequest> requests, String ip, AsyncContextHolder contextHolder) { CheckResult result = checkService.check(requests, ip, qFileFactory); logger.info("profile:{}, result change list {} for check request {}", clientInfoService.getProfile(), result.getChanges(), requests); if (!result.getChanges().isEmpty()) { returnChanges(AbstractCheckConfigServlet.formatOutput(CheckUtil.processStringCase(result.getChanges())), contextHolder, Constants.UPDATE); return; } // 为该请求注册监听器,并存放至longPollingStore中 addListener(result.getRequestsNoChange(), contextHolder); // 注册client registerOnlineClients(result, contextHolder); } private void addListener(Map<CheckRequest, QFile> requests, AsyncContextHolder contextHolder) { for (Map.Entry<CheckRequest, QFile> noChangeEntry : requests.entrySet()) { CheckRequest request = noChangeEntry.getKey(); QFile qFile = noChangeEntry.getValue(); if (!contextHolder.isComplete()) { // 根据请求创建监听器 Listener listener = qFile.createListener(request, contextHolder); // 将监听器存储至longPollingStore longPollingStore.addListener(listener); } } } private void registerOnlineClients(CheckResult result, AsyncContextHolder contextHolder) { Map<CheckRequest, QFile> noChanges = Maps.newHashMapWithExpectedSize( result.getRequestsNoChange().size() + result.getRequestsLockByFixVersion().size()); noChanges.putAll(result.getRequestsNoChange()); noChanges.putAll(result.getRequestsLockByFixVersion()); for (Map.Entry<CheckRequest, QFile> noChangeEntry : noChanges.entrySet()) { CheckRequest request = noChangeEntry.getKey(); QFile qFile = noChangeEntry.getValue(); if (!contextHolder.isComplete()) { long version = request.getVersion(); ConfigMeta meta = qFile.getRealMeta(); String ip = contextHolder.getIp(); if (qFile instanceof InheritQFileV2) { InheritQFileV2 inheritQFile = (InheritQFileV2) qFile; Optional<Long> optional = inheritQFile.getCacheConfigInfoService().getVersion(inheritQFile.getRealMeta()); version = optional.isPresent() ? optional.get() : version; onlineClientListService.register(inheritQFile.getRealMeta(), ip, version); } else { // 注册client,admin(管理平台)获取已经连接的client信息,其中包括ip、配置版本 onlineClientListService.register(meta, ip, version); } } } } /** * 配置变化,执行返回 */ private void returnChanges(String change, AsyncContextHolder contextHolder, String type) { contextHolder.completeRequest(new ChangeReturnAction(change, type)); } } 2.1.3.4、CheckService#check @Service public class CheckServiceImpl implements CheckService { ... @Override public CheckResult check(List<CheckRequest> requests, String ip, QFileFactory qFileFactory) { List<CheckRequest> requestsNoFile = Lists.newArrayList(); Map<CheckRequest, Changed> changes = Maps.newHashMap(); Map<CheckRequest, QFile> requestNoChange = Maps.newHashMap(); Map<CheckRequest, QFile> requestsLockByFixVersion = Maps.newHashMap(); for (CheckRequest request : requests) { ConfigMeta meta = new ConfigMeta(request.getGroup(), request.getDataId(), request.getProfile()); Optional<QFile> qFileOptional = qFileFactory.create(meta, cacheConfigInfoService); if (!qFileOptional.isPresent()) { requestsNoFile.add(request); continue; } QFile qFile = qFileOptional.get(); // 核心逻辑,检测版本 Optional<Changed> changedOptional = qFile.checkChange(request, ip); if (changedOptional.isPresent()) { Optional<Changed> resultChange = repairChangeWithFixVersion(qFile, request, ip, changedOptional.get()); if (resultChange.isPresent()) { changes.put(request, resultChange.get()); } else { requestsLockByFixVersion.put(request, qFile); } } else { requestNoChange.put(request, qFile); } } return new CheckResult(requestsNoFile, changes, requestNoChange, requestsLockByFixVersion); } } 2.1.3.5、QFileEntityV1#checkChange public class QFileEntityV1 extends AbstractQFileEntity implements QFile { public QFileEntityV1(ConfigMeta meta, CacheConfigInfoService cacheConfigInfoService, ConfigStore configStore, LogService logService, ClientInfoService clientInfoService) { super(meta, cacheConfigInfoService, configStore, logService, clientInfoService); } @Override public Optional<Changed> checkChange(CheckRequest request, String ip) { ConfigMeta meta = getSourceMeta(); // 从缓存中获取配置文件的最新版本 Optional<Long> version = getCacheConfigInfoService().getVersion(meta, ip); if (!version.isPresent()) { return Optional.absent(); } if (version.get() <= request.getVersion()) { return Optional.absent(); } return Optional.of(new Changed(meta.getGroup(), meta.getDataId(), meta.getProfile(), version.get())); } } 2.1.3.6、CacheConfigInfoService#getVersion @Service("cacheConfigInfoService") public class CacheConfigInfoService implements ConfigInfoService { ... @Override public Optional<Long> getVersion(ConfigMeta meta, String ip) { // 获取配置已发布的最新版本 Optional<Long> publishVersion = getVersion(meta); // 获取推送给该IP的配置的最新灰度版本 Optional<Long> pushVersion = getPushVersion(meta, ip); return VersionUtil.getLoadVersion(publishVersion, pushVersion); } } 3、Client配置拉取 3.1.1、逻辑描述 根据长轮询后Client端获取到的配置文件对应的最新版本信息,查询最新的配置数据。查询顺序是先查询缓存,如果查找不到则通过本地文件查找,如果再查不到则查询数据库。这样可以有效缓解数据库压力。 3.1.2、代码位置 3.1.2.1、ConfigStoreImpl#findConfig @Service public class ConfigStoreImpl implements ConfigStore { private LoadingCache<VersionData<ConfigMeta>, ChecksumData<String>> configCache; @PostConstruct private void init() { configCache = CacheBuilder.newBuilder() .maximumSize(5000) // 最大数量 .expireAfterAccess(10, TimeUnit.SECONDS) // 访问失效时间 .recordStats() .build(new CacheLoader<VersionData<ConfigMeta>, ChecksumData<String>>() { @Override public ChecksumData<String> load(VersionData<ConfigMeta> configId) throws ConfigNotFoundException { return loadConfig(configId); } }); Metrics.gauge("configFile_notFound_cache_hitRate", new Supplier<Double>() { @Override public Double get() { return configCache.stats().hitRate(); } }); } /** * 查本地guava cache */ @Override public ChecksumData<String> findConfig(VersionData<ConfigMeta> configId) throws ConfigNotFoundException { try { return configCache.get(configId); } catch (ExecutionException e) { if (e.getCause() instanceof ConfigNotFoundException) { throw (ConfigNotFoundException) e.getCause(); } else { log.error("find config error, configId:{}", configId, e); throw new RuntimeException(e.getCause()); } } } /** * 从本地文件或数据库中获取配置信息 */ private ChecksumData<String> loadConfig(VersionData<ConfigMeta> configId) throws ConfigNotFoundException { // 从本地配置文件中查询配置信息 ChecksumData<String> config = findFromDisk(configId); if (config != null) { return config; } String groupId = configId.getData().getGroup(); Monitor.notFoundConfigFileFromDiskCounterInc(groupId); log.warn("config not found from disk: {}", configId); // 从数据库中加载配置数据 config = findFromDb(configId); if (config != null) { return config; } Monitor.notFoundConfigFileFromDbCounterInc(groupId); throw new ConfigNotFoundException(); } private ChecksumData<String> findFromDb(VersionData<ConfigMeta> configId) { ChecksumData<String> config = configDao.loadFromCandidateSnapshot(configId); if (config != null) { saveToFile(configId, config); } return config; } } 三、最后 《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。 懂得不多,做得太少。欢迎批评、指正。