赞
踩
目录
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.77.Final</version>
- </dependency>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.5.8</version>
- </dependency>
- server:
- port: 8001
- servlet:
- context-path: /netty
- netty:
- url: 0.0.0.0 #0.0.0.0表示绑定任意ip
- port: 20004
- package com.tlxy.lhn.controller.netty;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
-
- public class NettyServer {
- public static void main(String[] args) throws InterruptedException {
- //创建两个线程组bossGroup和workerGroup,含有的子线程NioEventLoop的个数默认是CPU的两倍
- //bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup(1);
-
- try {
- //创建服务器端的启动对象
- ServerBootstrap bootstrap = new ServerBootstrap();
- //使用链式编程来配置参数
- bootstrap.group(bossGroup, workerGroup)//设置两个线程组
- .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现
- //初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
- //多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- //对workerGroup的SocketChannel设置处理器
- channel.pipeline().addLast(new NettyServerHandler());
- }
- });
-
- System.out.println("netty server start..");
-
- //绑定一个端口并且同步生成一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
- //启动服务器(并绑定的端口),bind是异步操作,sync方法是等待异步操作执行完毕
- ChannelFuture cf = bootstrap.bind(9000).sync();
-
- //给cf注册监听器,监听我们关心的事件
- cf.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- if (cf.isSuccess()) {
- System.out.println("监听端口9000成功");
- } else {
- System.out.println("监听端口9000失败");
- }
- }
- });
- //等待服务端监听端口关闭,closeFuture是异步操作
- //通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
- cf.channel().closeFuture().sync();
-
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
-
-
- }
- }
NettyServer类中的channel.pipeline().addLast(new NettyServerHandler());对应以下的处理器。
- package com.tlxy.lhn.controller.netty;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
- import lombok.extern.slf4j.Slf4j;
-
- @Slf4j
- public class NettyServerHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf) msg;
- System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
- // 读取byteBuf
- // 业务处理
- // 回消息给客户端
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
- ctx.writeAndFlush(buf);
- }
-
- //只要Netty抛出错误就会执行,Netty断会开连接会抛出连接超时的错误
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- log.info("关闭通道");
- cause.printStackTrace();
- ctx.close();
- }
- }
- package com.tlxy.lhn.controller.netty;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
-
- public class NettyClient {
- public static void main(String[] args) throws InterruptedException {
- //客户端需要一个事件循环组
- NioEventLoopGroup group = new NioEventLoopGroup();
-
- try {
- //创建客户端启动对象
- //注意客户端使用的不是SocketBootstrap而是Bootstrap
- Bootstrap bootstrap = new Bootstrap();
-
- // 设置相关参数
- bootstrap.group(group) //设置线程组
- .channel(NioSocketChannel.class)// 使用NioSocketChannel作为客户端的通道实现
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new NettyClientHandler());
- }
- });
-
- System.out.println("netty client start..");
- ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
- cf.channel().closeFuture().sync();
- }finally {
- group.shutdownGracefully();
- }
-
-
- }
- }
NettyClient类中ch.pipeline().addLast(new NettyClientHandler());为处理器。
- package com.tlxy.lhn.controller.netty;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
- import lombok.extern.slf4j.Slf4j;
-
- @Slf4j
- public class NettyClientHandler extends ChannelInboundHandlerAdapter {
-
- /**
- * 客户端连接标识
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
- ctx.writeAndFlush(buf);
- }
-
- //当通道建立后有事件时会触发,即服务端发送数据给客户端
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf) msg;
- System.out.println("收到服务端的消息是:" + buf.toString(CharsetUtil.UTF_8));
- System.out.println("服务端地址是:" + ctx.channel().remoteAddress());
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- log.info("关闭通道");
- cause.printStackTrace();
- ctx.close();
- }
-
- }
客户端和服务端都是固定的框架,我们只需写处理器。
粘包和拆包问题,可以自己手写通过固定长度发送数据,或者使用Google的Protostuff。
- <dependency>
- <groupId>com.dyuproject.protostuff</groupId>
- <artifactId>protostuff-api</artifactId>
- <version>1.0.8</version>
- </dependency>
- <dependency>
- <groupId>com.dyuproject.protostuff</groupId>
- <artifactId>protostuff-core</artifactId>
- <version>1.0.8</version>
- </dependency>
- <dependency>
- <groupId>com.dyuproject.protostuff</groupId>
- <artifactId>protostuff-runtime</artifactId>
- <version>1.0.8</version>
- </dependency>
- package com.tlxy.lhn.controller.netty;
-
- import com.dyuproject.protostuff.LinkedBuffer;
- import com.dyuproject.protostuff.ProtostuffIOUtil;
- import com.dyuproject.protostuff.Schema;
- import com.dyuproject.protostuff.runtime.RuntimeSchema;
-
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
-
- public class ProtostuffUtil {
- private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
-
- private static <T> Schema<T> getSchema(Class<T> clazz) {
- @SuppressWarnings("unchecked")
- Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
- if (schema == null) {
- schema = RuntimeSchema.getSchema(clazz);
- if (schema != null) {
- cachedSchema.put(clazz, schema);
- }
- }
- return schema;
- }
-
- /**
- * 序列化
- *
- * @param obj
- * @return
- */
- public static <T> byte[] serializer(T obj) {
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) obj.getClass();
- LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
- try {
- Schema<T> schema = getSchema(clazz);
- return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
- } catch (Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- } finally {
- buffer.clear();
- }
- }
-
- /**
- * 反序列化
- *
- * @param data
- * @param clazz
- * @return
- */
- public static <T> T deserializer(byte[] data, Class<T> clazz) {
- try {
- T obj = clazz.newInstance();
- Schema<T> schema = getSchema(clazz);
- ProtostuffIOUtil.mergeFrom(data, obj, schema);
- return obj;
- } catch (Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。