[Nacos] Nacos Server与Nacos Client间的UDP通信 (十)

news/2024/5/18 13:00:06 标签: udp, java, spring cloud, nacos

文章目录

      • 1.Nacos Server与Nacos Client间的UDP通信
        • 1.1 Nacos Server向Nacos Client进行UDP推送
        • 1.2 Nacos Client接收Nacos Server的UDP推送

1.Nacos Server与Nacos Client间的UDP通信

  • Nacos Server向Nacos Client进行UDP推送
  • Nacos Client接收Nacos Server的UDP推送

1.1 Nacos Server向Nacos Client进行UDP推送

在这里插入图片描述

之前的文章中Nacos Server处理心跳请求的时候, 最后有一个向其他服务发布服务变更事件。

InstanceController#beat() -> Service#processClientBeat()

在这里插入图片描述

在这里插入图片描述

创建一个处理器,其是一个任务 (ClientBeatProcessor), 开启一个立即执行的任务, 执行clientBeatProcessor任务的run()

在这里插入图片描述

在这里插入图片描述

PushService#onApplicationEvent(): 发布服务变更事件给Server

java">    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();

        // 启动一个定时操作,异步执行相关内容
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                // 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有Nacos Client的
                // UDP客户端PushClient
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
                    return;
                }

                Map<String, Object> cache = new HashMap<>(16);
                // 更新最后引用时间
                long lastRefTime = System.nanoTime();
                // 遍历所有PushClient,向所有该服务的订阅者Nacos Client进行UDP推送
                for (PushClient client : clients.values()) {
                    // 若当前PushClient为僵尸客户端
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        // 将该PushClient干掉
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }

                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();

                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }

                    if (compressData != null) {
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }

                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(),
                            (ackEntry == null ? null : ackEntry.key));

                    // UDP通信
                    udpPush(ackEntry);
                }
            } catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

            } finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }

        }, 1000, TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }

启动一个定时操作,异步执行相关内容:

  1. 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有Nacos Client的UDP客户端PushClient
  2. 遍历所有PushClient,向所有该服务的订阅者Nacos Client进行UDP推送 udpPush(ackEntry);

PushService#udpPush(): udp通信, Nacos Server向Nacos Client进行UDP推送

java">    private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }

        // 若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;  // 失败计数器加一
            return ackEntry;
        }

        try {
            // 计数器加一
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush++;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            // 发送UDP
            udpSocket.send(ackEntry.origin);

            ackEntry.increaseRetryTime();

            // 开启定时任务,进行UPD通信失败后的重新推送
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                    TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                    ackEntry.origin.getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;

            return null;
        }
    }

若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉, ackMap和udpSendTimeMap。
最后通过DatagramSocket.send()方法发送数据并开启定时任务,进行UPD通信失败后的重新推送。

在这里插入图片描述

1.2 Nacos Client接收Nacos Server的UDP推送

在这里插入图片描述
在这里插入图片描述

在之前的分析中, 会在项目启动后注入AbstractAutoServiceRegistration

AbstractAutoServiceRegistration#bind()

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

PushReceiver#run()

java">    public void run() {
        while (true) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\""
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

在这里插入图片描述

最后也是通过DatagramSocket.send()方法发送的数据, 表示Nacos Client接收Nacos Server的UDP推送成功


http://www.niftyadmin.cn/n/372543.html

相关文章

如何在Vue中进行表单验证?

Vue是一种非常强大的JavaScript框架&#xff0c;可以帮助我们在前端实现数据绑定、事件监听等特性&#xff0c;实现类似于MVVM的数据绑定机制。表单验证是Vue应用中非常常见的需求&#xff0c;下面是一个简单的示例&#xff0c;演示如何在Vue中进行表单验证。 首先&#xff0c…

帕累托改进和帕累托最优、卡尔多-希克斯改进

根据目标个数&#xff0c;分为单目标规划&#xff0c;以及多目标规划。多目标的规划是去找折中的解集合&#xff0c;既pareto最优解集合。对优化目标超过3个以上的&#xff0c;称之为超多目标优化问题。 帕累托改进描述的就是在没有人变得不好的前提下让有些人更好的过程。帕累…

Springboot整合OSS并实现文件上传和下载

目录 一.OSS服务器开通并创建账户 二.Springboot整合OSS 1.创建springboot项目 2.整合OSS 三.postman测试 一.OSS服务器开通并创建账户 参考阿里云OSS的使用(全程请登陆)_zhz小白的博客-CSDN博客https://blog.csdn.net/zhouhengzhe/article/details/112077301 二.Springb…

盛元广通生物样本库管理系统

生物样本库管理系统&#xff0c;作为一种高效、智能化的信息管理工具&#xff0c;在现代生物研究中扮演着重要的角色。随着科学技术的不断进步&#xff0c;生物样本的采集、保存和管理变得越来越重要&#xff0c;而盛元广通生物样本库管理系统正是为了解决这一问题而应运而生的…

windwos 安装 pysqlcipher3

windwos 安装 pysqlcipher3 安装软件 参考别人的文章&#xff0c;有的要安装Tcl Windows(64-bit, x64)&#xff0c;下载地址 但是我没有安装也可以用 安装python 推荐安装 python3.7 日常使用够了&#xff0c;不要追求新出来的版本&#xff0c;不太完善。 安装visual studio…

数字IC验证高频面试问题整理(附答案)

后台有同学私信想要验证的面试题目&#xff0c;这不就来了~ Q1.权重约束中”:”和 /”的区别 : 操作符表示值范围内的每一个值的权重是相同的,比如[1:3]:40,表示1&#xff0c;2&#xff0c;3取到的概率为40/120&#xff1b; :&#xff0f;操作符表示权重要平均分到值范围内的…

js中some和every用法

some用法 some() 方法用于检测数组中的元素是否满足指定条件&#xff0c;如果有一个元素满足条件&#xff0c;则表达式返回true , 剩余的元素不会再执行检测。如果没有满足条件的元素&#xff0c;则返回false。 注意&#xff1a; some() 不会对空数组进行检测。 some() 不会改…

中级软件设计师考试总结

目录 前言考前学习宏观什么是软考涉及的知识范围软考整体导图总结 微观我的分享——希尔排序学习过程结构化做题 考试阶段确定不确定 考后总结 前言 作为一名中级软件设计师&#xff0c;考试是衡量自己技能和水平的一项重要指标。在备考和考试过程中&#xff0c;我通过总结经验…