概述
开启数据同步
soul-admin
支持在配置文件 application.yml
中指定与网关的数据同步策略(默认采用 websocket)。还支持 http
、zookeeper
soul:
database:
dialect: mysql
init_script: "META-INF/schema.sql"
sync:
websocket:
enabled: true
# zookeeper:
# url: localhost:2181
# sessionTimeout: 5000
# connectionTimeout: 2000
# http:
# enabled: true
# nacos:
# url: localhost:8848
# namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
# acm:
# enabled: false
# endpoint: acm.aliyun.com
# namespace:
# accessKey:
# secretKey:
采用 websocket
同步数据,需要在 soul-admin
的 pom.xml
文件中引入以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
需要在 soul-bootstrap
的配置文件application-local.yml
中配置 soul-admin
的地址
soul :
file:
enabled: true
corss:
enabled: true
dubbo :
parameter: multi
sync:
websocket :
urls: ws://localhost:9095/websocket
使用 websocket
同步数据的优点:
-
当建立连接以后会全量获取一次数据,以后的数据都是增量的更新与新增,性能好。
-
支持断线重连 (默认30秒)
Websocket
同步数据
soul-admin
在用户发生配置变更之后,会通过 EventPublisher
发出配置变更通知,由 EventDispatcher
处理该变更通知,然后根据配置的同步策略(http
、weboscket
、zookeeper
),将配置发送给对应的事件处理器
- 如果是
websocket
同步策略,则将变更后的数据主动推送给soul-web
,并且在网关层,会有对应的WebsocketCacheHandler
处理器处理来处admin
的数据推送 - 如果是
zookeeper
同步策略,将变更数据更新到zookeeper
,而ZookeeperSyncCache
会监听到zookeeper
的数据变更,并予以处理 - 如果是
http
同步策略,soul-web
主动发起长轮询请求,默认有 90s 超时时间,如果soul-admin
没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求
soul-admin
如何同步数据到 soul-web
执行顺序:
soul-admin
的DataSyncConfiguration
配置类中,当soul.sync.websocket.enabled
配置为true
时,会自动装载 WebsocketDataChangedListener
类型的bean
(WebsocketDataChangedListener
实现了 DataChangedListener
).
所以这行代码可以从上下文中取出 DataChangedListene
r 类型的 bean
:WebsocketDataChangedListener
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
如图:
实现了 DataChangedListener
接口的类,可以监听插件的改动、选择器的改动、规则的改动、用户权限的改动以及元数据的改动。比如 WebsocketDataChangedListener
类:
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
// 变更的数据
WebsocketData<PluginData> websocketData =
new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
// 将变更的数据通过 websocket 连接发送给 soul-web
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
@Override
public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
WebsocketData<RuleData> configData =
new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
@Override
public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {
WebsocketData<AppAuthData> configData =
new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
@Override
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
WebsocketData<MetaData> configData =
new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
WebsocketDataChangedListener
监听到配置的变更之后,便会通过建立好的 websocket
连接将变更的数据发送给soul-web
:
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
soul-admin
同步数据到 soul-web
的图示如下:
soul-bootstrap
是如何处理 soul-admin
发送过来的数据
总体上依靠 soul-spring-boot-starter-sync-data-websocket
来创建一个 websocket
客户端,然后通过引入 soul-sync-data-websocket
依赖来同步soul-admin
发过来的数据。
具体就是:
-
soul-spring-boot-starter-sync-data-websocket
的配置类WebsocketSyncDataConfiguration
初始化bean
时,创建了WebsocketSyncDataService
用于同步数据(WebsocketSyncDataService
的构造函数中创建了SoulWebsocketClient
来处理websocket
数据)/** * Websocket sync data service. * * @param websocketConfig the websocket config * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use websocket sync soul data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
-
soul-sync-data-websocket
是同步数据的具体操作。其中有很多Handler
,顶级接口DataHandler
中定了一个handle()
方法,抽象类AbstractDataHandler
实现了DataHandler
接口,并封装了很多抽象方法,包括 覆盖数据,刷新数据REFRESH,更新数据UPDATE,删除数据DELETE,并实现了handle()
方法。其中WebsocketDataHandler
用来处理web socket
协议推送过来的数据。
/**
* Instantiates a new Websocket data handler.
*
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}
依赖关系:
soul-bootstrap
中引入了 soul-spring-boot-starter-sync-data-websocket
依赖来创建websocket
客户端:
<!--soul data sync start use websocket-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
soul-spring-boot-starter-sync-data-websocket
中引入了 soul-sync-data-websocket
依赖来同步soul-admin
发过来的数据:
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
总结
-
当建立连接以后会全量获取一次数据,以后的数据都是增量的更新与新增,性能好。
-
支持断线重连 (默认30秒)----- 心跳机制如何实现待研究
最后
以上就是自由外套为你收集整理的Soul网关(六)---- Websocket 同步数据的全部内容,希望文章能够帮你解决Soul网关(六)---- Websocket 同步数据所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复