当前位置:   article > 正文

c/c++开发,c++无可避免的TCP/Socket通信开发实战案例_c++ socket通信案例

c++ socket通信案例

目录

一、案例项目设计概括

二,案例整体设计

三、TCP/Socket 的服务端与客户端通信代码设计

        3.1 tcp/socket通信

       3.2 客户端信息管理

        3.3 数据的读取与写入

四、tcp/socket通信扩展

       4.1 线程类及消息队列类设计

        4.2 读取及写入数据处理类

        4.3 更上层集成的Socket-API接口

五、tcp/socket业务数据通信处理

        5.1  数据编解码设计(序列化及反序列化)

        5.2 结构化数据传递

六、项目最终呈现

        6.1 增加日志记录模块

        6.2 项目完整目录结构

6.3 程序编译如下

 6.4 测试

        6.4 结语

七、源码附录

       7.1  commom文件夹

       7.2  srv_IO目录

       7.4  client_IO目录

        7.4 main.cpp


一、案例项目设计概括

        我们c/c++开发人员在实际开发过程中,不可不知TCP通信,也不可不知标准库针对TCP通信提供的Socket实现。也行在很多稍大一些的项目中,我们会借用其他通信中间件来实现TCP通信,但是在日常很多小项目、原型验证项目、本地化局部通信项目等项目中,TCP/Socket通信都是优先想到和使用的方法。

        在本文中,将抛开关于TCP/Socket通信的请求、握手等通信过程和各种socket相关函数及结构体的原理说明,这种技术文档太多太多了,本文将用实际使用的角度,直接设计一个项目使用TCP/Socket来阐述其如何通信、如何应用细化、如何切换项目需要的由简入繁的项目开发工程。

        假定一:

        现在需要做一个简单的TCP/Socket服务端程序,和一个TCP/Socket客户端的程序,服务端和客户端均支持win/linux平台,客户端链接上服务端,并发送一串字符串。

        假定二:

        基于假定一,实现一个支持多个客户端链接的TCP/Socket服务端程序,和一个TCP/Socket客户端程序,各个客户端链接上服务端,并发送一串字符串到服务端,服务端也可以给各个客户端统一发送字符串或给特定客户端发送字符串。

        假定三:

        基于假定二,实现服务端接收数据和处理数据分离,为其建立独立线程,为了减少socket读写数据等待处理,为写入数据和读取数据建立消息队列缓存。

        假定四:

        基于假定三,实现通信数据的编码、解码设计,并实现结构化数据传输。

二,案例整体设计

        上面的假设就是开发案例的业务需求描述,较简单,会有欠缺很多细节业务描述,其实正如我们实际项目中来自客户简单口述的需求一样。

        按上述假设,先进行初步展开,其整体概念如下图,

        1)核心是基于TCP/Socket建立服务端和客户端的接口,实现TCP连接及通信;

        2)基于该接口,分别创建服务端和客户端的业务数据处理的数据写入socket和从socket读取数据的独立线程,可以为写入数据和读取数据建立缓存消息队列,该队列可以视业务复杂性,放置独立线程内或单独建立一个数据集处理类;

        3)为了应对不断连接或断开连接的客户端,还需要建立一个客户端端管理功能类或模块;这些类或模块集成在一起,构成服务端或客户端的socket-API(外观模式),统一给业务应用模块调用实现TCP通信;

        4)服务端和客户端的Socket-API基于共同的功能模块如线程、互斥锁、消息队列、编解码、日志等功能模块,以及共同的结构化数据模块,尤其是服务端和客户端在进行数据结构化传输的共同达成一致的数据结构。

         本文采用vs2015+cmake(win)和g++ +cmake(Linux)进行代码编译,按上述设计,构建开发项目的目录框架如下:

  1. #
  2. TCP_StructData
  3. bin #编译输出结果
  4. client_IO #客户端Socket的API功能源码
  5. client_test #客户端业务功能源码
  6. build_win #客户端windows编辑中间文件输出目录
  7. build_linux #客户端linux编辑中间文件输出目录
  8. main.cpp
  9. CMakeLists.txt #cmake配置文件
  10. common #共同功能模块或数据结构源码
  11. srv_IO #服务端Socket的API功能源码
  12. svr_test #服务端业务功能源码
  13. build_win #客户端windows编辑中间文件输出目录
  14. build_linux #客户端linux编辑中间文件输出目录
  15. main.cpp #
  16. CMakeLists.txt #cmake配置文件

三、TCP/Socket 的服务端与客户端通信代码设计

        Socket 通常是服务端先启动,服务端启动后建立socket结构体,然后初始化就进行bind函数调用绑定,然后调用listen函数建立客户端连接侦听能力,而实际获取客户端连接信息,是通过调用accept函数来实现的。

        3.1 tcp/socket通信

        服务端:

        先构建两个函数,onConnect函数和Accept函数,onConnect函数用于建立及初始化socket结构体,并进行bind绑定和listen侦听。Accept函数则是监测是否由新客户端连接进来。部分代码如下图:

  1. int MySocketPrivate::onConnect()
  2. {
  3. if (m_OnListen) //服务器Socket是否已经创建
  4. {
  5. //your code
  6. return 1;
  7. }
  8. else {
  9. #ifdef WIN32
  10. m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
  11. SOCKADDR_IN addrServ;
  12. addrServ.sin_family = AF_INET;
  13. addrServ.sin_port = htons(m_Port);
  14. addrServ.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  15. bind(m_SSocket, (SOCKADDR*)&addrServ, sizeof(SOCKADDR));
  16. //如果创建Socket失败则提示,成功则开始监听
  17. if (listen(m_SSocket, 20) == SOCKET_ERROR)
  18. {
  19. closesocket(m_SSocket);
  20. //your code
  21. return -1;
  22. }
  23. else {
  24. //your code
  25. m_OnListen = true;
  26. return 1;
  27. }
  28. #else
  29. m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
  30. if (MY_SOCKET_NULL == m_SSocket)
  31. {
  32. //your code
  33. return -1;
  34. }
  35. struct sockaddr_in s_add;
  36. bzero(&s_add, sizeof(struct sockaddr_in));
  37. s_add.sin_family = AF_INET;
  38. s_add.sin_addr.s_addr = htonl(INADDR_ANY);
  39. s_add.sin_port = htons(m_Port);
  40. if (-1 == bind(m_SSocket, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
  41. {
  42. //your code
  43. return -1;
  44. }
  45. else {
  46. //your code
  47. }
  48. if (-1 == listen(m_SSocket, 5))
  49. {
  50. //your code
  51. return -1;
  52. }
  53. else {
  54. //your code
  55. }
  56. m_OnListen = true;
  57. return 1;
  58. #endif
  59. }
  60. }
  61. bool MySocketPrivate::Accept()
  62. {
  63. bool bRet = true;
  64. if (m_OnListen)
  65. {
  66. #ifdef WIN32
  67. SOCKADDR_IN cliAddr;
  68. int length = sizeof(SOCKADDR);
  69. SOCKET cliSock = accept(m_SSocket, (SOCKADDR*)&cliAddr, &length);
  70. if (INVALID_SOCKET == cliSock)
  71. {
  72. closesocket(cliSock);
  73. //your code
  74. bRet = false;
  75. }
  76. else {
  77. char _ipport[64] = { 0 };
  78. sprintf_s(_ipport, "%s:%d", (char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
  79. int nNetTimeout = 100; //1秒
  80. setsockopt(cliSock, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));//这是对客户端句柄进行设置
  81. //your code
  82. }
  83. #else
  84. int sin_size = sizeof(struct sockaddr_in);
  85. struct sockaddr_in c_add;
  86. int nfp = accept(m_SSocket, (struct sockaddr *)(&c_add), (socklen_t*)&sin_size);
  87. if (-1 == nfp)
  88. {
  89. //your code
  90. bRet = false;
  91. }
  92. else {
  93. char _ipport[64] = { 0 };
  94. std::string _ipStr = inet_ntoa((*(in_addr*)&(c_add.sin_addr)));
  95. int _port = static_cast<int>(htons(c_add.sin_port));
  96. //your code
  97. }
  98. #endif
  99. }
  100. else {
  101. bRet = false;
  102. //your code
  103. }
  104. return bRet;
  105. };

                通常需要根据项目实际情况调用setsockopt函数设置相关链接参数,需要在注意的是,在onConnect内对服务端部分进行配置,在Accept函数可对成功链接进来的客户端部分进行配置:

  1. /*
  2. *windows 参数设置描述
  3. *3.在send(),recv()过程中有时由于网络状况等原因,收发不能预期进行,可以设置收发时限:
  4. *int nNetTimeout = 1000; //1秒
  5. *发送时限
  6. *setsockopt( socket, SOL_SOCKET, SO_SNDTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
  7. *接收时限
  8. *setsockopt( socket, SOL_SOCKET, SO_RCVTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
  9. *4.在send()的时候,返回的是实际发送出去的字节(同步)或发送到socket缓冲区的字节(异步);系统默认的状态发送和接收一次为8688字节(约
  10. *为8.5K);在实际的过程中如果发送或是接收的数据量比较大,可以设置socket缓冲区,避免send(),recv()不断的循环收发:
  11. * 接收缓冲区
  12. *int nRecvBufLen = 32 * 1024; //设置为32K
  13. *setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( const char* )&nRecvBufLen, sizeof( int ) );
  14. *发送缓冲区
  15. *int nSendBufLen = 32*1024; //设置为32K
  16. *setsockopt( s, SOL_SOCKET, SO_SNDBUF, ( const char* )&nSendBufLen, sizeof( int ) );
  17. *5.在发送数据的时,不执行由系统缓冲区到socket缓冲区的拷贝,以提高程序的性能:
  18. *int nZero = 0;
  19. *setsockopt( socket, SOL_SOCKET, SO_SNDBUF, ( char * )&nZero, sizeof( nZero ) );
  20. *6.在接收数据时,不执行将socket缓冲区的内容拷贝到系统缓冲区:
  21. *int nZero = 0;
  22. *setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( char * )&nZero, sizeof( int ) );
  23. */

        客户端部分:

        客户端是通过指定的ip和port链接到服务端的,在socket通信这方面,仅需要建立及初始化sokcet结构体(配置ip及port),调用sokcet的connect api链接服务端。

  1. int MySocketClient::onConnect()
  2. {
  3. if (m_OnConnect) //
  4. {
  5. //your code
  6. return 0;
  7. }
  8. //防止链接冲突调用
  9. if (m_OnConnecting)
  10. {
  11. return 0;
  12. }
  13. try {
  14. m_OnConnecting = true;
  15. #ifdef WIN32
  16. sock_fd = static_cast<int>(socket(AF_INET, SOCK_STREAM, 0));
  17. SOCKADDR_IN ser_addr;
  18. memset(&ser_addr, 0, sizeof(ser_addr));
  19. ser_addr.sin_family = AF_INET;
  20. ser_addr.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
  21. ser_addr.sin_port = htons(static_cast<unsigned short>(m_Port));
  22. if (connect(sock_fd, (struct sockaddr *)&ser_addr, sizeof(ser_addr)) < 0)
  23. {
  24. //your code
  25. m_OnConnecting = false;
  26. return -1;
  27. }
  28. int nNetTimeout = 10; //10毫秒
  29. setsockopt(sock_fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&nNetTimeout, sizeof(int));
  30. setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
  31. //KeepAlive
  32. bool bKeepAlive = true;
  33. int nRet = setsockopt(sock_fd, SOL_SOCKET, SO_KEEPALIVE,(char*)&bKeepAlive, sizeof(bKeepAlive));
  34. if (nRet == SOCKET_ERROR)
  35. {
  36. //your code
  37. }
  38. // set KeepAlive parameter
  39. tcp_keepalive alive_in;
  40. tcp_keepalive alive_out;
  41. alive_in.keepalivetime = 1000; // 1s
  42. alive_in.keepaliveinterval = 3000; //3s
  43. alive_in.onoff = true;
  44. unsigned long ulBytesReturn = 0;
  45. nRet = WSAIoctl(sock_fd, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in),
  46. &alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL);
  47. if (nRet == SOCKET_ERROR)
  48. {
  49. //your code
  50. }
  51. m_OnConnect = true;
  52. m_OnConnecting = false;
  53. m_ConnectONLog = true;
  54. return 1;
  55. #else
  56. sock_fd = socket(PF_INET, SOCK_STREAM, 0);
  57. if (-1 == sock_fd)
  58. {
  59. //your code
  60. m_OnConnecting = false;
  61. return -1;
  62. }
  63. struct sockaddr_in s_add;
  64. bzero(&s_add, sizeof(struct sockaddr_in));
  65. s_add.sin_family = PF_INET;
  66. s_add.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
  67. s_add.sin_port = htons(m_Port);
  68. if (-1 == connect(sock_fd, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
  69. {
  70. //your code
  71. m_OnConnecting = false;
  72. return -1;
  73. }
  74. int x = fcntl(sock_fd, F_GETFL, 0);
  75. fcntl(sock_fd, F_SETFL, x | O_NONBLOCK);
  76. //your code
  77. m_OnConnect = true;
  78. m_OnConnecting = false;
  79. return 1;
  80. #endif
  81. }
  82. catch (...) {
  83. //your code
  84. m_OnConnecting = false;
  85. return -2;
  86. }
  87. }

       3.2 客户端信息管理

         在服务端,针对多客户端链接,服务端是无法预知那些客户端连接进来的,只有在accept函数成功返回时,才能获得客户端句柄及具体信息,对于程序而言和那个客户端通信,知道客户端句柄就可以,而对于业务应用(客户)而言,需要知道的是客户端的IP和端口。因此就需要将客户端句柄和网络ip及端口映射起来,并将这些相关信息进行缓存管理。

        accept函数返回的客户端句柄数据类型在windows和linux系统是不一致的,因此需要统一

  1. #ifdef WIN32
  2. #define MY_SOCKET SOCKET
  3. #define MY_SOCKET_NULL NULL
  4. #endif
  5. #ifdef __linux__
  6. #define MY_SOCKET int
  7. #define MY_SOCKET_NULL (-1)
  8. #endif

        构建和管理网络信息(IP、port)和客户端句柄的映射关系,本人采用了std::map管理,将网络信息集成到一个KeyObj_Client类里面,在common文件夹建立hashmap.h/cpp源码文件,实现KeyObj_Client类,并将该类实例作为map的key,因此为该类添加相关的比较运算操作符函数,以满足作为map的key及排序需要。

  1. class KeyObj_Client
  2. {
  3. public:
  4. KeyObj_Client(std::string _ipStr, int _port);
  5. //
  6. static long cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2);
  7. std::string m_ipStr;
  8. int m_port;
  9. int linkFlag;
  10. long m_ip; //网络地址整型表述
  11. private:
  12. };
  13. inline bool operator==(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) == 0; }
  14. inline bool operator!=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) != 0; }
  15. inline bool operator>=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) >= 0; }
  16. inline bool operator<=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) <= 0; }
  17. inline bool operator>(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) > 0; }
  18. inline bool operator<(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) < 0; }
  19. /*------------------------------------------------------------------------*/
  20. std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端

        在实现通信逻辑处理时,字符串处理相对不方便,为此还将字符串ip信息转换为整型IP信息。关于网络ip的字符串和整型之间的互换功能,在common文件夹建立myFunc.h/cpp源文件,实现该ip的转换功能函数。

  1. //myFunc.h
  2. namespace PFunc
  3. {
  4. //
  5. bool ipCheck(std::string ip_str);
  6. long ipToInt(std::string ip_str);
  7. std::string intToIp(long ip_int);
  8. };
  9. //myFunc.cpp
  10. bool PFunc::ipCheck(std::string ip_str)
  11. {
  12. if (INADDR_NONE != inet_addr(ip_str.c_str()))
  13. {
  14. return true;
  15. }
  16. return false;
  17. };
  18. long PFunc::ipToInt(std::string ip_str)
  19. {
  20. if (INADDR_NONE != inet_addr(ip_str.c_str()))
  21. {
  22. return ntohl(inet_addr(ip_str.c_str()));
  23. }
  24. else {
  25. CLogger::createInstance()->Log(eConfigError
  26. , "ip format [%s] error: %s %s %d,please check the file format and code!"
  27. , ip_str.c_str(), __FILE__, __FUNCTION__, __LINE__);
  28. return 0;
  29. }
  30. };
  31. std::string PFunc::intToIp(long ip_int)
  32. {
  33. char ip[64] = { 0 };
  34. #ifdef WIN32
  35. strcpy_s(ip, inet_ntoa(*(in_addr*)&ip_int));
  36. #else
  37. strcpy(ip, inet_ntoa(*(in_addr*)&ip_int));
  38. #endif
  39. return std::string(ip);
  40. };

        因此,KeyObj_Client类在显式实例时,就调用ip的转换功能函数,并将字符串ip和整型ip信息都存储起来,便于使用

  1. //hashmap.cpp
  2. KeyObj_Client::KeyObj_Client(std::string _ipStr, int _port)
  3. : m_ipStr(_ipStr), m_port(_port), linkFlag(0)
  4. {
  5. m_ip = PFunc::ipToInt(_ipStr);
  6. };
  7. //
  8. long KeyObj_Client::cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2)
  9. {
  10. long diff = obj1.m_ip - obj2.m_ip;
  11. if (diff != 0) return diff;
  12. diff = obj1.m_port - obj2.m_port;
  13. if (diff != 0) return diff;
  14. return 0;
  15. };

        因此,在accept函数成功返回及获得客户端句柄及信息,就将按这些信息创建KeyObj_Client实例,并存入到std::map的m_CSockets缓存容器中,Accept-API函数的部分代码调整如下。

  1. bool MySocketPrivate::Accept()
  2. {
  3. bool bRet = true;
  4. if (m_OnListen)
  5. {
  6. #ifdef WIN32
  7. SOCKADDR_IN cliAddr;
  8. int length = sizeof(SOCKADDR);
  9. SOCKET cliSock = accept(m_SSocket, (SOCKADDR*)&cliAddr, &length);
  10. if (INVALID_SOCKET == cliSock)
  11. {
  12. closesocket(cliSock);
  13. //your code
  14. bRet = false;
  15. }
  16. else {
  17. //cliAddr.sin_addr.S_un.S_addr;
  18. char _ipport[64] = { 0 };
  19. sprintf_s(_ipport, "%s:%d", (char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
  20. KeyObj_Client _linkInfo((char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
  21. int nNetTimeout = 100; //1秒
  22. setsockopt(cliSock, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
  23. m_MyMutex.Lock();
  24. m_CSockets[_linkInfo] = cliSock;//添加客户端
  25. m_MyMutex.Unlock();
  26. }
  27. #else
  28. int sin_size = sizeof(struct sockaddr_in);
  29. struct sockaddr_in c_add;
  30. // printf("MySocketPrivate::Accept 1\n");
  31. int nfp = accept(m_SSocket, (struct sockaddr *)(&c_add), (socklen_t*)&sin_size);
  32. if (-1 == nfp)
  33. {
  34. //your code
  35. bRet = false;
  36. }
  37. else {
  38. char _ipport[64] = { 0 };
  39. std::string _ipStr = inet_ntoa((*(in_addr*)&(c_add.sin_addr)));
  40. int _port = static_cast<int>(htons(c_add.sin_port));
  41. sprintf(_ipport, "%s:%d", _ipStr.c_str(), _port);
  42. int x = fcntl(nfp, F_GETFL, 0);
  43. fcntl(nfp, F_SETFL, x | O_NONBLOCK);
  44. KeyObj_Client _linkInfo(_ipStr, _port);
  45. m_MyMutex.Lock();
  46. m_CSockets[_linkInfo] = nfp;
  47. m_MyMutex.Unlock();
  48. //your code
  49. }
  50. #endif
  51. }
  52. else {
  53. bRet = false;
  54. //your code
  55. }
  56. return bRet;
  57. };

        如上述代码,除了Accept成功时操作m_CSockets缓存容器外,在发送、读取数据失败,客户端端主动断开、异常断开等都会操作m_CSockets缓存容器,为了确保m_CSockets缓存容器的一致性,因此添加了互斥锁对m_CSockets缓存容器进行一致性保护。

  1. PYMutex m_MyMutex;
  2. std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端

        因此在common文件夹建立Mutex.h/cpp源文件,创建了PYMutex类,实现互斥锁。

  1. //Mutex.h
  2. typedef void *HANDLE;
  3. class IMutex
  4. {
  5. public:
  6. virtual ~IMutex() {}
  7. virtual void Lock() const = 0;
  8. virtual bool TryLock() const = 0;
  9. virtual void Unlock() const = 0;
  10. };
  11. class PYMutex : public IMutex
  12. {
  13. public:
  14. PYMutex();
  15. ~PYMutex();
  16. virtual void Lock() const;
  17. virtual bool TryLock() const;
  18. virtual void Unlock() const;
  19. private:
  20. #ifdef _WIN32
  21. HANDLE m_mutex;
  22. #else
  23. mutable pthread_mutex_t m_mutex;
  24. #endif
  25. };

        因此,需要提供对基于m_CSockets缓存容器的写入,删除,清空等操作实现对接入客户端的管理:

  1. //实例销毁时调用
  2. void MySocketPrivate::disConnect()
  3. {
  4. deleteCSocket();//删除客户端
  5. deleteSSocket();//删除服务端
  6. #ifdef WIN32
  7. WSACleanup();
  8. #endif
  9. }
  10. //删除所有客户端
  11. void MySocketPrivate::deleteCSocket()
  12. {
  13. m_MyMutex.Lock();
  14. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  15. while (it != m_CSockets.end())
  16. {
  17. //删除链接
  18. deleteCSocket(it->second);
  19. #ifdef WIN32
  20. it = m_CSockets.erase(it);
  21. #else
  22. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  23. m_CSockets.erase(ittemp);
  24. #endif
  25. }
  26. m_MyMutex.Unlock();
  27. }
  28. //删除指定客户端
  29. void MySocketPrivate::deleteCSocket(MY_SOCKET m_CSocket)
  30. {
  31. try {
  32. if (MY_SOCKET_NULL != m_CSocket)
  33. {
  34. #ifdef WIN32
  35. closesocket(m_CSocket);
  36. #else
  37. close(m_CSocket);
  38. #endif
  39. m_CSocket = MY_SOCKET_NULL;
  40. }
  41. }
  42. catch (...) {
  43. //your code
  44. }
  45. }
  46. //删除服务端
  47. void MySocketPrivate::deleteSSocket()
  48. {
  49. m_OnListen = false;
  50. try {
  51. if (MY_SOCKET_NULL != m_SSocket)
  52. {
  53. #ifdef WIN32
  54. closesocket(m_SSocket);
  55. #else
  56. close(m_SSocket);
  57. #endif
  58. m_SSocket = MY_SOCKET_NULL;
  59. }
  60. }
  61. catch (...) {
  62. //your code
  63. }
  64. };

        3.3 数据的读取与写入

        数据读取和写入类似于管道或文件操作,主要是基于socket句柄,调用read/wirte函数或recv/send函数来实现,如一下代码,可以指定客户端读取数据,也可以指定向客户端发送数据,在出现读取数据或写入数据异常时,需要删除该客户端。

  1. //MySocketPrivate.cpp
  2. //return success read count
  3. int MySocketPrivate::Read(std::map<KeyObj_Client, RDClient> &bufs)
  4. {
  5. int ret = 0;
  6. m_MyMutex.Lock();
  7. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  8. while (it != m_CSockets.end())
  9. {
  10. char _buf[512] = { 0 };
  11. #ifdef WIN32
  12. int re_one = recv(it->second, _buf, 512, 0);
  13. if (re_one <= 0)
  14. {
  15. int _error = GetLastError();
  16. if (_error != 10060)
  17. {
  18. //other code
  19. //删除链接
  20. deleteCSocket(it->second);
  21. it = m_CSockets.erase(it);
  22. continue;
  23. }
  24. else {
  25. re_one = 0;
  26. }
  27. }
  28. #else
  29. int re_one = recv(it->second, _buf, 256, MSG_DONTWAIT);
  30. if (re_one <= 0)
  31. {
  32. if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  33. {
  34. usleep(10);
  35. re_one = 0;
  36. }
  37. else {
  38. //other code
  39. //删除连接
  40. deleteCSocket(it->second);
  41. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  42. m_CSockets.erase(ittemp);
  43. continue;
  44. }
  45. }
  46. #endif
  47. if (re_one>0)
  48. {
  49. ret += 1;
  50. std::map<KeyObj_Client, RDClient>::iterator itrd = bufs.find(it->first);
  51. if (itrd != bufs.end())
  52. {
  53. itrd->second.add((unsigned char*)_buf, re_one);
  54. }
  55. else {
  56. bufs[it->first] = RDClient((unsigned char*)_buf, re_one);
  57. }
  58. }
  59. it++;
  60. }
  61. m_MyMutex.Unlock();
  62. return ret;
  63. };
  64. //return success count
  65. int MySocketPrivate::Write(const char* buf, int size)//向全部客户端发送数据
  66. {
  67. int ret = 0;
  68. m_MyMutex.Lock();
  69. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  70. while (it != m_CSockets.end())
  71. {
  72. // printf_s("write data %d to client is started!\r\n",size);
  73. #ifdef WIN32
  74. int re = send(it->second, buf, size, 0);
  75. if (re <= 0)
  76. {
  77. int _error = GetLastError();
  78. if (_error != 10060)
  79. {
  80. //other code
  81. //删除连接
  82. deleteCSocket(it->second);
  83. it = m_CSockets.erase(it);
  84. continue;
  85. }
  86. else {
  87. re = 0;
  88. }
  89. }
  90. #else
  91. int re = send(it->second, buf, size, MSG_DONTWAIT);
  92. if (re <= 0)
  93. {
  94. if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  95. {
  96. usleep(10);
  97. re = 0;
  98. }
  99. else {
  100. //other code
  101. //删除连接
  102. deleteCSocket(it->second);
  103. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  104. m_CSockets.erase(ittemp);
  105. continue;
  106. }
  107. }
  108. #endif
  109. else{
  110. ret += 1;
  111. }
  112. it++;
  113. }
  114. m_MyMutex.Unlock();
  115. return ret;
  116. };
  117. //return success count
  118. int MySocketPrivate::Write(unsigned long long ipInt, const char* buf, int size)
  119. {
  120. int ret = 0;
  121. m_MyMutex.Lock();
  122. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  123. while (it != m_CSockets.end())
  124. {
  125. //当前版本只针对网络地址做判断,即一台电脑多个客户端连接,都会被发送数据
  126. if ((unsigned long long)it->first.m_ip == ipInt)
  127. {
  128. #ifdef WIN32
  129. int re = send(it->second, buf, size, 0);
  130. if (re < 0)
  131. {
  132. int _error = GetLastError();
  133. if (_error != 10060)
  134. {
  135. //other code
  136. //删除连接
  137. deleteCSocket(it->second);
  138. it = m_CSockets.erase(it);
  139. continue;
  140. }
  141. else {
  142. re = 0;
  143. }
  144. }
  145. #else
  146. int re = send(it->second, buf, size, MSG_DONTWAIT);
  147. if (re <= 0)
  148. {
  149. if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  150. {
  151. usleep(10);
  152. re = 0;
  153. }
  154. else {
  155. //other code
  156. //删除连接
  157. deleteCSocket(it->second);
  158. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  159. m_CSockets.erase(ittemp);
  160. continue;
  161. }
  162. }
  163. #endif
  164. else {
  165. ret += 1;
  166. }
  167. }
  168. it++;
  169. }
  170. m_MyMutex.Unlock();
  171. return ret;
  172. }

        由于Read-API是支持从多个客户端读取数据,因此其数据采用一个RDClient结构体来缓存,需要注意的是,每次Read调用时,每个客户端都可能会一次性读取多帧数据,也有可能读取非完整帧数据,这是在实际项目中受到读取的效率、读取间隔、处理效率等影响。在common目录下创建DataDef.h源文件。

  1. //DataDef.h
  2. #define RDCSIZE 1024
  3. struct RDClient
  4. {
  5. RDClient()
  6. : len(0)
  7. {
  8. memset(Buf,0,RDCSIZE);
  9. };
  10. RDClient(unsigned char *buf,int nlen)
  11. {
  12. memset(Buf,0,RDCSIZE);
  13. memcpy(Buf,buf,nlen);
  14. len = nlen;
  15. };
  16. ~RDClient()
  17. {
  18. };
  19. RDClient& operator=(const RDClient &rval)
  20. {
  21. if (this!=&rval)
  22. {
  23. memset(Buf,0,RDCSIZE);
  24. memcpy(Buf,rval.Buf,rval.len);
  25. len = rval.len;
  26. }
  27. return *this;
  28. };
  29. int add(unsigned char *buf,int nlen)
  30. {
  31. try{
  32. memset(Buf+len,0,RDCSIZE-len);
  33. memcpy(Buf+len,buf,nlen);
  34. len += nlen;
  35. }catch(...)
  36. {
  37. printf("RDClient::add error \r\n");
  38. }
  39. return len;
  40. };
  41. unsigned char Buf[RDCSIZE];
  42. int len;
  43. };

        至此,整个服务端的tcp/socket通信接口MySocketPrivate类实现为,在srv_IO目录下,创建MySocketPrivate.h/cpp源文件,类设计如下(完整的MySocketPrivate.h/cpp见附录):

  1. //MySocketPrivate.h
  2. class MySocketPrivate
  3. {
  4. public:
  5. MySocketPrivate(unsigned int port)
  6. : m_Port(port)
  7. , m_OnListen(false)
  8. {
  9. m_SSocket = MY_SOCKET_NULL;
  10. //m_CSockets.clear();
  11. #ifdef WIN32
  12. /*
  13. * This function should be called once in each secondary thread
  14. * before the first socket is created in the new thread.
  15. */
  16. SocketThreadInit();
  17. #endif
  18. };
  19. ~MySocketPrivate(){
  20. disConnect();
  21. };
  22. public:
  23. int onConnect();
  24. void disConnect();
  25. int Read(std::map<KeyObj_Client,RDClient> &bufs);
  26. int Write(const char* buf, int size);
  27. int Write(unsigned long long ipInt,const char* buf, int size);
  28. bool Accept();
  29. bool get_ipInt_list(std::set<long> &ipintlist); //获取在线端的整型IP
  30. #ifdef WIN32
  31. private:
  32. void SocketThreadInit();
  33. #endif
  34. private:
  35. void deleteSSocket(); //删除服务端
  36. void deleteCSocket(); //删除所有客户端
  37. void deleteCSocket(MY_SOCKET m_CSocket);//删除指定客户端
  38. private:
  39. MY_SOCKET m_SSocket; //服务端
  40. unsigned int m_Port; //端口变量
  41. bool m_OnListen; //用于标注侦听
  42. PYMutex m_MyMutex;
  43. std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端
  44. };

        类似地,实现客户端的socket句柄管理以及读取及写入函数,整个客户端的tcp/socket通信接口MySocketClient类实现为,在client_IO目录下创建MySocketClient.h/cpp源文件,类设计如下(完整的MySocketClient.h/cpp见附录)。

  1. //MySocketClient.h
  2. class MySocketClient
  3. {
  4. public:
  5. MySocketClient(std::string ip, UINT port);
  6. ~MySocketClient(void);
  7. int onConnect();
  8. void disConnect();
  9. bool isConnect(){ return m_OnConnect; };
  10. int reSetSocket();
  11. int Read(RDClient &bufs);
  12. int Read(char* buf, int size);
  13. int Write(const char* buf, int size);
  14. private:
  15. #ifdef WIN32
  16. void SocketThreadInit();
  17. #endif
  18. private:
  19. int sock_fd;
  20. //fd_set read_flags,write_flags; // you know what these are
  21. std::string m_IpAddress;
  22. UINT m_Port; //
  23. bool m_OnConnect; //
  24. /*
  25. *当前写入失败及读取线程都可重新建立链接,m_OnConnecting设置防止冲突
  26. */
  27. bool m_OnConnecting; //
  28. bool m_ConnectONLog; //防止链接错误日志反复记录,切换状态时必定记录
  29. unsigned int m_log_Interval; //防止链接错误日志长时间不记录,2018-10-08
  30. };

四、tcp/socket通信扩展

        按上述设计,MySocketPrivate.h/cpp接口可以实现tcp/socket服务端,MySocketClient.h/cpp接口可实现tcp/socket服务端,其调用伪代码如下所示。

  1. //server
  2. MySocketPrivate *my_PrivateData = new MySocketPrivate(port);
  3. if (my_PrivateData->onConnect() > 0)
  4. {
  5. myPDataPrt->Accept();
  6. //读写数据
  7. }
  8. //client
  9. MySocketClient *my_PrivateData = new MySocketClient(netarg.ipStr, netarg.port);
  10. if (my_PrivateData->onConnect() > 0)
  11. {
  12. //读写数据,管理客户端
  13. }

        前面所述应基本实现了假定一、二,先再来看看假定三功能。MySocketPrivate.h/cpp接口和MySocketClient.h/cpp接口提供了tcp/socket功能,但是若我们按上述伪代码调用,免不了同时处理读写数据、客户端管理,在支撑多客户端通信的情况下,数据处理效率就会差,进而造成数据堵塞。因此将读取数据和写入数据采用专用线程来处理。

       4.1 线程类及消息队列类设计

         首先创建MyThread类,以此为基类,创建读取数据和写入数据类。由于windows-vs和linux-g++对于thread的调用API有所差异,因此本文分开设计win和linux的线程类。

        linux系统,在common文件加下创建myThread.h/cpp源文件,实现linux线程类MyThread。

  1. //myThread.h
  2. class MyThread
  3. {
  4. private:
  5. //current thread ID
  6. pthread_t tid;
  7. //thread status
  8. int threadStatus;
  9. //get manner pointer of execution
  10. static void* run0(void* pVoid);
  11. //manner of execution inside
  12. void* run1();
  13. public:
  14. //threadStatus-new create
  15. static const int THREAD_STATUS_NEW = 0;
  16. //threadStatus-running
  17. static const int THREAD_STATUS_RUNNING = 1;
  18. //threadStatus-end
  19. static const int THREAD_STATUS_EXIT = -1;
  20. // constructed function
  21. MyThread();
  22. ~MyThread();
  23. //the entity for thread running
  24. virtual int Run()=0;
  25. //start thread
  26. bool start();
  27. //gte thread ID
  28. pthread_t getThreadID();
  29. //get thread status
  30. int getState();
  31. //wait for thread end
  32. void join();
  33. //wait for thread end in limit time
  34. void join(unsigned long millisTime);
  35. };

        winows系统,在common文件加下创建win32Thread.h/cpp源文件,实现linux线程类MyThread。

  1. //win32Thread.h
  2. typedef void *HANDLE;
  3. class MyThread
  4. {
  5. public:
  6. MyThread();
  7. ~MyThread();
  8. void start();
  9. virtual int Run();
  10. HANDLE getThread();
  11. private:
  12. HANDLE hThread;
  13. static void agent(void *p);
  14. };

        在数据读取和数据写入时,通过设计缓存消息队列,实现读写数据和数据业务处理分离。因此采用std::deque来实现消息传递。在common文件夹创建queuedata.h源文件,设计消息队列类QueueData如下。

  1. //queuedata.h
  2. template <class T>
  3. class QueueData
  4. {
  5. public:
  6. QueueData(std::string desc = "thread_queue");
  7. ~QueueData();
  8. //
  9. /**
  10. * 获取队列大小
  11. * @return {int } 队列大小
  12. */
  13. int size();
  14. /**
  15. * 判定队列是否为空
  16. * @return {bool } 是否为空队列
  17. */
  18. bool isEmpty();
  19. /**
  20. * 获取队列头元素
  21. * @param it {T&} 头元素
  22. * @return {bool } 是否成功
  23. */
  24. bool getFirst(T &it);
  25. /**
  26. * 删除元素
  27. * @return {bool } 是否成功
  28. */
  29. bool removeFirst();
  30. /**
  31. * 获取队列头元素,并从队列终删除
  32. * @param it {T&} 头元素
  33. * @return {bool } 是否成功
  34. */
  35. bool pop(T &it);
  36. /**
  37. * 从队列头开始逐步获取多个元素,并剔除
  38. * @param its {queue<T>&} 获取到的元素集
  39. * @param sizel {int} 一次获取多少个
  40. * @return {bool } 至少获取一个元素以上则成功
  41. */
  42. bool getList(std::queue<T> &its,unsigned int sizel=5);
  43. /**
  44. * 从队列尾部添加元素
  45. * @param it {T} 被添加元素
  46. * @return {void } 无返回
  47. */
  48. void add(T it);
  49. /**
  50. * 从队列头部添加元素
  51. * @param it {T} 被添加元素
  52. * @return {void } 无返回
  53. */
  54. void add_front(T it);
  55. /**
  56. * 清空元素
  57. * @return {void }
  58. */
  59. void clear();
  60. private:
  61. void init();
  62. QueueData& operator=(const QueueData&) {return this;};
  63. protected:
  64. std::string queue_desc;
  65. private:
  66. /点集转发
  67. //协议解析结果缓存
  68. std::deque<T> datacache_queue; //队列容器
  69. PYMutex m_Mutex; //线程锁,或者如果更彻底采用acl库,采用acl::thread_mutex替代
  70. //
  71. static unsigned int QSize; //队列大小约束,超出是会从队列头剔除旧数据腾出空位在对末添加数据
  72. //
  73. int queue_overS; //队列溢出次数计数
  74. };

        线程类和消息队列类准备好后,就可以实现读取数据和写入数据类了。

        4.2 读取及写入数据处理类

        服务端部分,写入数据处理类MySocketWD,基于消息缓存队列QueueData存储需要发送的数据,和MySocketPrivate实例实现发送数据,即将MySocketPrivate类的实例传入MySocketWD,然后该类循环地从消息缓存队列中读取数据,在内部调用MySocketPrivate的write函数完成数据发送。对外提供AddData函数接收外部传递数据进入消息缓存队列。在srv_IO目录下,创建MySocketWD.h/cpp源文件,实现MySocketWD类。

  1. //MySocketWD.h
  2. class MySocketPrivate;
  3. class MySocketWD : public MyThread
  4. {
  5. public:
  6. MySocketWD(MySocketPrivate* myPDataPrt_,int netType_=1);
  7. virtual ~MySocketWD(void);
  8. int Run();
  9. int AddData(const char* buf, int len);
  10. int getBuffer(unsigned long long &_ipInt, unsigned char* _buf);
  11. private:
  12. bool running;
  13. int netType;
  14. MySocketPrivate *myPDataPrt;
  15. QueueData<WDS> WriteData;
  16. };

        写入缓存数据结构体WDS设计如下(在DataDef.h源文件实现):

  1. //DataDef.h
  2. struct WDS
  3. {
  4. WDS() : ipInt(0), data()
  5. {
  6. };
  7. WDS(unsigned long long _ipInt,TCP_Data _data)
  8. : ipInt(_ipInt), data(_data)
  9. {
  10. };
  11. WDS& operator=(const WDS &rval)
  12. {
  13. if (this == &rval) {
  14. return *this;
  15. }
  16. ipInt = rval.ipInt;
  17. data = rval.data;
  18. return *this;
  19. };
  20. unsigned long long ipInt;
  21. TCP_Data data;
  22. };

        其中WDS包含的TCP_Data结构体和RDClient结构体几乎一致,除了缓存区域大小,但暂时还是把他们区分开来,TCP_Data代表应用层面数据缓存结构体,RDClient代表socket通信逻辑层面数据结构体,在代码优化迭代阶段再合并也不迟。(备注,在开发中过程中,过早合并和优化代码,有时反而可能会失去真正设计过程及真实业务逻辑)

  1. //DataDef.h
  2. struct TCP_Data
  3. {
  4. TCP_Data() : len(0)
  5. {
  6. memset(Buf, 0, 512);
  7. };
  8. TCP_Data(unsigned char *buf, int nlen)
  9. {
  10. memset(Buf, 0, 512);
  11. memcpy(Buf, buf, nlen);
  12. len = nlen;
  13. };
  14. TCP_Data& operator=(const TCP_Data &rval)
  15. {
  16. if (this != &rval) {
  17. memset(Buf, 0, 512);
  18. if (rval.len < 512) {
  19. memcpy(Buf, rval.Buf, rval.len);
  20. len = rval.len;
  21. }
  22. else {
  23. memcpy(Buf, rval.Buf, 512);
  24. len = 512;
  25. }
  26. }
  27. return *this;
  28. };
  29. unsigned char Buf[512];
  30. int len;
  31. };

        该类最主要的就是循环体的实现

  1. //MySocketWD.cpp
  2. int MySocketWD::Run()
  3. {
  4. if (NULL == myPDataPrt)
  5. {
  6. //your code
  7. return 0;
  8. }
  9. while(running)
  10. {
  11. try {
  12. unsigned long long _ipInt = 0;
  13. unsigned char buf[512] = { 0 };
  14. int len = this->getBuffer(_ipInt, buf);//从缓存获取数据
  15. if (len > 0)
  16. {
  17. int ret = -1;
  18. ret = myPDataPrt->Write(_ipInt, (const char*)buf, len);//发送数据
  19. if (ret <=0)
  20. {
  21. //your code
  22. }
  23. }
  24. }
  25. catch (const std::exception& e)
  26. {
  27. //your code
  28. }
  29. catch (...) {
  30. //your code
  31. }
  32. #ifdef WIN32
  33. Sleep(10);
  34. #else
  35. usleep(10000);
  36. #endif
  37. }
  38. return 0;
  39. };

        类似地,读取数据处理类MySocketRD如下,在srv_IO目录下,创建MySocketRD.h/cpp源文件。

  1. //MySocketRD.h
  2. class MySocketPrivate;
  3. class MySocketRD : public MyThread
  4. {
  5. public:
  6. MySocketRD(MySocketPrivate* myPDataPrt_, int netType_=1);
  7. virtual ~MySocketRD(void);
  8. int Run();
  9. //从缓存中读取帧数据处理,请按需自行处理该函数
  10. int AddFrame(const std::string link, const unsigned char *buf, int len);
  11. private:
  12. bool running;
  13. int netType;//数据读写处理类型
  14. MySocketPrivate *myPDataPrt;
  15. QueueData<RDS> ReadData;
  16. };

        缓存数据结构体RDS如下,其也包含TCP_Data结构体。

  1. //DataDef.h
  2. struct RDS
  3. {
  4. RDS() : data(),flag("")
  5. {
  6. };
  7. RDS(TCP_Data _data,std::string _f = "")
  8. : data(_data),flag(_f)
  9. {
  10. };
  11. RDS& operator=(const RDS &rval)
  12. {
  13. if (this == &rval) {
  14. return *this;
  15. }
  16. data = rval.data;
  17. flag = rval.flag;
  18. return *this;
  19. };
  20. TCP_Data data;
  21. std::string flag;
  22. };

        该类最主要的就是循环体的实现大致如下,在该版本实现中,假定每次读取到一个完整帧数据,

  1. //MySocketRD.cpp
  2. int MySocketRD::Run()
  3. {
  4. if (NULL == myPDataPrt )
  5. {
  6. //your code
  7. return 0;
  8. }
  9. std::map<KeyObj_Client, RDClient> bufs; //从socket读取出的缓存数据
  10. RDS rdataGx;
  11. while (running)
  12. {
  13. int re = myPDataPrt->Read(bufs);//从socket接口读取数据
  14. if (re <= 0)
  15. {
  16. //your code
  17. }else {
  18. try {
  19. std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
  20. while (it != bufs.end())
  21. {
  22. if (it->second.len > 0)
  23. {
  24. RDS rdata(TCP_Data(it->second.Buf, it->second.len), it->first.m_ipStr);
  25. ReadData.add(rdata);
  26. }
  27. it++;
  28. }
  29. bufs.clear();
  30. }
  31. catch (const std::exception& e)
  32. {
  33. //your code
  34. }
  35. catch (...) {
  36. //your code
  37. }
  38. while (ReadData.getFirst(rdataGx))//处理缓存中的数据
  39. {
  40. this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
  41. ReadData.removeFirst();
  42. }
  43. }
  44. #ifdef WIN32
  45. Sleep(10);
  46. #else
  47. usleep(10000);
  48. #endif
  49. }
  50. return 0;
  51. };

        注意到本文直接在读取数据循环体内同时处理了缓存的数据(如下),也可以再采用一条独立线程来处理缓存数据,使得MySocketRD类就仅不间断循环从socket接口读取数据。当然这可视实际项目需要来设计。

  1. while (ReadData.getFirst(rdataGx))//处理缓存中的数据
  2. {
  3. this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
  4. ReadData.removeFirst();
  5. }

        客户端部分读取及写入数据处理类设计类似

        客户端的写入处理类先命名和服务端的一样,也是MySocketWD,因为基本大部分功能都一样,可以便于后面进行代码优化设计。当前先在client_IO目录下,创建MySocketWD.h/cpp源文件来实现。

  1. //client_IO/MySocketWD.h
  2. class MySocketClient;
  3. class MySocketWD : public MyThread
  4. {
  5. public:
  6. MySocketWD(void);
  7. virtual ~MySocketWD(void);
  8. void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
  9. int Run();
  10. int add_data(const char* buf, int len);
  11. int getBuffer(unsigned char * _buf);
  12. int getHeartBeatBuffer(unsigned char * buf);
  13. private:
  14. bool running;
  15. int netType;//数据读写处理类型
  16. unsigned int heartBeatWrite;
  17. MySocketClient *myPDataPrt;
  18. QueueData<TCP_Data> WriteData;
  19. };

        其处理循环函数如下,注意的是,在循环体内除了写入数据外,还同时处理链接状态判定及重新连接情况,当然该功能也在后面优化设计时可以独立出去。另外客户端的写入线程还负责发送心跳信息,即在长时间没有从消息缓存队列读取到数据时,生成心跳数据来发送

  1. //client_IO/MySocketWD.cpp
  2. int MySocketWD::Run()
  3. {
  4. if (NULL == myPDataPrt )
  5. {
  6. //
  7. return 0;
  8. }
  9. while(running)
  10. {
  11. if (!myPDataPrt->isConnect())
  12. {
  13. myPDataPrt->reSetSocket();//read or write thread do it
  14. if (!myPDataPrt->isConnect())
  15. {
  16. #ifdef WIN32
  17. Sleep(1000);
  18. #else
  19. usleep(1000000);
  20. #endif
  21. }
  22. }
  23. else {
  24. //由读取进程去重新建立链接,写入线程只判定链接状态,进行数据写入
  25. unsigned char buf[512] = { 0 };
  26. int len = this->getBuffer(buf);
  27. if (len <= 0 && (heartBeatWrite+heartBeat_interval)<static_cast<unsigned int>(time(NULL)))
  28. {
  29. len = this->getHeartBeatBuffer(buf);//生成心跳数据
  30. }
  31. if (len > 0) {
  32. int ret = myPDataPrt->Write((const char*)buf, len);
  33. if (ret != len) {
  34. //
  35. }
  36. else {
  37. heartBeatWrite = static_cast<unsigned int>(time(NULL));
  38. }
  39. }
  40. }
  41. #ifdef WIN32
  42. Sleep(1);
  43. #else
  44. usleep(1000);
  45. #endif
  46. }
  47. return 0;
  48. }

        生成心跳数据函数实现如下,至于具体发送什么心跳数据,那就依据项目要求而定,或许从配置信息读取来生成是不错的选择,就不需调整代码:

  1. //cient_IO/MySocketWD.cpp
  2. int MySocketWD::getHeartBeatBuffer(unsigned char * buf)
  3. {
  4. if (NULL != buf)
  5. {
  6. int idx = 0;
  7. std::string cur_time_str = PFunc::getCurrentTime();
  8. char buf_[64]={0};
  9. sprintf(buf_,"HeartBeat:%s",cur_time_str.c_str());
  10. idx = (int)strlen(buf_);
  11. memcpy(buf,buf_,idx);
  12. return idx;
  13. }
  14. else
  15. {
  16. return 0;
  17. }
  18. };

        客户端读取数据处理类,在 client_IO目录下,创建MySocketRD.h/cpp源文件。

  1. //client_IO/MySocketRD.h
  2. class MySocketClient;
  3. class MySocketRD : public MyThread
  4. {
  5. public:
  6. MySocketRD(void);
  7. virtual ~MySocketRD(void);
  8. void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
  9. int Run();
  10. //从缓存中读取帧数据处理,请按需自行处理该函数
  11. int AddFrame(const unsigned char *buf, int len);
  12. private:
  13. bool running;
  14. int netType;//数据读写处理类型
  15. MySocketClient *myPDataPrt;
  16. QueueData<TCP_Data> ReadData;
  17. };

        其处理循环函数如下,同样处理链接状态判定及重新连接情况(哈哈,冗余设计),另外还同时处理缓存的数据。反正循环体内就做了三件事,连接服务端,从socket读取数据并写入缓存,从缓存读取数据来处理,至于是否放在其他地方实现,呵呵,就看项目需要咯。

  1. //client_IO/MySocketRD.cpp
  2. int MySocketRD::Run()
  3. {
  4. if (NULL == myPDataPrt )
  5. {
  6. //
  7. return 0;
  8. }
  9. RDClient rdc_data;
  10. TCP_Data rddata;
  11. while (running)
  12. {
  13. if (!myPDataPrt->isConnect())
  14. {
  15. myPDataPrt->reSetSocket();//read or write thread do it
  16. if (!myPDataPrt->isConnect())
  17. {
  18. #ifdef WIN32
  19. Sleep(1000);
  20. #else
  21. usleep(1000000);
  22. #endif
  23. }
  24. }
  25. else
  26. {
  27. //读取帧数据
  28. char buf[256] = { 0 };
  29. int len = myPDataPrt->Read(buf, 256);
  30. if (len > 0)
  31. {
  32. TCP_Data rdata((unsigned char*)buf, len);
  33. ReadData.add(rdata);
  34. }
  35. //数据帧解析
  36. if (ReadData.getFirst(rddata))
  37. {
  38. this->AddFrame(rddata.Buf, rddata.len);
  39. ReadData.removeFirst();
  40. }
  41. }
  42. #ifdef WIN32
  43. Sleep(10);
  44. #else
  45. usleep(10000);
  46. #endif
  47. }
  48. return 0;
  49. };

        4.3 更上层集成的Socket-API接口

        还需要不间断监听新客户端链接情况,为了更体现独立性,把该功能也独立出来,采用专用线程类MySocketSrv去侦听客户端的链接,类似读取、写入数据处理类那样,也将MySocketPrivate类的实例传入MySocketWD。在srv_IO目录下,创建MySocketSrv.h/cpp源文件来实现。

  1. //MySocketSrv.h
  2. class MySocketPrivate;
  3. class MySocketSrv : public MyThread
  4. {
  5. public:
  6. MySocketSrv();
  7. virtual ~MySocketSrv();
  8. void setPDataPtr(MySocketPrivate *myPData);
  9. int Run();
  10. private:
  11. MySocketPrivate *myPDataPrt;
  12. };

        其循环体处理逻辑很简单,

  1. //MySocketSrv.cpp
  2. int MySocketSrv::Run()
  3. {
  4. if (NULL == myPDataPrt)
  5. {
  6. //your code
  7. return 0;
  8. }
  9. while(1)
  10. {
  11. myPDataPrt->Accept();
  12. #ifdef WIN32
  13. Sleep(300);
  14. #else
  15. usleep(300000);
  16. #endif
  17. }
  18. return 0;
  19. }

        为了更体现是一个整体的 TCP/Socket业务接口,在srv_IO目录下创建一个MySocket.h/cpp源文件,用于从业务层面上提供一个Socket-IO接口(是否有点向外观模式,呵呵),它将socket接口,读取及写入线程,侦听线程集成进来,对外提供简单易用的socket功能:初始化及读写数据。

  1. //srv_IO/MySocket.h
  2. class MySocketPrivate;
  3. class MySocketSrv;
  4. class MySocketRD;
  5. class MySocketWD;
  6. class MySocket
  7. {
  8. public:
  9. MySocket(unsigned int port,int netType_=1);
  10. ~MySocket(void);
  11. public:
  12. int Read(){ return -1; };
  13. int Write(){ return -1; };
  14. //
  15. //int Read(char* buf, int size);
  16. int Write(const char* buf, int size);
  17. private:
  18. MySocketPrivate *my_PrivateData;
  19. MySocketSrv *my_SocketSrv;
  20. MySocketRD *m_MySocketRD;
  21. MySocketWD *m_MySocketWD;
  22. };

        MySocket类的初始化如下,初始化底层真正的Socket接口MySocketPrivate,然后依据是否listen成功而进一步将MySocketPrivate传入监测客户端线程类及读写线程类并初始化及启动线程。

  1. //srv_IO/MySocket.cpp
  2. MySocket::MySocket(unsigned int port,int netType_)
  3. {
  4. try {
  5. my_PrivateData = new MySocketPrivate(port);
  6. if (my_PrivateData->onConnect() > 0)
  7. {
  8. my_SocketSrv = new MySocketSrv();
  9. my_SocketSrv->setPDataPtr(my_PrivateData);
  10. my_SocketSrv->start();
  11. m_MySocketRD = new MySocketRD(my_PrivateData,netType_);
  12. m_MySocketRD->start();
  13. m_MySocketWD = new MySocketWD( my_PrivateData,netType_);
  14. m_MySocketWD->start();
  15. }
  16. else {
  17. my_SocketSrv = NULL;
  18. m_MySocketRD = NULL;
  19. m_MySocketWD = NULL;
  20. }
  21. }
  22. catch (...) {
  23. //以下处理时防止初始化半途而废(异常)
  24. delete my_SocketSrv;
  25. my_SocketSrv = NULL;
  26. delete m_MySocketRD;
  27. m_MySocketRD = NULL;
  28. delete m_MySocketWD;
  29. m_MySocketWD = NULL;
  30. delete my_PrivateData;
  31. my_PrivateData = NULL;
  32. }
  33. };

        同样地,在客户端层面,也创建一个MySocket类,集成socket-API接口及读写线程类,在client_IO目录,创建MySocket.h/cpp源文件来实现。

  1. //client_IO/MySocket.h
  2. class MySocketClient;
  3. class MySocketWD;
  4. class MySocketRD;
  5. class MySocket
  6. {
  7. public:
  8. MySocket(int _tranid, NetArg _netarg);
  9. virtual ~MySocket(void);
  10. public:
  11. virtual int Read(){ return -1; };
  12. virtual int Write(){ return -1; };
  13. int Write(const char* buf, int size);
  14. private:
  15. int tranid;
  16. NetArg netarg;
  17. MySocketClient *my_PrivateData;
  18. MySocketWD *m_MySocketWD;
  19. MySocketRD *m_MySocketRD;
  20. };

        客户端的MySocket类的初始化如下,初始化底层真正的Socket接口MySocketPrivate,然后将MySocketPrivate传入读写线程类并初始化及启动线程。

  1. //client_IO/MySocket.cpp
  2. MySocket::MySocket(int _tranid, NetArg _netarg)
  3. : tranid(_tranid)
  4. , netarg(_netarg)
  5. {
  6. try {//防止构造时异常出现内存泄漏
  7. //TCP/IP客户端,连接监控服务或其他平台
  8. my_PrivateData = new MySocketClient(netarg.ipStr, netarg.port);
  9. if (my_PrivateData->onConnect() <= 0)
  10. {
  11. //do something
  12. }
  13. //数据协议编码解码 序列化及反序列化
  14. //数据发送线程
  15. m_MySocketWD = new MySocketWD();
  16. m_MySocketWD->setPrivateDataPtr(my_PrivateData, netarg.type);
  17. m_MySocketWD->start();
  18. //数据接收线程
  19. m_MySocketRD = new MySocketRD();
  20. m_MySocketRD->setPrivateDataPtr(my_PrivateData, netarg.type);
  21. m_MySocketRD->start();
  22. }
  23. catch (...)
  24. {
  25. delete m_MySocketRD;
  26. m_MySocketRD = NULL;
  27. delete m_MySocketWD;
  28. m_MySocketWD = NULL;
  29. delete my_PrivateData;
  30. my_PrivateData = NULL;
  31. }
  32. }

        通过集成MySocket类提供接口服务后,那么调用就更简单了,如下构造socket接口及发送数据:

  1. //srv_test/main.cpp
  2. MySocket server_test(70001,2);
  3. char buf[]="hello, this is server!";
  4. server_test.Write((const char*)buf,(int)strlen(buf));
  5. //client_test/main.cpp
  6. MySocket client_test(1,_netarg);
  7. char buf[]="hello, this is client 01!";
  8. client_test.Write((const char*)buf,(int)strlen(buf));

        至于读取数据,前面已经提过,在读取线程里进行类处理了,其处理如下(打印信息而已,哈哈):

  1. //client_IO/MySocketRD.cpp
  2. //srv_IO/MySocketRD.cpp
  3. int MySocketRD::AddFrame(const unsigned char *buf, int len)
  4. {
  5. if(NULL==buf)
  6. return 0;
  7. printf("rev:%s\r\n",(char*)buf);
  8. return 0;
  9. };

五、tcp/socket业务数据通信处理

        前面在读取数据时就提到,暂且当每次读取数据时是一个完整帧,但是我们回想一下,在实际使用过程中,尤其是服务端,同时读取多个客户端的数据时,可能会出现数据堆叠在一起,即有可能一次从socket句柄信道哪里读取到多帧数据,或者读取到不完整帧数据,那该如何处理呢。

        5.1  数据编解码设计(序列化及反序列化)

        那么就需要通信有识别帧的能力,在本文,通过一个简要设计来阐述这方面的应用。

        假设编码时:

        1)在编码时,逐步遍历消息中的每个字节的数据(8bit),如果有大于0XF0需要做转义处理,例如将0XF3转义为0XF0 0X03

        2)在转移好的消息前加上一个字节数据0XF1作为帧头标记,在消息尾加上一个字节数据0XFF作为帧尾标记。

        加上解码时

        1)在解码时,先查找到0XF1标记和0XFF标记,进行帧数据分割,取出它们之间的数据作为接收到的消息

        2)将接收到的消息进行转义,如果发现某字节等于0XF0,与其后续字节合并转义,例如0XF0 0X0A转义0XFA。

        因此给出编解码实现函数code、uncode,并将其声明及定义加入到common目录下的myFunc.h/cpp内。

  1. //common/myFunc.h
  2. namespace PFunc
  3. {
  4. //frame code
  5. int code(const unsigned char *buff, const int len, unsigned char *outbuf);
  6. //frame uncode
  7. int uncode(const unsigned char *buff, int len, unsigned char *outbuf);
  8. };
  9. //common/myFunc.cpp
  10. int PFunc::code(const unsigned char *buff, const int len, unsigned char *outbuf)
  11. {
  12. char ch = 0;
  13. int nLen = 0;
  14. unsigned char * buf = (unsigned char *)buff;
  15. *outbuf++ = 0XF1;//头字节
  16. nLen+=1;
  17. for (int i = 0; i < len; i++, nLen++)
  18. {
  19. ch = buf[i];
  20. if ((buf[i] | 0x0f) == 0xff && i > 0 && i < (len - 1))
  21. {
  22. *outbuf++ = 0xf0 & buf[i];
  23. *outbuf++ = 0x0f & buf[i];
  24. nLen += 1;
  25. }
  26. else {
  27. *outbuf++ = ch;
  28. }
  29. }
  30. *outbuf++ = 0XFF;//末字节
  31. nLen+=1;
  32. buf = NULL;
  33. return nLen;
  34. }
  35. int PFunc::uncode(const unsigned char *buff, int len, unsigned char *outbuf)
  36. {
  37. char ch = 0;
  38. int nLen = 0;
  39. unsigned char * buf = (unsigned char *)buff;
  40. //头、末尾字节判断
  41. if(len<=2&&0XF1!=buf[0]&&0XFF!=buf[len-1]){
  42. printf("uncode func, start bit or end bit Error!\r\n");
  43. return 0;
  44. }
  45. for (int i = 1; i < (len-1); i++, nLen++)
  46. {
  47. ch = buf[i];
  48. if (buf[i] == 0xf0)
  49. {
  50. #ifdef _DEBUG
  51. if (i > len - 2)
  52. printf("Error!\r\n");
  53. if (buf[i + 1] > 0x0f)
  54. printf("Error!\r\n");
  55. #endif
  56. ch = 0xf0 | buf[++i];
  57. }
  58. *outbuf++ = ch;
  59. }
  60. buf = NULL;
  61. return nLen;
  62. }

        那么在服务端和客户端的数据读取及写入线程就可以调用code和uncode来实现数据的编解码。

        在/srv_IO/MySocketWD.cpp内除了原来的直接数据写入外,增加新的除了方式,从缓存消息队列读取数据,并将数据进行编码(序列化)后得到的新数据写入socket内(发送)。如下文,netType是数据处理类型(1,直接写入;2,code后写入)

  1. int MySocketWD::Run()
  2. {
  3. if (NULL == myPDataPrt)
  4. {
  5. //
  6. return 0;
  7. }
  8. while(running)
  9. {
  10. try {
  11. unsigned long long _ipInt = 0;
  12. unsigned char buf[512] = { 0 };
  13. int len = this->getBuffer(_ipInt, buf);
  14. if (len > 0)
  15. {
  16. int ret = -1;
  17. switch (netType)
  18. {
  19. case 1:
  20. {
  21. ret = myPDataPrt->Write(_ipInt, (const char*)buf, len);
  22. break;
  23. }
  24. case 2:
  25. {
  26. // printf("send data: %s\r\n",buf);
  27. unsigned char* _buf = new unsigned char[2 * len + 1];
  28. memset(_buf, 0, 2 * len + 1);
  29. len = PFunc::code(buf, len, _buf);//序列化处理
  30. printf("send data: %d\r\n",len);
  31. ret = myPDataPrt->Write(_ipInt, (const char*)_buf, len);//发送
  32. delete[] _buf;
  33. _buf = NULL;
  34. break;
  35. }
  36. default:
  37. {
  38. char warBuf[128] = { 0 };
  39. sprintf(warBuf, "MySocketWD::Run For Unkown NetType(%02X)", netType);
  40. #ifdef WIN32
  41. throw std::exception(warBuf);
  42. #else
  43. throw std::domain_error(warBuf);
  44. #endif
  45. break;
  46. }
  47. }
  48. if (ret <=0)
  49. {
  50. //printf("send data: %d, buf %d\n",len,ret);
  51. }
  52. //else{
  53. // printf("send data: %d, and real send %d\n",len,ret);
  54. //}
  55. }
  56. }
  57. catch (const std::exception& e)
  58. {
  59. //your code
  60. }
  61. catch (...) {
  62. //your code
  63. }
  64. #ifdef WIN32
  65. Sleep(10);
  66. #else
  67. usleep(10000);
  68. #endif
  69. }
  70. return 0;
  71. };

        修改/srv_IO/MySocketRD.cpp内的循环函数,相比写入处理要复杂些,如下文,netType是数据处理类型(1,直接写入;2,从socket读取数据后,进行数据分帧处理,将识别出的帧数据写入缓存中)。由于前面提到过可能会一次从某个socket句柄的读取多帧数据或不完整数据,因此在分帧处理时就查找到帧的开始及结束标记,才能算得到一帧完整的数据。如果只找到开始标记,没结束标记,这部分数据就需要移动到缓存区域前面,与后面继续读取的数据拼在一起来处理。

  1. int MySocketRD::Run()
  2. {
  3. if (NULL == myPDataPrt )
  4. {
  5. return 0;
  6. }
  7. std::map<KeyObj_Client, RDClient> bufs;
  8. RDS rdataGx;
  9. while (running)
  10. {
  11. int re = myPDataPrt->Read(bufs);
  12. if (re <= 0)
  13. {
  14. #ifdef _DEBUG
  15. printf_s("Read Data Failed or NULL\n!");
  16. #else
  17. ;
  18. #endif
  19. }else {
  20. switch (netType)
  21. {
  22. case 1:
  23. {
  24. try {
  25. std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
  26. while (it != bufs.end())
  27. {
  28. if (it->second.len > 0)
  29. {
  30. RDS rdata(TCP_Data(it->second.Buf, it->second.len), it->first.m_ipStr);
  31. ReadData.add(rdata);
  32. }
  33. it++;
  34. }
  35. bufs.clear();
  36. }
  37. catch (const std::exception& e)
  38. {
  39. //your code
  40. }
  41. catch (...) {
  42. //your code
  43. }
  44. while (ReadData.getFirst(rdataGx))
  45. {
  46. this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
  47. ReadData.removeFirst();
  48. }
  49. break;
  50. }
  51. case 2:
  52. {
  53. try {
  54. std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
  55. while (it != bufs.end())
  56. {
  57. unsigned char * buff = it->second.Buf;
  58. int start_frame = 0;
  59. unsigned char ctype = 0;
  60. for (int i = 0; i < it->second.len; i++)
  61. {
  62. //printf_s("%02X ",buff[i]);
  63. if (buff[i] > 0xf0) {
  64. if (buff[i] == 0xff)
  65. {
  66. if (ctype)
  67. {
  68. ctype = 0;
  69. int re_len = i - start_frame + 1;
  70. // RDS rdata(TCP_Data(buff + start_frame, i - start_frame + 1), it->first.m_ipStr);
  71. unsigned char * pBuf = new unsigned char[re_len];
  72. //
  73. int nLen = PFunc::uncode(buff + start_frame, re_len, pBuf);
  74. RDS rdata(TCP_Data(pBuf, nLen), it->first.m_ipStr);
  75. // printf("rev01:%s\r\n",(char*)pBuf);
  76. printf("rev01:%d\r\n",nLen);
  77. ReadData.add(rdata);
  78. start_frame = i + 1;
  79. delete[] pBuf;
  80. pBuf = NULL;
  81. }
  82. }
  83. else {
  84. ctype = buff[i];
  85. start_frame = i;
  86. }
  87. }
  88. }
  89. buff = NULL;
  90. if (start_frame < it->second.len)
  91. {
  92. RDClient _newrd(it->second.Buf + start_frame, it->second.len - start_frame);
  93. it->second = _newrd;
  94. it++;
  95. }
  96. else {
  97. #ifdef WIN32
  98. it = bufs.erase(it);
  99. #else
  100. std::map<KeyObj_Client, RDClient>::iterator ittemp = it++;
  101. bufs.erase(ittemp);
  102. #endif
  103. }
  104. }
  105. }
  106. catch (const std::exception& e)
  107. {
  108. //your code
  109. }
  110. catch (...) {
  111. //your code
  112. }
  113. while (ReadData.getFirst(rdataGx))
  114. {
  115. this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
  116. ReadData.removeFirst();
  117. }
  118. break;
  119. }
  120. default:
  121. break;
  122. }
  123. }
  124. #ifdef WIN32
  125. Sleep(10);
  126. #else
  127. usleep(10000);
  128. #endif
  129. }
  130. return 0;
  131. };

        因此,在通过socket-API 读取到数据后,是调用缓存RDClient结构体的add函数在末尾追加写入的

  1. //common/DataDef.h
  2. int RDClient::add(unsigned char *buf,int nlen)
  3. {
  4. try{
  5. memset(Buf+len,0,RDCSIZE-len);
  6. memcpy(Buf+len,buf,nlen);
  7. len += nlen;
  8. }catch(...)
  9. {
  10. printf("RDClient::add error \r\n");
  11. }
  12. return len;
  13. };

        对于客户端来说,也是类似的

        client_IO/MySocketWD.cpp-Run函数修改如下:

  1. int MySocketWD::Run()
  2. {
  3. if (NULL == myPDataPrt )
  4. {
  5. return 0;
  6. }
  7. while(running)
  8. {
  9. if (!myPDataPrt->isConnect())
  10. {
  11. myPDataPrt->reSetSocket();//read or write thread do it
  12. if (!myPDataPrt->isConnect())
  13. {
  14. #ifdef WIN32
  15. Sleep(1000);
  16. #else
  17. usleep(1000000);
  18. #endif
  19. }
  20. }
  21. else {
  22. //由读取进程去重新建立链接,写入线程只判定链接状态,进行数据写入
  23. unsigned char buf[512] = { 0 };
  24. int len = this->getBuffer(buf);
  25. if (len <= 0 && (heartBeatWrite+heartBeat_interval)<static_cast<unsigned int>(time(NULL)))
  26. {
  27. len = this->getHeartBeatBuffer(buf);
  28. }
  29. if (len > 0) {
  30. switch (netType)
  31. {
  32. case 1:
  33. {
  34. int ret = myPDataPrt->Write((const char*)buf, len);
  35. if (ret != len) {
  36. //printf("send data: %d, buf %d\n",len,ret);
  37. }
  38. else {
  39. heartBeatWrite = static_cast<unsigned int>(time(NULL));
  40. }
  41. }
  42. break;
  43. case 2:
  44. {
  45. int cacheLen = 2 * len + 1;
  46. unsigned char* _buf = new unsigned char[cacheLen];
  47. memset(_buf, 0, cacheLen);
  48. int nLen = PFunc::code(buf, len, _buf);//序列化处理
  49. int ret = myPDataPrt->Write((const char*)_buf, nLen);
  50. if (ret != nLen) {
  51. //printf("send data: %d, buf %d\n",len,ret);
  52. }
  53. else {
  54. heartBeatWrite = static_cast<unsigned int>(time(NULL));
  55. }
  56. delete[] _buf;
  57. _buf = NULL;
  58. }
  59. break;
  60. default:
  61. break;
  62. }
  63. }
  64. }
  65. #ifdef WIN32
  66. Sleep(1);
  67. #else
  68. usleep(1000);
  69. #endif
  70. }
  71. return 0;
  72. }

        client_IO/MySocketRD.cpp-Run函数修改如下:

  1. int MySocketRD::Run()
  2. {
  3. if (NULL == myPDataPrt )
  4. {
  5. return 0;
  6. }
  7. RDClient rdc_data;
  8. TCP_Data rddata;
  9. while (running)
  10. {
  11. if (!myPDataPrt->isConnect())
  12. {
  13. myPDataPrt->reSetSocket();//read or write thread do it
  14. if (!myPDataPrt->isConnect())
  15. {
  16. #ifdef WIN32
  17. Sleep(1000);
  18. #else
  19. usleep(1000000);
  20. #endif
  21. }
  22. }
  23. else
  24. {
  25. //读取帧数据
  26. switch (netType)
  27. {
  28. case 1:
  29. {
  30. //直接读取,不用做分帧处理,ACSII字段
  31. char buf[256] = { 0 };
  32. int len = myPDataPrt->Read(buf, 256);
  33. if (len > 0)
  34. {
  35. TCP_Data rdata((unsigned char*)buf, len);
  36. ReadData.add(rdata);
  37. }
  38. //数据帧解析
  39. if (ReadData.getFirst(rddata))
  40. {
  41. this->AddFrame(rddata.Buf, rddata.len);
  42. ReadData.removeFirst();
  43. }
  44. }
  45. break;
  46. case 2:
  47. {
  48. //数据有特定帧头和结尾,做分帧处理
  49. int ret = myPDataPrt->Read(rdc_data);
  50. if (ret > 0)
  51. {
  52. //printf("read(%d) from pcs_server\n",ret);
  53. unsigned char * buff = rdc_data.Buf;
  54. int frame_start = 0;
  55. unsigned char ctype = 0;
  56. for (int i = 0; i < rdc_data.len; ++i)
  57. {
  58. //printf("%02X ",buff[i]);
  59. if (buff[i] > 0xf0)
  60. {
  61. if (buff[i] == 0xff)
  62. {
  63. if (ctype)
  64. {
  65. ctype = 0;
  66. // TCP_Data rdata(buff + frame_start, i - frame_start + 1);
  67. unsigned char * pBuf = new unsigned char[i - frame_start + 1];
  68. int nLen = PFunc::uncode(buff + frame_start, i - frame_start + 1, pBuf);//反序列化处理
  69. TCP_Data rdata(pBuf, nLen);
  70. ReadData.add(rdata);
  71. frame_start = i + 1;
  72. delete[] pBuf;
  73. pBuf = NULL;
  74. }
  75. }
  76. else
  77. {
  78. ctype = buff[i];
  79. frame_start = i;
  80. }
  81. }
  82. }
  83. buff = NULL;
  84. if (frame_start < rdc_data.len)
  85. {
  86. RDClient _newrd(rdc_data.Buf + frame_start, rdc_data.len - frame_start);
  87. rdc_data = _newrd;
  88. }
  89. else
  90. {
  91. rdc_data.len = 0;
  92. }
  93. }
  94. //数据帧解析
  95. while (ReadData.getFirst(rddata))
  96. {
  97. this->AddFrame(rddata.Buf, rddata.len);
  98. ReadData.removeFirst();
  99. }
  100. }
  101. break;
  102. default:
  103. break;
  104. }
  105. }
  106. #ifdef WIN32
  107. Sleep(10);
  108. #else
  109. usleep(10000);
  110. #endif
  111. }
  112. return 0;
  113. };

        5.2 结构化数据传递

        在实现项目中,通过TCP数据传递除了发送字符串外,还需要将数据以包的形式直接发送及接收,而包数据有固定的格式,在c/c++领域,我们通常采用struct关键字定义包数据。例如定义一下结构体:

  1. //common/DataDef.h
  2. typedef struct TCP_Data_Model
  3. {
  4. unsigned type;
  5. int len;
  6. float val;
  7. char desc[32];
  8. }TcpSocket,*PTcpSocket;

        按前面所述的业务应用时数据发送及接收处理方式,那么发送时,如下,和发送字符串无多少区别:

  1. //main.cpp
  2. MySocket server_test(70001,2);
  3. char buf[]="hello, this is server!";
  4. TcpSocket ts;
  5. ts.len=(int)strlen(buf);
  6. memcpy(ts.desc,buf,ts.len);
  7. ts.type = 1;
  8. ts.val = 10.0;
  9. //server_test.Write((const char*)buf,(int)strlen(buf));
  10. server_test.Write((const char*)&ts,(int)sizeof(TcpSocket));

        接收数据时,处理如下,直接将数据进行块拷贝即可:

  1. //MySocketRD.cpp
  2. int MySocketRD::AddFrame(const std::string link, const unsigned char *buf, int len)
  3. {
  4. if(NULL == buf)
  5. return 0;
  6. // printf("rev:%s\r\n",(char*)buf);
  7. TcpSocket ts;
  8. memcpy(&ts,buf,len);
  9. printf("rev:%u,%d,%0.2f,%s\r\n",ts.type,ts.len,ts.val,ts.desc);
  10. return 0;
  11. };

六、项目最终呈现

        6.1 增加日志记录模块

        涉及到TCP/Socket通信,一般大多数都会被设计成后台服务长期运行,那么日志记录就是用户需求不提及也需要提供功能服务。日志服务对于长期运行中发现异常、bug、调优等都很有帮助,并也可以为用户提供日志运行信息。

        通常大多数公司都有自己应有的及长时间实践过的或大或小的日志系统,尤其是对于平台级的系统而言。但是对于项目项目,原型验证,就没必要给出一个系统级的日志系统。

        本文给出一个基类为MyThread的单体模式的日志类CLogger,在common目录下,创建Log.h/cpp源文件,实现CLogger类,如下文。在该类中,有一个日志信息缓存队列mylogs(类似前面的数据读取及写入消息队列),用来缓存各业务模块推送的日志。循环函数Run还不间断从日志缓存队列读取日志并写入日志文件中。

  1. //common/Log.h
  2. enum eLogType
  3. {
  4. eHardError = 1,
  5. eSoftError = 2,
  6. eConfigError = 3,
  7. eParameterError = 4,
  8. eReadError = 5,
  9. eWriteError = 6,
  10. eControlMessage = 7,
  11. eResponseMessage = 8,
  12. eTipMessage = 9
  13. };
  14. struct MyLogStruct
  15. {
  16. MyLogStruct():type(0)
  17. {
  18. memset(szBuffer, 0, 1024);
  19. };
  20. int type;
  21. char szBuffer[1024];
  22. };
  23. class CLogger : public MyThread
  24. {
  25. public:
  26. CLogger();
  27. ~CLogger();
  28. int Run();
  29. public:
  30. void Log(const eLogType type, const char* lpszFormat, ...);
  31. static CLogger* createInstance( void );
  32. private:
  33. bool getFirstLog(MyLogStruct &it);
  34. void addLog(MyLogStruct it);
  35. private:
  36. static CLogger* m_pLogInstance;
  37. bool running;
  38. //for cache
  39. std::queue<MyLogStruct> mylogs;
  40. PYMutex m_Mutex_Log;
  41. int i_log_over;
  42. };
  43. //common/Log.cpp
  44. int CLogger::Run()
  45. {
  46. MyLogStruct log_;
  47. while (running) {
  48. if (getFirstLog(log_))
  49. {
  50. WriteLog(log_.type, log_.szBuffer);
  51. #ifndef WIN32
  52. printf("Log::[%d]-->%s\n", getpid(), log_.szBuffer);
  53. #else
  54. printf("Log::-->%s\n", log_.szBuffer);
  55. #endif
  56. }
  57. #ifdef WIN32
  58. Sleep(10);
  59. #else
  60. usleep(10000);
  61. #endif
  62. }
  63. return 0;
  64. };

        6.2 项目完整目录结构

        至此,基本实现了前面提到的四个假设内容(业务需求),并增加了一些额外功能,构建一个实现基本功能点的程序开发。

  1. #
  2. TCP_StructData
  3. bin #编译输出结果
  4. client_test
  5. client_test.exe
  6. server_test
  7. server_test.exe
  8. client_IO #客户端Socket的API功能源码
  9. MySocket.h
  10. MySocket.cpp
  11. MySocketClient.h
  12. MySocketClient.cpp
  13. MySocketRD.h
  14. MySocketRD.cpp
  15. MySocketWD.h
  16. MySocketWD.cpp
  17. client_test #客户端业务功能源码
  18. build_win #客户端windows编辑中间文件输出目录
  19. build_linux #客户端linux编辑中间文件输出目录
  20. main.cpp
  21. CMakeLists.txt #cmake配置文件
  22. common #共同功能模块或数据结构源码
  23. DataDef.h #结构化数据
  24. hashmap.h #结构化数据作为容器Key实现
  25. hashmap.cpp
  26. Log.h #日志模块
  27. Log.cpp
  28. Mutex.h #互斥锁
  29. Mutex.cpp
  30. myThread.h #linux系统下线程类实现
  31. myThread.cpp
  32. queuedata.h
  33. win32Thread.h #windows系统下线程类实现
  34. win32Thread.cpp
  35. srv_IO #服务端Socket的API功能源码
  36. MySocket.h
  37. MySocket.cpp
  38. MySocketPrivate.h
  39. MySocketPrivate.cpp
  40. MySocketRD.h
  41. MySocketRD.cpp
  42. MySocketSrv.h
  43. MySocketSrv.cpp
  44. MySocketWD.h
  45. MySocketWD.cpp
  46. svr_test #服务端业务功能源码
  47. build_win #客户端windows编辑中间文件输出目录
  48. build_linux #客户端linux编辑中间文件输出目录
  49. main.cpp #
  50. CMakeLists.txt #cmake配置文件

                其中/srv_test/CMakeLists.txt 如下:

  1. # CMake 最低版本号要求
  2. cmake_minimum_required (VERSION 3.2)
  3. # 项目信息
  4. project (server_test)
  5. #
  6. if(WIN32)
  7. message(STATUS "windows compiling...")
  8. add_definitions(-D_PLATFORM_IS_WINDOWS_)
  9. set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /MT")
  10. set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /MTd")
  11. set(WIN_OS true)
  12. else(WIN32)
  13. message(STATUS "linux compiling...")
  14. add_definitions( -D_PLATFORM_IS_LINUX_)
  15. set(UNIX_OS true)
  16. set(_DEBUG true)
  17. endif(WIN32)
  18. #
  19. set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin)
  20. # 指定源文件的目录,并将名称保存到变量
  21. SET(source_h
  22. #
  23. #${PROJECT_SOURCE_DIR}/../common/pfunc_print.h
  24. ${PROJECT_SOURCE_DIR}/../common/queuedata.h
  25. #${PROJECT_SOURCE_DIR}/../common/conf.h
  26. ${PROJECT_SOURCE_DIR}/../common/Mutex.h
  27. ${PROJECT_SOURCE_DIR}/../common/hashmap.h
  28. ${PROJECT_SOURCE_DIR}/../common/myFunc.h
  29. ${PROJECT_SOURCE_DIR}/../common/Log.h
  30. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocket.h
  31. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketRD.h
  32. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketWD.h
  33. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketPrivate.h
  34. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketSrv.h
  35. )
  36. SET(source_cpp
  37. #
  38. ${PROJECT_SOURCE_DIR}/../common/Mutex.cpp
  39. ${PROJECT_SOURCE_DIR}/../common/hashmap.cpp
  40. ${PROJECT_SOURCE_DIR}/../common/myFunc.cpp
  41. ${PROJECT_SOURCE_DIR}/../common/Log.cpp
  42. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocket.cpp
  43. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketRD.cpp
  44. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketWD.cpp
  45. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketPrivate.cpp
  46. ${PROJECT_SOURCE_DIR}/../srv_IO/MySocketSrv.cpp
  47. ${PROJECT_SOURCE_DIR}/main.cpp
  48. )
  49. #头文件目录
  50. include_directories(
  51. ${PROJECT_SOURCE_DIR}
  52. ${PROJECT_SOURCE_DIR}/../common
  53. ${PROJECT_SOURCE_DIR}/../srv_IO
  54. )
  55. if (${UNIX_OS})
  56. SET(source_h_linux
  57. ${PROJECT_SOURCE_DIR}/../common/myThread.h
  58. )
  59. SET(source_cpp_linux
  60. ${PROJECT_SOURCE_DIR}/../common/myThread.cpp
  61. )
  62. add_definitions(
  63. "-W"
  64. "-fPIC"
  65. "-Wall"
  66. # "-Wall -g"
  67. "-Werror"
  68. "-Wshadow"
  69. "-Wformat"
  70. "-Wpointer-arith"
  71. "-D_REENTRANT"
  72. "-D_USE_FAST_MACRO"
  73. "-Wno-long-long"
  74. "-Wuninitialized"
  75. "-D_POSIX_PTHREAD_SEMANTICS"
  76. "-DACL_PREPARE_COMPILE"
  77. "-Wno-unused-parameter"
  78. "-fexceptions"
  79. )
  80. set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
  81. link_directories()
  82. # 指定生成目标
  83. add_executable(server_test ${source_h} ${source_cpp} ${source_h_linux} ${source_cpp_linux})
  84. #link
  85. target_link_libraries(server_test
  86. -lpthread -pthread -lz -lrt -ldl
  87. )
  88. endif(${UNIX_OS})
  89. if (${WIN_OS})
  90. SET(source_h_win
  91. ${PROJECT_SOURCE_DIR}/../common/win32Thread.h
  92. )
  93. SET(source_cpp_win
  94. ${PROJECT_SOURCE_DIR}/../common/win32Thread.cpp
  95. )
  96. set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4819")
  97. add_definitions(
  98. "-D_CRT_SECURE_NO_WARNINGS"
  99. "-D_WINSOCK_DEPRECATED_NO_WARNINGS"
  100. "-DNO_WARN_MBCS_MFC_DEPRECATION"
  101. "-DWIN32_LEAN_AND_MEAN"
  102. )
  103. link_directories()
  104. if (CMAKE_BUILD_TYPE STREQUAL "Debug")
  105. set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG ${PROJECT_SOURCE_DIR}/../bin)
  106. # 指定生成目标
  107. add_executable(server_testd ${source_h} ${source_cpp} ${source_h_win} ${source_cpp_win})
  108. #target_link_libraries(server_testd *.lib)
  109. else(CMAKE_BUILD_TYPE)
  110. set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELEASE ${PROJECT_SOURCE_DIR}/../bin)
  111. # 指定生成目标
  112. add_executable(server_test ${source_h} ${source_cpp} ${source_h_win} ${source_cpp_win})
  113. #target_link_libraries(server_test *.lib)
  114. endif (CMAKE_BUILD_TYPE)
  115. endif(${WIN_OS})

        /client_test/CMakeLists.txt 如下:

  1. # CMake 最低版本号要求
  2. cmake_minimum_required (VERSION 2.8)
  3. # 项目信息
  4. project (client_test)
  5. #
  6. if(WIN32)
  7. message(STATUS "windows compiling...")
  8. add_definitions(-D_PLATFORM_IS_WINDOWS_)
  9. set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /MT")
  10. set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /MTd")
  11. set(WIN_OS true)
  12. else(WIN32)
  13. message(STATUS "linux compiling...")
  14. add_definitions( -D_PLATFORM_IS_LINUX_)
  15. add_definitions("-Wno-invalid-source-encoding")
  16. set(UNIX_OS true)
  17. set(_DEBUG true)
  18. endif(WIN32)
  19. #
  20. set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin)
  21. # 指定源文件的目录,并将名称保存到变量
  22. SET(source_h
  23. #
  24. ${PROJECT_SOURCE_DIR}/../common/DataDef.h
  25. ${PROJECT_SOURCE_DIR}/../common/queuedata.h
  26. ${PROJECT_SOURCE_DIR}/../common/Mutex.h
  27. ${PROJECT_SOURCE_DIR}/../common/myFunc.h
  28. ${PROJECT_SOURCE_DIR}/../common/Log.h
  29. ${PROJECT_SOURCE_DIR}/../client_IO/MySocket.h
  30. ${PROJECT_SOURCE_DIR}/../client_IO/MySocketClient.h
  31. ${PROJECT_SOURCE_DIR}/../client_IO/MySocketRD.h
  32. ${PROJECT_SOURCE_DIR}/../client_IO/MySocketWD.h
  33. )
  34. SET(source_cpp
  35. #
  36. ${PROJECT_SOURCE_DIR}/../common/Mutex.cpp
  37. ${PROJECT_SOURCE_DIR}/../common/myFunc.cpp
  38. ${PROJECT_SOURCE_DIR}/../common/Log.cpp
  39. ${PROJECT_SOURCE_DIR}/../client_IO/MySocket.cpp
  40. ${PROJECT_SOURCE_DIR}/../client_IO/MySocketClient.cpp
  41. ${PROJECT_SOURCE_DIR}/../client_IO/MySocketRD.cpp
  42. ${PROJECT_SOURCE_DIR}/../client_IO/MySocketWD.cpp
  43. ${PROJECT_SOURCE_DIR}/main.cpp
  44. )
  45. #头文件目录
  46. include_directories(
  47. ${PROJECT_SOURCE_DIR}
  48. ${PROJECT_SOURCE_DIR}/../common
  49. ${PROJECT_SOURCE_DIR}/../client_IO
  50. )
  51. if (${UNIX_OS})
  52. SET(source_h_linux
  53. ${PROJECT_SOURCE_DIR}/../common/myThread.h
  54. )
  55. SET(source_cpp_linux
  56. ${PROJECT_SOURCE_DIR}/../common/myThread.cpp
  57. )
  58. add_definitions(
  59. "-W"
  60. "-fPIC"
  61. "-Wall"
  62. # "-Wall -g"
  63. "-Werror"
  64. "-Wshadow"
  65. "-Wformat"
  66. "-Wpointer-arith"
  67. "-D_REENTRANT"
  68. "-D_USE_FAST_MACRO"
  69. "-Wno-long-long"
  70. "-Wuninitialized"
  71. "-D_POSIX_PTHREAD_SEMANTICS"
  72. "-DACL_PREPARE_COMPILE"
  73. "-Wno-unused-parameter"
  74. "-fexceptions"
  75. )
  76. set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
  77. link_directories()
  78. # 指定生成目标
  79. add_executable(client_test ${source_h} ${source_cpp} ${source_h_linux} ${source_cpp_linux})
  80. #link
  81. target_link_libraries(client_test
  82. -lpthread -pthread -lz -lrt -ldl
  83. )
  84. endif(${UNIX_OS})
  85. if (${WIN_OS})
  86. SET(source_h_win
  87. ${PROJECT_SOURCE_DIR}/../common/win32Thread.h
  88. )

6.3 程序编译如下

        server:

  1. cd srv_test && mkdir build_win && cd build_win
  2. cmake -G "Visual Studio 14 2015 Win64" -DCMAKE_BUILD_TYPE=Release ..
  3. #vs 命令窗口,或者配置了msbuild路径的命令窗口
  4. msbuild server_test.sln /p:Configuration="Release" /p:Platform="x64"
  5. linux:
  6. cd srv_test && mkdir build_linux && cd build_linux
  7. cmake ..
  8. make

        编译:

         client

  1. win:
  2. cd client_test && mkdir build_win && cd build_win
  3. cmake -G "Visual Studio 14 2015 Win64" -DCMAKE_BUILD_TYPE=Release .. -Wno-dev
  4. #vs 命令窗口
  5. msbuild client_test.sln /p:Configuration="Release" /p:Platform="x64"
  6. linux:
  7. cd client_test && mkdir build_linux && cd build_linux
  8. cmake ..
  9. make

        编译:

 6.4 测试

       netType=2,win:server_test.exe client_test.exe 127.0.0.1 70001

          netType=2,Linux:./server_test   ./client_test 127.0.0.1 70001

         netType=1,在main.cpp内调整输入类型,注释掉模块化传递部分,以及在MySocketRD.cpp的AddFrame函数注释掉模块传递部分,win运行指令:server_test.exe   client_test.exe 127.0.0.1 70001,netType=1时,支持通用客户端连接,例如借助串口助手连接测试。

        6.4 结语

         基本功能达成,但是整入前面所述那样,项目还有很多需要调整的空间:

        1)客户端及服务端的读取、写入处理类以及集成类如此相似,可提炼优化

        2)DataDef.h内的结构体如此相似,也可以提炼优化

        3)win、linux源码还有很多可以进一步一致性优化的空间

        4)业务逻辑可以进行调整,例如TCP/Socket服务接口提供更多功能

        5)性能优化,读取间隔、读取长度、处理数据的先后次序等

        6)...

        如何优化呢,且看本专栏的下一篇博文:

c/c++开发,无可避免的代码重构实战(基于前文TCP/Socket通信开发案例)_py_free的博客-CSDN博客

        温馨提示:本文采用socket通信设计在管理客户端上采用缓存容器管理,是支持少量客户端链接的情况,若有大量客户端链接情况,就建议采用其他方式,例如采用epoll:

利用epoll创建自己的后台服务,实现对各个客户端网络通信(含示例代码)_epollpri 带外数据_py_free的博客-CSDN博客

七、源码附录

       7.1  commom文件夹

        DataDef.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _DATA_DEF_H_
  5. #define _DATA_DEF_H_
  6. #include <string>
  7. #ifdef __linux__
  8. #include <string.h>
  9. #include <stdio.h>
  10. #endif
  11. struct NetArg
  12. {
  13. NetArg()
  14. : ipStr("127.0.0.1"),port(70001),type(1)
  15. {
  16. };
  17. std::string ipStr; //
  18. int port; //
  19. int type; //Protocol
  20. };
  21. #define RDCSIZE 1024
  22. struct RDClient
  23. {
  24. RDClient()
  25. : len(0)
  26. {
  27. memset(Buf,0,RDCSIZE);
  28. };
  29. RDClient(unsigned char *buf,int nlen)
  30. {
  31. memset(Buf,0,RDCSIZE);
  32. memcpy(Buf,buf,nlen);
  33. len = nlen;
  34. };
  35. ~RDClient()
  36. {
  37. };
  38. RDClient& operator=(const RDClient &rval)
  39. {
  40. if (this!=&rval)
  41. {
  42. memset(Buf,0,RDCSIZE);
  43. memcpy(Buf,rval.Buf,rval.len);
  44. len = rval.len;
  45. }
  46. return *this;
  47. };
  48. int add(unsigned char *buf,int nlen)
  49. {
  50. try{
  51. memset(Buf+len,0,RDCSIZE-len);
  52. memcpy(Buf+len,buf,nlen);
  53. len += nlen;
  54. }catch(...)
  55. {
  56. printf("RDClient::add error \r\n");
  57. }
  58. return len;
  59. };
  60. unsigned char Buf[RDCSIZE];
  61. int len;
  62. };
  63. typedef struct TCP_Data_Model
  64. {
  65. unsigned type;
  66. int len;
  67. float val;
  68. char desc[32];
  69. }TcpSocket,*PTcpSocket;
  70. struct TCP_Data
  71. {
  72. TCP_Data() : len(0)
  73. {
  74. memset(Buf, 0, 512);
  75. };
  76. TCP_Data(unsigned char *buf, int nlen)
  77. {
  78. memset(Buf, 0, 512);
  79. memcpy(Buf, buf, nlen);
  80. len = nlen;
  81. };
  82. TCP_Data& operator=(const TCP_Data &rval)
  83. {
  84. if (this != &rval) {
  85. memset(Buf, 0, 512);
  86. if (rval.len < 512) {
  87. memcpy(Buf, rval.Buf, rval.len);
  88. len = rval.len;
  89. }
  90. else {
  91. memcpy(Buf, rval.Buf, 512);
  92. len = 512;
  93. }
  94. }
  95. return *this;
  96. };
  97. unsigned char Buf[512];
  98. int len;
  99. };
  100. struct RDS
  101. {
  102. RDS() : data(),flag("")
  103. {
  104. };
  105. RDS(TCP_Data _data,std::string _f = "")
  106. : data(_data),flag(_f)
  107. {
  108. };
  109. RDS& operator=(const RDS &rval)
  110. {
  111. if (this == &rval) {
  112. return *this;
  113. }
  114. data = rval.data;
  115. flag = rval.flag;
  116. return *this;
  117. };
  118. TCP_Data data;
  119. std::string flag;
  120. };
  121. /
  122. struct WDS
  123. {
  124. WDS() : ipInt(0), data()
  125. {
  126. };
  127. WDS(unsigned long long _ipInt,TCP_Data _data)
  128. : ipInt(_ipInt), data(_data)
  129. {
  130. };
  131. WDS& operator=(const WDS &rval)
  132. {
  133. if (this == &rval) {
  134. return *this;
  135. }
  136. ipInt = rval.ipInt;
  137. data = rval.data;
  138. return *this;
  139. };
  140. unsigned long long ipInt;
  141. TCP_Data data;
  142. };
  143. #endif

        hashmap.h

  1. #pragma once
  2. #ifndef HASH_MAP_H
  3. #define HASH_MAP_H
  4. /*
  5. *自定义map容器的Key
  6. */
  7. #include <map>
  8. #include <iostream>
  9. //
  10. class KeyObj_Client
  11. {
  12. public:
  13. KeyObj_Client(std::string _ipStr, int _port);
  14. //
  15. static long cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2);
  16. std::string m_ipStr;
  17. int m_port;
  18. int linkFlag;
  19. long m_ip; //网络地址整型表述
  20. private:
  21. };
  22. inline bool operator==(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) == 0; }
  23. inline bool operator!=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) != 0; }
  24. inline bool operator>=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) >= 0; }
  25. inline bool operator<=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) <= 0; }
  26. inline bool operator>(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) > 0; }
  27. inline bool operator<(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) < 0; }
  28. #endif //HASH_MAP_H

        hashmap.cpp

  1. #include "hashmap.h"
  2. #ifdef WIN32
  3. #include <Winsock2.h>
  4. #else
  5. #include <netinet/in.h>
  6. #include <arpa/inet.h>
  7. #endif
  8. #include "myFunc.h"
  9. #include "Log.h"
  10. KeyObj_Client::KeyObj_Client(std::string _ipStr, int _port)
  11. : m_ipStr(_ipStr), m_port(_port), linkFlag(0)
  12. {
  13. m_ip = PFunc::ipToInt(_ipStr);
  14. };
  15. //
  16. long KeyObj_Client::cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2)
  17. {
  18. long diff = obj1.m_ip - obj2.m_ip;
  19. if (diff != 0) return diff;
  20. diff = obj1.m_port - obj2.m_port;
  21. if (diff != 0) return diff;
  22. return 0;
  23. };

        Log.h

  1. #ifndef CHANNELLOG_H
  2. #define CHANNELLOG_H
  3. #include <stdio.h>
  4. #include <stdarg.h>
  5. #include <string>
  6. //#ifndef LOGUDPPORT
  7. //#define LOGUDPPORT 989
  8. //#endif
  9. //#define MAXDATASIZE 10240
  10. //extern bool m_bDebug;
  11. #ifdef WIN32
  12. #include "win32Thread.h"
  13. #endif
  14. #ifdef linux
  15. #include "myThread.h"
  16. #include <string.h>
  17. #endif
  18. #include <queue>
  19. #include "Mutex.h"
  20. #ifndef WIN32
  21. #include <string>
  22. #include <sstream>
  23. namespace std
  24. {
  25. template < typename T > std::string to_string(const T & n)
  26. {
  27. std::ostringstream stm;
  28. stm << n;
  29. return stm.str();
  30. }
  31. }
  32. int GetPrivateProfileString(const char *AppName, const char *key, const char *defaultvalue, char *lpReturnedString, const int nSize, const char *lpFileName);
  33. int GetPrivateProfileInt(const char *AppName,const char *key,const int defaultvalue,const char *lpFileName);
  34. #endif
  35. enum eLogType
  36. {
  37. eHardError = 1,
  38. eSoftError = 2,
  39. eConfigError = 3,
  40. eParameterError = 4,
  41. eReadError = 5,
  42. eWriteError = 6,
  43. eControlMessage = 7,
  44. eResponseMessage = 8,
  45. eTipMessage = 9
  46. };
  47. struct MyLogStruct
  48. {
  49. MyLogStruct():type(0)
  50. {
  51. memset(szBuffer, 0, 1024);
  52. };
  53. int type;
  54. char szBuffer[1024];
  55. };
  56. class CLogger : public MyThread
  57. {
  58. public:
  59. CLogger();
  60. ~CLogger();
  61. int Run();
  62. public:
  63. void Log(const eLogType type, const char* lpszFormat, ...);
  64. static CLogger* createInstance( void );
  65. private:
  66. bool getFirstLog(MyLogStruct &it);
  67. void addLog(MyLogStruct it);
  68. private:
  69. static CLogger* m_pLogInstance;
  70. bool running;
  71. //for cache
  72. std::queue<MyLogStruct> mylogs;
  73. PYMutex m_Mutex_Log;
  74. int i_log_over;
  75. };
  76. #endif

        Log.cpp

  1. #include "Log.h"
  2. #include <time.h>
  3. #include <sys/timeb.h>
  4. #ifdef __linux__
  5. #ifndef sprintf_s
  6. #define sprintf_s sprintf
  7. #endif
  8. #include <string.h>
  9. #include <unistd.h>
  10. #include <stdlib.h>
  11. #endif
  12. CLogger* CLogger::m_pLogInstance = NULL;
  13. //extern void WriteLog( const int iMsgType, const std::string strMsg);
  14. extern char LOG_FILE_NAME[128]; //LOGFILE Name, should be defined
  15. extern std::string logdir;
  16. void WriteLog1( const int iMsgType, const char * strMsg)
  17. {
  18. try {
  19. time_t tt;
  20. struct timeb tm0;
  21. struct tm tm1;
  22. char buf[512];
  23. FILE * fpLog = NULL;
  24. //time(&tt);
  25. ftime(&tm0);
  26. tt = tm0.time;
  27. #ifdef WIN32
  28. localtime_s(&tm1, &tt);
  29. #else
  30. localtime_r(&tt, &tm1);
  31. #endif
  32. sprintf_s(buf, "%04d-%02d-%02d %02d:%02d:%02d.%03d "
  33. , tm1.tm_year + 1900, tm1.tm_mon + 1, tm1.tm_mday
  34. , tm1.tm_hour, tm1.tm_min, tm1.tm_sec, tm0.millitm);
  35. std::string strTime = buf;
  36. buf[10] = '\0';
  37. //file name
  38. std::string strPath = logdir;
  39. #ifdef WIN32
  40. strPath += "\\";
  41. #else
  42. strPath += "/";
  43. #endif
  44. strPath += buf;
  45. strPath += "_";
  46. //
  47. switch (iMsgType)
  48. {
  49. case eHardError:
  50. strTime += "[HardErrorIMsg] ";
  51. break;
  52. case eSoftError:
  53. strTime += "[SoftErrorIMsg] ";
  54. break;
  55. case eConfigError:
  56. strTime += "[ConfErrorIMsg] ";
  57. break;
  58. case eParameterError:
  59. strTime += "[ParamErrorMsg] ";
  60. break;
  61. case eReadError:
  62. strTime += "[ReadErrorIMsg] ";
  63. break;
  64. case eWriteError:
  65. strTime += "[WriteErrorMsg] ";
  66. break;
  67. case eControlMessage:
  68. strTime += "[ControlExeMsg] ";
  69. //strPath += "chain_";
  70. break;
  71. case eResponseMessage:
  72. strTime += "[ResponseUpMsg] ";
  73. //strPath += "chain_";
  74. break;
  75. case eTipMessage:
  76. strTime += "[NoticeTipIMsg] ";
  77. break;
  78. default:
  79. strTime += "[PromptUnNoMsg] ";
  80. break;
  81. }
  82. strPath += LOG_FILE_NAME;
  83. //open
  84. #ifdef WIN32
  85. fopen_s(&fpLog, strPath.c_str(), "a+");
  86. #else
  87. fpLog = fopen(strPath.c_str(), "a+");
  88. #endif
  89. if (NULL != fpLog)
  90. {
  91. fseek(fpLog, 0, SEEK_END);
  92. fwrite(strTime.c_str(), strTime.length(), 1, fpLog);
  93. fwrite(strMsg, strlen(strMsg), 1, fpLog);
  94. fwrite("\n", 1, 1, fpLog);
  95. fclose(fpLog);
  96. }
  97. }
  98. catch (...) {
  99. printf("write log[%d]{%s}error\n", iMsgType, strMsg);
  100. }
  101. }
  102. #ifdef WIN32
  103. #include <windows.h>
  104. #include <atlcomtime.h>
  105. extern char SVCNAME[128];
  106. void WriteLog( const int iMsgType, const char * strMsg)
  107. {
  108. try {
  109. if (iMsgType < int(eConfigError))
  110. {
  111. HANDLE hEventSource;
  112. LPCTSTR lpszStrings[2];
  113. hEventSource = RegisterEventSource(NULL, SVCNAME);
  114. if (NULL != hEventSource)
  115. {
  116. lpszStrings[0] = SVCNAME;
  117. lpszStrings[1] = strMsg;
  118. ReportEvent(hEventSource, // event log handle
  119. EVENTLOG_ERROR_TYPE, // event type
  120. 0, // event category
  121. 0, // event identifier
  122. NULL, // no security identifier
  123. 2, // size of lpszStrings array
  124. 0, // no binary data
  125. lpszStrings, // array of strings
  126. NULL); // no binary data
  127. DeregisterEventSource(hEventSource);
  128. }
  129. }
  130. }
  131. catch (...) {
  132. printf("WriteLog[%d]{%s}for Evnet error\n", iMsgType, strMsg);
  133. }
  134. WriteLog1(iMsgType, strMsg);
  135. }
  136. #else
  137. void WriteLog( const int iMsgType, const char * strMsg)
  138. {
  139. WriteLog1(iMsgType, strMsg);
  140. }
  141. #endif
  142. CLogger::CLogger()
  143. : running(true)
  144. , i_log_over(0)
  145. {
  146. char buf[256] = {0};
  147. sprintf_s(buf,"mkdir %s",logdir.c_str());
  148. system(buf);
  149. this->start();
  150. };
  151. CLogger::~CLogger()
  152. {
  153. running = false;
  154. };
  155. CLogger* CLogger::createInstance( void )
  156. {
  157. if (m_pLogInstance == NULL)
  158. {
  159. m_pLogInstance = new CLogger();
  160. return m_pLogInstance;
  161. }
  162. else
  163. return m_pLogInstance;
  164. };
  165. int CLogger::Run()
  166. {
  167. MyLogStruct log_;
  168. while (running) {
  169. if (getFirstLog(log_))
  170. {
  171. WriteLog(log_.type, log_.szBuffer);
  172. #ifndef WIN32
  173. printf("Log::[%d]-->%s\n", getpid(), log_.szBuffer);
  174. #else
  175. printf("Log::-->%s\n", log_.szBuffer);
  176. #endif
  177. }
  178. #ifdef WIN32
  179. Sleep(10);
  180. #else
  181. usleep(10000);
  182. #endif
  183. }
  184. return 0;
  185. };
  186. void CLogger::Log(const eLogType type, const char* lpszFormat, ...)
  187. {
  188. va_list args;
  189. //char szBuffer[2048] = {0};
  190. MyLogStruct log_;
  191. log_.type = static_cast<int>(type);
  192. va_start(args, lpszFormat);
  193. #ifdef WIN32
  194. vsnprintf_s(log_.szBuffer,sizeof(log_.szBuffer), lpszFormat, args);
  195. #else
  196. vsnprintf(log_.szBuffer, sizeof(log_.szBuffer), lpszFormat, args);
  197. #endif
  198. va_end(args);
  199. addLog(log_);
  200. }
  201. bool CLogger::getFirstLog(MyLogStruct &it) {
  202. bool ret = false;
  203. m_Mutex_Log.Lock();
  204. if (!mylogs.empty()) {
  205. it = mylogs.front();
  206. mylogs.pop();
  207. ret = true;
  208. }
  209. m_Mutex_Log.Unlock();
  210. return ret;
  211. }
  212. void CLogger::addLog(MyLogStruct it) {
  213. m_Mutex_Log.Lock();
  214. if (mylogs.size() > 100) {
  215. i_log_over++;
  216. mylogs.pop();
  217. }
  218. mylogs.push(it);
  219. m_Mutex_Log.Unlock();
  220. if (i_log_over >= 100) {//每溢出100次,报告一次
  221. MyLogStruct log_;
  222. log_.type = static_cast<int>(eTipMessage);
  223. sprintf(log_.szBuffer,"the size of mylogs queue is up to limmit size[100],[%s %s %d]."
  224. , __FILE__, __FUNCTION__, __LINE__);
  225. m_Mutex_Log.Lock();
  226. mylogs.push(log_);
  227. m_Mutex_Log.Unlock();
  228. i_log_over = 0;
  229. }
  230. }
  231. #ifndef WIN32
  232. #include <sys/vfs.h>
  233. #include <mntent.h>
  234. //int GetPrivateProfileInt(const char * lpAppName, const char *lpKeyName, int nDefault, const char * lpFileName)
  235. //{
  236. // return nDefault;
  237. //}
  238. //int GetPrivateProfileString(const char * lpAppName, const char * lpKeyName, const char * szDefault, char * szOut, const int nLen ,const char * lpFileName)
  239. //{
  240. // strncpy(szOut, szDefault, nLen);
  241. // return 0;
  242. //}
  243. int GetPrivateProfileString(const char *AppName, const char *key, const char *defaultvalue, char *lpReturnedString, const int nSize, const char *lpFileName)
  244. {
  245. bool bFindAppName = false;
  246. char tem[1000];
  247. char *ptr;
  248. FILE *fp;
  249. char name[150];
  250. snprintf(name, sizeof(name), "[%s]", AppName);
  251. strncpy(lpReturnedString,defaultvalue, nSize);
  252. if( (lpFileName == NULL) || ((fp=fopen(lpFileName,"rt"))==NULL) )// || fgets(tem,len,fp) == NULL)
  253. {
  254. return strlen(lpReturnedString);
  255. }
  256. while(fgets(tem,sizeof(tem),fp))
  257. {
  258. if(tem[0] == '[')
  259. {
  260. bFindAppName = false;
  261. if(strstr(tem,name)==tem)
  262. bFindAppName = true;
  263. }
  264. else
  265. {
  266. if(bFindAppName == true)
  267. {
  268. unsigned int n =strcspn(tem,"=");
  269. if(static_cast<unsigned int>(strlen(key)) == n
  270. && static_cast<int>(strncasecmp(tem,key,n)) == 0)
  271. {
  272. strncpy(lpReturnedString, tem+n+1,nSize);
  273. if( (ptr = strchr(lpReturnedString, '\n')) != NULL)
  274. *ptr = '\0';
  275. if( (ptr = strchr(lpReturnedString, '\r')) != NULL)
  276. *ptr = '\0';
  277. break;
  278. }
  279. }
  280. }
  281. }
  282. fclose(fp);
  283. return strlen(lpReturnedString);
  284. }
  285. int GetPrivateProfileInt(const char *AppName,const char *key,const int defaultvalue,const char *lpFileName)
  286. {
  287. char str[20];
  288. int nRet = defaultvalue;
  289. if(GetPrivateProfileString(AppName, key, "", str, sizeof(str), lpFileName) > 0)
  290. {
  291. nRet = atoi(str);
  292. }
  293. return nRet;
  294. }
  295. #endif

        Mutex.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef PYMUTEX_H
  5. #define PYMUTEX_H
  6. #ifdef WIN32
  7. //#include <windows.h>
  8. #else
  9. #include <pthread.h>
  10. #endif
  11. typedef void *HANDLE;
  12. class IMutex
  13. {
  14. public:
  15. virtual ~IMutex() {}
  16. virtual void Lock() const = 0;
  17. virtual bool TryLock() const = 0;
  18. virtual void Unlock() const = 0;
  19. };
  20. class PYMutex : public IMutex
  21. {
  22. public:
  23. PYMutex();
  24. ~PYMutex();
  25. virtual void Lock() const;
  26. virtual bool TryLock() const;
  27. virtual void Unlock() const;
  28. private:
  29. #ifdef _WIN32
  30. HANDLE m_mutex;
  31. #else
  32. mutable pthread_mutex_t m_mutex;
  33. #endif
  34. };
  35. #endif //PYMUTEX_H

        Mutex.cpp

  1. #include "Mutex.h"
  2. #ifdef WIN32
  3. #include <windows.h>
  4. #endif
  5. //#include <iostream>
  6. #include <stdio.h>
  7. PYMutex::PYMutex()
  8. {
  9. #ifdef _WIN32
  10. m_mutex = ::CreateMutex(NULL, FALSE, NULL);
  11. #else
  12. pthread_mutex_init(&m_mutex, NULL);
  13. #endif
  14. }
  15. PYMutex::~PYMutex()
  16. {
  17. #ifdef _WIN32
  18. ::CloseHandle(m_mutex);
  19. #else
  20. pthread_mutex_destroy(&m_mutex);
  21. #endif
  22. }
  23. void PYMutex::Lock() const
  24. {
  25. #ifdef _WIN32
  26. //DWORD d = WaitForSingleObject(m_mutex, INFINITE);
  27. WaitForSingleObject(m_mutex, INFINITE);
  28. /// \todo check 'd' for result
  29. #else
  30. pthread_mutex_lock(&m_mutex);
  31. #endif
  32. }
  33. bool PYMutex::TryLock() const
  34. {
  35. #ifdef _WIN32
  36. DWORD dwWaitResult = WaitForSingleObject(m_mutex, 0);
  37. if (dwWaitResult != WAIT_OBJECT_0 && dwWaitResult != WAIT_TIMEOUT) {
  38. printf("thread WARNING: bad result from try-locking mutex\n");
  39. }
  40. return (dwWaitResult == WAIT_OBJECT_0) ? true : false;
  41. #else
  42. return (0==pthread_mutex_trylock(&m_mutex))?true:false;
  43. #endif
  44. };
  45. void PYMutex::Unlock() const
  46. {
  47. #ifdef _WIN32
  48. ::ReleaseMutex(m_mutex);
  49. #else
  50. pthread_mutex_unlock(&m_mutex);
  51. #endif
  52. }

        myFunc.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MYFUNC_H_
  5. #define _MYFUNC_H_
  6. /*
  7. public function
  8. */
  9. #include <string>
  10. #include <vector>
  11. namespace PFunc
  12. {
  13. //字符串分割
  14. bool string_divide(std::vector<std::string> &_strlist, const std::string src, const std::string div);
  15. //hex to ascii
  16. int string2bytes(const char* pSrc, unsigned char* pDst, int nSrcLength);
  17. //ascii to hex
  18. int bytes2string(const unsigned char* pSrc, char* pDst, int nSrcLength);
  19. //frame code
  20. int code(const unsigned char *buff, const int len, unsigned char *outbuf);
  21. //frame uncode
  22. int uncode(const unsigned char *buff, int len, unsigned char *outbuf);
  23. //
  24. bool ipCheck(std::string ip_str);
  25. long ipToInt(std::string ip_str);
  26. std::string intToIp(long ip_int);
  27. //crc
  28. unsigned int crc16(unsigned char *ptr, unsigned int len);
  29. //年-月-日 时:分:秒
  30. std::string getCurrentTime();
  31. };
  32. #endif

        myFunc.cpp

  1. #include "myFunc.h"
  2. #include <stdlib.h>
  3. #ifdef WIN32
  4. #include <Winsock2.h>
  5. #else
  6. #include <netinet/in.h>
  7. #include <arpa/inet.h>
  8. #include <string.h>
  9. #endif
  10. #include <time.h>
  11. #include "Log.h"
  12. bool PFunc::string_divide(std::vector<std::string> &_strlist, const std::string src, const std::string div)
  13. {
  14. std::string _src = src;
  15. std::string::size_type _pos = _src.find(div);
  16. while (std::string::npos != _pos)
  17. {
  18. std::string _buf = "";
  19. _buf = _src.substr(0, _pos);
  20. _strlist.push_back(_buf);
  21. _src = _src.erase(0, _pos + div.size());
  22. _pos = _src.find(div.c_str());
  23. }
  24. if (!_src.empty())
  25. {
  26. _strlist.push_back(_src);
  27. }
  28. return true;
  29. };
  30. int PFunc::string2bytes(const char* pSrc, unsigned char* pDst, int nSrcLength)
  31. {
  32. for (int i = 0; i < nSrcLength; i += 2)
  33. {
  34. if (*pSrc >= '0' && *pSrc <= '9')
  35. *pDst = (*pSrc - '0') << 4;
  36. else
  37. *pDst = (*pSrc - 'A' + 10) << 4;
  38. pSrc++;
  39. // 输出低4位
  40. if (*pSrc >= '0' && *pSrc <= '9')
  41. *pDst |= *pSrc - '0';
  42. else
  43. *pDst |= *pSrc - 'A' + 10;
  44. pSrc++;
  45. pDst++;
  46. }
  47. return (nSrcLength / 2);
  48. };
  49. int PFunc::bytes2string(const unsigned char* pSrc, char* pDst, int nSrcLength)
  50. {
  51. const char tab[] = "0123456789ABCDEF"; // 0x0-0xf的字符查找表
  52. for (int i = 0; i < nSrcLength; i++)
  53. {
  54. *pDst++ = tab[*pSrc >> 4];
  55. *pDst++ = tab[*pSrc & 0x0f];
  56. pSrc++;
  57. }
  58. *pDst = '\0';
  59. return nSrcLength * 2;
  60. };
  61. int PFunc::code(const unsigned char *buff, const int len, unsigned char *outbuf)
  62. {
  63. char ch = 0;
  64. int nLen = 0;
  65. unsigned char * buf = (unsigned char *)buff;
  66. *outbuf++ = 0XF1;//头字节
  67. nLen+=1;
  68. for (int i = 0; i < len; i++, nLen++)
  69. {
  70. ch = buf[i];
  71. if ((buf[i] | 0x0f) == 0xff && i > 0 && i < (len - 1))
  72. {
  73. *outbuf++ = 0xf0 & buf[i];
  74. *outbuf++ = 0x0f & buf[i];
  75. nLen += 1;
  76. }
  77. else {
  78. *outbuf++ = ch;
  79. }
  80. }
  81. *outbuf++ = 0XFF;//末字节
  82. nLen+=1;
  83. buf = NULL;
  84. return nLen;
  85. }
  86. int PFunc::uncode(const unsigned char *buff, int len, unsigned char *outbuf)
  87. {
  88. char ch = 0;
  89. int nLen = 0;
  90. unsigned char * buf = (unsigned char *)buff;
  91. //头、末尾字节判断
  92. if(len<=2&&0XF1!=buf[0]&&0XFF!=buf[len-1]){
  93. printf("uncode func, start bit or end bit Error!\r\n");
  94. return 0;
  95. }
  96. for (int i = 1; i < (len-1); i++, nLen++)
  97. {
  98. ch = buf[i];
  99. if (buf[i] == 0xf0)
  100. {
  101. #ifdef _DEBUG
  102. if (i > len - 2)
  103. printf("Error!\r\n");
  104. if (buf[i + 1] > 0x0f)
  105. printf("Error!\r\n");
  106. #endif
  107. ch = 0xf0 | buf[++i];
  108. }
  109. *outbuf++ = ch;
  110. }
  111. buf = NULL;
  112. return nLen;
  113. }
  114. bool PFunc::ipCheck(std::string ip_str)
  115. {
  116. if (INADDR_NONE != inet_addr(ip_str.c_str()))
  117. {
  118. return true;
  119. }
  120. return false;
  121. };
  122. long PFunc::ipToInt(std::string ip_str)
  123. {
  124. if (INADDR_NONE != inet_addr(ip_str.c_str()))
  125. {
  126. return ntohl(inet_addr(ip_str.c_str()));
  127. }
  128. else {
  129. CLogger::createInstance()->Log(eConfigError
  130. , "ip format [%s] error: %s %s %d,please check the file format and code!"
  131. , ip_str.c_str(), __FILE__, __FUNCTION__, __LINE__);
  132. return 0;
  133. }
  134. };
  135. std::string PFunc::intToIp(long ip_int)
  136. {
  137. char ip[64] = { 0 };
  138. #ifdef WIN32
  139. strcpy_s(ip, inet_ntoa(*(in_addr*)&ip_int));
  140. #else
  141. strcpy(ip, inet_ntoa(*(in_addr*)&ip_int));
  142. #endif
  143. return std::string(ip);
  144. };
  145. //CRC校验
  146. unsigned int PFunc::crc16(unsigned char *ptr, unsigned int len)
  147. {
  148. unsigned int wcrc = 0XFFFF;//预置16位crc寄存器,初值全部为1
  149. unsigned char temp;//定义中间变量
  150. unsigned int i = 0, j = 0;//定义计数
  151. for (i = 0; i < len; i++)//循环计算每个数据
  152. {
  153. temp = *ptr & 0X00FF;//将八位数据与crc寄存器亦或
  154. ptr++;//指针地址增加,指向下个数据
  155. wcrc ^= temp;//将数据存入crc寄存器
  156. for (j = 0; j < 8; j++)//循环计算数据的
  157. {
  158. if (wcrc & 0X0001)//判断右移出的是不是1,如果是1则与多项式进行异或。
  159. {
  160. wcrc >>= 1;//先将数据右移一位
  161. wcrc ^= 0XA001;//与上面的多项式进行异或
  162. }
  163. else//如果不是1,则直接移出
  164. {
  165. wcrc >>= 1;//直接移出
  166. }
  167. }
  168. }
  169. temp = wcrc;//crc的值
  170. return wcrc;
  171. };
  172. std::string PFunc::getCurrentTime()
  173. {
  174. time_t _t = time(NULL);
  175. struct tm _tt;
  176. #ifdef WIN32
  177. localtime_s(&_tt, &_t);
  178. #else
  179. localtime_r(&_t, &_tt);
  180. #endif
  181. _tt.tm_year += 1900;
  182. _tt.tm_mon += 1;
  183. char buf[32] = { 0 };
  184. sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d"
  185. , _tt.tm_year, _tt.tm_mon, _tt.tm_mday, _tt.tm_hour, _tt.tm_min, _tt.tm_sec);
  186. return std::string(buf);
  187. }

        myThread.h

  1. #ifndef _MYTHREAD_H
  2. #define _MYTHREAD_H
  3. #include <pthread.h>
  4. #include <unistd.h>
  5. class MyThread
  6. {
  7. private:
  8. //current thread ID
  9. pthread_t tid;
  10. //thread status
  11. int threadStatus;
  12. //get manner pointer of execution
  13. static void* run0(void* pVoid);
  14. //manner of execution inside
  15. void* run1();
  16. public:
  17. //threadStatus-new create
  18. static const int THREAD_STATUS_NEW = 0;
  19. //threadStatus-running
  20. static const int THREAD_STATUS_RUNNING = 1;
  21. //threadStatus-end
  22. static const int THREAD_STATUS_EXIT = -1;
  23. // constructed function
  24. MyThread();
  25. ~MyThread();
  26. //the entity for thread running
  27. virtual int Run()=0;
  28. //start thread
  29. bool start();
  30. //gte thread ID
  31. pthread_t getThreadID();
  32. //get thread status
  33. int getState();
  34. //wait for thread end
  35. void join();
  36. //wait for thread end in limit time
  37. void join(unsigned long millisTime);
  38. };
  39. #endif /* _MYTHREAD_H */

        myThread.cpp

  1. #include "myThread.h"
  2. #include <stdio.h>
  3. void* MyThread::run0(void* pVoid)
  4. {
  5. MyThread* p = (MyThread*) pVoid;
  6. p->run1();
  7. return p;
  8. }
  9. void* MyThread::run1()
  10. {
  11. threadStatus = THREAD_STATUS_RUNNING;
  12. tid = pthread_self();
  13. Run();
  14. threadStatus = THREAD_STATUS_EXIT;
  15. tid = 0;
  16. pthread_exit(NULL);
  17. }
  18. MyThread::MyThread()
  19. {
  20. tid = 0;
  21. threadStatus = THREAD_STATUS_NEW;
  22. }
  23. MyThread::~MyThread()
  24. {
  25. join(10);
  26. }
  27. int MyThread::Run()
  28. {
  29. while(true){
  30. printf("thread is running!\n");
  31. sleep(100);
  32. }
  33. return 0;
  34. }
  35. bool MyThread::start()
  36. {
  37. return pthread_create(&tid, NULL, run0, this) == 0;
  38. }
  39. pthread_t MyThread::getThreadID()
  40. {
  41. return tid;
  42. }
  43. int MyThread::getState()
  44. {
  45. return threadStatus;
  46. }
  47. void MyThread::join()
  48. {
  49. if (tid > 0)
  50. {
  51. pthread_join(tid, NULL);
  52. }
  53. }
  54. void MyThread::join(unsigned long millisTime)
  55. {
  56. if (tid == 0)
  57. {
  58. return;
  59. }
  60. if (millisTime == 0)
  61. {
  62. join();
  63. }else
  64. {
  65. unsigned long k = 0;
  66. while (threadStatus != THREAD_STATUS_EXIT && k <= millisTime)
  67. {
  68. usleep(100);
  69. k++;
  70. }
  71. }
  72. }

        queuedata.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _QUEUE_DATA_H_
  5. #define _QUEUE_DATA_H_
  6. #include <deque>
  7. #include <stdio.h>
  8. #include <string.h>
  9. #include "Mutex.h"
  10. #include "Log.h"
  11. template <class T>
  12. class QueueData
  13. {
  14. public:
  15. QueueData(std::string desc = "thread_queue");
  16. ~QueueData();
  17. //
  18. /**
  19. * 获取队列大小
  20. * @return {int } 队列大小
  21. */
  22. int size();
  23. /**
  24. * 判定队列是否为空
  25. * @return {bool } 是否为空队列
  26. */
  27. bool isEmpty();
  28. /**
  29. * 获取队列头元素
  30. * @param it {T&} 头元素
  31. * @return {bool } 是否成功
  32. */
  33. bool getFirst(T &it);
  34. /**
  35. * 删除元素
  36. * @return {bool } 是否成功
  37. */
  38. bool removeFirst();
  39. /**
  40. * 获取队列头元素,并从队列终删除
  41. * @param it {T&} 头元素
  42. * @return {bool } 是否成功
  43. */
  44. bool pop(T &it);
  45. /**
  46. * 从队列头开始逐步获取多个元素,并剔除
  47. * @param its {queue<T>&} 获取到的元素集
  48. * @param sizel {int} 一次获取多少个
  49. * @return {bool } 至少获取一个元素以上则成功
  50. */
  51. bool getList(std::queue<T> &its,unsigned int sizel=5);
  52. /**
  53. * 从队列尾部添加元素
  54. * @param it {T} 被添加元素
  55. * @return {void } 无返回
  56. */
  57. void add(T it);
  58. /**
  59. * 从队列头部添加元素
  60. * @param it {T} 被添加元素
  61. * @return {void } 无返回
  62. */
  63. void add_front(T it);
  64. /**
  65. * 清空元素
  66. * @return {void }
  67. */
  68. void clear();
  69. private:
  70. void init();
  71. QueueData& operator=(const QueueData&) {return this;};
  72. protected:
  73. std::string queue_desc;
  74. private:
  75. /点集转发
  76. //协议解析结果缓存
  77. std::deque<T> datacache_queue; //队列容器
  78. PYMutex m_Mutex; //线程锁,或者如果更彻底采用acl库,采用acl::thread_mutex替代
  79. //
  80. static unsigned int QSize; //队列大小约束,超出是会从队列头剔除旧数据腾出空位在对末添加数据
  81. //
  82. int queue_overS; //队列溢出次数计数
  83. };
  84. template <class T>
  85. unsigned int QueueData<T>::QSize = 100;
  86. template <class T>
  87. QueueData<T>::QueueData(std::string desc)
  88. : queue_desc(desc)
  89. {
  90. init();
  91. };
  92. template <class T>
  93. void QueueData<T>::init()
  94. {
  95. queue_overS = 0;
  96. };
  97. template <class T>
  98. QueueData<T>::~QueueData()
  99. {
  100. }
  101. //
  102. template <class T>
  103. int QueueData<T>::size()
  104. {
  105. int ret = 0;
  106. m_Mutex.Lock();
  107. ret = static_cast<int>(datacache_queue.size());
  108. m_Mutex.Unlock();
  109. return ret;
  110. }
  111. template <class T>
  112. bool QueueData<T>::isEmpty()
  113. {
  114. bool ret = false;
  115. m_Mutex.Lock();
  116. ret = datacache_queue.empty();
  117. m_Mutex.Unlock();
  118. return ret;
  119. }
  120. template <class T>
  121. bool QueueData<T>::getFirst(T &it)
  122. {
  123. bool ret = false;
  124. m_Mutex.Lock();
  125. if (!datacache_queue.empty())
  126. {
  127. it = datacache_queue.front();
  128. ret = true;
  129. }
  130. m_Mutex.Unlock();
  131. return ret;
  132. }
  133. template <class T>
  134. bool QueueData<T>::removeFirst()
  135. {
  136. bool ret = false;
  137. m_Mutex.Lock();
  138. if (!datacache_queue.empty())
  139. {
  140. datacache_queue.pop_front();
  141. ret = true;
  142. }
  143. m_Mutex.Unlock();
  144. return ret;
  145. }
  146. template <class T>
  147. bool QueueData<T>::pop(T &it)
  148. {
  149. bool ret = false;
  150. m_Mutex.Lock();
  151. if (!datacache_queue.empty())
  152. {
  153. it = datacache_queue.front();
  154. datacache_queue.pop_front();
  155. ret = true;
  156. }
  157. m_Mutex.Unlock();
  158. return ret;
  159. };
  160. template <class T>
  161. bool QueueData<T>::getList(std::queue<T> &its,unsigned int sizel)
  162. {
  163. m_Mutex.Lock();
  164. while (!datacache_queue.empty())
  165. {
  166. its.push(datacache_queue.front());
  167. datacache_queue.pop_front();
  168. if (its.size() >= sizel)
  169. {
  170. break;
  171. }
  172. }
  173. m_Mutex.Unlock();
  174. return !its.empty();
  175. };
  176. template <class T>
  177. void QueueData<T>::add(T it)
  178. {
  179. m_Mutex.Lock();
  180. if (datacache_queue.size() > QSize)
  181. {
  182. queue_overS++;
  183. datacache_queue.pop_front();
  184. }
  185. datacache_queue.push_back(it);
  186. m_Mutex.Unlock();
  187. if (queue_overS >= 10)
  188. {
  189. //每溢出10次,报告一次
  190. CLogger::createInstance()->Log(eSoftError
  191. ,"add item to queue %s at end,but the size of QueueData is up to limmit size: %d.\n"
  192. , queue_desc.c_str(), QSize);
  193. queue_overS = 0;
  194. }
  195. }
  196. template <class T>
  197. void QueueData<T>::add_front(T it)
  198. {
  199. m_Mutex.Lock();
  200. if (datacache_queue.size() > QSize)
  201. {
  202. queue_overS++;
  203. datacache_queue.pop_front();
  204. }
  205. datacache_queue.push_front(it);
  206. m_Mutex.Unlock();
  207. if (queue_overS >= 10)
  208. {
  209. //每溢出10次,报告一次
  210. CLogger::createInstance()->Log(eSoftError,
  211. "add item to queue %s at first,but the size of QueueData is up to limmit size: %d.\n"
  212. , queue_desc.c_str(), QSize);
  213. queue_overS = 0;
  214. }
  215. }
  216. template <class T>
  217. void QueueData<T>::clear()
  218. {
  219. m_Mutex.Lock();
  220. datacache_queue.clear();
  221. m_Mutex.Unlock();
  222. queue_overS = 0;
  223. }
  224. #endif //_QUEUE_DATA_H_

        win32Thread.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef WIN32THREAD_H
  5. #define WIN32THREAD_H
  6. #include <process.h>
  7. #include <iostream>
  8. typedef void *HANDLE;
  9. class MyThread
  10. {
  11. public:
  12. MyThread();
  13. ~MyThread();
  14. void start();
  15. virtual int Run();
  16. HANDLE getThread();
  17. private:
  18. HANDLE hThread;
  19. static void agent(void *p);
  20. };
  21. #endif

        win32Thread.cpp

  1. #include "win32Thread.h"
  2. #include <windows.h>
  3. MyThread::MyThread()
  4. {
  5. }
  6. MyThread::~MyThread()
  7. {
  8. WaitForSingleObject(hThread, INFINITE);
  9. }
  10. void MyThread::start()
  11. {
  12. hThread =(HANDLE)_beginthread(agent, 0, (void *)this);
  13. }
  14. int MyThread::Run()
  15. {
  16. printf("Base Thread\n");
  17. return 0;
  18. }
  19. void MyThread::agent(void *p)
  20. {
  21. MyThread *agt = (MyThread *)p;
  22. agt->Run();
  23. }
  24. HANDLE MyThread::getThread()
  25. {
  26. return hThread;
  27. }

       7.2  srv_IO目录

        MySocket.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MySocket_H_
  5. #define _MySocket_H_
  6. /*
  7. *建立socket服务端
  8. */
  9. #include "DataDef.h"
  10. class MySocketPrivate;
  11. class MySocketSrv;
  12. class MySocketRD;
  13. class MySocketWD;
  14. class MySocket
  15. {
  16. public:
  17. MySocket(unsigned int port,int netType_=1);
  18. ~MySocket(void);
  19. public:
  20. int Read(){ return -1; };
  21. int Write(){ return -1; };
  22. //
  23. //int Read(char* buf, int size);
  24. int Write(const char* buf, int size);
  25. private:
  26. MySocketPrivate *my_PrivateData;
  27. MySocketSrv *my_SocketSrv;
  28. MySocketRD *m_MySocketRD;
  29. MySocketWD *m_MySocketWD;
  30. };
  31. #endif //_MYSOCKET_H_

        MySocket.cpp

  1. #include "MySocket.h"
  2. #include "MySocketPrivate.h"
  3. #include "MySocketSrv.h"
  4. #include "MySocketRD.h"
  5. #include "MySocketWD.h"
  6. #include "Log.h"
  7. *MySocketExx*///
  8. MySocket::MySocket(unsigned int port,int netType_)
  9. {
  10. try {
  11. my_PrivateData = new MySocketPrivate(port);
  12. if (my_PrivateData->onConnect() > 0)
  13. {
  14. my_SocketSrv = new MySocketSrv();
  15. my_SocketSrv->setPDataPtr(my_PrivateData);
  16. my_SocketSrv->start();
  17. m_MySocketRD = new MySocketRD(my_PrivateData,netType_);
  18. m_MySocketRD->start();
  19. m_MySocketWD = new MySocketWD( my_PrivateData,netType_);
  20. m_MySocketWD->start();
  21. }
  22. else {
  23. my_SocketSrv = NULL;
  24. m_MySocketRD = NULL;
  25. m_MySocketWD = NULL;
  26. CLogger::createInstance()->Log(eSoftError,
  27. "listen port(%u) error, [%s %s %d]"
  28. , port
  29. , __FILE__, __FUNCTION__, __LINE__);
  30. }
  31. }
  32. catch (...) {
  33. delete my_SocketSrv;
  34. my_SocketSrv = NULL;
  35. delete m_MySocketRD;
  36. m_MySocketRD = NULL;
  37. delete m_MySocketWD;
  38. m_MySocketWD = NULL;
  39. delete my_PrivateData;
  40. my_PrivateData = NULL;
  41. CLogger::createInstance()->Log(eSoftError,
  42. "MySocketGx init error, [%s %s %d]"
  43. , __FILE__, __FUNCTION__, __LINE__);
  44. }
  45. };
  46. MySocket::~MySocket(void)
  47. {
  48. if (NULL != my_SocketSrv)
  49. {
  50. delete my_SocketSrv;
  51. my_SocketSrv = NULL;
  52. }
  53. if (NULL != m_MySocketRD)
  54. {
  55. delete m_MySocketRD;
  56. m_MySocketRD = NULL;
  57. }
  58. if (NULL != m_MySocketWD)
  59. {
  60. delete m_MySocketWD;
  61. m_MySocketWD = NULL;
  62. }
  63. if(NULL!=my_PrivateData)
  64. {
  65. delete my_PrivateData;
  66. my_PrivateData = NULL;
  67. }
  68. };
  69. int MySocket::Write(const char* buf, int size)
  70. {
  71. if (NULL != m_MySocketWD&&NULL!=buf)
  72. {
  73. return m_MySocketWD->AddData(buf, size);
  74. }
  75. else {
  76. return -1;
  77. }
  78. }

        MySocketPrivate.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MYSOCKETPRIVATE_H_
  5. #define _MYSOCKETPRIVATE_H_
  6. #include <map>
  7. #include <queue>
  8. #include <set>
  9. #ifdef WIN32
  10. #include "afxsock.h"
  11. #define MY_SOCKET SOCKET
  12. #define MY_SOCKET_NULL NULL
  13. #endif
  14. #ifdef __linux__
  15. #include <stdio.h>
  16. #include <stdlib.h>
  17. #include <string.h>
  18. #include <string>
  19. #include <sys/types.h>
  20. #include <sys/socket.h>
  21. #include <netinet/in.h>
  22. #define printf_s printf
  23. #define sprintf_s sprintf
  24. #define MY_SOCKET int
  25. #define MY_SOCKET_NULL (-1)
  26. #endif
  27. #include "Mutex.h"
  28. #include "hashmap.h"
  29. #include "DataDef.h"
  30. class MySocketPrivate
  31. {
  32. public:
  33. MySocketPrivate(unsigned int port)
  34. : m_Port(port)
  35. , m_OnListen(false)
  36. {
  37. m_SSocket = MY_SOCKET_NULL;
  38. m_CSockets.clear();
  39. #ifdef WIN32
  40. /*
  41. * This function should be called once in each secondary thread
  42. * before the first socket is created in the new thread.
  43. */
  44. SocketThreadInit();
  45. #endif
  46. };
  47. ~MySocketPrivate(){
  48. disConnect();
  49. };
  50. public:
  51. int onConnect();
  52. void disConnect();
  53. int Read(std::map<KeyObj_Client,RDClient> &bufs);
  54. int Write(const char* buf, int size);
  55. int Write(unsigned long long ipInt,const char* buf, int size);
  56. bool Accept();
  57. bool get_ipInt_list(std::set<long> &ipintlist); //获取在线端的整型IP
  58. #ifdef WIN32
  59. private:
  60. void SocketThreadInit();
  61. #endif
  62. private:
  63. void deleteSSocket(); //删除服务端
  64. void deleteCSocket(); //删除所有客户端
  65. void deleteCSocket(MY_SOCKET m_CSocket);//删除指定客户端
  66. private:
  67. MY_SOCKET m_SSocket; //服务端
  68. unsigned int m_Port; //端口变量
  69. bool m_OnListen; //用于标注侦听
  70. PYMutex m_MyMutex;
  71. std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端
  72. };
  73. #endif //

        MySocketPrivate.cpp

  1. #include "MySocketPrivate.h"
  2. #include "myFunc.h"
  3. #include "Log.h"
  4. #ifdef __linux__
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #include <string.h>
  8. #include <sys/types.h>
  9. #include <sys/socket.h>
  10. #include <netinet/in.h>
  11. #include <arpa/inet.h>
  12. #include <unistd.h>
  13. #include <fcntl.h>
  14. #include <errno.h>
  15. #include <signal.h>
  16. #define printf_s printf
  17. #define sprintf_s sprintf
  18. #pragma message("def printf_s from printf And def sprintf_s from sprintf")
  19. #endif
  20. #ifdef WIN32
  21. void MySocketPrivate::SocketThreadInit()
  22. {
  23. WORD wVersionRequested;
  24. WSADATA wsaData;
  25. int err;
  26. wVersionRequested = MAKEWORD(2, 2);
  27. err = WSAStartup(wVersionRequested, &wsaData);
  28. if (err != 0)
  29. {
  30. //printf("WSAStartup failed with error: %d\n", err);
  31. CLogger::createInstance()->Log(eSoftError,
  32. "WSAStartup failed with error: %d, [%s %s %d]"
  33. , err
  34. , __FILE__, __FUNCTION__, __LINE__);
  35. return;
  36. }
  37. if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
  38. {
  39. /* Tell the user that we could not find a usable */
  40. /* WinSock DLL. */
  41. //printf("Could not find a usable version of Winsock.dll\n");
  42. CLogger::createInstance()->Log(eSoftError,
  43. "Could not find a usable version of Winsock.dll: [%s %s %d]"
  44. , __FILE__, __FUNCTION__, __LINE__);
  45. WSACleanup();
  46. return;
  47. }
  48. else {
  49. printf("The Winsock 2.2 dll was found okay\n");
  50. }
  51. }
  52. #endif
  53. int MySocketPrivate::onConnect()
  54. {
  55. if (m_OnListen) //服务器Socket是否已经创建
  56. {
  57. //printf_s("it's has been Listten! \r\n");
  58. CLogger::createInstance()->Log(eTipMessage,
  59. "it's has been Listten, [%s %s %d]!"
  60. , __FILE__, __FUNCTION__, __LINE__);
  61. return 1;
  62. }
  63. else {
  64. #ifdef WIN32
  65. m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
  66. SOCKADDR_IN addrServ;
  67. addrServ.sin_family = AF_INET;
  68. addrServ.sin_port = htons(m_Port);
  69. addrServ.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  70. bind(m_SSocket, (SOCKADDR*)&addrServ, sizeof(SOCKADDR));
  71. /*
  72. *3.在send(),recv()过程中有时由于网络状况等原因,收发不能预期进行,可以设置收发时限:
  73. *int nNetTimeout = 1000; //1秒
  74. *发送时限
  75. *setsockopt( socket, SOL_SOCKET, SO_SNDTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
  76. *接收时限
  77. *setsockopt( socket, SOL_SOCKET, SO_RCVTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
  78. *4.在send()的时候,返回的是实际发送出去的字节(同步)或发送到socket缓冲区的字节(异步);系统默认的状态发送和接收一次为8688字节(约
  79. *为8.5K);在实际的过程中如果发送或是接收的数据量比较大,可以设置socket缓冲区,避免send(),recv()不断的循环收发:
  80. * 接收缓冲区
  81. *int nRecvBufLen = 32 * 1024; //设置为32K
  82. *setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( const char* )&nRecvBufLen, sizeof( int ) );
  83. *发送缓冲区
  84. *int nSendBufLen = 32*1024; //设置为32K
  85. *setsockopt( s, SOL_SOCKET, SO_SNDBUF, ( const char* )&nSendBufLen, sizeof( int ) );
  86. *5.在发送数据的时,不执行由系统缓冲区到socket缓冲区的拷贝,以提高程序的性能:
  87. *int nZero = 0;
  88. *setsockopt( socket, SOL_SOCKET, SO_SNDBUF, ( char * )&nZero, sizeof( nZero ) );
  89. *6.在接收数据时,不执行将socket缓冲区的内容拷贝到系统缓冲区:
  90. *int nZero = 0;
  91. *setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( char * )&nZero, sizeof( int ) );
  92. */
  93. //如果创建Socket失败则提示,成功则开始监听
  94. if (listen(m_SSocket, 20) == SOCKET_ERROR)
  95. {
  96. closesocket(m_SSocket);
  97. //printf_s("ServerSocket Create failed! error:%d \r\n",static_cast<int>(GetLastError()));
  98. CLogger::createInstance()->Log(eParameterError,
  99. "ServerSocket Create failed! error:%d, [%s %s %d]"
  100. , static_cast<int>(GetLastError())
  101. , __FILE__, __FUNCTION__, __LINE__);
  102. return -1;
  103. }
  104. else {
  105. CLogger::createInstance()->Log(eTipMessage, "on listen port(%d)", m_Port);
  106. m_OnListen = true;
  107. return 1;
  108. }
  109. #else
  110. m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
  111. if (MY_SOCKET_NULL == m_SSocket)
  112. {
  113. CLogger::createInstance()->Log(eSoftError,
  114. "socket create fail ![%s %s %d]!"
  115. , __FILE__, __FUNCTION__, __LINE__);
  116. return -1;
  117. }
  118. struct sockaddr_in s_add;
  119. bzero(&s_add, sizeof(struct sockaddr_in));
  120. s_add.sin_family = AF_INET;
  121. s_add.sin_addr.s_addr = htonl(INADDR_ANY);
  122. s_add.sin_port = htons(m_Port);
  123. if (-1 == bind(m_SSocket, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
  124. {
  125. CLogger::createInstance()->Log(eSoftError,
  126. "socket bind fail, %d ![%s %s %d]",m_Port
  127. , __FILE__, __FUNCTION__, __LINE__);
  128. return -1;
  129. }
  130. else {
  131. CLogger::createInstance()->Log(eTipMessage,
  132. "bind success, %d! [%s %s %d]!",m_Port
  133. , __FILE__, __FUNCTION__, __LINE__);
  134. }
  135. if (-1 == listen(m_SSocket, 5))
  136. {
  137. CLogger::createInstance()->Log(eSoftError,
  138. "listen %d fail ![%s %s %d]",m_Port
  139. , __FILE__, __FUNCTION__, __LINE__);
  140. return -1;
  141. }
  142. else {
  143. CLogger::createInstance()->Log(eTipMessage,
  144. "listen success, %d ! [%s %s %d]!",m_Port
  145. , __FILE__, __FUNCTION__, __LINE__);
  146. }
  147. m_OnListen = true;
  148. return 1;
  149. #endif
  150. }
  151. }
  152. bool MySocketPrivate::Accept()
  153. {
  154. bool bRet = true;
  155. if (m_OnListen)
  156. {
  157. #ifdef WIN32
  158. SOCKADDR_IN cliAddr;
  159. int length = sizeof(SOCKADDR);
  160. SOCKET cliSock = accept(m_SSocket, (SOCKADDR*)&cliAddr, &length);
  161. if (INVALID_SOCKET == cliSock)
  162. {
  163. closesocket(cliSock);
  164. //printf_s("Connect Accept Failed: %d! \r\n",static_cast<int>(GetLastError()));
  165. CLogger::createInstance()->Log(eSoftError,
  166. "Connect Accept Failed: %d! , [%s %s %d]"
  167. , static_cast<int>(GetLastError())
  168. , __FILE__, __FUNCTION__, __LINE__);
  169. bRet = false;
  170. }
  171. else {
  172. //cliAddr.sin_addr.S_un.S_addr;
  173. char _ipport[64] = { 0 };
  174. //sprintf_s(_ipport,"%s",(char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))));
  175. sprintf_s(_ipport, "%s:%d", (char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
  176. //std::string _linkInfo = _ipport;
  177. KeyObj_Client _linkInfo((char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
  178. int nNetTimeout = 100; //1秒
  179. //setsockopt(cliSock, SOL_SOCKET, SO_SNDTIMEO, (char *)&nNetTimeout, sizeof(int));
  180. setsockopt(cliSock, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
  181. m_MyMutex.Lock();
  182. m_CSockets[_linkInfo] = cliSock;//添加客户端
  183. m_MyMutex.Unlock();
  184. //printf_s("Connect Accept Success: %s \r\n", _ipport);
  185. CLogger::createInstance()->Log(eTipMessage,
  186. "Connect Accept Success: %s,[%s %s %d]"
  187. , _ipport
  188. , __FILE__, __FUNCTION__, __LINE__);
  189. }
  190. #else
  191. int sin_size = sizeof(struct sockaddr_in);
  192. struct sockaddr_in c_add;
  193. // printf("MySocketPrivate::Accept 1\n");
  194. int nfp = accept(m_SSocket, (struct sockaddr *)(&c_add), (socklen_t*)&sin_size);
  195. if (-1 == nfp)
  196. {
  197. //printf("accept fail !\r\n");
  198. CLogger::createInstance()->Log(eParameterError,
  199. "accept fail![%s %s %d]"
  200. , __FILE__, __FUNCTION__, __LINE__);
  201. bRet = false;
  202. }
  203. else {
  204. char _ipport[64] = { 0 };
  205. std::string _ipStr = inet_ntoa((*(in_addr*)&(c_add.sin_addr)));
  206. //std::string _ipStr = PFunc::intToIp(htonl(c_add.sin_addr.s_addr));
  207. int _port = static_cast<int>(htons(c_add.sin_port));
  208. sprintf(_ipport, "%s:%d", _ipStr.c_str(), _port);
  209. /*
  210. struct timeval timeout = {3,0};
  211. //setsockopt(nfp,SOL_SOCKET,SO_SNDTIMEO,(char *)&timeout,sizeof(struct timeval));
  212. setsockopt(nfp,SOL_SOCKET,SO_RCVTIMEO,(char *)&timeout,sizeof(struct timeval));
  213. KeyObj_Client _linkInfo((char*)inet_ntoa((*(in_addr*)&(c_Addr.sin_addr))), cliAddr.sin_port);
  214. */
  215. int x = fcntl(nfp, F_GETFL, 0);
  216. fcntl(nfp, F_SETFL, x | O_NONBLOCK);
  217. KeyObj_Client _linkInfo(_ipStr, _port);
  218. m_MyMutex.Lock();
  219. //nfps.push_back(nfp);
  220. //nfps[KeyObj_Client(_ipStr, _port)] = nfp;
  221. m_CSockets[_linkInfo] = nfp;
  222. m_MyMutex.Unlock();
  223. //printf("accept ok!\r\nServer start get connect from %s\r\n", _ipport);
  224. CLogger::createInstance()->Log(eTipMessage,
  225. "accept ok!\r\nServer start get connect from %s.[%s %s %d]"
  226. , _ipport
  227. , __FILE__, __FUNCTION__, __LINE__);
  228. }
  229. #endif
  230. }
  231. else {
  232. bRet = false;
  233. //printf_s("m_OnListen is false, please check Listen state, Accept error \r\n");
  234. CLogger::createInstance()->Log(eTipMessage,
  235. "m_OnListen is false, please check Listen state, Accept error,[%s %s %d]"
  236. , __FILE__, __FUNCTION__, __LINE__);
  237. }
  238. return bRet;
  239. };
  240. bool MySocketPrivate::get_ipInt_list(std::set<long> &ipintlist)
  241. {
  242. for (std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();it!= m_CSockets.end();it++)
  243. {
  244. ipintlist.insert(it->first.m_ip);
  245. }
  246. return !ipintlist.empty();
  247. }
  248. void MySocketPrivate::disConnect()
  249. {
  250. deleteCSocket();//删除客户端
  251. deleteSSocket();//删除服务端
  252. #ifdef WIN32
  253. WSACleanup();
  254. #endif
  255. CLogger::createInstance()->Log(eTipMessage,
  256. "socket disConnect success and exit: [%s %s %d]!"
  257. , __FILE__, __FUNCTION__, __LINE__);
  258. }
  259. void MySocketPrivate::deleteCSocket()
  260. {
  261. m_MyMutex.Lock();
  262. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  263. while (it != m_CSockets.end())
  264. {
  265. //删除链接
  266. deleteCSocket(it->second);
  267. #ifdef WIN32
  268. it = m_CSockets.erase(it);
  269. #else
  270. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  271. m_CSockets.erase(ittemp);
  272. #endif
  273. }
  274. m_MyMutex.Unlock();
  275. }
  276. void MySocketPrivate::deleteCSocket(MY_SOCKET m_CSocket)
  277. {
  278. try {
  279. if (MY_SOCKET_NULL != m_CSocket)
  280. {
  281. #ifdef WIN32
  282. closesocket(m_CSocket);
  283. #else
  284. close(m_CSocket);
  285. #endif
  286. m_CSocket = MY_SOCKET_NULL;
  287. }
  288. }
  289. catch (...) {
  290. CLogger::createInstance()->Log(eSoftError,
  291. "socket deleteCSocket exception and failed! [%s %s %d]!"
  292. , __FILE__, __FUNCTION__, __LINE__);
  293. }
  294. }
  295. void MySocketPrivate::deleteSSocket()
  296. {
  297. m_OnListen = false;
  298. try {
  299. if (MY_SOCKET_NULL != m_SSocket)
  300. {
  301. #ifdef WIN32
  302. closesocket(m_SSocket);
  303. #else
  304. close(m_SSocket);
  305. #endif
  306. m_SSocket = MY_SOCKET_NULL;
  307. }
  308. }
  309. catch (...) {
  310. CLogger::createInstance()->Log(eSoftError,
  311. "socket deleteSSocket exception and failed! [%s %s %d]!"
  312. , __FILE__, __FUNCTION__, __LINE__);
  313. }
  314. };
  315. //return success read count
  316. int MySocketPrivate::Read(std::map<KeyObj_Client, RDClient> &bufs)
  317. {
  318. int ret = 0;
  319. m_MyMutex.Lock();
  320. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  321. while (it != m_CSockets.end())
  322. {
  323. char _buf[512] = { 0 };
  324. #ifdef WIN32
  325. int re_one = recv(it->second, _buf, 512, 0);
  326. if (re_one <= 0)
  327. {
  328. int _error = GetLastError();
  329. if (_error != 10060)
  330. {
  331. //printf_s("read data failed from %s! return val is %d. \r\n", it->first.c_str(), re);
  332. CLogger::createInstance()->Log(eReadError,
  333. "read data failed from %s! return val is %d,error(%d).[%s %s %d]"
  334. , it->first.m_ipStr.c_str(), re_one, _error
  335. , __FILE__, __FUNCTION__, __LINE__);
  336. //删除链接
  337. deleteCSocket(it->second);
  338. it = m_CSockets.erase(it);
  339. continue;
  340. }
  341. else {
  342. re_one = 0;
  343. }
  344. }
  345. #else
  346. int re_one = recv(it->second, _buf, 256, MSG_DONTWAIT);
  347. if (re_one <= 0)
  348. {
  349. if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  350. {
  351. usleep(10);
  352. re_one = 0;
  353. }
  354. else {
  355. CLogger::createInstance()->Log(eReadError,
  356. "read data failed from %s! return val is %d.[%s %s %d]"
  357. , it->first.m_ipStr.c_str(), re_one
  358. , __FILE__, __FUNCTION__, __LINE__);
  359. //删除连接
  360. deleteCSocket(it->second);
  361. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  362. m_CSockets.erase(ittemp);
  363. continue;
  364. }
  365. }
  366. #endif
  367. if (re_one>0)
  368. {
  369. ret += 1;
  370. std::map<KeyObj_Client, RDClient>::iterator itrd = bufs.find(it->first);
  371. if (itrd != bufs.end())
  372. {
  373. itrd->second.add((unsigned char*)_buf, re_one);
  374. }
  375. else {
  376. bufs[it->first] = RDClient((unsigned char*)_buf, re_one);
  377. }
  378. }
  379. it++;
  380. }
  381. m_MyMutex.Unlock();
  382. return ret;
  383. };
  384. //return success count
  385. int MySocketPrivate::Write(const char* buf, int size)
  386. {
  387. int ret = 0;
  388. m_MyMutex.Lock();
  389. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  390. while (it != m_CSockets.end())
  391. {
  392. // printf_s("write data %d to client is started!\r\n",size);
  393. #ifdef WIN32
  394. int re = send(it->second, buf, size, 0);
  395. if (re <= 0)
  396. {
  397. int _error = GetLastError();
  398. if (_error != 10060)
  399. {
  400. CLogger::createInstance()->Log(eWriteError,
  401. "socket write data failed! return val is %d,%s.[%s %s %d]"
  402. , re, buf
  403. , __FILE__, __FUNCTION__, __LINE__);
  404. //删除连接
  405. deleteCSocket(it->second);
  406. it = m_CSockets.erase(it);
  407. continue;
  408. }
  409. else {
  410. re = 0;
  411. }
  412. }
  413. #else
  414. int re = send(it->second, buf, size, MSG_DONTWAIT);
  415. if (re <= 0)
  416. {
  417. if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  418. {
  419. usleep(10);
  420. re = 0;
  421. }
  422. else {
  423. CLogger::createInstance()->Log(eWriteError,
  424. "Write Data Failed! error(%d,%s)! [%s %s %d]"
  425. , errno, strerror(errno)
  426. , __FILE__, __FUNCTION__, __LINE__);
  427. //删除连接
  428. deleteCSocket(it->second);
  429. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  430. m_CSockets.erase(ittemp);
  431. continue;
  432. }
  433. }
  434. #endif
  435. else{
  436. ret += 1;
  437. }
  438. it++;
  439. }
  440. m_MyMutex.Unlock();
  441. return ret;
  442. };
  443. //return success count
  444. int MySocketPrivate::Write(unsigned long long ipInt, const char* buf, int size)
  445. {
  446. int ret = 0;
  447. m_MyMutex.Lock();
  448. std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
  449. while (it != m_CSockets.end())
  450. {
  451. // printf_s("write data %d to client is started!\r\n",size);
  452. //当前版本只针对网络地址做判断,即一台电脑多个客户端连接,都会被发送数据
  453. if ((unsigned long long)it->first.m_ip == ipInt)
  454. {
  455. #ifdef WIN32
  456. int re = send(it->second, buf, size, 0);
  457. if (re < 0)
  458. {
  459. int _error = GetLastError();
  460. if (_error != 10060)
  461. {
  462. CLogger::createInstance()->Log(eWriteError,
  463. "socket write data failed! return val is %d,%s.[%s %s %d]"
  464. , re, buf
  465. , __FILE__, __FUNCTION__, __LINE__);
  466. //删除连接
  467. deleteCSocket(it->second);
  468. it = m_CSockets.erase(it);
  469. continue;
  470. }
  471. else {
  472. re = 0;
  473. }
  474. }
  475. #else
  476. int re = send(it->second, buf, size, MSG_DONTWAIT);
  477. if (re <= 0)
  478. {
  479. if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  480. {
  481. usleep(10);
  482. re = 0;
  483. }
  484. else {
  485. CLogger::createInstance()->Log(eWriteError,
  486. "Write Data Failed! error(%d,%s)! [%s %s %d]"
  487. , errno, strerror(errno)
  488. , __FILE__, __FUNCTION__, __LINE__);
  489. //删除连接
  490. deleteCSocket(it->second);
  491. std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
  492. m_CSockets.erase(ittemp);
  493. continue;
  494. }
  495. }
  496. #endif
  497. else {
  498. ret += 1;
  499. }
  500. }
  501. it++;
  502. }
  503. m_MyMutex.Unlock();
  504. return ret;
  505. }

        MySocketRD.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MYSOCKETGXRD_H_
  5. #define _MYSOCKETGXRD_H_
  6. /*
  7. 该线程从各个客户端读取数据,并将数据分帧
  8. */
  9. #ifdef WIN32
  10. #include "win32Thread.h"
  11. #endif
  12. #ifdef linux
  13. #include "myThread.h"
  14. #endif
  15. #include "DataDef.h"
  16. #include "queuedata.h"
  17. class MySocketPrivate;
  18. class MySocketRD : public MyThread
  19. {
  20. public:
  21. MySocketRD(MySocketPrivate* myPDataPrt_, int netType_=1);
  22. virtual ~MySocketRD(void);
  23. int Run();
  24. //从缓存中读取帧数据处理,请按需自行处理该函数
  25. int AddFrame(const std::string link, const unsigned char *buf, int len);
  26. private:
  27. bool running;
  28. int netType;//数据读写处理类型
  29. MySocketPrivate *myPDataPrt;
  30. QueueData<RDS> ReadData;
  31. };
  32. #endif

        MySocketRD.cpp

  1. #include "MySocketRD.h"
  2. #include "MySocketPrivate.h"
  3. #include "myFunc.h"
  4. #include "Log.h"
  5. MySocketRD::MySocketRD( MySocketPrivate* myPDataPrt_, int netType_)
  6. : running(true)
  7. , netType(netType_)
  8. , myPDataPrt(myPDataPrt_)
  9. {
  10. }
  11. MySocketRD::~MySocketRD(void)
  12. {
  13. running = false;
  14. };
  15. int MySocketRD::Run()
  16. {
  17. if (NULL == myPDataPrt )
  18. {
  19. CLogger::createInstance()->Log(eSoftError,
  20. "MySocketRD start fail for myPDataPrt is NULL,[%s %s %d]!"
  21. , __FILE__, __FUNCTION__, __LINE__);
  22. return 0;
  23. }
  24. std::map<KeyObj_Client, RDClient> bufs;
  25. RDS rdataGx;
  26. while (running)
  27. {
  28. int re = myPDataPrt->Read(bufs);
  29. if (re <= 0)
  30. {
  31. #ifdef _DEBUG
  32. printf_s("Read Data Failed or NULL\n!");
  33. #else
  34. ;
  35. #endif
  36. }else {
  37. switch (netType)
  38. {
  39. case 1:
  40. {
  41. try {
  42. std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
  43. while (it != bufs.end())
  44. {
  45. if (it->second.len > 0)
  46. {
  47. RDS rdata(TCP_Data(it->second.Buf, it->second.len), it->first.m_ipStr);
  48. ReadData.add(rdata);
  49. }
  50. it++;
  51. }
  52. bufs.clear();
  53. }
  54. catch (const std::exception& e)
  55. {
  56. CLogger::createInstance()->Log(eSoftError,
  57. "Exception for Reading and Parsing Error[%s],NetType(%d), [%s %s %d]"
  58. , e.what()
  59. , netType
  60. , __FILE__, __FUNCTION__, __LINE__);
  61. }
  62. catch (...) {
  63. CLogger::createInstance()->Log(eSoftError,
  64. "Exception for Reading and Parsing Error,NetType(%d),[%s %s %d]!"
  65. , netType
  66. , __FILE__, __FUNCTION__, __LINE__);
  67. }
  68. while (ReadData.getFirst(rdataGx))
  69. {
  70. this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
  71. ReadData.removeFirst();
  72. }
  73. break;
  74. }
  75. case 2:
  76. {
  77. try {
  78. std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
  79. while (it != bufs.end())
  80. {
  81. unsigned char * buff = it->second.Buf;
  82. int start_frame = 0;
  83. unsigned char ctype = 0;
  84. for (int i = 0; i < it->second.len; i++)
  85. {
  86. //printf_s("%02X ",buff[i]);
  87. if (buff[i] > 0xf0) {
  88. if (buff[i] == 0xff)
  89. {
  90. if (ctype)
  91. {
  92. ctype = 0;
  93. int re_len = i - start_frame + 1;
  94. // RDS rdata(TCP_Data(buff + start_frame, i - start_frame + 1), it->first.m_ipStr);
  95. unsigned char * pBuf = new unsigned char[re_len];
  96. //
  97. int nLen = PFunc::uncode(buff + start_frame, re_len, pBuf);
  98. RDS rdata(TCP_Data(pBuf, nLen), it->first.m_ipStr);
  99. // printf("rev01:%s\r\n",(char*)pBuf);
  100. printf("rev01:%d\r\n",nLen);
  101. ReadData.add(rdata);
  102. start_frame = i + 1;
  103. delete[] pBuf;
  104. pBuf = NULL;
  105. }
  106. }
  107. else {
  108. ctype = buff[i];
  109. start_frame = i;
  110. }
  111. }
  112. }
  113. buff = NULL;
  114. if (start_frame < it->second.len)
  115. {
  116. RDClient _newrd(it->second.Buf + start_frame, it->second.len - start_frame);
  117. it->second = _newrd;
  118. it++;
  119. }
  120. else {
  121. #ifdef WIN32
  122. it = bufs.erase(it);
  123. #else
  124. std::map<KeyObj_Client, RDClient>::iterator ittemp = it++;
  125. bufs.erase(ittemp);
  126. #endif
  127. }
  128. }
  129. }
  130. catch (const std::exception& e)
  131. {
  132. CLogger::createInstance()->Log(eSoftError,
  133. "Data Deserialize false[%s],[%s %s %d]"
  134. , e.what()
  135. , __FILE__, __FUNCTION__, __LINE__);
  136. }
  137. catch (...) {
  138. CLogger::createInstance()->Log(eSoftError,
  139. "Data Deserialize false,[%s %s %d]!"
  140. , __FILE__, __FUNCTION__, __LINE__);
  141. }
  142. while (ReadData.getFirst(rdataGx))
  143. {
  144. this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
  145. ReadData.removeFirst();
  146. }
  147. break;
  148. }
  149. default:
  150. break;
  151. }
  152. }
  153. #ifdef WIN32
  154. Sleep(10);
  155. #else
  156. usleep(10000);
  157. #endif
  158. }
  159. return 0;
  160. };
  161. int MySocketRD::AddFrame(const std::string link, const unsigned char *buf, int len)
  162. {
  163. if(NULL == buf)
  164. return 0;
  165. printf("rev:%s\r\n",(char*)buf);
  166. // TcpSocket ts;
  167. // memcpy(&ts,buf,len);
  168. // printf("rev:%u,%d,%0.2f,%s\r\n",ts.type,ts.len,ts.val,ts.desc);
  169. return 0;
  170. };

        MySocketSrv.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MYSOCKETSRV_H_
  5. #define _MYSOCKETSRV_H_
  6. #ifdef WIN32
  7. #include "win32Thread.h"
  8. #endif
  9. #ifdef linux
  10. #include "myThread.h"
  11. #endif
  12. class MySocketPrivate;
  13. class MySocketSrv : public MyThread
  14. {
  15. public:
  16. MySocketSrv();
  17. virtual ~MySocketSrv();
  18. void setPDataPtr(MySocketPrivate *myPData);
  19. int Run();
  20. private:
  21. MySocketPrivate *myPDataPrt;
  22. };
  23. #endif

        MySocketSrv.cpp

  1. #include "MySocketSrv.h"
  2. #include "MySocketPrivate.h"
  3. #include "Log.h"
  4. MySocketSrv::MySocketSrv(void)
  5. : myPDataPrt(NULL)
  6. {
  7. }
  8. MySocketSrv::~MySocketSrv(void)
  9. {
  10. }
  11. void MySocketSrv::setPDataPtr(MySocketPrivate *myPData)
  12. {
  13. myPDataPrt=myPData;
  14. };
  15. int MySocketSrv::Run()
  16. {
  17. if (NULL == myPDataPrt)
  18. {
  19. CLogger::createInstance()->Log(eSoftError,
  20. "MySocketSrv start fail for myPDataPrt is NULL,[%s %s %d]!"
  21. , __FILE__, __FUNCTION__, __LINE__);
  22. return 0;
  23. }
  24. while(1)
  25. {
  26. myPDataPrt->Accept();
  27. #ifdef WIN32
  28. Sleep(300);
  29. #else
  30. usleep(300000);
  31. #endif
  32. }
  33. return 0;
  34. }

        MySocketWD.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MySocketWD_H_
  5. #define _MySocketWD_H_
  6. /*
  7. 该线程向指定客户端写入数据,将数据协议编码并序列化
  8. */
  9. #ifdef WIN32
  10. #include "win32Thread.h"
  11. #endif
  12. #ifdef linux
  13. #include "myThread.h"
  14. #endif
  15. #include "DataDef.h"
  16. #include "queuedata.h"
  17. class MySocketPrivate;
  18. class MySocketWD : public MyThread
  19. {
  20. public:
  21. MySocketWD(MySocketPrivate* myPDataPrt_,int netType_=1);
  22. virtual ~MySocketWD(void);
  23. int Run();
  24. int AddData(const char* buf, int len);
  25. int getBuffer(unsigned long long &_ipInt, unsigned char* _buf);
  26. private:
  27. bool running;
  28. int netType;
  29. MySocketPrivate *myPDataPrt;
  30. QueueData<WDS> WriteData;
  31. };
  32. #endif

        MySocketWD.cpp

  1. #include "MySocketWD.h"
  2. #include "MySocketPrivate.h"
  3. #include "myFunc.h"
  4. #include "Log.h"
  5. #include <set>
  6. #ifdef __linux__
  7. #include <stdexcept>
  8. #endif
  9. MySocketWD::MySocketWD( MySocketPrivate* myPDataPrt_,int netType_)
  10. : running(true)
  11. , netType(netType_)
  12. , myPDataPrt(myPDataPrt_)
  13. {
  14. }
  15. MySocketWD::~MySocketWD(void)
  16. {
  17. running = false;
  18. }
  19. int MySocketWD::Run()
  20. {
  21. if (NULL == myPDataPrt)
  22. {
  23. CLogger::createInstance()->Log(eSoftError,
  24. "MySocketWD start fail for myPDataPrt is NULL,[%s %s %d]!"
  25. , __FILE__, __FUNCTION__, __LINE__);
  26. return 0;
  27. }
  28. while(running)
  29. {
  30. try {
  31. unsigned long long _ipInt = 0;
  32. unsigned char buf[512] = { 0 };
  33. int len = this->getBuffer(_ipInt, buf);
  34. if (len > 0)
  35. {
  36. int ret = -1;
  37. switch (netType)
  38. {
  39. case 1:
  40. {
  41. ret = myPDataPrt->Write(_ipInt, (const char*)buf, len);
  42. break;
  43. }
  44. case 2:
  45. {
  46. // printf("send data: %s\r\n",buf);
  47. unsigned char* _buf = new unsigned char[2 * len + 1];
  48. memset(_buf, 0, 2 * len + 1);
  49. len = PFunc::code(buf, len, _buf);//序列化处理
  50. printf("send data: %d\r\n",len);
  51. ret = myPDataPrt->Write(_ipInt, (const char*)_buf, len);
  52. delete[] _buf;
  53. _buf = NULL;
  54. break;
  55. }
  56. default:
  57. {
  58. char warBuf[128] = { 0 };
  59. sprintf(warBuf, "MySocketWD::Run For Unkown NetType(%02X)", netType);
  60. #ifdef WIN32
  61. throw std::exception(warBuf);
  62. #else
  63. throw std::domain_error(warBuf);
  64. #endif
  65. break;
  66. }
  67. }
  68. if (ret <=0)
  69. {
  70. //printf("send data: %d, buf %d\n",len,ret);
  71. CLogger::createInstance()->Log(eTipMessage,
  72. "MySocketWD send data(%s,%d) fail. [%s %s %d]"
  73. ,buf,len
  74. , __FILE__, __FUNCTION__, __LINE__);
  75. }
  76. //else{
  77. // printf("send data: %d, and real send %d\n",len,ret);
  78. //}
  79. }
  80. }
  81. catch (const std::exception& e)
  82. {
  83. CLogger::createInstance()->Log(eSoftError,
  84. "MySocketWD Run for data writing exception[%s],[%s %s %d]"
  85. , e.what()
  86. , __FILE__, __FUNCTION__, __LINE__);
  87. }
  88. catch (...) {
  89. CLogger::createInstance()->Log(eSoftError,
  90. "MySocketWD Run for data writing exception, [%s %s %d]!"
  91. , __FILE__, __FUNCTION__, __LINE__);
  92. }
  93. #ifdef WIN32
  94. Sleep(10);
  95. #else
  96. usleep(10000);
  97. #endif
  98. }
  99. return 0;
  100. };
  101. int MySocketWD::AddData(const char* buf, int len)
  102. {
  103. if(len>0&&NULL!=buf){
  104. if(len>=512){
  105. printf("buf len is >=512!\r\n");
  106. }else{
  107. std::set<long> ipintlist;
  108. if(!myPDataPrt->get_ipInt_list(ipintlist))
  109. {
  110. return 0;
  111. }
  112. std::set<long>::iterator it = ipintlist.begin();
  113. while (it != ipintlist.end())
  114. {
  115. /* code */
  116. WDS wdata(*it,TCP_Data((unsigned char*)buf, len));
  117. WriteData.add(wdata);
  118. it++;
  119. }
  120. return len;
  121. }
  122. }
  123. return 0;
  124. }
  125. int MySocketWD::getBuffer(unsigned long long &_ipInt, unsigned char* _buf)
  126. {
  127. if(NULL == _buf)
  128. return 0;
  129. int ret = 0;
  130. WDS wdata;
  131. if(WriteData.getFirst(wdata))
  132. {
  133. try{
  134. if (!WriteData.removeFirst())
  135. {
  136. #ifdef WIN32
  137. throw std::exception("removeFirst WData failed!");
  138. #else
  139. throw std::logic_error("removeFirst WData failed!");
  140. #endif
  141. }
  142. _ipInt = wdata.ipInt;
  143. memcpy(_buf,wdata.data.Buf,wdata.data.len);
  144. ret = wdata.data.len;
  145. }
  146. catch (const std::exception& e)
  147. {
  148. CLogger::createInstance()->Log(eSoftError,
  149. "write item info to socket failed! have error[%s]. [%s %s %d]"
  150. , e.what()
  151. , __FILE__, __FUNCTION__, __LINE__);
  152. ret = -1;
  153. }
  154. catch (...) {
  155. //printf_s("write item info to socket failed! have error. \r\n");
  156. CLogger::createInstance()->Log(eSoftError,
  157. "write item info to socket failed! have error. [%s %s %d]"
  158. , __FILE__, __FUNCTION__, __LINE__);
  159. ret = -2;
  160. }
  161. }
  162. return ret;
  163. }

       7.4  client_IO目录

        MySocket.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MYSOCKET_H_
  5. #define _MYSOCKET_H_
  6. /*
  7. *建立socket客户端
  8. */
  9. #include <string>
  10. #include "DataDef.h"
  11. class MySocketClient;
  12. class MySocketWD;
  13. class MySocketRD;
  14. class MySocket
  15. {
  16. public:
  17. MySocket(int _tranid, NetArg _netarg);
  18. virtual ~MySocket(void);
  19. public:
  20. virtual int Read(){ return -1; };
  21. virtual int Write(){ return -1; };
  22. int Write(const char* buf, int size);
  23. private:
  24. int tranid;
  25. NetArg netarg;
  26. MySocketClient *my_PrivateData;
  27. MySocketWD *m_MySocketWD;
  28. MySocketRD *m_MySocketRD;
  29. };
  30. #endif //_MYSOCKET_H_

        MySocket.cpp

  1. #include "MySocket.h"
  2. #include "MySocketClient.h"
  3. #include "MySocketWD.h"
  4. #include "MySocketRD.h"
  5. #include "Log.h"
  6. *MySocket*///
  7. MySocket::MySocket(int _tranid, NetArg _netarg)
  8. : tranid(_tranid)
  9. , netarg(_netarg)
  10. {
  11. try {//防止构造时异常出现内存泄漏
  12. //TCP/IP客户端,连接监控服务或其他平台
  13. my_PrivateData = new MySocketClient(netarg.ipStr, netarg.port);
  14. if (my_PrivateData->onConnect() <= 0)
  15. {
  16. CLogger::createInstance()->Log(eSoftError,
  17. "connect server[%s,%d] error,please check it,[%s %s %d]!"
  18. , netarg.ipStr.c_str(), netarg.port
  19. , __FILE__, __FUNCTION__, __LINE__);
  20. }
  21. //数据协议编码解码 序列化及反序列化
  22. //数据发送线程
  23. m_MySocketWD = new MySocketWD();
  24. m_MySocketWD->setPrivateDataPtr(my_PrivateData, netarg.type);
  25. m_MySocketWD->start();
  26. //数据接收线程
  27. m_MySocketRD = new MySocketRD();
  28. m_MySocketRD->setPrivateDataPtr(my_PrivateData, netarg.type);
  29. m_MySocketRD->start();
  30. }
  31. catch (...)
  32. {
  33. delete m_MySocketRD;
  34. m_MySocketRD = NULL;
  35. delete m_MySocketWD;
  36. m_MySocketWD = NULL;
  37. delete my_PrivateData;
  38. my_PrivateData = NULL;
  39. CLogger::createInstance()->Log(eSoftError,
  40. "MySocket init error,please check it,[%s %s %d]!"
  41. , __FILE__, __FUNCTION__, __LINE__);
  42. }
  43. }
  44. MySocket::~MySocket(void)
  45. {
  46. if(NULL!= m_MySocketRD)
  47. {
  48. delete m_MySocketRD;
  49. m_MySocketRD = NULL;
  50. }
  51. if (NULL != m_MySocketWD)
  52. {
  53. delete m_MySocketWD;
  54. m_MySocketWD = NULL;
  55. }
  56. if (NULL != my_PrivateData)
  57. {
  58. delete my_PrivateData;
  59. my_PrivateData = NULL;
  60. }
  61. };
  62. int MySocket::Write(const char* buf, int size)
  63. {
  64. if (size <= 0)
  65. {
  66. return -1;
  67. }
  68. if (NULL != m_MySocketWD && NULL != buf)
  69. {
  70. return m_MySocketWD->add_data(buf, size);
  71. }
  72. else
  73. {
  74. return -1;
  75. }
  76. };

        MySocketClient.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef MY_SOCKET_CLIENT_H
  5. #define MY_SOCKET_CLIENT_H
  6. #ifdef WIN32
  7. #include "afxsock.h"
  8. #else
  9. #define UINT unsigned int
  10. #endif
  11. #include <string>
  12. #ifdef __linux__
  13. #include <string.h>
  14. #include <stdio.h>
  15. #endif
  16. #include "DataDef.h"
  17. class MySocketClient
  18. {
  19. public:
  20. MySocketClient(std::string ip, UINT port);
  21. ~MySocketClient(void);
  22. int onConnect();
  23. void disConnect();
  24. bool isConnect(){ return m_OnConnect; };
  25. int reSetSocket();
  26. int Read(RDClient &bufs);
  27. int Read(char* buf, int size);
  28. int Write(const char* buf, int size);
  29. private:
  30. #ifdef WIN32
  31. void SocketThreadInit();
  32. #endif
  33. private:
  34. int sock_fd;
  35. //fd_set read_flags,write_flags; // you know what these are
  36. std::string m_IpAddress;
  37. UINT m_Port; //
  38. bool m_OnConnect; //
  39. /*
  40. *当前写入失败及读取线程都可重新建立链接,m_OnConnecting设置防止冲突
  41. */
  42. bool m_OnConnecting; //
  43. bool m_ConnectONLog; //防止链接错误日志反复记录,切换状态时必定记录
  44. unsigned int m_log_Interval; //防止链接错误日志长时间不记录,2018-10-08
  45. };
  46. #endif

        MySocketClient.cpp

  1. #include "MySocketClient.h"
  2. #ifdef __linux__
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include <sys/types.h>
  7. #include <sys/socket.h>
  8. #include <netinet/in.h>
  9. #include <arpa/inet.h>
  10. #include <unistd.h>
  11. #include <fcntl.h>
  12. #include <errno.h>
  13. #include <signal.h>
  14. #define printf_s printf
  15. #define sprintf_s sprintf
  16. #endif
  17. #include "Log.h"
  18. #ifdef WIN32
  19. #include <mstcpip.h>
  20. void MySocketClient::SocketThreadInit()
  21. {
  22. WORD wVersionRequested;
  23. WSADATA wsaData;
  24. int err;
  25. wVersionRequested = MAKEWORD(2, 2);
  26. err = WSAStartup(wVersionRequested, &wsaData);
  27. if (err != 0)
  28. {
  29. //printf("WSAStartup failed with error: %d\n", err);
  30. CLogger::createInstance()->Log(eSoftError,
  31. "WSAStartup failed with error: %d, [%s %s %d]"
  32. , err, __FILE__, __FUNCTION__, __LINE__);
  33. return;
  34. }
  35. if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
  36. {
  37. /* Tell the user that we could not find a usable */
  38. /* WinSock DLL. */
  39. //printf("Could not find a usable version of Winsock.dll\n");
  40. CLogger::createInstance()->Log(eSoftError,
  41. "Could not find a usable version of Winsock.dll: [%s %s %d]"
  42. , __FILE__, __FUNCTION__, __LINE__);
  43. WSACleanup();
  44. return;
  45. }
  46. else
  47. {
  48. printf("The Winsock 2.2 dll was found okay\n");
  49. }
  50. }
  51. #endif
  52. MySocketClient::MySocketClient(std::string ip, UINT port)
  53. : m_IpAddress(ip)
  54. , m_Port(port)
  55. {
  56. sock_fd = -1;
  57. #ifdef WIN32
  58. /*initf = true;
  59. if(!AfxWinInit(::GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0)){
  60. CLogger::createInstance()->Log(eTipMessage,"AfxWinInit initial failed!");
  61. initf = false;
  62. }
  63. if (!AfxSocketInit())
  64. {
  65. CLogger::createInstance()->Log(eTipMessage,"WindowSocket initial failed!");
  66. initf = false;
  67. }
  68. */
  69. SocketThreadInit();
  70. #endif
  71. m_OnConnect = false;
  72. m_OnConnecting = false;
  73. m_ConnectONLog = true;
  74. m_log_Interval = static_cast<unsigned int>(time(NULL));
  75. }
  76. MySocketClient::~MySocketClient(void)
  77. {
  78. disConnect();
  79. }
  80. int MySocketClient::onConnect()
  81. {
  82. if (m_OnConnect) //
  83. {
  84. CLogger::createInstance()->Log(eTipMessage,
  85. "it is on connecting,Please disconnect!, [%s %s %d]!"
  86. , __FILE__, __FUNCTION__, __LINE__);
  87. return 0;
  88. }
  89. //防止链接冲突调用
  90. if (m_OnConnecting)
  91. {
  92. return 0;
  93. }
  94. try {
  95. m_OnConnecting = true;
  96. #ifdef WIN32
  97. sock_fd = static_cast<int>(socket(AF_INET, SOCK_STREAM, 0));
  98. SOCKADDR_IN ser_addr;
  99. memset(&ser_addr, 0, sizeof(ser_addr));
  100. ser_addr.sin_family = AF_INET;
  101. ser_addr.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
  102. ser_addr.sin_port = htons(static_cast<unsigned short>(m_Port));
  103. if (connect(sock_fd, (struct sockaddr *)&ser_addr, sizeof(ser_addr)) < 0)
  104. {
  105. //printf("%s:%d, connect socket failed,%s:%d \r\n", __FILE__, __LINE__,m_IpAddress.c_str(),m_Port);
  106. if (m_ConnectONLog|| m_log_Interval < static_cast<unsigned int>(time(NULL)))
  107. {
  108. m_ConnectONLog = false;
  109. m_log_Interval = static_cast<unsigned int>(time(NULL)) + 3600;
  110. CLogger::createInstance()->Log(eConfigError,
  111. "connect socket failed,%s:%d, [%s %s %d]"
  112. , m_IpAddress.c_str(), m_Port
  113. , __FILE__, __FUNCTION__, __LINE__);
  114. }
  115. m_OnConnecting = false;
  116. return -1;
  117. }
  118. printf("connect socket %s:%d !\r\n", m_IpAddress.c_str(), m_Port);
  119. CLogger::createInstance()->Log(eTipMessage,
  120. "connect socket %s:%d, [%s %s %d]!"
  121. , m_IpAddress.c_str(), m_Port
  122. , __FILE__, __FUNCTION__, __LINE__);
  123. int nNetTimeout = 10; //10毫秒
  124. setsockopt(sock_fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&nNetTimeout, sizeof(int));
  125. setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
  126. //KeepAlive
  127. bool bKeepAlive = true;
  128. int nRet = setsockopt(sock_fd, SOL_SOCKET, SO_KEEPALIVE,(char*)&bKeepAlive, sizeof(bKeepAlive));
  129. if (nRet == SOCKET_ERROR)
  130. {
  131. CLogger::createInstance()->Log(eTipMessage
  132. , "connect socket %s:%d and setsockopt(SO_KEEPALIVE=true) fail!"
  133. , m_IpAddress.c_str(), m_Port);
  134. }
  135. // set KeepAlive parameter
  136. tcp_keepalive alive_in;
  137. tcp_keepalive alive_out;
  138. alive_in.keepalivetime = 1000; // 1s
  139. alive_in.keepaliveinterval = 3000; //3s
  140. alive_in.onoff = true;
  141. unsigned long ulBytesReturn = 0;
  142. nRet = WSAIoctl(sock_fd, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in),
  143. &alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL);
  144. if (nRet == SOCKET_ERROR)
  145. {
  146. CLogger::createInstance()->Log(eTipMessage
  147. , "connect socket %s:%d and setsockopt(tcp_keepalive) fail!"
  148. , m_IpAddress.c_str(), m_Port);
  149. }
  150. m_OnConnect = true;
  151. m_OnConnecting = false;
  152. m_ConnectONLog = true;
  153. return 1;
  154. #else
  155. sock_fd = socket(PF_INET, SOCK_STREAM, 0);
  156. //sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  157. if (-1 == sock_fd)
  158. {
  159. CLogger::createInstance()->Log(eTipMessage,
  160. "socket fail!, [%s %s %d]!"
  161. , __FILE__, __FUNCTION__, __LINE__);
  162. m_OnConnecting = false;
  163. return -1;
  164. }
  165. //printf("socket ok !\r\n");
  166. CLogger::createInstance()->Log(eTipMessage,
  167. "socket ok !, [%s %s %d]!"
  168. , __FILE__, __FUNCTION__, __LINE__);
  169. struct sockaddr_in s_add;
  170. bzero(&s_add, sizeof(struct sockaddr_in));
  171. s_add.sin_family = PF_INET;
  172. s_add.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
  173. s_add.sin_port = htons(m_Port);
  174. printf("s_addr = %#x ,port : %#x\r\n", s_add.sin_addr.s_addr, s_add.sin_port);
  175. CLogger::createInstance()->Log(eTipMessage,
  176. "s_addr = %#x ,port : %#x, [%s %s %d]"
  177. , s_add.sin_addr.s_addr, s_add.sin_port
  178. , __FILE__, __FUNCTION__, __LINE__);
  179. //int x=fcntl(sock_fd,F_GETFL,0);
  180. //fcntl(sock_fd,F_SETFL,x | O_NONBLOCK);
  181. if (-1 == connect(sock_fd, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
  182. {
  183. if (m_ConnectONLog)
  184. {
  185. m_ConnectONLog = false;
  186. CLogger::createInstance()->Log(eConfigError,
  187. "connect fail !, [%s %s %d]!"
  188. , __FILE__, __FUNCTION__, __LINE__);
  189. }
  190. m_OnConnecting = false;
  191. return -1;
  192. }
  193. int x = fcntl(sock_fd, F_GETFL, 0);
  194. fcntl(sock_fd, F_SETFL, x | O_NONBLOCK);
  195. //signal(SIGCHLD, SIG_IGN);
  196. //FD_ZERO(&read_flags); // Zero the flags ready for using
  197. //FD_ZERO(&write_flags);
  198. //FD_SET(sock_fd, &read_flags);
  199. //FD_SET(sock_fd, &write_flags);
  200. /*
  201. struct timeval timeout = {0,100};
  202. setsockopt(sock_fd,SOL_SOCKET,SO_SNDTIMEO,(char *)&timeout,sizeof(struct timeval));
  203. int tSet = setsockopt(sock_fd,SOL_SOCKET,SO_RCVTIMEO,(char *)&timeout,sizeof(struct timeval));
  204. socklen_t len=sizeof(timeout);
  205. getsockopt(sock_fd,SOL_SOCKET,SO_RCVTIMEO,&timeout,&len);
  206. printf("setsockopt(%d),socklen_t(%d)!\r\n",tSet,len);
  207. KeepAlive实现,单位秒
  208. //下面代码要求有ACE,如果没有包含ACE,则请把用到的ACE函数改成linux相应的接口
  209. int keepAlive = 1;//设定KeepAlive
  210. int keepIdle = 5;//开始首次KeepAlive探测前的TCP空闭时间
  211. int keepInterval = 5;//两次KeepAlive探测间的时间间隔
  212. int keepCount = 3;//判定断开前的KeepAlive探测次数
  213. if(setsockopt(s,SOL_SOCKET,SO_KEEPALIVE,(void*)&keepAlive,sizeof(keepAlive)) == -1)
  214. {
  215. CLogger::createInstance()->Log(eTipMessage,"setsockopt SO_KEEPALIVE error!");
  216. }
  217. if(setsockopt(s,SOL_TCP,TCP_KEEPIDLE,(void *)&keepIdle,sizeof(keepIdle)) == -1)
  218. {
  219. CLogger::createInstance()->Log(eTipMessage,"setsockopt TCP_KEEPIDLE error!");
  220. }
  221. if(setsockopt(s,SOL_TCP,TCP_KEEPINTVL,(void *)&keepInterval,sizeof(keepInterval)) == -1)
  222. {
  223. CLogger::createInstance()->Log(eTipMessage,setsockopt TCP_KEEPINTVL error!");
  224. }
  225. if(setsockopt(s,SOL_TCP,TCP_KEEPCNT,(void *)&keepCount,sizeof(keepCount)) == -1)
  226. {
  227. CLogger::createInstance()->Log(eTipMessage,setsockopt TCP_KEEPCNT error!");
  228. }
  229. */
  230. CLogger::createInstance()->Log(eTipMessage,
  231. "connect ok !, [%s %s %d]!"
  232. , __FILE__, __FUNCTION__, __LINE__);
  233. m_OnConnect = true;
  234. m_OnConnecting = false;
  235. return 1;
  236. #endif
  237. }
  238. catch (...) {
  239. #ifdef WIN32
  240. CLogger::createInstance()->Log(eSoftError,
  241. "ClientSocket::onConnect error: %d.[%s %s %d]", static_cast<int>(GetLastError())
  242. , __FILE__, __FUNCTION__, __LINE__);
  243. #else
  244. CLogger::createInstance()->Log(eSoftError,
  245. "ClientSocket::onConnect error: %s. [%s %s %d]", strerror(errno)
  246. , __FILE__, __FUNCTION__, __LINE__);
  247. #endif
  248. m_OnConnecting = false;
  249. return -2;
  250. }
  251. }
  252. void MySocketClient::disConnect()
  253. {
  254. m_OnConnect = false;
  255. if (-1 != sock_fd)
  256. {
  257. #ifdef WIN32
  258. closesocket(sock_fd);
  259. #else
  260. close(sock_fd);
  261. #endif
  262. sock_fd = -1;
  263. }
  264. }
  265. int MySocketClient::reSetSocket()
  266. {
  267. disConnect();
  268. return onConnect();
  269. }
  270. int MySocketClient::Read(RDClient &bufs)
  271. {
  272. try {
  273. if (m_OnConnect)
  274. {
  275. char buf[256] = { 0 };
  276. #ifdef WIN32
  277. int re = recv(sock_fd, buf, 256, 0);
  278. if (re <= 0)
  279. {
  280. int _error = GetLastError();
  281. if (_error != 10060 && 0 != _error)
  282. //if (_error != 10060)
  283. {
  284. CLogger::createInstance()->Log(eReadError,
  285. "Read Datas Failed! ret(%d) error(%d)! [%s %s %d]"
  286. , re, _error
  287. , __FILE__, __FUNCTION__, __LINE__);
  288. disConnect();
  289. }
  290. else
  291. {
  292. re = 0;
  293. }
  294. }
  295. #else
  296. //int re = read(sock_fd, buf, 256);
  297. int re = recv(sock_fd, buf, 256, MSG_DONTWAIT);
  298. if (re <= 0)
  299. {
  300. if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  301. {
  302. usleep(10);
  303. re = 0;
  304. }
  305. else
  306. {
  307. CLogger::createInstance()->Log(eReadError,
  308. "Read Data Failed! error(%d,%d,%s)! [%s %s %d]"
  309. , re, errno, strerror(errno)
  310. , __FILE__, __FUNCTION__, __LINE__);
  311. disConnect();
  312. }
  313. //static int index = 0;
  314. //printf("..%d..%d,%s\n",index++,errno,strerror(errno));
  315. }
  316. #endif
  317. if (re > 0)
  318. {
  319. /*
  320. for(int j=0; j<re; j++){
  321. printf("%02X ",buf[j]);
  322. }
  323. printf("\n");
  324. */
  325. bufs.add((unsigned char*)buf, re);
  326. return bufs.len;
  327. }
  328. return re;
  329. }
  330. else
  331. {
  332. printf("Read Data Failed!unconnect!");
  333. return -2;
  334. }
  335. }
  336. catch (...)
  337. {
  338. disConnect();
  339. #ifdef WIN32
  340. CLogger::createInstance()->Log(eSoftError,
  341. "Read Data Failed!unknown error: %d! [%s %s %d]", static_cast<int>(GetLastError())
  342. , __FILE__, __FUNCTION__, __LINE__);
  343. #else
  344. CLogger::createInstance()->Log(eSoftError,
  345. "Read Data Failed!unknown error: %s! [%s %s %d]", strerror(errno)
  346. , __FILE__, __FUNCTION__, __LINE__);
  347. #endif
  348. return -3;
  349. }
  350. };
  351. int MySocketClient::Read(char* buf, int size)
  352. {
  353. try {
  354. if (m_OnConnect)
  355. {
  356. #ifdef WIN32
  357. //int re = m_CSocket->Receive(buf, size);
  358. int re = recv(sock_fd, buf, size, 0);
  359. if (re <= 0)
  360. {
  361. int _error = GetLastError();
  362. if (_error != 10060 && 0 != _error)
  363. //if (_error != 10060)
  364. {
  365. CLogger::createInstance()->Log(eReadError,
  366. "Read Data Failed!ret(%d),error(%d)! [%s %s %d]"
  367. , re, _error
  368. , __FILE__, __FUNCTION__, __LINE__);
  369. disConnect();
  370. }
  371. else
  372. {
  373. re = 0;
  374. }
  375. }
  376. #else
  377. //int re = read(sock_fd, buf, size);
  378. int re = recv(sock_fd, buf, size, MSG_DONTWAIT);
  379. if (re <= 0)
  380. {
  381. if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  382. {
  383. usleep(10);
  384. re = 0;
  385. }
  386. else
  387. {
  388. CLogger::createInstance()->Log(eReadError,
  389. "Read Data Failed! error(%d,%d,%s)! [%s %s %d]"
  390. ,re , errno, strerror(errno)
  391. , __FILE__, __FUNCTION__, __LINE__);
  392. disConnect();
  393. }
  394. }
  395. #endif
  396. return re;
  397. }
  398. else
  399. {
  400. printf("Read Data Failed! unconnect! \n");
  401. return -2;
  402. }
  403. }
  404. catch (...)
  405. {
  406. disConnect();
  407. #ifdef WIN32
  408. CLogger::createInstance()->Log(eSoftError,
  409. "Read Data Failed!unknown error: %d! [%s %s %d]", static_cast<int>(GetLastError())
  410. , __FILE__, __FUNCTION__, __LINE__);
  411. #else
  412. CLogger::createInstance()->Log(eSoftError,
  413. "Read Data Failed!unknown error: %s! [%s %s %d]", strerror(errno)
  414. , __FILE__, __FUNCTION__, __LINE__);
  415. #endif
  416. return -3;
  417. }
  418. }
  419. int MySocketClient::Write(const char* buf, int size)
  420. {
  421. try {
  422. if (m_OnConnect)
  423. {
  424. #ifdef WIN32
  425. int re = send(sock_fd, buf, size, 0);
  426. if (re <= 0)
  427. {
  428. int _error = GetLastError();
  429. if (_error != 10060 && 0 != _error)
  430. {
  431. CLogger::createInstance()->Log(eWriteError,
  432. "Write Data Failed! ret(%d)! error(%d)! [%s %s %d]"
  433. , re, _error
  434. , __FILE__, __FUNCTION__, __LINE__);
  435. disConnect();
  436. }
  437. }
  438. #else
  439. //int re = write(sock_fd, buf, size);
  440. int re = send(sock_fd, buf, size, MSG_DONTWAIT);
  441. if (re <= 0)
  442. {
  443. if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
  444. {
  445. usleep(10);
  446. re = 0;
  447. }
  448. else
  449. {
  450. CLogger::createInstance()->Log(eWriteError,
  451. "Write Data Failed! error(%d,%d,%s)! [%s %s %d]"
  452. , re, errno, strerror(errno)
  453. , __FILE__, __FUNCTION__, __LINE__);
  454. disConnect();
  455. }
  456. }
  457. #endif
  458. return re;
  459. }
  460. else {
  461. //CLogger::createInstance()->Log(eWriteError,
  462. // "Write Data Failed! unconnect! [%s %s %d]\r\n"
  463. // , __FILE__, __FUNCTION__, __LINE__);
  464. printf("Write Data Failed! unconnect!");
  465. if (!m_OnConnecting)
  466. {
  467. reSetSocket();
  468. }
  469. return -2;
  470. }
  471. }
  472. catch (...)
  473. {
  474. disConnect();
  475. #ifdef WIN32
  476. CLogger::createInstance()->Log(eSoftError,
  477. "Write Data Failed! unknown error: %d! [%s %s %d]", static_cast<int>(GetLastError())
  478. , __FILE__, __FUNCTION__, __LINE__);
  479. #else
  480. CLogger::createInstance()->Log(eSoftError,
  481. "Write Data Failed! unknown error: %d! [%s %s %d]", strerror(errno)
  482. , __FILE__, __FUNCTION__, __LINE__);
  483. #endif
  484. return -3;
  485. }
  486. }

        MySocketRD.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MYSOCKETRD_H_
  5. #define _MYSOCKETRD_H_
  6. /*
  7. *从服务端获取数据
  8. */
  9. #ifdef WIN32
  10. #include "win32Thread.h"
  11. #endif
  12. #ifdef linux
  13. #include "myThread.h"
  14. #endif
  15. #include "DataDef.h"
  16. #include "queuedata.h"
  17. class MySocketClient;
  18. class MySocketRD : public MyThread
  19. {
  20. public:
  21. MySocketRD(void);
  22. virtual ~MySocketRD(void);
  23. void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
  24. int Run();
  25. //从缓存中读取帧数据处理,请按需自行处理该函数
  26. int AddFrame(const unsigned char *buf, int len);
  27. private:
  28. bool running;
  29. int netType;//数据读写处理类型
  30. MySocketClient *myPDataPrt;
  31. QueueData<TCP_Data> ReadData;
  32. };
  33. #endif

        MySocketRD.cpp

  1. #include "MySocketRD.h"
  2. #include "MySocketClient.h"
  3. #include "myFunc.h"
  4. #include "Log.h"
  5. MySocketRD::MySocketRD(void)
  6. : running(true)
  7. , netType(1)
  8. , myPDataPrt(NULL)
  9. {
  10. }
  11. MySocketRD::~MySocketRD(void)
  12. {
  13. running = false;
  14. }
  15. void MySocketRD::setPrivateDataPtr(MySocketClient *myPData, int _netType)
  16. {
  17. netType = _netType;
  18. if (NULL != myPData)
  19. {
  20. myPDataPrt = myPData;
  21. }
  22. }
  23. int MySocketRD::Run()
  24. {
  25. if (NULL == myPDataPrt )
  26. {
  27. CLogger::createInstance()->Log(eSoftError,
  28. "MySocketRD start fail for myPDataPrt is NULL"
  29. ",[%s %s %d]!"
  30. , __FILE__, __FUNCTION__, __LINE__);
  31. return 0;
  32. }
  33. RDClient rdc_data;
  34. TCP_Data rddata;
  35. while (running)
  36. {
  37. if (!myPDataPrt->isConnect())
  38. {
  39. myPDataPrt->reSetSocket();//read or write thread do it
  40. if (!myPDataPrt->isConnect())
  41. {
  42. #ifdef WIN32
  43. Sleep(1000);
  44. #else
  45. usleep(1000000);
  46. #endif
  47. }
  48. }
  49. else
  50. {
  51. //读取帧数据
  52. switch (netType)
  53. {
  54. case 1:
  55. {
  56. //直接读取,不用做分帧处理,ACSII字段
  57. char buf[256] = { 0 };
  58. int len = myPDataPrt->Read(buf, 256);
  59. if (len > 0)
  60. {
  61. TCP_Data rdata((unsigned char*)buf, len);
  62. ReadData.add(rdata);
  63. }
  64. //数据帧解析
  65. if (ReadData.getFirst(rddata))
  66. {
  67. this->AddFrame(rddata.Buf, rddata.len);
  68. ReadData.removeFirst();
  69. }
  70. }
  71. break;
  72. case 2:
  73. {
  74. //数据有特定帧头和结尾,做分帧处理
  75. int ret = myPDataPrt->Read(rdc_data);
  76. if (ret > 0)
  77. {
  78. //printf("read(%d) from pcs_server\n",ret);
  79. unsigned char * buff = rdc_data.Buf;
  80. int frame_start = 0;
  81. unsigned char ctype = 0;
  82. for (int i = 0; i < rdc_data.len; ++i)
  83. {
  84. //printf("%02X ",buff[i]);
  85. if (buff[i] > 0xf0)
  86. {
  87. if (buff[i] == 0xff)
  88. {
  89. if (ctype)
  90. {
  91. ctype = 0;
  92. // TCP_Data rdata(buff + frame_start, i - frame_start + 1);
  93. unsigned char * pBuf = new unsigned char[i - frame_start + 1];
  94. int nLen = PFunc::uncode(buff + frame_start, i - frame_start + 1, pBuf);//反序列化处理
  95. TCP_Data rdata(pBuf, nLen);
  96. ReadData.add(rdata);
  97. frame_start = i + 1;
  98. delete[] pBuf;
  99. pBuf = NULL;
  100. }
  101. }
  102. else
  103. {
  104. ctype = buff[i];
  105. frame_start = i;
  106. }
  107. }
  108. }
  109. buff = NULL;
  110. if (frame_start < rdc_data.len)
  111. {
  112. RDClient _newrd(rdc_data.Buf + frame_start, rdc_data.len - frame_start);
  113. rdc_data = _newrd;
  114. }
  115. else
  116. {
  117. rdc_data.len = 0;
  118. }
  119. }
  120. //数据帧解析
  121. while (ReadData.getFirst(rddata))
  122. {
  123. this->AddFrame(rddata.Buf, rddata.len);
  124. ReadData.removeFirst();
  125. }
  126. }
  127. break;
  128. default:
  129. CLogger::createInstance()->Log(eSoftError,
  130. "Exception for Reading and Parsing is undef NetType(%d),[%s %s %d]!"
  131. , netType
  132. , __FILE__, __FUNCTION__, __LINE__);
  133. break;
  134. }
  135. }
  136. #ifdef WIN32
  137. Sleep(10);
  138. #else
  139. usleep(10000);
  140. #endif
  141. }
  142. return 0;
  143. };
  144. int MySocketRD::AddFrame(const unsigned char *buf, int len)
  145. {
  146. if(NULL==buf)
  147. return 0;
  148. printf("rev:%s\r\n",(char*)buf);
  149. // TcpSocket ts;
  150. // memcpy(&ts,buf,len);
  151. // printf("rev:%u,%d,%0.2f,%s\r\n",ts.type,ts.len,ts.val,ts.desc);
  152. return 0;
  153. };

        MySocketWD.h

  1. #if _MSC_VER > 1000
  2. #pragma once
  3. #endif // _MSC_VER > 1000
  4. #ifndef _MYSOCKETWD_H_
  5. #define _MYSOCKETWD_H_
  6. /*
  7. *从缓存采集数据,向服务端发送数据
  8. */
  9. #ifdef WIN32
  10. #include "win32Thread.h"
  11. #endif
  12. #ifdef linux
  13. #include "myThread.h"
  14. #endif
  15. #include "DataDef.h"
  16. #include "queuedata.h"
  17. class MySocketClient;
  18. class MySocketWD : public MyThread
  19. {
  20. public:
  21. MySocketWD(void);
  22. virtual ~MySocketWD(void);
  23. void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
  24. int Run();
  25. int add_data(const char* buf, int len);
  26. int getBuffer(unsigned char * _buf);
  27. int getHeartBeatBuffer(unsigned char * buf);
  28. private:
  29. bool running;
  30. int netType;//数据读写处理类型
  31. unsigned int heartBeatWrite;
  32. MySocketClient *myPDataPrt;
  33. QueueData<TCP_Data> WriteData;
  34. };
  35. #endif

        MySocketWD.cpp

  1. #include "MySocketWD.h"
  2. #include "MySocketClient.h"
  3. #include "myFunc.h"
  4. #include "Log.h"
  5. #ifdef __linux__
  6. #include <stdexcept>
  7. #endif
  8. #define heartBeat_interval 10
  9. MySocketWD::MySocketWD(void)
  10. : running(true)
  11. , netType(1)
  12. , heartBeatWrite(static_cast<unsigned int>(time(NULL)))
  13. , myPDataPrt(NULL)
  14. {
  15. }
  16. MySocketWD::~MySocketWD(void)
  17. {
  18. running = false;
  19. }
  20. void MySocketWD::setPrivateDataPtr(MySocketClient *myPData, int _netType)
  21. {
  22. netType = _netType;
  23. if (NULL != myPData)
  24. {
  25. myPDataPrt = myPData;
  26. }
  27. }
  28. int MySocketWD::Run()
  29. {
  30. if (NULL == myPDataPrt )
  31. {
  32. CLogger::createInstance()->Log(eSoftError,
  33. "MySocketWD start fail for myPDataPrt or m_MonitorData is NULL"
  34. ",[%s %s %d]!"
  35. , __FILE__, __FUNCTION__, __LINE__);
  36. return 0;
  37. }
  38. while(running)
  39. {
  40. if (!myPDataPrt->isConnect())
  41. {
  42. myPDataPrt->reSetSocket();//read or write thread do it
  43. if (!myPDataPrt->isConnect())
  44. {
  45. #ifdef WIN32
  46. Sleep(1000);
  47. #else
  48. usleep(1000000);
  49. #endif
  50. }
  51. }
  52. else {
  53. //由读取进程去重新建立链接,写入线程只判定链接状态,进行数据写入
  54. unsigned char buf[512] = { 0 };
  55. int len = this->getBuffer(buf);
  56. if (len <= 0 && (heartBeatWrite+heartBeat_interval)<static_cast<unsigned int>(time(NULL)))
  57. {
  58. len = this->getHeartBeatBuffer(buf);
  59. }
  60. if (len > 0) {
  61. switch (netType)
  62. {
  63. case 1:
  64. {
  65. int ret = myPDataPrt->Write((const char*)buf, len);
  66. if (ret != len) {
  67. //printf("send data: %d, buf %d\n",len,ret);
  68. CLogger::createInstance()->Log(eTipMessage,
  69. "send point data: %d, buf %d. [%s %s %d]"
  70. , len, ret
  71. , __FILE__, __FUNCTION__, __LINE__);
  72. }
  73. else {
  74. heartBeatWrite = static_cast<unsigned int>(time(NULL));
  75. }
  76. }
  77. break;
  78. case 2:
  79. {
  80. int cacheLen = 2 * len + 1;
  81. unsigned char* _buf = new unsigned char[cacheLen];
  82. memset(_buf, 0, cacheLen);
  83. int nLen = PFunc::code(buf, len, _buf);//序列化处理
  84. int ret = myPDataPrt->Write((const char*)_buf, nLen);
  85. if (ret != nLen) {
  86. //printf("send data: %d, buf %d\n",len,ret);
  87. CLogger::createInstance()->Log(eTipMessage,
  88. "send point data: %d, buf %d. [%s %s %d]"
  89. , nLen, ret
  90. , __FILE__, __FUNCTION__, __LINE__);
  91. }
  92. else {
  93. heartBeatWrite = static_cast<unsigned int>(time(NULL));
  94. }
  95. delete[] _buf;
  96. _buf = NULL;
  97. }
  98. break;
  99. default:
  100. CLogger::createInstance()->Log(eConfigError,
  101. "Exception for Write data and unkown NetType(%d),[%s %s %d]!"
  102. , netType
  103. , __FILE__, __FUNCTION__, __LINE__);
  104. break;
  105. }
  106. }
  107. }
  108. #ifdef WIN32
  109. Sleep(1);
  110. #else
  111. usleep(1000);
  112. #endif
  113. }
  114. return 0;
  115. }
  116. int MySocketWD::add_data(const char* buf, int len)
  117. {
  118. if(len>0&&NULL!=buf){
  119. if(len>=512){
  120. printf("buf len is >=512!\r\n");
  121. }
  122. else
  123. {
  124. TCP_Data rdata((unsigned char*)buf, len);
  125. WriteData.add(rdata);
  126. return len;
  127. }
  128. }
  129. return 0;
  130. }
  131. int MySocketWD::getBuffer(unsigned char * _buf)
  132. {
  133. int ret = 0;
  134. TCP_Data data;
  135. if(WriteData.getFirst(data))
  136. {
  137. try{
  138. if (!WriteData.removeFirst())
  139. {
  140. #ifdef WIN32
  141. throw std::exception("removeFirst WData failed!");
  142. #else
  143. throw std::logic_error("removeFirst WData failed!");
  144. #endif
  145. }
  146. memcpy(_buf,data.Buf,data.len);
  147. ret = data.len;
  148. printf("send:%s\r\n",_buf);
  149. }
  150. catch (const std::exception& e)
  151. {
  152. CLogger::createInstance()->Log(eSoftError,
  153. "write item info to socket failed! have error[%s]. [%s %s %d]"
  154. , e.what()
  155. , __FILE__, __FUNCTION__, __LINE__);
  156. ret = -1;
  157. }
  158. catch (...)
  159. {
  160. //printf_s("write item info to socket failed! have error. \r\n");
  161. CLogger::createInstance()->Log(eSoftError,
  162. "write item info to socket failed! have error. [%s %s %d]"
  163. , __FILE__, __FUNCTION__, __LINE__);
  164. ret = -2;
  165. }
  166. }
  167. return ret;
  168. };
  169. int MySocketWD::getHeartBeatBuffer(unsigned char * buf)
  170. {
  171. if (NULL != buf)
  172. {
  173. int idx = 0;
  174. std::string cur_time_str = PFunc::getCurrentTime();
  175. char buf_[64]={0};
  176. sprintf(buf_,"HeartBeat:%s",cur_time_str.c_str());
  177. idx = (int)strlen(buf_);
  178. memcpy(buf,buf_,idx);
  179. return idx;
  180. }
  181. else
  182. {
  183. return 0;
  184. }
  185. };

        7.4 main.cpp

        srv_test/main.cpp

  1. #include "MySocket.h"
  2. #ifdef WIN32
  3. #include <windows.h>
  4. #else
  5. #include <unistd.h>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #endif
  9. const unsigned int PORT = 60008;
  10. //log conf
  11. char LOG_FILE_NAME[128] = "server_test.log";
  12. std::string logdir = "log";
  13. char SVCNAME[128] = "TCPServer_Srv";
  14. int main(int argc, char *argv[])
  15. {
  16. MySocket server_test(70001,1);
  17. char buf[]="hello, this is server!";
  18. // TcpSocket ts;
  19. // ts.len=(int)strlen(buf);
  20. // memcpy(ts.desc,buf,ts.len);
  21. // ts.type = 1;
  22. // ts.val = 10.0;
  23. while(1)
  24. {
  25. server_test.Write((const char*)buf,(int)strlen(buf));
  26. // server_test.Write((const char*)&ts,(int)sizeof(TcpSocket));
  27. #ifdef WIN32
  28. Sleep(10000);
  29. #else
  30. usleep(10000000);
  31. #endif
  32. }
  33. return 0;
  34. }

        /client_test/main.cpp

  1. #include "MySocket.h"
  2. #ifdef WIN32
  3. #include <windows.h>
  4. #else
  5. #include <unistd.h>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #endif
  9. //log conf
  10. char LOG_FILE_NAME[128] = "server_test.log";
  11. std::string logdir = "log";
  12. char SVCNAME[128] = "TCPServer_Srv";
  13. int main(int argc, char *argv[])
  14. {
  15. if(3!=argc)
  16. {
  17. printf("CMD prompt: client_test ip_addr port\r\n");
  18. }
  19. NetArg _netarg;
  20. _netarg.ipStr = std::string(argv[1]);
  21. _netarg.port = (int)atoi(argv[2]);
  22. _netarg.type = 1;
  23. MySocket client_test(1,_netarg);
  24. char buf[]="hello, this is client 01!";
  25. // TcpSocket ts;
  26. // ts.len=(int)strlen(buf);
  27. // memcpy(ts.desc,buf,ts.len);
  28. // ts.type = 1;
  29. // ts.val = 10.0;
  30. while(1)
  31. {
  32. client_test.Write((const char*)buf,(int)strlen(buf));
  33. // client_test.Write((const char*)&ts,(int)sizeof(TcpSocket));
  34. #ifdef WIN32
  35. Sleep(10000);
  36. #else
  37. usleep(10000000);
  38. #endif
  39. }
  40. return 0;
  41. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/678772
推荐阅读
相关标签
  

闽ICP备14008679号