当前位置:   article > 正文

基于java实现tcp长链接,自定义消息协议_java tcp长连接

java tcp长连接

现在即时通信在移动领域应用的非常广了,通过TCP长链接实现数据的即时更新,通知,在日常开发中常会用的。今天我就把在在平常开发中的基本实现思路跟大家一起交流下。转载请注明出处:http://blog.csdn.net/mr_oorange/article/details/52353626
源码地址:https://github.com/Ooorange/java-TCP-long-connection

一:准备阶段
android客户端用于消息的接受和发送,一个java的本地的服务端实现消息的转发推送。
二:实现思路
如何做到消息的即时通信,1是通过http轮训;2是通过http的长链接服务(跟tcp有点像,但是还是会有断开的可能性,很大),3是基于tcp的长连接。先比较下3中实现方案的优缺点:第一种,缺点:耗费流量,损耗性能,tcp会不断的开启停止,优点:实现简单; 第二种,缺点:需要服务端配合,而且http断开的偶发性很高,不易控制,优点:可实现可接受的即时通信;第三种:通过心跳维持的连接不会经常断开,即可实现即时的通信,而且可自定义头,减小流量的耗用。缺点需要后台配合,实现较复杂(理解了都还好其实)。我选择的是第三种方案。即基于java的tcp长链接。


客户端和服务端的约定条件:

服务端通过设置timeout终止消息的接收,以及客户端消息的发送。具体值可以自己设置,其实就是感知客户端心跳。服务端的socket保持打开,可接受多个客户端的连接;每个新加入的客户端标识唯一的ID,用于定向发送;离线消息的存储,发送;自定义协议的封装以及解包;


android 客户端实现:通过一个线程池维护消息任务的发送以及终止。任务包括:通过socket获得输入输出流,设置timeout时间,当时间到达后输入输出都不可用,起3个线程保持消息的接收,发送以及分发,另外还有一个心跳线程,用于维持跟服务端的连接(要不然服务端就认为你timeout GG了,就给你断开);还有一个重要的地方就是,实现自定义协议将不同的业务线分解,可以用于推送,即时聊天,等等。。。客户端还是现实了离线的消息存储(ORM模式的greenDao)

服务端实现:同样的通过serverSocket获得输入输出流,并保持连接open状态,为每个新加入的客户端标时唯一的ID,用于定向发送消息,当客户端主动或者是timeout之后关闭流管道, 

三:上代码
客户端:

  1. import com.orange.blog.net.protocol.ChatMsgProcotol;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. /**
  5. * 长链接:问题描述
  6. * Q1:中服务端与客户端socket的输入输出流都不会关闭,所以需要考虑资源得释放长链接时机
  7. *
  8. * Q2何时关闭链接,客户端和服务器端,不能关闭任一socket的输入流或者输出流,否则socket通信会关闭;
  9. * 由于都不关闭连接,而read方法又是阻塞的,会一直读取数据,不知道何时读取结束(何时知道本次数据读取结束);
  10. * *
  11. *
  12. * Q3如果有多个业务需要使用链接,如何兼容其他的业务,使得解析流程一致
  13. *
  14. *
  15. * Q1解决方案:服务器端都是会设定超时时间的,也就是timeout,如果超过timeout服务器没有接收到任何数据,
  16. * 那么该服务器就会关闭该连接,从而使得服务器资源得到有效地使用。
  17. *
  18. * Q2解决方案:约定通信协议,如特定字符,或者使用包头+包体的方式,传递数据,包头固定长度,
  19. * 里面保存包体长度等信息,这样服务端就知道读取到何时结束了.(本文使用此种方式)以下是代码:
  20. *
  21. * Q3解决方案:不同业务定义不同的协议,比如心跳协议,业务协议; 另外一种方案就是实用json数据格式进行传输
  22. * @Link(http://blog.csdn.net/ljl157011/article/details/19291611)
  23. * 短链接:建立完一次通信后将被释放,下次发送得重新链接建立,浪费资源
  24. * Created by orange on 16/6/8.
  25. */
  26. public class TCPLongConnectClient {
  1. //通过缓存线程池实现消息的加入以及断开
  2. ExecutorService executorService= Executors.newCachedThreadPool();
  3. RequestTask requestTask;
  4. public TCPLongConnectClient(TCPRequestCallBack tcpRequestCallBack){
  5. requestTask=new RequestTask(tcpRequestCallBack);
  6. executorService.execute(requestTask);
  7. }
  8. public void addNewRequest(ChatMsgProcotol data){
  9. if (requestTask!=null)
  10. requestTask.addRequest(data);
  11. }
  12. public void closeConnect() {
  13. requestTask.stop();
  14. }
  15. }

客户端核心任务代码:

  1. /**
  2. * 实现5秒的定时发送一个心跳
  3. * Created by orange on 16/6/8.
  4. */
  5. public class HeartBeatTask implements Runnable {
  6. private static final int REPEATTIME = 4000;
  7. private volatile boolean isKeepAlive = true;
  8. private OutputStream outputStream;
  9. private String uuid;
  10. public HeartBeatTask(OutputStream outputStream,String uuid) {
  11. this.outputStream = outputStream;
  12. this.uuid=uuid;
  13. }
  14. @Override
  15. public void run() {
  16. try {
  17. while (isKeepAlive) {
  18. SocketUtil.writeContent2Stream(new HeartBeatProtocol(), outputStream);
  19. try {
  20. Thread.sleep(REPEATTIME);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. if (outputStream != null) {
  26. SocketUtil.closeStream(outputStream);
  27. }
  28. } catch (Exception e) {
  29. Log.d("exception", " : Time is out, request" + " has been closed.");
  30. e.printStackTrace();
  31. } finally {
  32. if (outputStream != null) {
  33. SocketUtil.closeStream(outputStream);
  34. }
  35. }
  36. }
  37. public void setKeepAlive(boolean isKeepAlive) {
  38. this.isKeepAlive = isKeepAlive;
  39. }
  40. }

消息的发送接收

  1. @Override
  2. public void run() {
  3. uuid = ProjectApplication.getUUID();
  4. try {
  5. failedMessage(0, "服务器连接中");
  6. try {
  7. socket = SocketFactory.getDefault().createSocket(ADDRESS, PORT);
  8. }catch (ConnectException e){
  9. failedMessage(-1, "服务区器连接异常,请检查网络");
  10. return;
  11. }
  12. sendData.add(new RegisterProcotol());
  13. sendData.add(new UserFriendReuqestProcotol());
  14. sendTask=new SendTask();
  15. sendTask.outputStreamSend = socket.getOutputStream();
  16. sendTask.start();
  17. reciverTask = new ReciverTask();
  18. reciverTask.inputStreamReciver = socket.getInputStream();
  19. reciverTask.start();
  20. if (isLongConnection) {
  21. heartBeatTask = new HeartBeatTask(sendTask.outputStreamSend, uuid);
  22. executorService = Executors.newCachedThreadPool();
  23. executorService.execute(heartBeatTask);
  24. }
  25. } catch (IOException e) {
  26. failedMessage(-1, "IOException");
  27. e.printStackTrace();
  28. }
  29. }


通信协议的的自定义以及解包

  1. public abstract class BasicProtocol {
  2. public static final int VERSION_LEN=2;//协议的版本
  3. public static final int COMMEND_LEN=4;//协议的类型: 0000心跳,0001普通文字聊天,0002服务端返回协议,0003好友列表请求,0004用户注册连接协议
  4. public static String VERSION="00"; //目前版本号死的
  5. public static String paraseCommend(byte[] data){
  6. return new String(data,VERSION_LEN,COMMEND_LEN);
  7. }
  8. public abstract String getCommend();
  9. public byte[] getContentData(){
  10. ByteArrayOutputStream baos=new ByteArrayOutputStream(VERSION_LEN+COMMEND_LEN);
  11. baos.write(VERSION.getBytes(),0,VERSION_LEN);
  12. baos.write(getCommend().getBytes(),0,COMMEND_LEN);
  13. return baos.toByteArray();
  14. }
  15. public int parseBinary(byte[] data) throws ProtocolException {
  16. String version=new String(data,0,VERSION_LEN);
  17. VERSION=version;
  18. if (!version.equals("00")){
  19. throw new ProtocolException("income version is error"+version);
  20. }
  21. return VERSION_LEN+COMMEND_LEN;
  22. }
  23. }

代码有点乱,服务端代码就不贴来,需要的话可以大家请点这里是源码,https://github.com/Ooorange/java-TCP-long-connection  欢迎fork,star

转载请注明出处

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号