当前位置:   article > 正文

C++中如何使用HP-Socket_hpsocket

hpsocket

简介

HP-Socket 是一套通用的高性能 TCP/UDP /HTTP 通信 框架 ,包含服务端组件、客户端组件和 Agent 组件,广泛适用于各种不同应用场景的 TCP/UDP /HTTP 通信系统,提供 C/C++ 、 C# 、 Delphi 、 E (易语言)、 Java 、 Python 等编程语言接口。

HP-Socket是一套国产的开源通讯库,使用C++语言实现,提供多种编程语言的接口,支持 Windows 和 Linux 平台:

HP-Socket包含30多个组件 ,可根据通信角色Client/Server)、通信协议TCP/UDP/HTTP)和接收模型PUSH/PULL/PACK)进行归类,这里只简单介绍一下:

  • Server组件:基于IOCP/EPOLL通信模型 ,并结合缓存池 、私有堆等技术实现高效内存管理,支持超大规模、高并发通信场景。
  • Agent组件:实质上是Multi-Client组件,与Server组件采用相同的技术架构,可同时建立和高效处理大规模Socket连接 。
  • Client组件:基于Event Select/POLL通信模型,每个组件对象创建一个通信线程并管理一个Socket连接, 适用于小规模客户端场景。
  • Thread Pool组件:HP-Socket实现的高效易用的线程池组件,当成普通的第三方线程池库使用即可。

HP-Socket的TCP组件支持PUSH、PULL和PACK三种接收模型:

  • PUSH模型:组件接收到数据时会触发监听器对象的OnReceive(pSender,dwConnID,pData,iLength)事件,把数据“推”给应用程序,这种模型使用起来是最自由的。
  • PULL模型:组件接收到数据时会触发监听器对象的OnReceive(pSender,dwConnID,iTotalLength)事件 ,告诉应用程序当前已经接收到多少数据,应用程序检查数据的长度,如果满足需要则调用组件的**Fetch(dwConnID,pData,iDataLength)方法把需
    要的数据“拉”出来。
  • PACK模型:PACK模型系列组件是PUSH和PULL模型的结合体,应用程序不必处理分包与数据抓取,组件保证每个OnReceive事件都向应用程序提供一个完整数据包。

注:PACK模型组件会对应用程序发送的每个数据包自动加上 4 字节(32位的包头),前10位为用于数据包校验的包头标识位,后22位为记录包体长度的长度位。

使用方式

HP-Socket支持MBCSUnicode字符集,支持32位和64位应用程序。可以通过源代码、 DLL或LIB方式使用HP-Socket。 HP-Socket发行包中已经提供了HPSocket DLL和HPSocket4C DLL。
HP-Socket提供了各种情况下的dll文件,不需要我们重新编译,dll文件按编程接口分为两大类:

  • HPSocket DLL:导出C++编程接口 ,C++程序的首选方式,使用时需要把SocketInterface.h(及其依赖文件HPTypeDef.h) 、HPSocket.h以及 DLL 对应的 *.lib 文件加入到工程项目,用到SSL组件还需要HPSocket-SSL.h文件。
  • HPSocket4C DLL:导出C编程接口,提供给C语言或其它编程语言使用,使用时需要把HPSocket4C.h以及 DLL 对应的 *.lib 文件加入到工程项目,用到SSL组件还需要HPSocket4C-SSL.h文件。

实现简单线程池

使用HP-Socket的线程池组件可以在程序中实现一个简单的、公用的线程池,TCP通讯的断线重连、发送心跳都会用到线程池。线程池组件的主要函数如下:

  • Start:启动线程池,具体的使用可以参考源代码的注释。
  • Submit:提交任务,主要使用BOOL Submit(fnTaskProc,pvArg,dwMaxWait=INFINITE),另一个函数重载是使用一个特殊的数据类型(把Socket任务参数和任务函数封装成一个数据结构)作为参数。
  • Stop:关闭线程池,参数dwMaxWait代表最大等待时间(毫秒,默认: INFINITE ,一直等待)。

先实现线程池的CHPThreadPoolListener接口,然后构造IHPThreadPool智能指针,后面线程池的操作都通过智能指针操作,代码如下:

  1. class CHPThreadPoolListenerImpl : public CHPThreadPoolListener
  2. {
  3. private:
  4. void LogInfo(string logStr)
  5. {
  6. cout <<"ThreadPool " <<logStr << endl;
  7. }
  8. public:
  9. virtual void OnStartup(IHPThreadPool* pThreadPool)
  10. {
  11. LogInfo("线程池启动");
  12. }
  13. virtual void OnShutdown(IHPThreadPool* pThreadPool)
  14. {
  15. LogInfo("线程池启动关闭");
  16. }
  17. virtual void OnWorkerThreadStart(IHPThreadPool* pThreadPool, THR_ID dwThreadID)
  18. {
  19. LogInfo("[" + to_string(dwThreadID) + "] " + "工作线程启动");
  20. }
  21. virtual void OnWorkerThreadEnd(IHPThreadPool* pThreadPool, THR_ID dwThreadID)
  22. {
  23. LogInfo("[" + to_string(dwThreadID) + "] " + "工作线程退出");
  24. }
  25. };
  26. CHPThreadPoolListenerImpl ThreadPoolListener;
  27. //全局共享变量使用extern关键字修饰
  28. extern CHPThreadPoolPtr ThreadPool(&ThreadPoolListener);

实现TCP客户端

先实现一个打印函数,显示客户端相关的信息,代码如下:

  1. void PrintInfo(ITcpClient* pSender, CONNID dwConnID)
  2. {
  3. char buffer[20];
  4. TCHAR* ipAddr = buffer;
  5. int ipLen;
  6. USHORT port;
  7. pSender->GetLocalAddress(ipAddr, ipLen, port);
  8. cout << string(ipAddr,0,ipLen) << ":" << port << " " << " [" << dwConnID << "] -> ";
  9. pSender->GetRemoteHost(ipAddr, ipLen, port);
  10. cout << string(ipAddr, 0, ipLen) << ":" << port << " ";
  11. }

实现CTcpClientListener监听接口,客户端断线后自动重连,以换行符分割接收到的字符串,代码如下:

  1. bool SysExit = false;
  2. void ReConnect(ITcpClient* pSender)
  3. {
  4. while (pSender->GetState() != SS_STOPPED)
  5. {
  6. Sleep(10);
  7. }
  8. pSender->Start("127.0.0.1", 60000);
  9. }
  10. class CClientListenerImpl : public CTcpClientListener
  11. {
  12. public:
  13. virtual EnHandleResult OnConnect(ITcpClient* pSender, CONNID dwConnID)
  14. {
  15. PrintInfo(pSender, dwConnID);
  16. cout << "连接成功" << endl;
  17. return HR_OK;
  18. }
  19. string resStr = "";
  20. string commStr="";
  21. virtual EnHandleResult OnReceive(ITcpClient* pSender, CONNID dwConnID, const BYTE* pData, int iLength)
  22. {
  23. string str((char*)pData,0, iLength);
  24. resStr.append(str);
  25. int index;
  26. while (true)
  27. {
  28. index = resStr.find("\r\n");
  29. if (index == -1)break;
  30. commStr = resStr.substr(0, index);
  31. resStr = resStr.substr(index +2, resStr.length() - (index +2));
  32. if (commStr!="")
  33. {
  34. PrintInfo(pSender, dwConnID);
  35. cout << "收到分割字符串 " << commStr << endl;
  36. }
  37. }
  38. PrintInfo(pSender, dwConnID);
  39. cout << "数据接受 " << str << endl;
  40. return HR_OK;
  41. }
  42. virtual EnHandleResult OnClose(ITcpClient* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode)
  43. {
  44. resStr = "";
  45. PrintInfo(pSender, dwConnID);
  46. cout << "连接断开,"<< enOperation <<"操作导致错误,错误码 " << iErrorCode<< endl;
  47. if (!SysExit)
  48. {
  49. ThreadPool->Submit((Fn_TaskProc)(&ReConnect), (PVOID)pSender);
  50. }
  51. return HR_OK;
  52. }
  53. };

循环输入字符串发送服务端,代码如下:

  1. int main()
  2. {
  3. //启动线程池
  4. ThreadPool->Start();
  5. CClientListenerImpl listener;
  6. CTcpClientPtr client(&listener);
  7. if (!client->Start("127.0.0.1", 60000))
  8. {
  9. cout << "连接错误:" << client->GetLastError() << "-" << client->GetLastErrorDesc();
  10. }
  11. string sendMsg;
  12. while (!SysExit)
  13. {
  14. cin >> sendMsg;
  15. if (sendMsg == "esc")
  16. {
  17. SysExit = true;
  18. break;
  19. }
  20. if (client->GetState() == SS_STARTED)
  21. {
  22. const BYTE* data = (BYTE*)(sendMsg.c_str());
  23. if (client->Send(data, sizeof(data)))
  24. {
  25. PrintInfo(client, client->GetConnectionID());
  26. cout << "发送成功 "<<sendMsg<<endl;
  27. }
  28. else
  29. {
  30. PrintInfo(client, client->GetConnectionID());
  31. cout << "发送失败,错误描述 " << client->GetLastError() << "-" << client->GetLastErrorDesc() << endl;
  32. }
  33. }
  34. else
  35. {
  36. PrintInfo(client, client->GetConnectionID());
  37. cout << "无法发送,当前状态 " <<client->GetState()<< endl;
  38. }
  39. }
  40. client->Stop();
  41. //关闭线程池
  42. ThreadPool->Stop();
  43. return 0;
  44. }

实现TCP服务端

先实现一个打印函数,基本上和客户端的相同,只有获取本地IP的地方不同,代码如下:

  1. void PrintInfo(ITcpServer* pSender, CONNID dwConnID)
  2. {
  3. char buffer[20];
  4. TCHAR* ipAddr = buffer;
  5. int ipLen;
  6. USHORT port;
  7. pSender->GetListenAddress(ipAddr, ipLen, port);
  8. cout << string(ipAddr, 0, ipLen) << ":" << port << " " << "<- [" << dwConnID << "] ";
  9. pSender->GetRemoteAddress(dwConnID, ipAddr, ipLen, port);
  10. cout << string(ipAddr, 0, ipLen) << ":" << port << " ";
  11. }

为了演示客户端和应用数据的绑定,定义一个用户数据类型并创建一个队列,代码如下:

  1. class UserData
  2. {
  3. public:
  4. UserData(string name="")
  5. {
  6. Name = name;
  7. }
  8. string Name;
  9. };
  10. queue<UserData*> qName; //创建队列对象

实现CTcpServerListener监听接口,收到字符串后加上用户名再发送回去,代码如下:

  1. class CTcpServerListenerImpl : public CTcpServerListener
  2. {
  3. public:
  4. virtual EnHandleResult OnAccept(ITcpServer* pSender, CONNID dwConnID, UINT_PTR soClient)
  5. {
  6. pSender->SetConnectionExtra(dwConnID,qName.front());
  7. qName.pop();
  8. PrintInfo(pSender, dwConnID);
  9. cout << "连接成功" << endl;
  10. return HR_OK;
  11. }
  12. virtual EnHandleResult OnReceive(ITcpServer* pSender, CONNID dwConnID, const BYTE* pData, int iLength)
  13. {
  14. string str((char*)pData, 0, iLength);
  15. PrintInfo(pSender, dwConnID);
  16. cout << "数据接受 " << str<<endl;
  17. PVOID pInfo = nullptr;
  18. pSender->GetConnectionExtra(dwConnID, &pInfo);
  19. str = "reply-" + ((UserData*)pInfo)->Name + str;
  20. const BYTE* data = (BYTE*)(str.c_str());
  21. pSender->Send(dwConnID, data,str.size());
  22. return HR_OK;
  23. }
  24. virtual EnHandleResult OnClose(ITcpServer* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode)
  25. {
  26. PVOID pInfo = nullptr;
  27. pSender->GetConnectionExtra(dwConnID, &pInfo);
  28. qName.push((UserData*)pInfo);
  29. PrintInfo(pSender, dwConnID);
  30. cout << "断开连接"<< endl;
  31. pSender->SetConnectionExtra(dwConnID, NULL);
  32. return HR_OK;
  33. }
  34. };

循环输入字符串发送到客户端,自动回复客户端发送的消息,代码如下:

  1. bool SysExit = false;
  2. int main()
  3. {
  4. UserData user1("NO1-User");
  5. UserData user2("NO2-User");
  6. UserData user3("NO3-User");
  7. UserData user4("NO4-User");
  8. qName.push(&user1);
  9. qName.push(&user2);
  10. qName.push(&user3);
  11. qName.push(&user4);
  12. CTcpServerListenerImpl listener;
  13. CTcpServerPtr server(&listener);
  14. if (!server->Start("127.0.0.1", 60000))
  15. {
  16. cout << "启动错误:" << server->GetLastError() << "-" << server->GetLastErrorDesc();
  17. }
  18. string sendMsg;
  19. while (!SysExit)
  20. {
  21. cin >> sendMsg;
  22. if (sendMsg == "esc")
  23. {
  24. SysExit = true;
  25. break;
  26. }
  27. //如果数组长度小于当前连接数量,则获取失败
  28. DWORD count= 1000;
  29. CONNID pIDs[1000];
  30. ZeroMemory(pIDs, 1000);;
  31. if (server->GetAllConnectionIDs(pIDs, count)&& count >0)
  32. {
  33. for (size_t i = 0; i < count; i++)
  34. {
  35. const BYTE* data = (BYTE*)(sendMsg.c_str());
  36. if (server->Send(*(pIDs+i),data, sendMsg.size()))
  37. {
  38. PrintInfo(server, pIDs[i]);
  39. cout << "发送成功 " << sendMsg << endl;
  40. }
  41. else
  42. {
  43. PrintInfo(server, pIDs[i]);
  44. cout << "发送失败,错误描述 " << server->GetLastError() << "-" << server->GetLastErrorDesc() << endl;
  45. }
  46. }
  47. }
  48. else
  49. {
  50. cout << "无法发送,当前连接数 " << count << endl;
  51. }
  52. }
  53. server->Stop();
  54. }

注:获取连接时指针数组的长度一定要大于当前连接数量,否则会失败。

实现Http客户端

HP-Socket的Http客户端有同步、异步两种,同步客户端不需要绑定监听器,这里使用同步客户端演示。

Sync Client:同步HTTP客户端组件(CHttpSyncClient和CHttpsSyncClient)内部会处理所有事件,因此,它们不需要绑定监听器(构造方法的监听器参数传入null); 如果绑定了监听器则可以跟踪组件的通信过程。

测试客户端可以使用上面的测试示例,当前的测试示例为:

http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json

直接开始测试,代码如下:

  1. int main()
  2. {
  3. CHttpSyncClientPtr SyncClient;
  4. THeader type;
  5. type.name = "Content-Type";
  6. type.value = "text/html;charset=UTF-8";
  7. if (SyncClient->OpenUrl("GET", "http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json",&type))
  8. {
  9. LPCBYTE pData = nullptr;
  10. int iLength = 0;
  11. SyncClient->GetResponseBody(&pData, &iLength);
  12. string body((char*)pData, iLength);
  13. //返回的有中文,需要转化编码格式
  14. cout << body << endl;
  15. cout << endl;
  16. cout << StringToUtf(body) << endl;
  17. cout << endl;
  18. cout << UtfToString(StringToUtf(body)) << endl;
  19. }
  20. else
  21. {
  22. cout << "打开失败:"<<SyncClient->GetLastError()<<"-"<< SyncClient->GetLastErrorDesc()<<endl;
  23. }
  24. }

上面的StringToUtfUtfToString函数是转载至,该函数实现UTF-8和ANSI编码格式的转化,代码如下:

  1. string UtfToString(string strValue)
  2. {
  3. int nwLen = ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), -1, NULL, 0);
  4. wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0'
  5. ZeroMemory(pwBuf, nwLen * 2 + 2);
  6. ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen);
  7. int nLen = ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, NULL, NULL, NULL);
  8. char* pBuf = new char[nLen + 1];
  9. ZeroMemory(pBuf, nLen + 1);
  10. ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);
  11. std::string retStr(pBuf);
  12. delete[]pwBuf;
  13. delete[]pBuf;
  14. pwBuf = NULL;
  15. pBuf = NULL;
  16. return retStr;
  17. }
  18. string StringToUtf(string strValue)
  19. {
  20. int nwLen = MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), -1, NULL, 0);
  21. wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0'
  22. memset(pwBuf, 0, nwLen * 2 + 2);
  23. MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen);
  24. int nLen = WideCharToMultiByte(CP_ACP, 0, pwBuf, -1, NULL, NULL, NULL, NULL);
  25. char* pBuf = new char[nLen + 1];
  26. memset(pBuf, 0, nLen + 1);
  27. WideCharToMultiByte(CP_ACP, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);
  28. std::string retStr = pBuf;
  29. delete[]pBuf;
  30. delete[]pwBuf;
  31. return retStr;
  32. }

注:函数实现需放在main函数之前。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/678793
推荐阅读
相关标签
  

闽ICP备14008679号