当前位置:   article > 正文

Springboot中使用netty 实现 WebSocket 服务_springboot netty websocket

springboot netty websocket

依赖


    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.77.Final</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
创建启动类
package com.message.after;

import com.message.websocket.WebSocketServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/**
 * @author kuaiting
 */
@Component
public class AfterExecuteMethods implements CommandLineRunner {
	/**
	 * 项目启动之后立即执行的方法,可以做些初始化项目的操作以及需要启动项目立即执行的任务
	 * @param args
	 * @throws Exception
	 */
	@Override
	public void run(String... args) throws Exception {
		/**
		 * 启动WebSocketServer 服务使用netty实现
		 */
		new WebSocketServer().start();
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
创建WebSocket 服务
package com.message.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author kuaiting
 * WebSocket
 */

public class WebSocketServer {


	public void start() {
		// 一个主线程组
		NioEventLoopGroup mainGroup = new NioEventLoopGroup();
		//一个工作线程组
		NioEventLoopGroup subGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(mainGroup, subGroup)
					//设置队列大小
					.option(ChannelOption.SO_BACKLOG, 1024)
					.channel(NioServerSocketChannel.class)
					// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
					.childOption(ChannelOption.SO_KEEPALIVE, true)
					//添加自定义初始化处理器
					.childHandler(new WsServerInitialzer());
			ChannelFuture channelFuture = serverBootstrap.bind(8082).sync();
			channelFuture.channel().closeFuture().sync();

		}catch (Exception e){
			e.printStackTrace();
		}finally {
			//关闭主线程组
			mainGroup.shutdownGracefully();
			//关闭工作线程组
			subGroup.shutdownGracefully();
		}


	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
WsServerInitialzer 初始化
package com.message.websocket;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * @author kuaiting
 */
public class WsServerInitialzer extends ChannelInitializer<SocketChannel> {
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {

		ChannelPipeline pipeline = ch.pipeline();
		//websocket基于http协议,所以需要http编解码器
		pipeline.addLast(new HttpServerCodec());
		//添加对于读写大数据流的支持
		pipeline.addLast(new ChunkedWriteHandler());
		//对httpMessage进行聚合
		pipeline.addLast(new HttpObjectAggregator(1024*64));

		// ================= 上述是用于支持http协议的 ==============

		//websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
		//比如处理一些握手动作(ping,pong)
		pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
		//自定义handler
		pipeline.addLast(new ChatHandler());

	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
创建信息ChatHandler 处理类
package com.message.websocket;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

/**
 * @author kuaiting
 */
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
	private static  ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		log.info("已创建WebSocket链接:{}", ctx.channel().remoteAddress());
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
		String text = msg.text();
		sendAllMessages(ctx,text);
	}

	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		channels.add(ctx.channel());
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		log.info("断开链接的ID", ctx.channel().id().asLongText());

	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.channel().closeFuture();
	}


	//给每个人发送消息,除发消息人外
	private void sendAllMessages(ChannelHandlerContext ctx, String msg) {
		for (Channel channel : channels) {
			if (!channel.id().asLongText().equals(ctx.channel().id().asLongText())) {
				channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg)));
			}
		}
	}

	//给每个人发送消息,除发消息人外
	private void sendMessages(String msg) {
		for (Channel channel : channels) {
			channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg)));
		}
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/404165
推荐阅读
相关标签
  

闽ICP备14008679号