Nacos源码解读08——集群节点间的健康检查

news/2024/6/16 21:40:13 标签: java

集群间的节点健康检查

引言

1.当集群间 新加入一个节点或者宕掉一个节点时其他节点怎么感知这个节点的操作的
2.当节点宕机之后 又是怎么做剔除的
3.新加入一个节点怎么让集群其他成员感知到
带着这些问题来看下面的代码

服务健康检查

在ServerMemberManager容器启动 的时候会调用到onApplicationEvent方法上 然后会判断非单机模式的情况下 启动一个定时线程池 第一次延迟5秒执行 这个线程池的任务就是负责节点间的心跳

java">    @Override
    public void onApplicationEvent(WebServerInitializedEvent event) {
        String serverNamespace = event.getApplicationContext().getServerNamespace();
        if (SPRING_MANAGEMENT_CONTEXT_NAMESPACE.equals(serverNamespace)) {
            // ignore
            // fix#issue https://github.com/alibaba/nacos/issues/7230
            return;
        }
        getSelf().setState(NodeState.UP);
        //非单机模式执行infoReportTask任务
        if (!EnvUtil.getStandaloneMode()) {
            GlobalExecutor.scheduleByCommon(this.infoReportTask, DEFAULT_TASK_DELAY_TIME);
        }
        EnvUtil.setPort(event.getWebServer().getPort());
        EnvUtil.setLocalAddress(this.localAddress);
        Loggers.CLUSTER.info("This node is ready to provide external services");
    }

MemberInfoReportTask

java">class MemberInfoReportTask extends Task {
 
    private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
    };
 
    private int cursor = 0;
 
    @Override
    protected void executeBody() {
       // 获取集群中除了自身以外的其他节点列表
        List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
        if (members.isEmpty()) {
            return;
        }
        // 定义一个游标
        this.cursor = (this.cursor + 1) % members.size();
        // 获取每个节信息
        Member target = members.get(cursor);
        Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
        // 构造每个节点的上报url请求路径为「/cluster/report」
        final String url = HttpUtils
                .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
                        "/cluster/report");
        try {
           // 发起Post健康检查请求,请求内容为自身信息Member
            asyncRestTemplate
                    .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
                            Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() { 
                                @Override
                                public void onReceive(RestResult<String> result) { // 处理健康检查返回结果,有以下三种类型
                                    // 版本过低错误,这个可能在集群中版本不一致出现
                                    if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
                                            || result.getCode() == HttpStatus.NOT_FOUND.value()) {
                                        // ...
                                        Member memberNew = target.copy();
                                        if (memberNew.getAbilities() != null
                                                && memberNew.getAbilities().getRemoteAbility() != null && memberNew
                                                .getAbilities().getRemoteAbility().isSupportRemoteConnection()) {
                                            memberNew.getAbilities().getRemoteAbility()
                                                    .setSupportRemoteConnection(false);
                                            update(memberNew); // 更新节点属性
                                        }
                                        return;
                                    }
                                    // 处理成功上报,更新该节点member的状态为UP表示科通信,设置失败次数为0,并发布成员变更事件
                                    if (result.ok()) {
                                        MemberUtil.onSuccess(ServerMemberManager.this, target);
                                    } else {
                                     //  处理失败的上报,例如:集群中一个节点被kill -9 杀掉后。在nacos-cluster.log日志文件中会打印如下日志,并发布成员变更事件
                                        MemberUtil.onFail(ServerMemberManager.this, target);
                                    }
                                }
 
                                @Override
                                public void onError(Throwable throwable) {
                                   //  处理失败的上报,例如:集群中一个节点被kill -9 杀掉后。在nacos-cluster.log日志文件中会打印如下日志,并发布成员变更事件
                                     MemberUtil.onFail(ServerMemberManager.this, target, throwable);
                                }
 
                                @Override
                                public void onCancel() {
 
                                }
                            });
        } catch (Throwable ex) {
            // ...
        }
    }
 
    @Override
    protected void after() {
        GlobalExecutor.scheduleByCommon(this, 2_000L);  //执行完executeBody后延迟2秒继续执行executeBody,也就是检查健康检查的心跳频率为2秒,一轮全部节点检查结束后延迟2秒接着下一轮
    }
}

上报成功处理

java">public static void onSuccess(final ServerMemberManager manager, final Member member) {
    final NodeState old = member.getState();
    manager.getMemberAddressInfos().add(member.getAddress());
    member.setState(NodeState.UP); // 状态为UP可通信状态
    member.setFailAccessCnt(0); // 失败次数为0
    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange(); // 发布成员变更事件
    }
}

上报失败处理

java">public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
    manager.getMemberAddressInfos().remove(member.getAddress());
    final NodeState old = member.getState();
 
    // 设置该节点为「不信任」
    member.setState(NodeState.SUSPICIOUS);
    // 失败次数递增+1
    member.setFailAccessCnt(member.getFailAccessCnt() + 1);
    // 默认最大失败重试次数为3
    int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);
 
    // If the number of consecutive failures to access the target node reaches
    // a maximum, or the link request is rejected, the state is directly down
    // 超过重试次数设置节点状态为「下线」
    if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
            .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
        member.setState(NodeState.DOWN);
    }
 
    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange(); // 发布成员变更事件
    }
}

发送成员变更事件

当集群中有节点下线或者新节点上线都会通过心跳健康检查探测对节点状态进行改变。 而状态的变更均会触发成员变更事件MembersChangeEvent

java">    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange(); // 发布成员变更事件
    }
java">void notifyMemberChange() {
    NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
}

ClusterRpcClientProxy监听到MembersChangeEvent变更事件

java">    @Override
    public void onEvent(MembersChangeEvent event) {
        try {
            List<Member> members = serverMemberManager.allMembersWithoutSelf();
            refresh(members);
        } catch (NacosException e) {
            Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client, event:{}, msg: {} ", event, e.getMessage());
        }
    }
java">private void refresh(List<Member> members) throws NacosException {
    for (Member member : members) {
        if (MemberUtil.isSupportedLongCon(member)) {
            // 集群中每个节点member创建rcp client,在client启动时会先目标节点发送HealthCheckRequest,如果非健康节点将会被移除
            createRpcClientAndStart(member, ConnectionType.GRPC);
        }
    }
    Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
    Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
    List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
            .map(a -> memberClientKey(a)).collect(Collectors.toList());
    // 关闭旧的grpc连接
    while (iterator.hasNext()) {
        Map.Entry<String, RpcClient> next1 = iterator.next();
        if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
            Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
            RpcClientFactory.getClient(next1.getKey()).shutdown();
            iterator.remove();
        }
    }
 
}

总结

1.当在集群模式的情况下 每个集群节点启动的时候会启动一个定时的线程池去执行心跳任务 第一次延迟五秒执行每一轮频率为2秒 他会去把除了自己节点外的全部其他集群节点查出来然后挨个去发信号,如果成功则设置这个节点是可信任的,如果这个节点失败 则设置成不可信任的 然后会去重试 如果超过了三次重试都是失败 则会把这个节点设置成down
2.当集群中有节点下线或者新节点上线都会通过心跳健康检查探测对节点状态进行改变。 而状态的变更均会触发成员变更事件MembersChangeEvent 都会去发送一个节点变更的一个事件 去发送GRPC请求, 刷新本节点与集群中其他节点的RPC状态,如果这个节点是挂掉的情况下会将这个down掉的节点去做移除 关闭无效连接 如果这个节点是新增加的情况则增加新的GRPC连接


http://www.niftyadmin.cn/n/5249581.html

相关文章

LeetCode 每日一题 2023/12/4-2023/12/10

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录 12/4 1038. 从二叉搜索树到更大和树12/5 2477. 到达首都的最少油耗12/6 2646. 最小化旅行的价格总和12/7 1466. 重新规划路线12/8 2008. 出租车的最大盈利12/9 2048. 下一个…

clickhouse优化汇总

记录下clickhouse平常用到的几个优化点&#xff1a; 1.数据类型的选择&#xff0c;不要都把每一列都设置成字符串类型&#xff0c;并且对于枚举值很少的字符串类型&#xff0c;可以使用LowCardinality编码&#xff0c;使用整数代替字符串存储 2.字段类型最好不要运行为null&…

朝花夕拾华山平台流水账

2021年8月25日&#xff0c;我加入了诚迈科技&#xff08;南京&#xff09;&#xff0c;加入了华山平台。 跟我一起入职平台的还有三个小伙伴&#xff1a;小帅、小阳、小甘。 小帅能力很强&#xff0c;前后端都会&#xff0c;入职各种考试工具人。 小阳毕业没多久&#xff0c;一…

Kubernetes - 为什么 K8S 在容器里不能调用自己?

问题描述 最近遇到一个神奇的现象&#xff0c;在 K8S 的 POD 容器中&#xff0c;比如 pod name&#xff1a;mini-appnamespace&#xff1a;devport&#xff1a;5050 那么&#xff0c;是无法在 mini-app 容器里执行以下命令&#xff0c;如果执行&#xff0c;会一直卡在这条命…

【Docker】进阶之路:(一)容器技术发展史

【Docker】进阶之路&#xff1a;&#xff08;一&#xff09;容器技术发展史 什么是容器为什么需要容器容器技术的发展历程Docker容器是如何工作的 什么是容器 容器作为一种先进的虚拟化技术&#xff0c;已然成为了云原生时代软件开发和运维的标准基础设施。在了解容器技术之前…

ubuntu22.04 安装cuda

CUDA&#xff08;Compute Unified Device Architecture&#xff09;是由 NVIDIA 开发的一种并行计算平台和编程模型。它允许开发者利用 NVIDIA 的 GPU&#xff08;图形处理单元&#xff09;进行高效的计算处理。CUDA 通过提供一系列的 C、C 和 Fortran 扩展&#xff0c;使得开发…

Python 模板引擎 Jinja2 的安装和使用

目录 一、概述 二、安装 Jinja2 三、使用 Jinja2 四、Jinja2的强大功能和优点 五、总结 一、概述 Jinja2 是 Python 中广泛使用的一种模板引擎&#xff0c;它具有灵活的语法、强大的控制结构、方便的 API&#xff0c;以及高效的渲染速度。通过使用 Jinja2&#xff0c;开发…

2020年系统分析师综合知识历年真题(1-10) + 详解

【2020下系分真题第01题&#xff1a;绿色】 01.系统结构化分析模型包括数据模型、功能模型和行为模型&#xff0c;这些模型的核心是( )。 - A. 实体联系图 - B. 状态转换图 - C. 数据字典 - D. 流程图 答案&#xff1a;C 解答&#xff1a;结构化分析方法建立数据模型、功能模型…