diff --git "a/blog-site/content/posts/essays/Netty\350\257\246\350\247\243.md" "b/blog-site/content/posts/essays/Netty\350\257\246\350\247\243.md" index 62f53bcc0..9cbed8adf 100644 --- "a/blog-site/content/posts/essays/Netty\350\257\246\350\247\243.md" +++ "b/blog-site/content/posts/essays/Netty\350\257\246\350\247\243.md" @@ -62,774 +62,408 @@ slug: "java-netty" 在低并发环境下,这种模型可以有效地处理IO操作,降低了系统的复杂性和开发难度。 这个模型的局限性包括处理能力的限制。在高并发场景中,单线程可能成为性能瓶颈,无法处理大量的并发请求。此外,如果某个IO操作发生阻塞,可能会影响到其他事件的处理,导致整体系统性能下降。 - 单`Reactor`单线程模型适用于连接数目较少、负载不高的应用场景,例如小型网络服务或低并发的应用。在处理高并发、大流量的应用时,可能需要使用多 Reactor 或多线程模型,以更好地满足性能需求。 #### 单Reactor多线程 -[//]: # (写到了这里) -![单Reactor多线程](/iblog/posts/annex/images/essays/单Reactor多线程.png) - -步骤: -- Reactor 对象通过对 select 监听请求事件,收到请求事件后交给 dispath 进行转发 -- 如果是建立连接请求,则通过 accept 处理连接请求,然后创建一个handler对象处理完成连接后的事件 -- 如果不是建立连接请求,则直接交给 handler 对象 -- handler 只负责响应事件,不做具体的业务处理,通过 read 读取完后,分发给下面的 worker线程池中某个线程处理 -- worker 线程负责处理具体业务,处理完成后会将具体的结果返回给 handler -- handler 线程通过send方法返回给客户端 +[单Reactor多线程](/iblog/posts/annex/images/essays/单Reactor多线程.png) -缺点: -- 多线程访问比较复杂,需要处理线程之间的竞争,资源共享 -- Reactor 对象在处理所有事件的监听和响应都是单线程的,在高并发场景容易出现性能瓶颈 +工作原理: +- `Reactor`对象通过对`select`监听请求事件,收到请求事件后交给`Dispatch`进行转发; +- 如果是建立连接请求,则通过`accept`处理连接请求,然后创建一个`handler`对象处理完成连接后的事件; +- 如果不是建立连接请求,则直接交给`handler`对象; +- `handler`只负责响应事件,不做具体的业务处理,通过`read`读取完后,分发给下面的 worker线程池中某个线程处理; +- `worker` 线程负责处理具体业务,处理完成后会将具体的结果返回给`handler`; +- `handler`线程通过`send`方法返回给客户端; -优点: -- 可以充分利用CPU资源 +单`Reactor`多线程模型通过引入多个工作线程来处理IO操作,能够更好地利用多核CPU的资源,提升了系统的并发处理能力和整体性能。 +但是`Reactor`对象在处理所有事件的监听和响应都是单线程的,在高并发场景容易出现性能瓶颈。 #### 主从Reactor多线程 -![主从Reactor线程](/iblog/posts/annex/images/essays/主从Reactor线程.png) +`Reactor`主线程可以对应多个`Reactor`子线程,即`MainRecator`可以关联多个`SubReactor`,从而解决了`Reactor` 在单线程中运行,高并发场景下容易成为性能瓶颈。 -Reactor 主线程可以对应多个 Reactor 子线程,即 MainRecator 可以关联多个 SubReactor,从而解决了Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈。 -步骤: -- Reactor 主线程 MainReactor 对象通过 select 监听连接事件,收到事件后,通过 Acceptor 处理连接事件 -- 当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor -- SubReactor 将连接加入到连接队列进行监听,并创建 handler 进行各种事件处理 -- 当有新事件发生时,SubReactor 就会调用对应的 handler 处理 -- handler 只负责响应事件,不做具体的业务处理,通过 read 读取完后,分发给下面的 worker线程池中某个线程处理 -- worker 线程负责处理具体业务,处理完成后会将具体的结果返回给 handler -- handler 线程通过send方法返回给客户端 +![主从Reactor线程](/iblog/posts/annex/images/essays/主从Reactor线程.png) -优点: -- 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据 -- 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理 +工作原理: +- `Reactor`主线程`MainReactor`对象通过`select`监听连接事件,收到事件后,通过`Acceptor`处理连接事件; +- 当`Acceptor`处理连接事件后,`MainReactor` 将连接分配给`SubReactor`; +- `SubReactor`将连接加入到连接队列进行监听,并创建`handler`进行各种事件处理; +- 当有新事件发生时,`SubReactor`就会调用对应的`handler`处理; +- `handler`只负责响应事件,不做具体的业务处理,通过`read`读取完后,分发给下面的`worker`线程池中某个线程处理; +- `worker`线程负责处理具体业务,处理完成后会将具体的结果返回给`handler`; +- `handler`线程通过`send`方法返回给客户端; -缺点: -- 编程复杂度较高 +主从`Reactor`多线程模型在`Reactor`架构中进一步优化了性能和扩展性。在这种模型中,主`Reactor`线程负责监听和接收所有IO事件,而从`Reactor`线程池则处理具体的事件和业务逻辑。 +通过将事件的监听和处理分离,主`Reactor`线程可以专注于高效地接受新的连接和事件,而从`Reactor`线程池则可以并行处理实际的IO操作。这种设计充分利用了多核CPU的资源,显著提升了系统的并发处理能力和响应速度。 -这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持。 +这种模型在许多项目中广泛使用,包括 `Nginx`主从`Reactor` 多进程模型,`Memcached`主从多线程,`Netty`主从多线程模型的支持。 +尽管该模型的实现复杂度增加,但它在处理大量并发请求时提供了卓越的性能和扩展性。 +### Netty模型 +`Netty`的线程模型设计旨在高效处理大量并发连接,并最大限度地利用CPU资源。其核心思想是将IO操作和业务逻辑处理分开,通过线程池和事件循环机制实现异步处理。 -### Netty 模型 ![Netty模型](/iblog/posts/annex/images/essays/Netty模型.png) -![netty结构](/iblog/posts/annex/images/essays/netty结构.png) +Netty的线程模型设计高效地支持大规模并发网络通信。核心包括两个主要类型的线程: +- 主事件循环线程(`Boss Thread`):主要负责监听和接受新的网络连接。`Netty`中的`ServerBootstrap`配置了一个或多个主事件循环线程,这些线程负责绑定网络端口并接收客户端的连接请求。当新的连接到达时,主线程会将这些连接注册到工作事件循环线程上进行进一步处理。 +- 工作事件循环线程(`Worker Thread`):负责处理已经建立连接的IO操作,包括读取和写入数据。每个连接的IO操作由工作线程负责处理,这些线程通常以线程池的形式存在,可以并行处理多个连接的IO事件。`Netty`允许为工作线程配置多个线程组,从而能够高效地处理大量并发请求。 -- Netty 抽象出两组线程池 `BossGroup` 专门负责接收客户端的连接,`WorkerGroup` 专门负责网络的读写;`BossGroup` 和 `WorkerGroup` 类型都是 `NioEventLoopGroup` -- `NioEventLoopGroup` 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是 `NioEventLoop`,每个 `NioEventLoop` 都有一个 `Selector`,用于监听绑定在其上的 `socket` 的网络通讯 -- `NioEventLoopGroup` 可指定多个 `NioEventLoop` -- 每个 `BossNioEventLoop` 循环执行的步骤 - - 轮询 accept 事件 - - 处理 `accept` 事件,与 client 建立连接,生成 `NioSocketChannel`,并将其注册到某个 `workerNioEventLoop` 上的 `Selector` - - 处理任务队列的任务,即 `runAllTasks` -- 每个 `WorkerNioEventLoop` 循环执行的步骤 - - 轮询 `read,write` 事件 - - 处理IO事件,即 `read,write` 事件,在对应 `NioSocketChannel` 处理 - - 处理任务队列的任务,即 `runAllTasks` -- 每个 `WorkerNioEventLoop` 处理业务时,会使用 `pipeline`(管道),`pipeline` 中包含了 `channel`(通道),即通过 `pipeline` 可以获取到对应通道,管道中维护了很多的处理器 +![netty结构](/iblog/posts/annex/images/essays/netty结构.png) +工作原理: +- `Netty`抽象出两组线程池 `BossGroup`专门负责接收客户端的连接,`WorkerGroup`专门负责网络的读写;`BossGroup`和`WorkerGroup`类型都是`NioEventLoopGroup`; +- `NioEventLoopGroup`相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是`NioEventLoop`,每个`NioEventLoop`都有一个`Selector`,用于监听绑定在其上的`socket`的网络通讯; +- `NioEventLoopGroup`可指定多个`NioEventLoop`; +- 每个`BossNioEventLoop`循环执行的步骤: + 1. 轮询`accept`事件; + 2. 处理`accept`事件,与`client`建立连接,生成`NioSocketChannel`,并将其注册到某个`workerNioEventLoop`上的`Selector`; + 3. 处理任务队列的任务,即`runAllTasks`; +- 每个`WorkerNioEventLoop`循环执行的步骤: + 1. 轮询`read`,`write`事件; + 2. 处理IO事件,即`read`,`write`事件,在对应`NioSocketChannel`处理; + 3. 处理任务队列的任务,即`runAllTasks`; +- 每个`WorkerNioEventLoop`处理业务时,会使用`pipeline`,`pipeline`中包含了`channel`,即通过`pipeline`可以获取到对应通道,管道中维护了很多的处理器; ## 使用Netty -1.[导入依赖](https://netty.io/downloads.html) -``` - - io.netty - netty-all - 4.1.36.Final - - - org.projectlombok - lombok - -``` - -2.服务器端代码演示 -``` -/** - * netty 服务端测试 - */ -public class MainTestServer { - public static void main(String[] args) { - // 启动器, 负责组装netty组件 启动服务器 - new ServerBootstrap() - // BossEventLoop WorkEventLoop 每个 EventLoop 就是 一个选择器 + 一个线程 - .group(new NioEventLoopGroup()) - // 选择服务器 ServerSocketChannel 具体实现 - .channel(NioServerSocketChannel.class) - // 决定了 workEventLoop 能做那些操作 - .childHandler( - // 建立连接后会被调用; 作用: 初始化 + 添加其他的 handler - new ChannelInitializer() { - // 当客户端请求发过来时 才会调用 - @Override - protected void initChannel(NioSocketChannel channel) throws Exception { - channel.pipeline().addLast(new StringDecoder()); - // 自定义handler - channel.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("服务器端接收数据:" + msg); - } - }); - } - }) - // 绑定监听端口 - .bind(8090); - } -} -``` - -3.客户端代码演示 -``` -/** - * netty 客户端测试 - */ -public class MainTestClient { - public static void main(String[] args) throws InterruptedException { - new Bootstrap() - .group(new NioEventLoopGroup()) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - // 初始化 在与服务器建立链接的时候 调用 - @Override - protected void initChannel(NioSocketChannel channel) throws Exception { - // 添加编码器 只有当向服务端发送请求数据时 才会执行 - channel.pipeline().addLast(new StringEncoder()); - } - }) - .connect(new InetSocketAddress("127.0.0.1", 8090)) - //阻塞方法,直到与服务器端连接建立 - .sync() - .channel() - // 向服务器端发送数据 - .writeAndFlush("hello word"); - } -} -``` -### 任务队列 -- 用户程序自定义的普通任务 -- 用户自定义定时任务 -- 非当前 Reactor 线程调用 Channel 的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。 - 最终的 Write 会提交到任务队列中后被异步消费 - -客户端 -``` -/** - * netty 客户端测试 - */ -public class MainTestClient { - public static void main(String[] args) throws InterruptedException { - new Bootstrap() - .group(new NioEventLoopGroup()) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - - // 初始化 在与服务器建立链接的时候 调用 - @Override - protected void initChannel(NioSocketChannel channel) throws Exception { - // 添加编码器 只有当向服务端发送请求数据时 才会执行 - channel.pipeline().addLast("decoder", new StringDecoder()); - channel.pipeline().addLast("encoder", new StringEncoder()); - channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("服务端响应数据:" + msg); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("客户端Active ....."); - } - - /** - * 客户端异常时触发 - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - cause.printStackTrace(); - ctx.close(); - } - - }); - } - - }) - .connect(new InetSocketAddress("127.0.0.1", 8090)) - //阻塞方法,直到与服务器端连接建立 - .sync() - .channel() - // 向服务器端发送数据 - .writeAndFlush("hello word"); - } -} -``` -服务端 -``` -/** - * netty 服务端测试 - */ -public class MainTestServer { - public static void main(String[] args) { - //new 一个主线程组 - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - //new 一个工作线程组 - EventLoopGroup workGroup = new NioEventLoopGroup(200); - // 启动器, 负责组装netty组件 启动服务器 - try { - ServerBootstrap serverBootstrap = new ServerBootstrap() +`Netty`是一个高性能的网络通信框架,广泛应用于需要高并发和高吞吐量的场景。它适用于构建各种类型的网络服务,包括但不限于高性能的`TCP`服务器、实时的`WebSocket`应用、轻量级的`UDP`服务和高效的HTTP/HTTPS 服务。 +`Netty`的异步非阻塞IO模型使其特别适合处理大量并发连接和高负载情况,广泛用于金融系统、在线游戏、实时聊天、流媒体传输、分布式系统和微服务架构等领域。凭借其灵活的架构和高效的性能,`Netty`能够满足对延迟敏感的应用需求,并且支持复杂的协议和自定义业务逻辑。 + +### 搭建TCP服务 +1. 在项目的构建工具中添加`Netty`依赖。 + ```xml + + io.netty + netty-all + 4.1.36.Final + + ``` +2. 使用`ServerBootstrap`类来引导服务器端。配置主要包括设置主事件循环线程和工作事件循环线程的线程池,以及处理各种连接和IO事件的处理器。 + ```java + /** + * netty 服务端测试 + */ + public class MainTestServer { + public static void main(String[] args) { + // 启动器, 负责组装netty组件 启动服务器 + new ServerBootstrap() // BossEventLoop WorkEventLoop 每个 EventLoop 就是 一个选择器 + 一个线程 - .group(bossGroup, workGroup) + .group(new NioEventLoopGroup()) // 选择服务器 ServerSocketChannel 具体实现 .channel(NioServerSocketChannel.class) - //设置队列大小 - .option(ChannelOption.SO_BACKLOG, 1024) - // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 - .childOption(ChannelOption.SO_KEEPALIVE, true) // 决定了 workEventLoop 能做那些操作 .childHandler( // 建立连接后会被调用; 作用: 初始化 + 添加其他的 handler new ChannelInitializer() { - // 当客户端请求发过来时 才会调用 + // 当客户端请求发过来时 才会调用 + @Override + protected void initChannel(NioSocketChannel channel) throws Exception { + channel.pipeline().addLast(new StringDecoder()); + // 自定义handler + channel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override - protected void initChannel(NioSocketChannel channel) throws Exception { - channel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); - channel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); - // 自定义handler - channel.pipeline().addLast(new ChannelInboundHandlerAdapter() { - - /** - * 客户端连接会触发 - */ - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("服务端 Active......"); - } - - /** - * 客户端发消息会触发 - */ - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("服务器收到消息: " + msg.toString()); - // 比如这里我们有一个非常耗时长的业务-> 应该一步执行 -> 提交该对应的channel - // 将任务放在 taskQueue 中 - ctx.channel().eventLoop().execute(() -> { - try { - Thread.sleep(10 * 1000); - ctx.writeAndFlush("业务1处理完成"); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - // 放在TaskQueue 中的任务是由一个线程来进行处理的 所以执行完上一个任务才会执行下一个任务 时间是累加的 - ctx.channel().eventLoop().execute(() -> { - try { - Thread.sleep(20 * 1000); - ctx.writeAndFlush("业务2处理完成"); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - // 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中 - - ctx.channel().eventLoop().schedule(new Runnable() { - @Override - public void run() { - - try { - Thread.sleep(5 * 1000); - ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8)); - System.out.println("channel code=" + ctx.channel().hashCode()); - } catch (Exception ex) { - System.out.println("发生异常" + ex.getMessage()); - } - } - }, 5, TimeUnit.SECONDS); - } - - /** - * 给客户端发送消息 - */ - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.writeAndFlush("over"); - } - - /** - * 发生异常触发 - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - cause.printStackTrace(); - ctx.close(); - } - - - }); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println("服务器端接收数据:" + msg); } }); + } + }) // 绑定监听端口 - ChannelFuture future = serverBootstrap.bind(8090).sync(); - // 对关闭通道进行监听 - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { - - } finally { - //关闭主线程组 - bossGroup.shutdownGracefully(); - //关闭工作线程组 - workGroup.shutdownGracefully(); + .bind(8090); } } -} -``` -### 异步模型 -异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。 - -`Netty` 中的IO操作是异步的,包括 `Bind、Write、Connect` 等操作会简单的返回一个 `ChannelFuture`。 调用者并不能立刻获得结果,而是通过 `Future-Listener` 机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。 - -当 `Future` 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 `ChannelFuture` 来获取操作执行的状态,注册监听函数来执行完成后的操作。 - -常见有如下操作: -- 通过 `isDone` 方法来判断当前操作是否完成; -- 通过 `isSuccess` 方法来判断已完成的当前操作是否成功; -- 通过 `getCause` 方法来获取已完成的当前操作失败的原因; -- 通过 `isCancelled` 方法来判断已完成的当前操作是否被取消; -- 通过 `addListener` 方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果 `Future` 对象已完成,则通知指定的监听器 - -``` -//绑定一个端口并且同步,生成了一个ChannelFuture对象 -//启动服务器(并绑定端口) -ChannelFuture cf = bootstrap.bind(6668).sync(); -//给cf注册监听器,监控我们关心的事件 -cf.addListener(new ChannelFutureListener() { - @Override - public void operationComplete (ChannelFuture future) throws Exception { - if (cf.isSuccess()) { - System.out.println("监听端口6668成功"); - } else { - System.out.println("监听端口6668失败"); + ``` +3. 使用`Bootstrap`类来引导客户端,配置包括事件循环线程池、处理器等。 + ```java + /** + * netty 客户端测试 + */ + public class MainTestClient { + public static void main(String[] args) throws InterruptedException { + new Bootstrap() + .group(new NioEventLoopGroup()) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + // 初始化 在与服务器建立链接的时候 调用 + @Override + protected void initChannel(NioSocketChannel channel) throws Exception { + // 添加编码器 只有当向服务端发送请求数据时 才会执行 + channel.pipeline().addLast(new StringEncoder()); + } + }) + .connect(new InetSocketAddress("127.0.0.1", 8090)) + //阻塞方法,直到与服务器端连接建立 + .sync() + .channel() + // 向服务器端发送数据 + .writeAndFlush("hello word"); } - } -}); - -``` -### Netty搭建Http服务 -服务端 -``` -public class MainTestServer { - public static void main(String[] args) { - //new 一个主线程组 - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - //new 一个工作线程组 - EventLoopGroup workGroup = new NioEventLoopGroup(200); - // 启动器, 负责组装netty组件 启动服务器 - try { - ServerBootstrap serverBootstrap = new ServerBootstrap() - // BossEventLoop WorkEventLoop 每个 EventLoop 就是 一个选择器 + 一个线程 - .group(bossGroup, workGroup) - // 选择服务器 ServerSocketChannel 具体实现 - .channel(NioServerSocketChannel.class) - //设置队列大小 - .option(ChannelOption.SO_BACKLOG, 1024) - // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 - .childOption(ChannelOption.SO_KEEPALIVE, true) - // 决定了 workEventLoop 能做那些操作 - .childHandler( - // 建立连接后会被调用; 作用: 初始化 + 添加其他的 handler - new ChannelInitializer() { - // 当客户端请求发过来时 才会调用 - @Override - protected void initChannel(NioSocketChannel channel) throws Exception { - channel.pipeline().addLast("MyHttpServerCodec", new HttpServerCodec()); - // 自定义handler - channel.pipeline().addLast(new SimpleChannelInboundHandler() { - @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { - System.out.println("对应的channel=" + ctx.channel() + " pipeline=" + ctx.pipeline() + " 通过pipeline获取channel" + ctx.pipeline().channel()); - - System.out.println("当前ctx的handler=" + ctx.handler()); - - //判断 msg 是不是 httprequest请求 - if (msg instanceof HttpRequest) { - - System.out.println("ctx 类型=" + ctx.getClass()); - - System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + " TestHttpServerHandler hash=" + this.hashCode()); - - System.out.println("msg 类型=" + msg.getClass()); - System.out.println("客户端地址" + ctx.channel().remoteAddress()); - - //获取到 - HttpRequest httpRequest = (HttpRequest) msg; - //获取uri, 过滤指定的资源 - URI uri = new URI(httpRequest.uri()); - if ("/favicon.ico".equals(uri.getPath())) { - System.out.println("请求了 favicon.ico, 不做响应"); - return; - } - //回复信息给浏览器 [http协议] - - ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8); - - //构造一个http的相应,即 httpresponse - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); - - response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); - - //将构建好 response返回 - ctx.writeAndFlush(response); - } - } - }); - } - }); - // 绑定监听端口 - ChannelFuture future = serverBootstrap.bind(8090).sync(); - future.addListener( future1 -> { - if (future.isSuccess()) { - System.out.println("监听端口8090成功"); - }else{ - System.out.println("监听端口8090失败"); - } - }); - // 对关闭通道进行监听 - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { - - } finally { - //关闭主线程组 - bossGroup.shutdownGracefully(); - //关闭工作线程组 - workGroup.shutdownGracefully(); + } + ``` +4. 启动服务器端应用和客户端应用,进行网络通信测试。 + +### 搭建HTTP服务 +1. 在项目的构建工具中添加`Netty`依赖。 + ```xml + + io.netty + netty-all + 4.1.36.Final + + ``` +2. 创建一个主类用于启动`Netty`服务器。在此类中,配置`ServerBootstrap`来设置服务器的基本属性,并绑定端口以开始监听请求。 + ```java + public class NettyHttpServer { + private final int port; + + public NettyHttpServer(int port) { + this.port = port; + } + + public void start() throws Exception { + // Event loop groups for handling I/O operations + final NioEventLoopGroup bossGroup = new NioEventLoopGroup(); + final NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(65536)); + ch.pipeline().addLast(new HttpContentCompressor()); + ch.pipeline().addLast(new HttpContentDecompressor()); + ch.pipeline().addLast(new SimpleHttpHandler()); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // Bind and start to accept incoming connections + ChannelFuture f = bootstrap.bind(port).sync(); + f.channel().closeFuture().sync(); + } finally { + // Shut down all event loops to terminate all threads + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + public static void main(String[] args) throws Exception { + new NettyHttpServer(8080).start(); } } -} -``` -### TCP粘包、拆包及解决方案 -TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的 socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,由于 TCP 无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题。 - -拆包和粘包是在socket编程中经常出现的情况,在socket通讯过程中,如果通讯的一端一次性连续发送多条数据包,tcp协议会将多个数据包打包成一个tcp报文发送出去,这就是所谓的粘包。而如果通讯的一端发送的数据包超过一次tcp报文所能传输的最大值时,就会将一个数据包拆成多个最大tcp长度的tcp报文分开传输,这就叫做拆包。 - -对于粘包的情况,要对粘在一起的包进行拆包。对于拆包的情况,要对被拆开的包进行粘包,即将一个被拆开的完整应用包再组合成一个完整包。比较通用的做法就是每次发送一个应用数据包前在前面加上四个字节的包长度值,指明这个应用包的真实长度。 - -使用netty解决拆包、粘包问题代码示例: - -客户端代码 -```` -@SpringBootApplication -public class NettyClientApplication { - - public static void main(String[] args) { - SpringApplication.run(NettyClientApplication.class, args); + ``` +3. 实现一个`SimpleHttpHandler`处理 HTTP 请求和响应。这个处理器将负责处理客户端的请求并生成响应。 + ```java + public class SimpleHttpHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { + // Create a simple HTTP response + String content = "Hello, Netty!"; + FullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, + Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + // Write the response and close the connection + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } } -} + ``` +4. 启动服务器。服务器会监听在指定的端口,并可以处理到达该端口的HTTP请求。 + ```java + public static void main(String[] args) throws Exception { + new NettyHttpServer(8080).start(); + } + ``` +5. 启动服务器后,可以使用浏览器或工具访问`http://localhost:8080`来测试服务是否正常工作。如果一切正常,应该会看到 “Hello, Netty!” 的响应。 + ```text + curl http://localhost:8080 + ``` -@Slf4j -@Component -public class StartNetty implements CommandLineRunner { +### TCP粘包拆包 +TCP是面向连接的,面向流的,提供高可靠性服务。客户端和服务器端都要有成对的`Socket`,因此发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化算法,将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。 +这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,由于`TCP`无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题。 - private final NettyClient nettyClient; +粘包就是多个数据包被合并成一个包进行发送。在接收端,数据包无法区分它们的边界,导致接收到的数据流是混合的。 +拆包是一个数据包被拆分成多个包进行发送。在接收端,可能会将多个数据包的内容错误地合并为一个包。 - public StartNetty(NettyClient nettyClient) { - this.nettyClient = nettyClient; - } +对于粘包的情况,要对粘在一起的包进行拆包。对于拆包的情况,要对被拆开的包进行粘包,即将一个被拆开的完整应用包再组合成一个完整包。 +比较通用的做法就是每次发送一个应用数据包前在前面加上四个字节的包长度值,指明这个应用包的真实长度。 - @Override - public void run(String... args) throws Exception { - log.info("启动netty客户端 ..."); - nettyClient.start(); - } -} +在每个数据包的开头添加一个固定长度的字段,用于指明整个包的长度。这样,接收端可以根据这个长度信息正确地拆分和组装数据包,从而解决粘包和拆包问题。数据包格式: +```text +| Length (4 bytes) | Data Body | +``` -@Slf4j -@Component -public class NettyClient { - /** - * Netty客户端启动 - */ - public void start() { - EventLoopGroup group = new NioEventLoopGroup(); - Bootstrap bootstrap = new Bootstrap() - .group(group) - //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输 - .option(ChannelOption.TCP_NODELAY, true) - .channel(NioSocketChannel.class) - .handler(new NettyClientInitializer()); +服务端代码 +```java +public class TcpServer { + public static void main(String[] args) { try { - ChannelFuture future = bootstrap.connect("127.0.0.1", 9000).sync(); - log.info("客户端成功...."); - //发送消息 - future.channel().writeAndFlush("客户端请求数据"); - // 等待连接被关闭 - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.bind(new java.net.InetSocketAddress(8080)); + System.out.println("服务器启动,监听端口8080"); + + while (true) { + SocketChannel clientChannel = serverSocketChannel.accept(); + System.out.println("客户端已连接"); + + // 使用线程处理客户端请求 + new Thread(() -> handleClient(clientChannel)).start(); + } + } catch (IOException e) { e.printStackTrace(); - } finally { - group.shutdownGracefully(); } } -} - - - -public class NettyClientInitializer extends ChannelInitializer { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast("decoder", new MyMessageDecoder()); - socketChannel.pipeline().addLast("encoder", new MyMessageEncoder()); - socketChannel.pipeline().addLast(new NettyClientHandler()); - } -} - -@Slf4j -public class NettyClientHandler extends ChannelInboundHandlerAdapter { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.info("客户端Active ....."); - // 模拟tcp粘包 - for (int i = 0; i < 5; i++) { - String mes = "今天天气冷,吃火锅"; - byte[] content = mes.getBytes(Charset.forName("utf-8")); - int length = mes.getBytes(Charset.forName("utf-8")).length; - - // 解决tcp粘包问题 - MessageProtocol messageProtocol = new MessageProtocol(); - messageProtocol.setLen(length); - messageProtocol.setContent(content); - ctx.writeAndFlush(messageProtocol); + private static void handleClient(SocketChannel channel) { + try { + while (true) { + byte[] data = receiveMessage(channel); + if (data == null) break; // 连接关闭 + + String message = new String(data); + System.out.println("收到消息: " + message); + + // 回显消息给客户端 + sendMessage(channel, data); + } + } catch (IOException e) { + e.printStackTrace(); } } - /** - * 收到服务端的消息 - */ - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log.info("客户端收到消息: {}", msg.toString()); - MessageProtocol mp = (MessageProtocol)msg; - int len = mp.getLen(); - byte[] content = mp.getContent(); - - System.out.println("客户端接收到消息如下"); - System.out.println("长度=" + len); - System.out.println("内容=" + new String(content, StandardCharsets.UTF_8)); - - + private static void sendMessage(SocketChannel channel, byte[] data) throws IOException { + ByteBuffer lengthBuffer = ByteBuffer.allocate(4); + lengthBuffer.putInt(data.length + 4); // 包含长度字段的总长度 + lengthBuffer.flip(); + + ByteBuffer dataBuffer = ByteBuffer.wrap(data); + channel.write(lengthBuffer); + channel.write(dataBuffer); } - /** - * 客户端异常时触发 - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - cause.printStackTrace(); - ctx.close(); + private static byte[] receiveMessage(SocketChannel channel) throws IOException { + ByteBuffer lengthBuffer = ByteBuffer.allocate(4); + int bytesRead = channel.read(lengthBuffer); + if (bytesRead == -1) return null; // 读取结束 + + lengthBuffer.flip(); + int length = lengthBuffer.getInt(); + + ByteBuffer dataBuffer = ByteBuffer.allocate(length - 4); + channel.read(dataBuffer); + dataBuffer.flip(); + + byte[] data = new byte[dataBuffer.remaining()]; + dataBuffer.get(data); + return data; } } -```` - -服务端代码 -```` -@SpringBootApplication -public class NettyServerApplication { +``` +客户端代码 +```java +public class TcpClient { public static void main(String[] args) { - SpringApplication.run(NettyServerApplication.class, args); - } -} - -@Slf4j -@Component -public class StartNetty implements CommandLineRunner { - - private final NettyServer nettyServer; - - public StartNetty(NettyServer nettyServer) { - this.nettyServer = nettyServer; - } + try { + SocketChannel socketChannel = SocketChannel.open(new java.net.InetSocketAddress("localhost", 8080)); + System.out.println("连接到服务器"); - /** - * 启动netty 让netty随着项目一起启动 - */ - @Override - public void run(String... args) throws Exception { - log.info("netty 服务端启动 ..."); - nettyServer.start(new InetSocketAddress("127.0.0.1", 9000)); - } -} + // 发送消息到服务器 + String message = "Hello, Server!"; + sendMessage(socketChannel, message.getBytes()); -@Slf4j -@Component -public class NettyServer { + // 接收服务器的响应 + byte[] response = receiveMessage(socketChannel); + System.out.println("收到来自服务器的消息: " + new String(response)); - /** - * Netty服务启动 - */ - public void start(InetSocketAddress socketAddress) { - //new 一个主线程组 - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - //new 一个工作线程组 - EventLoopGroup workGroup = new NioEventLoopGroup(200); - - ServerBootstrap bootstrap = new ServerBootstrap() - .group(bossGroup, workGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ServerChannelInitializer()) - .localAddress(socketAddress) - //设置队列大小 - .option(ChannelOption.SO_BACKLOG, 1024) - // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 - .childOption(ChannelOption.SO_KEEPALIVE, true); - //绑定端口,开始接收进来的连接 - try { - // 绑定端口 生成一个ChannelFuture 对象 启动服务器 - ChannelFuture future = bootstrap.bind(socketAddress).sync(); - log.info("服务器启动开始监听端口: {}", socketAddress.getPort()); - // 对关闭通道进行监听 - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { + socketChannel.close(); + } catch (IOException e) { e.printStackTrace(); - } finally { - //关闭主线程组 - bossGroup.shutdownGracefully(); - //关闭工作线程组 - workGroup.shutdownGracefully(); } } -} - -public class ServerChannelInitializer extends ChannelInitializer { - - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast("decoder", new MyMessageDecoder()); - socketChannel.pipeline().addLast("encoder", new MyMessageEncoder()); - socketChannel.pipeline().addLast(new NettyServerHandler()); - } -} - -@Slf4j -public class NettyServerHandler extends ChannelInboundHandlerAdapter { - /** - * 客户端连接会触发 - */ - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.info("服务端 Active......"); - } - /** - * 客户端发消息会触发 - */ - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log.info("服务器收到消息: {}", msg.toString()); - MessageProtocol mp = (MessageProtocol)msg; - int len = mp.getLen(); - byte[] content = mp.getContent(); - - System.out.println(); - System.out.println(); - System.out.println(); - System.out.println("服务器接收到信息如下"); - System.out.println("长度=" + len); - System.out.println("内容=" + new String(content, Charset.forName("utf-8"))); - - //回复消息 - String responseContent = UUID.randomUUID().toString(); - int responseLen = responseContent.getBytes("utf-8").length; - byte[] responseContent2 = responseContent.getBytes("utf-8"); - //构建一个协议包 - MessageProtocol messageProtocol = new MessageProtocol(); - messageProtocol.setLen(responseLen); - messageProtocol.setContent(responseContent2); - - ctx.writeAndFlush(messageProtocol); + private static void sendMessage(SocketChannel channel, byte[] data) throws IOException { + ByteBuffer lengthBuffer = ByteBuffer.allocate(4); + lengthBuffer.putInt(data.length + 4); // 包含长度字段的总长度 + lengthBuffer.flip(); + + ByteBuffer dataBuffer = ByteBuffer.wrap(data); + channel.write(lengthBuffer); + channel.write(dataBuffer); } - /** - * 给客户端发送消息 - */ - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.writeAndFlush("hello client"); - } - - /** - * 发生异常触发 - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - cause.printStackTrace(); - ctx.close(); + private static byte[] receiveMessage(SocketChannel channel) throws IOException { + ByteBuffer lengthBuffer = ByteBuffer.allocate(4); + channel.read(lengthBuffer); + lengthBuffer.flip(); + int length = lengthBuffer.getInt(); + + ByteBuffer dataBuffer = ByteBuffer.allocate(length - 4); + channel.read(dataBuffer); + dataBuffer.flip(); + + byte[] data = new byte[dataBuffer.remaining()]; + dataBuffer.get(data); + return data; } } -```` - -公共代码部分 -```` -public class MessageProtocol { +``` - private int len; +## Netty零拷贝 +零拷贝是计算机的一种技术,目的是减少数据在内存和存储设备之间的拷贝操作。这种技术优化了数据的传输过程,通过减少数据拷贝的次数来提高性能并降低资源消耗。 - private byte[] content; +假设我们有一个大文件,需要将其内容发送到客户端。传统的文件传输流程如下: +1. 将文件内容从磁盘读取到内存中的缓冲区。这一步需要将文件的数据从磁盘拷贝到操作系统的内核空间,再拷贝到用户空间。 +2. 将内存中的数据通过网络发送到客户端。这需要将数据从用户空间再次拷贝到内核空间,并通过网络传输。 - public int getLen() { - return len; - } +在这个过程中,文件内容被拷贝了两次,这会消耗CPU和内存资源,特别是对于大文件和高频率传输的情况。 - public void setLen(int len) { - this.len = len; - } +在零拷贝的优化下,减少了内存拷贝操作。以下是使用`Netty`的零拷贝进行文件传输的流程: +1. 创建直接内存缓冲区:`Netty`使用`DirectByteBuf`创建直接内存缓冲区,该缓冲区不在Java堆内存中,而是在操作系统的内存中。允许`Netty`直接操作内核空间的缓冲区。 +2. 使用`FileRegion`直接传输文件内容:`Netty`提供了`FileRegion`接口和`DefaultFileRegion`实现,可以直接将文件内容从磁盘传输到网络通道。 +`FileRegion`可以通过`FileChannel`的`transferTo`方法实现数据的零拷贝传输,不需要将文件内容加载到内存中。 - public byte[] getContent() { - return content; - } - - public void setContent(byte[] content) { - this.content = content; - } +数据从直接内存缓冲区通过网络传输到客户端,避免了额外的内存拷贝操作。 +```java +public class FileServerHandler extends SimpleChannelInboundHandler { @Override - public String toString() { - return "MessageProtocol{" + - "len=" + len + - ", content=" + new String(content) + - '}'; - } -} - -public class MyMessageDecoder extends ReplayingDecoder { - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - //需要将得到二进制字节码-> MessageProtocol 数据包(对象) - int length = in.readInt(); - byte[] content = new byte[length]; - in.readBytes(content); - - //封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理 - MessageProtocol messageProtocol = new MessageProtocol(); - messageProtocol.setLen(length); - messageProtocol.setContent(content); - out.add(messageProtocol); + protected void channelRead0(ChannelHandlerContext ctx, String fileName) throws Exception { + // 打开文件 + RandomAccessFile file = new RandomAccessFile(fileName, "r"); + FileChannel fileChannel = file.getChannel(); + + // 创建 FileRegion,用于零拷贝文件传输 + FileRegion fileRegion = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); + + // 将文件内容写入到客户端 + ctx.writeAndFlush(fileRegion).addListener(ChannelFutureListener.CLOSE); } } +``` -public class MyMessageEncoder extends MessageToByteEncoder { +`Netty`的零拷贝实现可以从两个方面来说: +1. `Netty`的零拷贝实现依赖于底层操作系统对零拷贝的支持。例如,在`UNIX`和`Linux`系统中,`Netty`使用`sendfile()`系统调用,将数据直接从内核缓冲区传输到套接字缓冲区,而不经过用户空间的拷贝。 +2. 通过直接内存缓冲区`Netty`可以减少内存拷贝,提升数据传输效率。`FileRegion`和`DirectByteBuf`等类使得数据可以高效地在网络通道和文件系统之间传输。 - @Override - protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { - System.out.println("MyMessageEncoder encode 方法被调用"); - out.writeInt(msg.getLen()); - out.writeBytes(msg.getContent()); - } -} -```` +`Netty`的零拷贝实现首先依赖于底层操作系统对零拷贝的支持。在`UNIX`和`Linux`系统中,这种支持主要体现在`sendfile()`方法的调用上。 +当`Netty`调用`sendfile()`时,数据从文件系统的内核缓冲区直接被传输到网络接口的内核缓冲区。这个过程完全在内核空间完成,不涉及用户空间,从而减少了内存拷贝和 CPU 的使用。 -## Netty零拷贝 +`Netty`使用`DirectByteBuf`类来创建直接内存缓冲区。这些缓冲区分配在操作系统的直接内存中,而不是Java堆内存中。 +数据在网络通道和直接内存缓冲区之间传输,避免了数据在堆内存和直接内存之间的拷贝。 +```text +ByteBuf buffer = Unpooled.directBuffer(1024); // 创建一个直接内存缓冲区 +buffer.writeBytes(data); // 直接写入数据到内存 +``` ## mpsc无锁编程 +[//]: # (写到了这里) + ## Netty对象池 diff --git a/blog-site/public/index.xml b/blog-site/public/index.xml index fffe7cd1e..6b1d707d2 100644 --- a/blog-site/public/index.xml +++ b/blog-site/public/index.xml @@ -839,7 +839,7 @@ http://localhost:1313/iblog/posts/essays/java-netty/ Fri, 09 Apr 2021 00:00:00 +0000 http://localhost:1313/iblog/posts/essays/java-netty/ - Netty 参考文章:https://dongzl.github.io/netty-handbook/#/ 概述 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers &amp; clients. Netty 是一个异步的、基于事 + 概述 Netty是一个高性能、异步的网络应用框架,用于开发高效的网络通信程序。它是Java NIO的一个抽象,简化了网络编程的复杂性,并提供了一系列高级功能,使网络 JVM中的方法区 diff --git a/blog-site/public/page/8/index.html b/blog-site/public/page/8/index.html index 3a08c7eb9..1f4d014bb 100644 --- a/blog-site/public/page/8/index.html +++ b/blog-site/public/page/8/index.html @@ -570,7 +570,7 @@

Netty详解

-

Netty 参考文章:https://dongzl.github.io/netty-handbook/#/ 概述 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty 是一个异步的、基于事......

+

概述 Netty是一个高性能、异步的网络应用框架,用于开发高效的网络通信程序。它是Java NIO的一个抽象,简化了网络编程的复杂性,并提供了一系列高级功能,使网络......

diff --git a/blog-site/public/posts/essays/java-netty/index.html b/blog-site/public/posts/essays/java-netty/index.html index b377a9c38..c52aa337b 100644 --- a/blog-site/public/posts/essays/java-netty/index.html +++ b/blog-site/public/posts/essays/java-netty/index.html @@ -274,8 +274,8 @@ @@ -326,8 +326,8 @@ @@ -354,8 +354,8 @@ @@ -382,36 +382,8 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - -