概述
今天我们分析SpringCloudAlibaba 之 nacos 心跳功能源码,进入正题之前需要先熟悉上一篇博客,地址:服务注册源码分析
一、客户端:
1、心跳入口
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {//还没有进行过心跳检测
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
//此服务加入心跳入口,点击进入
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
2、进入 BeatReactor 类
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
//存入map中
dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
//ScheduledExecutorService 带定时任务的线程池,首次间隔时间为0,即立刻执行,点击BeatTask进入核心
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
3、点击BeatTask 类进入内部类
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}//发送心跳,点击进入
long result = serverProxy.sendBeat(beatInfo);
long nextTime = result > 0 ? result : beatInfo.getPeriod();
//每次间隔nextTime循环执行
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
4、来到 NamingProxy类
public long sendBeat(BeatInfo beatInfo) {
try {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
//开始调用服务端,点击进入
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {//获取心跳监听时间,此参数从服务端获取
return jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);
}
return 0L;
}
5、点击 reqAPI 方法
public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {
List<String> snapshot = serversFromEndpoint;
if (!CollectionUtils.isEmpty(serverList)) {
snapshot = serverList;
}
//点击
return reqAPI(api, params, snapshot, method);
}
来到:
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
//选择一个服务
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);//获取服务
try {//开始调用,点击进入
return callServer(api, params, server, method);
} catch (NacosException e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
来到
public String callServer(String api, Map<String, String> params, String curServer, String method)
throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
checkSignature(params);
List<String> headers = builderHeaders();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
}
//正式调用
HttpClient.HttpResult result = HttpClient.request(url, headers, params, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
.observe(end - start);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return StringUtils.EMPTY;
}
throw new NacosException(NacosException.SERVER_ERROR, "failed to req API:"
+ curServer + api + ". code:"
+ result.code + " msg: " + result.content);
}
6、进入 HttpClient
public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String encoding, String method) {
HttpURLConnection conn = null;
try {
String encodedContent = encodingParams(paramValues, encoding);
url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent);
conn = (HttpURLConnection) new URL(url).openConnection();
setHeaders(conn, headers, encoding);
conn.setConnectTimeout(CON_TIME_OUT_MILLIS);
conn.setReadTimeout(TIME_OUT_MILLIS);
conn.setRequestMethod(method);
conn.setDoOutput(true);
if (POST.equals(method) || PUT.equals(method)) {
// fix: apache http nio framework must set some content to request body
byte[] b = encodedContent.getBytes();
conn.setRequestProperty("Content-Length", String.valueOf(b.length));
conn.getOutputStream().write(b, 0, b.length);
conn.getOutputStream().flush();
conn.getOutputStream().close();
}
conn.connect();
NAMING_LOGGER.debug("Request from server: " + url);
return getResult(conn);//点击
} catch (Exception e) {
try {
if (conn != null) {
NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from "
+ InetAddress.getByName(conn.getURL().getHost()).getHostAddress());
}
} catch (Exception e1) {
NAMING_LOGGER.error("[NA] failed to request ", e1);
//ignore
}
NAMING_LOGGER.error("[NA] failed to request ", e);
return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
} finally {
if (conn != null) {
conn.disconnect();
}
}
}
点击getResult(conn)
private static HttpResult getResult(HttpURLConnection conn) throws IOException {
int respCode = conn.getResponseCode();
InputStream inputStream;
if (HttpURLConnection.HTTP_OK == respCode
|| HttpURLConnection.HTTP_NOT_MODIFIED == respCode
|| Constants.WRITE_REDIRECT_CODE == respCode) {
inputStream = conn.getInputStream();
} else {
inputStream = conn.getErrorStream();
}
Map<String, String> respHeaders = new HashMap<String, String>(conn.getHeaderFields().size());
for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) {
respHeaders.put(entry.getKey(), entry.getValue().get(0));
}
String encodingGzip = "gzip";
if (encodingGzip.equals(respHeaders.get(HttpHeaders.CONTENT_ENCODING))) {
inputStream = new GZIPInputStream(inputStream);
}
//最终返回的数据
return new HttpResult(respCode, IoUtils.toString(inputStream, getCharset(conn)), respHeaders);
}
二、服务端:
1、接收请求入口
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
//获取服务实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
//实例不存在重新注册,如网络不通导致实例下线或者服务重启实例临时丢失
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}//去更新客户端最后心跳时间,点击进入
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
//心跳监听时间,默认5s,客户端会用到此参数
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
2、点击 service.processClientBeat(clientBeat);进入Service 类
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
//clientBeatProcessor类 更新实例的最后心跳时间,点击
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
3、进入ClientBeatProcessor类
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}//更新最后的心跳时间
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);//设置健康状态
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}
到此nacos心跳分析结束,下一篇我们分析服务列表拉取源码,敬请期待!
最后
以上就是高挑哑铃为你收集整理的SpringCloudAlibaba 之 nacos 心跳功能源码分析的全部内容,希望文章能够帮你解决SpringCloudAlibaba 之 nacos 心跳功能源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复