当前位置 博文首页 > 腾业:【Soul网关探秘】http数据同步-Web端处理变更通知

    腾业:【Soul网关探秘】http数据同步-Web端处理变更通知

    作者:腾业 时间:2021-01-31 06:18

    个人知识库

    引言

    上一篇,梳理http 数据同步策略的变更通知机制,本篇开始探究配置变更通知到达后, soul-web 端的处理响应。

    不同数据变更的通知机制应当是一致的,故本篇以 selector 配置变更通知为切入点进行深入。

    通知处理入口

    上回我们说到 HttpSyncDataService 的 doLongPolling,在其内部发起通知订阅并接收响应通知:

    private void doLongPolling(final String server) {
        ...
        String listenerUrl = server + "/configs/listener";
        ...
        try {
          	// 发起监听请求
            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
            log.debug("listener result: [{}]", json);
            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
        } catch (RestClientException e) {
            ...
        }
      	// 处理变更通知
        if (groupJson != null) {
            // fetch group configuration async.
            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
            if (ArrayUtils.isNotEmpty(changedGroups)) {
                log.info("Group config changed: {}", Arrays.toString(changedGroups));
                // 获取组配置
              	this.doFetchGroupConfig(server, changedGroups);
            }
        }
    }
    

    在收到变更通知时,若存在配置组变更,则按变更组获取相应配置。

    获取配置

    获取组配置处理如下:

    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
        ...
        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
        ...
        try {
            json = this.httpClient.getForObject(url, String.class);
        } catch (RestClientException e) {
            ...
        }
        // update local cache
        boolean updated = this.updateCacheWithJson(json);
        ...
    }
    

    内部发起配置获取请求并更新本地缓存。

    更新配置组缓存

    由 HttpSyncDataService 实现本地缓存更新:

      private boolean updateCacheWithJson(final String json) {
        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
        JsonObject data = jsonObject.getAsJsonObject("data");
        // if the config cache will be updated?
        return factory.executor(data);
    }
    

    转成 Json 对象后交由 DataRefreshFactory 进行处理。

    DataRefreshFactory 处理如下:

    public boolean executor(final JsonObject data) {
        final boolean[] success = {false};
        ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
        return success[0];
    }
    

    调用相应数据刷新类刷新数据。

    统一由 AbstractDataRefresh 的 refresh 进行处理:

    public Boolean refresh(final JsonObject data) {
        boolean updated = false;
        JsonObject jsonObject = convert(data);
        if (null != jsonObject) {
            ConfigData<T> result = fromJson(jsonObject);
            if (this.updateCacheIfNeed(result)) {
                updated = true;
                refresh(result.getData());
            }
        }
        return updated;
    }
    

    先更新本地缓存,再调用子类实现的 refresh。

    此处的更新本地缓存处理,由子类 SelectorDataRefresh 的 updateCacheIfNeed 实现:

    protected boolean updateCacheIfNeed(final ConfigData<SelectorData> result) {
        return updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR);
    }
    

    向父类 AbstractDataRefresh 的 updateCacheIfNeed 指定更新 selector 配置组。

    父类 AbstractDataRefresh 的 updateCacheIfNeed 处理:

    protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
        // 首次初始化缓存
        if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
            return true;
        }
        ResultHolder holder = new ResultHolder(false);
        GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
            // 必须比较最后更新时间
            if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {
                ...
                holder.result = true;
                return newVal;
            }
            ...
            return oldVal;
        });
        return holder.result;
    }
    

    通过比较新老缓存的 MD5 值来判定是否发生变更,存在变更则更新本地缓存(注意还有最后更新时间判定)。

    处理刷新事件

    SelectorDataRefresh 的 refresh 实现:

    protected void refresh(final List<SelectorData> data) {
        if (CollectionUtils.isEmpty(data)) {
            log.info("clear all selector cache, old cache");
            data.forEach(pluginDataSubscriber::unSelectorSubscribe);
            pluginDataSubscriber.refreshSelectorDataAll();
        } else {
            // update cache for UpstreamCacheManager
            pluginDataSubscriber.refreshSelectorDataAll();
            data.forEach(pluginDataSubscriber::onSelectorSubscribe);
        }
    }
    
    • 若最新数据为空,则循环取消订阅并刷新所有选择器数据,实际是清空选择器缓存。
    • 若最新数据不为空,则刷新所有选择器数据并循环响应选择器订阅事件处理,实际是更新上游服务缓存。

    取消订阅

    CommonPluginDataSubscriber 实现订阅取消:

    public void unSelectorSubscribe(final SelectorData selectorData) {
        subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE);
    }
    

    subscribeDataHandler 对 selectorData 的 delete 处理:

    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
                ...
            } else if (data instanceof SelectorData) {
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    ...
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removeSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
                }
            } else if (data instanceof RuleData) {
                ...
            }
        });
    }
    

    从 BaseDataCache 删除目标选择器数据,并移除选择器。

    此处由 DividePluginDataHandler 提供 removeSelector 实现:

    public void removeSelector(final SelectorData selectorData) {
        UpstreamCacheManager.getInstance().removeByKey(selectorData.getId());
    }
    

    根据 selector id 移除缓存的上游服务,注意只是从 UPSTREAM_MAP_TEMP 移除

    public void removeByKey(final String key) {
        UPSTREAM_MAP_TEMP.remove(key);
    }
    

    刷新数据

    CommonPluginDataSubscriber 实现数据刷新:

    public void refreshSelectorDataAll() {
        BaseDataCache.getInstance().cleanSelectorData();
    }
    

    注意这里的 refresh all 实际是做的 clean 操作。

    BaseDataCache 的 cleanSelectorData 处理:

    public void cleanSelectorData() {
        SELECTOR_MAP.clear();
    }
    

    直接清除 SELECTOR_MAP 所有数据。

    响应订阅

    CommonPluginDataSubscriber 实现订阅响应:

    public void onSelectorSubscribe(final SelectorData selectorData) {
        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
    }
    

    subscribeDataHandler 对 selectorData 的 update 处理:

    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
                ...
            } else if (data instanceof SelectorData) {
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    BaseDataCache.getInstance().cacheSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    ...
                }
            } else if (data instanceof RuleData) {
                ...
            }
        });
    }
    

    缓存选择器数据到 BaseDataCache,并处理选择器。

    此处由 DividePluginDataHandler 提供 handlerSelector 实现:

    public void handlerSelector(final SelectorData selectorData) {
        UpstreamCacheManager.getInstance().submit(selectorData);
    }
    

    提交选择器数据到 UpstreamCacheManager。

    UpstreamCacheManager 的 submit 处理:

    public void submit(final SelectorData selectorData) {
        final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);
        if (null != upstreamList && upstreamList.size() > 0) {
            UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
            UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
        } else {
            UPSTREAM_MAP.remove(selectorData.getId());
            UPSTREAM_MAP_TEMP.remove(selectorData.getId());
        }
    }
    

    根据 selector id 更新 UPSTREAM_MAP 和 UPSTREAM_MAP_TEMP。

    总结

    本篇梳理和分析了配置变更通知到达后 soul-web 端的处理流程,最终处理主要是更新本地配置缓存以及维护上游服务散列表。

    soul-web收到变更通知后处理流程如下:

    soul-web 端收到响应

    • 若配置组数据存在变更,则发起获取配置请求获取最新配置信息
      • 更新配置组缓存
      • 循环处理配置数据刷新事件
        • 若最新配置数据为空,则删除本地配置数据并移除上游服务
        • 若最新配置数据不为空,则缓存配置组数据并更新上游服务
    • 若配置组数据无变更,不作处理
    bk
    下一篇:没有了