当前位置:   article > 正文

Netty网络通信之Socket_netty socket

netty socket

一.什么是Socket

1.Socket起源于Unix,而Unix/Linux基本思想之一就是“一切皆文件”,也称为文件描述符

2.既然一切都是文件,那么就可以把对Socket的操作就是对“open—write/read—close”模式的一种实现

3.Socket是对TCP/IP协议的封装,Socket本身不是协议,通过Socket才能使用TCP/IP协议

二.Java四种IO模型

1.BIO(阻塞IO) 

  1. package com.lx.netty.bio;
  2. import java.io.*;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. public class BlockingServer {
  8. final static ExecutorService exec = Executors.newCachedThreadPool();
  9. public static void main(String[] args) {
  10. try {
  11. // 监听端口
  12. ServerSocket serverSocket = new ServerSocket(8080);
  13. // 等待客户端的连接过来,如果没有连接过来,就会阻塞
  14. while (true){
  15. // 阻塞IO中一个线程只能处理一个连接
  16. Socket socket = serverSocket.accept();
  17. exec.execute(() ->{
  18. // 获取数据
  19. String line = null;
  20. try {
  21. // 获取Socket中的输入流
  22. BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  23. line = bufferedReader.readLine();
  24. System.out.println("客户端的数据:"+ line);
  25. BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
  26. bufferedWriter.write("ok\n");
  27. bufferedWriter.flush();
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. }
  31. });
  32. }
  33. } catch (IOException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  1. package com.lx.netty.bio;
  2. import java.io.*;
  3. import java.net.Socket;
  4. public class BlockingClient {
  5. public static void main(String[] args) {
  6. try {
  7. // 建立连接
  8. Socket socket = new Socket("localhost",8080);
  9. // 向服务端写数据
  10. BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
  11. bufferedWriter.write("我是客户端,收到请回答!!\n");
  12. bufferedWriter.flush();
  13. BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  14. String line = bufferedReader.readLine();
  15. System.out.println(line);
  16. } catch (IOException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }

2.Non Blocking IO(非阻塞)

  1. package com.lx.netty.nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.ServerSocketChannel;
  6. import java.nio.channels.SocketChannel;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. public class NoBlockingServer {
  10. public static List<SocketChannel> channelList = new ArrayList<>();
  11. public static void main(String[] args) {
  12. try {
  13. // 相当于serverSocket
  14. // 1.支持非阻塞 2.数据总是写入buffer,读取也是从buffer中去读 3.可以同时读写
  15. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  16. serverSocketChannel.configureBlocking(false);
  17. serverSocketChannel.socket().bind(new InetSocketAddress(8080));
  18. while (true){
  19. // 这里将不再阻塞
  20. SocketChannel socketChannel = serverSocketChannel.accept();
  21. if(socketChannel != null){
  22. socketChannel.configureBlocking(false);
  23. channelList.add(socketChannel);
  24. }else {
  25. System.out.println("没有请求过来!!!");
  26. }
  27. for (SocketChannel client : channelList){
  28. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  29. // 也不阻塞
  30. int num = client.read(byteBuffer);
  31. if(num>0){
  32. System.out.println("客户端端口:"+ client.socket().getPort()+",客户端收据:"+new String(byteBuffer.array()));
  33. }else {
  34. System.out.println("等待客户端写数据");
  35. }
  36. }
  37. }
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }

3.NIO

  1. package com.lx.netty.nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. public class NewIOServer {
  12. static Selector selector;
  13. public static void main(String[] args) {
  14. try {
  15. selector = Selector.open();
  16. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  17. serverSocketChannel.configureBlocking(false);
  18. serverSocketChannel.socket().bind(new InetSocketAddress(8080));
  19. // 需要把serverSocketChannel注册到多路复用器上
  20. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  21. while (true) {
  22. // 阻塞
  23. selector.select();
  24. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  25. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  26. while (iterator.hasNext()) {
  27. SelectionKey key = iterator.next();
  28. iterator.remove();
  29. if (key.isAcceptable()) {
  30. handlerAccept(key);
  31. } else if (key.isReadable()) {
  32. handlerRead(key);
  33. }else if(key.isWritable()){
  34. }
  35. }
  36. }
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. private static void handlerRead(SelectionKey key) {
  42. SocketChannel socketChannel = (SocketChannel) key.channel();
  43. ByteBuffer allocate = ByteBuffer.allocate(1024);
  44. try {
  45. socketChannel.read(allocate);
  46. System.out.println("server msg:" + new String(allocate.array()));
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. private static void handlerAccept(SelectionKey key) {
  52. ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
  53. // 不阻塞
  54. try {
  55. SocketChannel socketChannel = serverSocketChannel.accept();
  56. socketChannel.configureBlocking(false);
  57. socketChannel.write(ByteBuffer.wrap("I am wentai,is shuai".getBytes()));
  58. // 读取客户端的数据
  59. socketChannel.register(selector, SelectionKey.OP_READ);
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }
  1. package com.lx.netty.nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.Iterator;
  9. import java.util.Set;
  10. public class NewIOClient {
  11. static Selector selector;
  12. public static void main(String[] args) {
  13. try {
  14. selector = Selector.open();
  15. SocketChannel socketChannel = SocketChannel.open();
  16. socketChannel.configureBlocking(false);
  17. socketChannel.connect(new InetSocketAddress("localhost", 8080));
  18. // 需要把socketChannel注册到多路复用器上
  19. socketChannel.register(selector, SelectionKey.OP_CONNECT);
  20. while (true) {
  21. // 阻塞
  22. selector.select();
  23. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  24. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  25. while (iterator.hasNext()) {
  26. SelectionKey key = iterator.next();
  27. iterator.remove();
  28. if (key.isConnectable()) {
  29. handlerConnect(key);
  30. } else if (key.isReadable()) {
  31. handlerRead(key);
  32. } else if (key.isWritable()) {
  33. }
  34. }
  35. }
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. private static void handlerRead(SelectionKey key) {
  41. SocketChannel socketChannel = (SocketChannel) key.channel();
  42. ByteBuffer allocate = ByteBuffer.allocate(1024);
  43. try {
  44. socketChannel.read(allocate);
  45. System.out.println("client msg:" + new String(allocate.array()));
  46. } catch (IOException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. private static void handlerConnect(SelectionKey key) throws IOException {
  51. SocketChannel socketChannel = (SocketChannel) key.channel();
  52. if (socketChannel.isConnectionPending()) {
  53. socketChannel.finishConnect();
  54. }
  55. socketChannel.configureBlocking(false);
  56. socketChannel.write(ByteBuffer.wrap("client I am wentai,is shuai".getBytes()));
  57. socketChannel.register(selector,SelectionKey.OP_READ);
  58. }
  59. }

4.AIO(异步IO)

  1. package com.lx.netty.aio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.AsynchronousServerSocketChannel;
  6. import java.nio.channels.AsynchronousSocketChannel;
  7. import java.nio.channels.CompletionHandler;
  8. // 服务端
  9. public class AIOServer {
  10. public static void main(String[] args) throws Exception {
  11. // 创建一个SocketChannel并绑定了8080端口
  12. final AsynchronousServerSocketChannel serverChannel =
  13. AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
  14. serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
  15. @Override
  16. public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
  17. try {
  18. // 打印线程的名字
  19. System.out.println("2--"+Thread.currentThread().getName());
  20. System.out.println(socketChannel.getRemoteAddress());
  21. ByteBuffer buffer = ByteBuffer.allocate(1024);
  22. // socketChannel异步的读取数据到buffer中
  23. socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
  24. @Override
  25. public void completed(Integer result, ByteBuffer buffer) {
  26. // 打印线程的名字
  27. System.out.println("3--"+Thread.currentThread().getName());
  28. buffer.flip();
  29. System.out.println(new String(buffer.array(), 0, result));
  30. socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
  31. }
  32. @Override
  33. public void failed(Throwable exc, ByteBuffer buffer) {
  34. exc.printStackTrace();
  35. }
  36. });
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. @Override
  42. public void failed(Throwable exc, Object attachment) {
  43. exc.printStackTrace();
  44. }
  45. });
  46. System.out.println("1--"+Thread.currentThread().getName());
  47. Thread.sleep(Integer.MAX_VALUE);
  48. }
  49. }
  1. package com.lx.netty.aio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.AsynchronousSocketChannel;
  6. import java.nio.channels.CompletionHandler;
  7. public class AIOClient {
  8. private final AsynchronousSocketChannel client;
  9. public AIOClient() throws IOException {
  10. client = AsynchronousSocketChannel.open();
  11. }
  12. public static void main(String[] args) throws Exception {
  13. new AIOClient().connect("localhost",8080);
  14. }
  15. public void connect(String host, int port) throws Exception {
  16. // 客户端向服务端发起连接
  17. client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
  18. @Override
  19. public void completed(Void result, Object attachment) {
  20. try {
  21. client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
  22. System.out.println("已发送到服务端");
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. @Override
  28. public void failed(Throwable exc, Object attachment) {
  29. exc.printStackTrace();
  30. }
  31. });
  32. final ByteBuffer bb = ByteBuffer.allocate(1024);
  33. // 客户端接收服务端的数据,获取的数据写入到bb中
  34. client.read(bb, null, new CompletionHandler<Integer, Object>() {
  35. @Override
  36. public void completed(Integer result, Object attachment) {
  37. // 服务端返回数据的长度result
  38. System.out.println("I/O操作完成:" + result);
  39. System.out.println("获取反馈结果:" + new String(bb.array()));
  40. }
  41. @Override
  42. public void failed(Throwable exc, Object attachment) {
  43. exc.printStackTrace();
  44. }
  45. });
  46. try {
  47. Thread.sleep(Integer.MAX_VALUE);
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }

5.多路复用IO:多路复用有三种模型,一种时select()模型,一种poll模型,还一种就是epoll模型。

6.select()模型: 在这个socket中,每个socket都是有接收队列,发送队列,等待队列组成,所以每次调 用selector.select()方法的时候,都会把线程的引用放入到所有socket中的等待队列中,当接收到客户端的数 据后,把每一个socket上的等待队列中的线程移除,并放入到就绪队列中,然后去遍历文件列表,找出所有 接收到数据的socket。

select()的缺点:

1. 每次调用select都需要将线程加入到所有socket对象的等待队列中,每次唤醒进程又要将线程从所有socket对象的等待队列中移除。这里涉及到对socket列表的两次遍历,而且每次都要将整个fds列表传 递给内核,有一定的开销。正因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数 量,默认只能监视1024个socket(强行修改也是可以的);

2. 进程被唤醒后,程序并不知道socket列表中的那些socket上收到数据,因此在用户空间内需要对socket列表再做一次遍历。poll模型和select相似,只是对监听的socket没有限制。 我们发现其实select模型和poll模型其实每次都是 遍历所有的socket,有些socket其实没有事件,还是回去遍历,如果socket越多,那么遍历事件就越长,在高 并发的情况下,select模型的效率其实比较低,那么有没有一种模型,可以只返回有事件的socket呢,而不 需要遍历那么多的socket,答案就是epoll模型

7.epoll模型

其实epoll有两种工作方式,一种时LT,另外一种时ET。

LT模式意思就是通过是支持阻塞和非阻塞的socket,其实select和poll都是这种模式,我们今天主要谈的时ET模式,

ET模式只支持non-block socket,我们来看下epoll再ET模式下的工作原理。

1.NIO中调用Selector.open()得到一个selector,实际上就是调用底层的epoll_create函数,创建一个     eventpoll的事件文件对象,假设为ep_fd

2.NIO中调用register(selector, SelectionKey.OP_ACCEPT),把需要注册的fd存储在jvm内存

3.循环调用selector.select(),底层触发epoll_ctl(ep_fd,add,fd,OP_ACCEPT),把需要注册的fd放到ep_fd对应 的红黑树上,注册OP_ACCEPT事件,并且对应到eventpoll文件对象中的rbr索引树中,然后调用epoll_wait()方法,然后把等待的线程的引用加入的eventpoll的等待队列中

4.一当接收到客户端的连接,那么将会触发socket上面的callback函数,并且把有数据处理的socket放入到eventpoll中的rdlist就序列表,同时唤醒等待队列中的线程,然后线程直接循环就绪列表中准备好的socket。

所以epoll模型的优点是:

1.支持一个进程打开很大数目的socket描述符

2.IO效率不随FD数目增加而线性下降

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/973947
推荐阅读
相关标签
  

闽ICP备14008679号