Linux下Netty实现高性能UDP服务

news/2024/5/18 16:08:57 标签: udp, netty

前言

近期笔者基于Netty接收UDP报文进行业务数据统计的功能,因为Netty默认情况下处理UDP收包只能由一个线程负责,无法像TCP协议那种基于主从reactor模型实现多线程监听端口,所以笔者查阅网上资料查看是否有什么方式可以接收UDP收包的性能瓶颈,遂以此文来记录一下笔者的解决过程。

简介Linux内核3.9的新特性对Netty的影响

常规的Netty处理UDP包我们只能用按个NIOEventLoop线程接收传输的数据包,从底层来看即只使用一个socket线程监听网络端口,通过这一个线程将数据传输到应用层上,这一切使得我们唯一能够调优的方式就是在Socket监听传输时尽可能快速将发送给应用程序,让应用程序及时处理完以便NIOEventLoop线程能够及时处理下一个UDP数据包。亦或者,我们也可以直接通过增加服务器的数量通过集群的方式提升系统整体的吞吐量。

在这里插入图片描述

然而事实真是如此吗?在Linux内核3.9版本新增了一个SO_REUSEPORT的特性,它使得单台Linux的端口可以被多个Socket线程监听,这一特性使得Netty在高并发场景下的UDP数据包能够及时被多个线程及时处理,尽可能的避免了丢包线程且最大化的利用了CPU核心,实现内核层面的负载均衡。

在这里插入图片描述

Netty实现Linux下UDP端口复用步骤

引入Netty依赖

为了使用Netty我们必须先引入对应的maven依赖,这里笔者选择了4.1.58的最终版,读者可以按需选择自己的版本。

 <!--netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.58.Final</version>
        </dependency>

编写启动类和启动逻辑

然后我我们需要编写Netty的启动类,代码模板如下,因为Netty默认使用的是Java NIO,而在Linux支持epoll模型,相比与常规的Java NIO这种通过来回在用户态和内核态来回拷贝事件数组fd的方式,epoll内部自己维护了事件的数组并可以将自行去询问连接状态并将结果返回到用户态显得更加高效。
所以笔者在启动类的编写时会判断当前服务器是否支持epoll的逻辑,并通过该判断顺手解决了是否基于SO_REUSEPORT开启多线程监听的功能(注:这段代码读者必须自行查阅一下服务器内核版本是否大于等于3.9)。

/**
 * netty服务
 */
@Component
public class NettyUdpServer {

    private static final Logger LOG = LoggerFactory.getLogger(NettyUdpServer.class);

    private EventLoopGroup bossLoopGroup;

    private Channel serverChannel;


    /**
     * netty初始化
     */
    public void init(int port) {

        LOG.info("Epoll.isAvailable():{}", Epoll.isAvailable());


        //表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接
        bossLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();

        try {
            //1、创建netty bootstrap 启动类
            Bootstrap serverBootstrap = new Bootstrap();
            //2、设置boostrap 的eventLoopGroup线程组
            serverBootstrap.group(bossLoopGroup)
                    //3、设置NIO UDP连接通道
                    .channel(Epoll.isAvailable() ? EpollDatagramChannel.class : NioDatagramChannel.class)
                    //4、设置通道参数 SO_BROADCAST广播形式
                    .option(ChannelOption.SO_BROADCAST, true)
                    .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
                    //5、设置处理类 装配流水线
                    .handler(new NettyUdpHandler());

            // linux平台下支持SO_REUSEPORT特性以提高性能
            if (Epoll.isAvailable()) {
                LOG.info("SO_REUSEPORT");
                serverBootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
            }

            // 如果支持epoll则说明是Linux版本,则利用SO_REUSEPORT创建多个线程
            if (Epoll.isAvailable()) {
                // linux系统下使用SO_REUSEPORT特性,使得多个线程绑定同一个端口
                int cpuNum = Runtime.getRuntime().availableProcessors();
                LOG.info("using epoll reuseport and cpu:" + cpuNum);
                for (int i = 0; i < cpuNum; i++) {
                    LOG.info("worker-{} bind", i);
                    //6、绑定server,通过调用sync()方法异步阻塞,直到绑定成功
                    ChannelFuture future = serverBootstrap.bind(port).sync();
                    if (!future.isSuccess()) {
                        LOG.error("bootstrap bind fail port is " + port);
                        throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", "192.168.2.128", port), future.cause());
                    } else {
                        LOG.info("bootstrap bind success ");
                    }
                }
            } else {
                ChannelFuture future = serverBootstrap.bind(port).sync();
                if (!future.isSuccess()) {
                    LOG.error("bootstrap bind fail port is " + port);
                    throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", "127.0.0.1", port), future.cause());
                } else {
                    LOG.info("bootstrap bind success ");
                }
            }

        } catch (Exception e) {
            LOG.error("报错了,错误原因:{}", e.getMessage(), e);
        }


    }


}

因为该代码是编写在spring boot项目中,所以我们还需要添加一下启动的逻辑。

@Component
public class InitTask implements CommandLineRunner {

    private static final Logger LOG = LoggerFactory.getLogger(InitTask.class);



    @Autowired
    private NettyUdpServer nettyUdpServer;






    @Override
    public void run(String... args) {

        LOG.info("netty服务器初始化成功,端口号:{}", 7000);
        nettyUdpServer.init(7000);

    }

}

封装业务处理类

处理类的逻辑比较简单了,收到内容后打印后,原子类自增一下,该原子类是用于后续压测统计是否丢包用的。

/**
 * 报文处理器
 */
@Component
@ChannelHandler.Sharable
public class NettyUdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    private static final Logger LOG = LoggerFactory.getLogger(NettyUdpHandler.class);

    private static AtomicInteger atomicInteger=new AtomicInteger(0);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) {
        try {

            int length = dp.content().readableBytes();
            //分配一个新的数组来保存具有该长度的字节数据
            byte[] array = new byte[length];
            //将字节复制到该数组
            dp.content().getBytes(dp.content().readerIndex(), array);
            LOG.info("收到UDP报文,报文内容:{} 包处理个数:{}", new String(array),atomicInteger.incrementAndGet());


        } catch (Exception e) {
            LOG.error("报文处理失败,失败原因:{}", e.getMessage(), e);
        }
    }
}

基于jmeter完成压测统计丢包率

自此我们项目都编写完成了,我们不妨使用jmeter进行一次压测,可以看到笔者会一次性发送100w个数据包查看最终的收包数。

在这里插入图片描述

而UDP包的格式以及目的地址和内容如下

在这里插入图片描述

最终压测结果如下,可以看到服务器都及时的收到了数据包,并不存在丢包的现象。

在这里插入图片描述

为了可以看到性能的提升,笔者将代码还原回单线程监听的老代码段:

/**
     * netty初始化
     */
    public void init(int port) {

        LOG.info("Epoll.isAvailable():{}", Epoll.isAvailable());


        //表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接
        bossLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();

        try {
            //1、创建netty bootstrap 启动类
            Bootstrap serverBootstrap = new Bootstrap();
            //2、设置boostrap 的eventLoopGroup线程组
            serverBootstrap.group(bossLoopGroup)
                    //3、设置NIO UDP连接通道
                    .channel(Epoll.isAvailable() ? EpollDatagramChannel.class : NioDatagramChannel.class)
                    //4、设置通道参数 SO_BROADCAST广播形式
                    .option(ChannelOption.SO_BROADCAST, true)
                    .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
                    //5、设置处理类 装配流水线
                    .handler(new NettyUdpHandler());

           


            ChannelFuture future = serverBootstrap.bind(port).sync();
            if (!future.isSuccess()) {
                LOG.error("bootstrap bind fail port is " + port);
                throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", "127.0.0.1", port), future.cause());
            } else {
                LOG.info("bootstrap bind success ");
            }


        } catch (Exception e) {
            LOG.error("报错了,错误原因:{}", e.getMessage(), e);
        }


    }

根据老的压测结果来看,单线程监听的情况下,确实会存在一定的丢包,所以如果在高并发场景下使用Netty接收UDP数据包的小伙伴,建立利用好Linux内核3.9的特性提升程序的吞吐量哦。

在这里插入图片描述

参考文献

Linux下Netty实现高性能UDP服务(SO_REUSEPORT): https://blog.csdn.net/monokai/article/details/108453746

Netty网络传输简记: https://www.sharkchili.com/pages/710071/#前言


http://www.niftyadmin.cn/n/5272160.html

相关文章

2024上半年软考别轻易尝试!先了未发布

最近几年&#xff0c;软件考试变得非常受欢迎&#xff01;不论你的专业、学历或工作时间如何&#xff0c;你都可以报名参加&#xff0c;而且通过考试取得证书还能用来抵扣个人所得税、评职称、帮助落户和参与招投标等等。 身边的朋友们纷纷参加软考&#xff0c;这让我也产生了…

ubuntu添加路由

ip route show 查看当前路由表 sudo ip route add /mask via 添加一条路由 目标ip 1.1.1.1/100 下一跳 2.2.2.2 sudo ip route add 1.1.1.1/100 via 2.2.2.2 dev ens160 proto static metric 100这是一条Linux命令&#xff0c;用于添加一个静态路由。具体含义如下&#xff1…

QT-可拖拉绘图工具

QT-可拖拉绘图工具 一、演示效果二、关键程序三、下载链接 一、演示效果 二、关键程序 #include "diagramscene.h" #include "arrow.h"#include <QTextCursor> #include <QGraphicsSceneMouseEvent> #include <QDebug>QPen const Diagr…

DataGrip 2023.3 新功能速递!

1 数据可视化 自 DataGrip 2023.3 发布以来&#xff0c;已整合 Lets-Plot 库&#xff0c;实现数据可视化。该可视化功能可用于所有三种类型的网格&#xff1a; 主选项卡&#xff1a;在打开表、视图或 CSV 文件时&#xff0c;在分割模式下显示图表。结果选项卡&#xff1a;在 服…

Postman/Apifox使用教程

Postman/Apifox使用教程 1. 界面导航说明2.发送第一个请求3. 工具的基础功能3.1 常见类型的接口请求3.1.1 查询参数的接口请求3.1.2 表单类型的接口请求3.1.3 上传文件的表单请求3.1.4 json类型的接口请求 3.2 接口响应数据解析 附录 1. 界面导航说明 2.发送第一个请求 http:/…

排序 | 冒泡 插入 希尔 选择 堆 快排 归并 非递归 计数 基数 排序

排序 | 冒泡 插入 希尔 选择 堆 快排 归并 非递归 计数 基数 排序 文章目录 排序 | 冒泡 插入 希尔 选择 堆 快排 归并 非递归 计数 基数 排序前言&#xff1a;冒泡排序插入排序希尔排序选择排序堆排序快速排序--交换排序三数取中快速排序hoare版本快速排序挖坑法快速排序前后指…

python 内置数据结构

python内置的数据结构有&#xff1a; 列表(list) 元组(tuple) 字典(dict) 集合(set) 在python语言中&#xff0c;以上4种数据结构和基础数据类型&#xff08;整数、浮点数等&#xff09;统称为“内置类型”&#xff08;Built-in Types&#xff09;。 1. 列表(list) 参考&…

为什么企业有了银企直联还需要智能网银?

银企直联&#xff0c;又称银企互联&#xff0c;是指企业通过在内部建立自己的资金管理系统&#xff0c;与银行进行数据与信息的交互&#xff0c;方便企业实时查询账户信息、交易明细&#xff0c;以及办理结算、贷款、票据管理等业务。 由于银企直联为企业财资管理带来了更多的…