【nacos】源码之服务端服务健康检查与服务查询
admin
2024-03-29 02:43:58
0

Nacos服务端在实例进行注册时,会在服务初始化的时候开启一个客户端心跳检测任务ClientBeatCheckTask。

ClientBeatCheckTask的添加

每一个Service初始化时都会添加一个与之对应的一个ClientBeatCheckTask。

com.alibaba.nacos.naming.core.Service#init

public void init() {// 开启客户端心跳检测任务HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}

这个任务默认5秒执行一次。
com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)

public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.computeIfAbsent(task.taskKey(),k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

ClientBeatCheckTask的执行

ClientBeatCheckTask实现了Runnable接口,主要看run()方法。

遍历服务中的所有实例,如果心跳时间超过15s就将健康状态标记为false,如果心跳时间超过30s就会删除实例。

将健康状态标记为false时会发布InstanceHeartbeatTimeoutEvent和ServiceChangeEvent两个事件。

InstanceHeartbeatTimeoutEvent没有地方监听此事件。

ServiceChangeEvent由本身发布事件的PushService监听。

com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run

public void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {// 心跳时间超过15s就将健康状态标记为falseif (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}// 心跳时间超过30s就会删除实例if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}

ClientBeatCheckTask#deleteIp

调用自己的/nacos/v1/ns/instance注销实例。

com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#deleteIp

private void deleteIp(Instance instance) {try {NamingProxy.Request request = NamingProxy.Request.newRequest();request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort())).appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName()).appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());// 调用自己的/nacos/v1/ns/instance注销实例String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();// delete instance asynchronously:HttpClient.asyncHttpDelete(url, null, null, new Callback() {@Overridepublic void onReceive(RestResult result) {if (!result.ok()) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",instance.toJson(), result.getMessage(), result.getCode());}}@Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);}
}

InstanceController#deregister

com.alibaba.nacos.naming.controllers.InstanceController#deregister

public String deregister(HttpServletRequest request) throws Exception {Instance instance = getIpAddress(request);String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);// 查询出ServiceService service = serviceManager.getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);return "ok";}// 删除实例serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);return "ok";
}

ServiceManager#removeInstance

这里只是将注册表的实例列表取出然后删除,并放入缓存中,并没有删除注册表中的实例。

com.alibaba.nacos.naming.core.ServiceManager#removeInstance(java.lang.String, java.lang.String, boolean, com.alibaba.nacos.naming.core.Instance…)

public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {Service service = getService(namespaceId, serviceName);synchronized (service) {removeInstance(namespaceId, serviceName, ephemeral, service, ips);}
}private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,Instance... ips) throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);// 将缓存中的实例列表删除实例List instanceList = substractIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);// 更新缓存consistencyService.put(key, instances);
}private List substractIpAddresses(Service service, boolean ephemeral, Instance... ips)throws NacosException {return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
}

真正从注册表中删除实例与注册实例一样通过同一个异步任务来完成。

ServiceChangeEvent事件的监听

PushService监听了ServiceChangeEvent事件。

当服务变更时,Nacos服务端会通过UDP通知所有监听此服务的客户端。这里为了使客户端能够实时的知道服务的实例状态变更了,又为了不增加服务器的压力,所以使用了UDP,因为UDP不需要建立连接,直接发送一个报文即可,不管客户端有没有收到。即使客户端没有收到,客户端也有一个定时任务每隔5s来查询服务的实例列表。

com.alibaba.nacos.naming.push.PushService#onApplicationEvent

public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");/*** client什么时候加入?客户端查询服务的实例列表时* @see InstanceController#doSrvIpxt(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, int, java.lang.String, boolean, java.lang.String, java.lang.String, boolean)*/ConcurrentMap clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map cache = new HashMap<>(16);long lastRefTime = System.nanoTime();// 遍历所有查询过此服务的客户端列表for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));// 将变更服务的实例列表通过UDP发送给客户端udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}

查询服务的实例列表

Nacos客户端会通过调用接口/nacos/v1/ns/instance/list来查询服务端对应服务的实例列表。

com.alibaba.nacos.naming.controllers.InstanceController#list

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));// 查询serviceName对应的实例return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}

com.alibaba.nacos.naming.controllers.InstanceController#doSrvIpxt

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();// 根据命名空间和服务名查询出服务Service service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {// 将监听实例变更的客户端加入到一个clientMap中,后续如果服务的实例列表状态变更了会遍历此MAP,发送UDP报文通知pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}if (service == null) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.put("cacheMillis", cacheMillis);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}checkIfDisabled(service);List srvedIPs;// 从服务service中查询出实例列表srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));// filter ips using selector:if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIPs = service.getSelector().select(clientIP, srvedIPs);}if (CollectionUtils.isEmpty(srvedIPs)) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.set("hosts", JacksonUtils.createEmptyArrayNode());result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;}Map> ipMap = new HashMap<>(2);ipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());for (Instance ip : srvedIPs) {ipMap.get(ip.isHealthy()).add(ip);}if (isCheck) {result.put("reachProtectThreshold", false);}double threshold = service.getProtectThreshold();if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);if (isCheck) {result.put("reachProtectThreshold", true);}ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));ipMap.get(Boolean.FALSE).clear();}if (isCheck) {result.put("protectThreshold", service.getProtectThreshold());result.put("reachLocalSiteCallThreshold", false);return JacksonUtils.createEmptyJsonNode();}ArrayNode hosts = JacksonUtils.createEmptyArrayNode();for (Map.Entry> entry : ipMap.entrySet()) {List ips = entry.getValue();if (healthyOnly && !entry.getKey()) {continue;}for (Instance instance : ips) {// remove disabled instance:if (!instance.isEnabled()) {continue;}ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();ipObj.put("ip", instance.getIp());ipObj.put("port", instance.getPort());// deprecated since nacos 1.0.0:ipObj.put("valid", entry.getKey());ipObj.put("healthy", entry.getKey());ipObj.put("marked", instance.isMarked());ipObj.put("instanceId", instance.getInstanceId());ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));ipObj.put("enabled", instance.isEnabled());ipObj.put("weight", instance.getWeight());ipObj.put("clusterName", instance.getClusterName());if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {ipObj.put("serviceName", instance.getServiceName());} else {ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put("ephemeral", instance.isEphemeral());hosts.add(ipObj);}}result.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}

查询服务的实例列表直接查询的是注册表,不同namespace,不同group之间的service是无法调用的,同一个service下的不同Cluster可以调用。

相关内容

热门资讯

原创 真... 乔布斯曾讲过一个企业的底层逻辑:如果你在顶层做了正确的事,底层的结果就会随之而来。 人们关注企业每年...
国内成品油价今晚上涨,加满一箱... 界面新闻记者 | 田鹤琪 国内成品油价迎来“三连涨”。 2月24日,国家发改委发布消息称,自24时...
马斯克设想从月球电磁弹射AI卫... IT之家 2 月 25 日消息,据新华社报道,为更便捷部署专用于人工智能 (AI) 的数据中心卫星网...
马年首涨:中概股破局,A股引领... 在黄金因美元强势而黯然跳水、A股于春节后首个交易日释放出久违的磅礴巨量之际,大洋彼岸的美股市场,第一...
原创 帮... 昨晚大宗商品市场,走出一场“分道扬镳”的戏码。 原油连续第三天下跌,WTI跌破66美元,布伦特收在7...
今起可预约!办理2025年度个... 今起可预约!办理2025年度个税汇算 这些事项要注意 2026-02-25 06:54:50 看看...
原创 天... 年后的天津二手房,马上就要起跑了。 其实在1月份迹象就已显现。 往年的楼市淡季却“反常”得活跃:连续...
13F机构追踪:谷歌、拼多多、... 来源:活报告 在美股市场,资产管理规模超过1亿美元的机构需要在每个季度结束后的45天内向SEC提交1...
原创 手... 最近一段时间,有个词突然走红甚至冲上热搜,这就是手搓经济,在这个早已经现代工业化的时代,手搓经济是怎...
【美联储理事警告:美联储货币政... 【美联储理事警告:美联储货币政策可能无法应对AI引发的失业潮 】库克称,AI已引发美国劳动力市场的代...
黄金和交易提醒:金价高位“吞没... 来源:市场资讯 文章来源:汇通财经 周三(2月26日)亚市早盘,现货黄金窄幅震荡,目前交投于5150...
IPO雷达| 百普赛斯港股IP... 百普赛斯(301080.SZ)正式向香港联交所递交招股书。根据公司同步发布的2025年度业绩预告,全...
原创 澳... 2025年一则“澳洲高薪挖角中国稀土团队”的新闻,把全球稀土市场搅得风生水起。澳大利亚莱纳斯公司甩出...
苹果收购单人AI初创公司inv... IT之家 2 月 25 日消息,据 MacRumors 报道,一份提交给欧盟的新文件显示,苹果公司已...
珍惜:由早晨跑步所想到的 我每天早晨起来习惯在校园跑步,在跑步的时候,常常会思考跑步、人生及享受人生之间的关系。 我们知道人的...
趁乱抛售?最高法院刚裁决,对冲... 来源:市场资讯 来源:金十数据 根据外媒获得的一份美国银行报告,花旗的对冲基金客户在上周五美国最高法...
特别关注|9艘!“超高规格”新... 根据广船国际官微介绍,上述MR型油轮新造船为广船国际自主设计,总长约183米、宽32.2米,设计服务...
甲骨文股价在星门项目相关报道发... 来源:环球市场播报 周一, 甲骨文股价下跌4.5%,此前报道称,这家云计算公司与OpenAI和软银的...