集群间的节点健康检查
引言
1.当集群间 新加入一个节点或者宕掉一个节点时其他节点怎么感知这个节点的操作的
2.当节点宕机之后 又是怎么做剔除的
3.新加入一个节点怎么让集群其他成员感知到
带着这些问题来看下面的代码
服务健康检查
在ServerMemberManager容器启动 的时候会调用到onApplicationEvent方法上 然后会判断非单机模式的情况下 启动一个定时线程池 第一次延迟5秒执行 这个线程池的任务就是负责节点间的心跳
java"> @Override
public void onApplicationEvent(WebServerInitializedEvent event) {
String serverNamespace = event.getApplicationContext().getServerNamespace();
if (SPRING_MANAGEMENT_CONTEXT_NAMESPACE.equals(serverNamespace)) {
// ignore
// fix#issue https://github.com/alibaba/nacos/issues/7230
return;
}
getSelf().setState(NodeState.UP);
//非单机模式执行infoReportTask任务
if (!EnvUtil.getStandaloneMode()) {
GlobalExecutor.scheduleByCommon(this.infoReportTask, DEFAULT_TASK_DELAY_TIME);
}
EnvUtil.setPort(event.getWebServer().getPort());
EnvUtil.setLocalAddress(this.localAddress);
Loggers.CLUSTER.info("This node is ready to provide external services");
}
MemberInfoReportTask
java">class MemberInfoReportTask extends Task {
private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
};
private int cursor = 0;
@Override
protected void executeBody() {
// 获取集群中除了自身以外的其他节点列表
List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
if (members.isEmpty()) {
return;
}
// 定义一个游标
this.cursor = (this.cursor + 1) % members.size();
// 获取每个节信息
Member target = members.get(cursor);
Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
// 构造每个节点的上报url请求路径为「/cluster/report」
final String url = HttpUtils
.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
"/cluster/report");
try {
// 发起Post健康检查请求,请求内容为自身信息Member
asyncRestTemplate
.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) { // 处理健康检查返回结果,有以下三种类型
// 版本过低错误,这个可能在集群中版本不一致出现
if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
|| result.getCode() == HttpStatus.NOT_FOUND.value()) {
// ...
Member memberNew = target.copy();
if (memberNew.getAbilities() != null
&& memberNew.getAbilities().getRemoteAbility() != null && memberNew
.getAbilities().getRemoteAbility().isSupportRemoteConnection()) {
memberNew.getAbilities().getRemoteAbility()
.setSupportRemoteConnection(false);
update(memberNew); // 更新节点属性
}
return;
}
// 处理成功上报,更新该节点member的状态为UP表示科通信,设置失败次数为0,并发布成员变更事件
if (result.ok()) {
MemberUtil.onSuccess(ServerMemberManager.this, target);
} else {
// 处理失败的上报,例如:集群中一个节点被kill -9 杀掉后。在nacos-cluster.log日志文件中会打印如下日志,并发布成员变更事件
MemberUtil.onFail(ServerMemberManager.this, target);
}
}
@Override
public void onError(Throwable throwable) {
// 处理失败的上报,例如:集群中一个节点被kill -9 杀掉后。在nacos-cluster.log日志文件中会打印如下日志,并发布成员变更事件
MemberUtil.onFail(ServerMemberManager.this, target, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Throwable ex) {
// ...
}
}
@Override
protected void after() {
GlobalExecutor.scheduleByCommon(this, 2_000L); //执行完executeBody后延迟2秒继续执行executeBody,也就是检查健康检查的心跳频率为2秒,一轮全部节点检查结束后延迟2秒接着下一轮
}
}
上报成功处理
java">public static void onSuccess(final ServerMemberManager manager, final Member member) {
final NodeState old = member.getState();
manager.getMemberAddressInfos().add(member.getAddress());
member.setState(NodeState.UP); // 状态为UP可通信状态
member.setFailAccessCnt(0); // 失败次数为0
if (!Objects.equals(old, member.getState())) {
manager.notifyMemberChange(); // 发布成员变更事件
}
}
上报失败处理
java">public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
manager.getMemberAddressInfos().remove(member.getAddress());
final NodeState old = member.getState();
// 设置该节点为「不信任」
member.setState(NodeState.SUSPICIOUS);
// 失败次数递增+1
member.setFailAccessCnt(member.getFailAccessCnt() + 1);
// 默认最大失败重试次数为3
int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);
// If the number of consecutive failures to access the target node reaches
// a maximum, or the link request is rejected, the state is directly down
// 超过重试次数设置节点状态为「下线」
if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
member.setState(NodeState.DOWN);
}
if (!Objects.equals(old, member.getState())) {
manager.notifyMemberChange(); // 发布成员变更事件
}
}
发送成员变更事件
当集群中有节点下线或者新节点上线都会通过心跳健康检查探测对节点状态进行改变。 而状态的变更均会触发成员变更事件MembersChangeEvent
java"> if (!Objects.equals(old, member.getState())) {
manager.notifyMemberChange(); // 发布成员变更事件
}
java">void notifyMemberChange() {
NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
}
ClusterRpcClientProxy监听到MembersChangeEvent变更事件
java"> @Override
public void onEvent(MembersChangeEvent event) {
try {
List<Member> members = serverMemberManager.allMembersWithoutSelf();
refresh(members);
} catch (NacosException e) {
Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client, event:{}, msg: {} ", event, e.getMessage());
}
}
java">private void refresh(List<Member> members) throws NacosException {
for (Member member : members) {
if (MemberUtil.isSupportedLongCon(member)) {
// 集群中每个节点member创建rcp client,在client启动时会先目标节点发送HealthCheckRequest,如果非健康节点将会被移除
createRpcClientAndStart(member, ConnectionType.GRPC);
}
}
Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
.map(a -> memberClientKey(a)).collect(Collectors.toList());
// 关闭旧的grpc连接
while (iterator.hasNext()) {
Map.Entry<String, RpcClient> next1 = iterator.next();
if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
RpcClientFactory.getClient(next1.getKey()).shutdown();
iterator.remove();
}
}
}
总结
1.当在集群模式的情况下 每个集群节点启动的时候会启动一个定时的线程池去执行心跳任务 第一次延迟五秒执行每一轮频率为2秒 他会去把除了自己节点外的全部其他集群节点查出来然后挨个去发信号,如果成功则设置这个节点是可信任的,如果这个节点失败 则设置成不可信任的 然后会去重试 如果超过了三次重试都是失败 则会把这个节点设置成down
2.当集群中有节点下线或者新节点上线都会通过心跳健康检查探测对节点状态进行改变。 而状态的变更均会触发成员变更事件MembersChangeEvent 都会去发送一个节点变更的一个事件 去发送GRPC请求, 刷新本节点与集群中其他节点的RPC状态,如果这个节点是挂掉的情况下会将这个down掉的节点去做移除 关闭无效连接 如果这个节点是新增加的情况则增加新的GRPC连接