当前位置:   article > 正文

一个简易的基于线程池的TCP的任务处理

一个简易的基于线程池的TCP的任务处理
  • 基于队列的线程池
  1. #pragma once
  2. #include <iostream>
  3. #include <cassert>
  4. #include <queue>
  5. #include <pthread.h>
  6. #include <sys/prctl.h>
  7. #include "Log.hpp"
  8. #include "Lock.hpp"
  9. #include "Task.hpp"
  10. using namespace std;
  11. const int gThreadNum = 5;
  12. template <class T>
  13. class threadPool
  14. {
  15. private:
  16. threadPool(int num = gThreadNum) : threadNums(num), isStart(false)
  17. {
  18. assert(threadNums > 0);
  19. // 初始化互斥锁和条件变量
  20. pthread_mutex_init(&mutex, nullptr);
  21. pthread_cond_init(&cond, nullptr);
  22. }
  23. // 单例模式 禁止拷贝构造和赋值操作
  24. threadPool(const threadPool<T> &t) = delete;
  25. threadPool<T> operator=(const threadPool<T> &t) = delete;
  26. public:
  27. // 懒汉模式
  28. static threadPool<T> *getInstance()
  29. {
  30. // 初始化锁
  31. static Mutex m;
  32. if (nullptr == instance) // 双重判定空指针, 降低锁冲突的概率, 提高性能。
  33. {
  34. // RAII思想
  35. GuardLock guard(&m);
  36. if (nullptr == instance)
  37. instance = new threadPool<T>();
  38. }
  39. return instance;
  40. }
  41. ~threadPool()
  42. {
  43. pthread_mutex_destroy(&mutex);
  44. pthread_cond_destroy(&cond);
  45. }
  46. // 线程启动后执行的函数 传入了this指针
  47. static void *taskRoutine(void *args)
  48. {
  49. pthread_detach(pthread_self()); // 分离线程 自动回收
  50. threadPool<T> *tp = static_cast<threadPool<T> *>(args);
  51. // prctl(PR_SET_NAME, "follower");//修改线程名称
  52. while (1)
  53. {
  54. // 队列中拿取任务
  55. tp->lockQueue();
  56. while (!tp->haveTask())
  57. {
  58. // 等待条件信号量
  59. tp->waitForTask();
  60. }
  61. T t = tp->pop();
  62. tp->unlockQueue();
  63. t(); // 让指定的线程执行任务
  64. }
  65. }
  66. void start()
  67. {
  68. assert(!isStart);
  69. // 根据线程池中的线程数量创建线程
  70. for (size_t i = 0; i < threadNums; i++)
  71. {
  72. pthread_t temp; // 输出型参数 线程ID
  73. pthread_create(&temp, nullptr, taskRoutine, this);
  74. }
  75. isStart = true;
  76. }
  77. // 添加任务
  78. void push(const T &in)
  79. {
  80. lockQueue();
  81. taskQueue.push(in);
  82. // 唤醒条件信号量 任务队列里面来任务了,线程可以开始处理任务了
  83. choiceThreadForHandler();
  84. unlockQueue();
  85. }
  86. int getThreadNum()
  87. {
  88. return threadNums;
  89. }
  90. private:
  91. void lockQueue() { pthread_mutex_lock(&mutex); }
  92. void unlockQueue() { pthread_mutex_unlock(&mutex); }
  93. bool haveTask() { return !taskQueue.empty(); }
  94. void waitForTask() { pthread_cond_wait(&cond, &mutex); }
  95. void choiceThreadForHandler() { pthread_cond_signal(&cond); }
  96. // 获取任务
  97. T pop()
  98. {
  99. T temp = taskQueue.front();
  100. taskQueue.pop();
  101. return temp;
  102. }
  103. private:
  104. bool isStart;
  105. int threadNums;
  106. queue<T> taskQueue;
  107. pthread_mutex_t mutex;
  108. pthread_cond_t cond;
  109. static threadPool<T> *instance;
  110. };
  111. template <class T>
  112. threadPool<T> *threadPool<T>::instance;
  • 简单的任务类(通过回调函数创建并处理任务)
  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <functional>
  5. #include "Log.hpp"
  6. using namespace std;
  7. class task
  8. {
  9. public:
  10. // 创建函数对象
  11. using called_func = function<void(int, string, uint16_t)>;
  12. task(int s, string i, int p, called_func f) : sock(s), ip(i), port(p), func(f) {}
  13. ~task() {}
  14. // 重载operator()
  15. void operator()()
  16. {
  17. LogMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 开始啦...", pthread_self(), ip.c_str(), port);
  18. func(sock, ip, port);
  19. LogMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 结束啦...", pthread_self(), ip.c_str(), port);
  20. }
  21. private:
  22. string ip;
  23. uint16_t port;
  24. int sock;
  25. called_func func; // 回调函数
  26. };
  • 工具类(互斥锁类、打印日志文件类、创建守护进程类)
    1. #include <iostream>
    2. #include <pthread.h>
    3. using namespace std;
    4. class Mutex
    5. {
    6. public:
    7. Mutex()
    8. {
    9. pthread_mutex_init(&_lock, nullptr);
    10. }
    11. void lock()
    12. {
    13. pthread_mutex_lock(&_lock);
    14. }
    15. void unlock()
    16. {
    17. pthread_mutex_unlock(&_lock);
    18. }
    19. ~Mutex()
    20. {
    21. pthread_mutex_destroy(&_lock);
    22. }
    23. private:
    24. pthread_mutex_t _lock;
    25. };
    26. class GuardLock
    27. {
    28. public:
    29. GuardLock(Mutex *mutex) : _mutex(mutex)
    30. {
    31. _mutex->lock();
    32. cout << "加锁成功..." << endl;
    33. }
    34. ~GuardLock()
    35. {
    36. _mutex->unlock();
    37. cout << "解锁成功..." << endl;
    38. }
    39. private:
    40. Mutex *_mutex;
    41. };
    1. #pragma once
    2. #include <cassert>
    3. #include <cstdarg>
    4. #include <cstring>
    5. #include <ctime>
    6. #include <cerrno>
    7. #include <stdio.h>
    8. #include <stdlib.h>
    9. #include <sys/types.h>
    10. #include <sys/stat.h>
    11. #include <fcntl.h>
    12. #include <unistd.h>
    13. #define DEBUG 0
    14. #define NOTICE 1
    15. #define WARINING 2
    16. #define FATAL 3
    17. #define LOGFILE "serverTcp.log"
    18. const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};
    19. // 创建serverTcp.log日志文件 将控制台输出全部重定向到文件中
    20. // 在Log类对象被销毁时,进行刷盘操作
    21. class Log
    22. {
    23. public:
    24. Log() : logFd(-1)
    25. {
    26. }
    27. void enable()
    28. {
    29. umask(0);
    30. logFd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
    31. assert(logFd > 0);
    32. dup2(logFd, 1);
    33. dup2(logFd, 2);
    34. }
    35. ~Log()
    36. {
    37. if (logFd > 0)
    38. {
    39. fsync(logFd);
    40. close(logFd);
    41. }
    42. }
    43. private:
    44. int logFd;
    45. };
    46. // 打印日志信息
    47. void LogMessage(int level, const char *format, ...)
    48. {
    49. assert(level >= DEBUG);
    50. assert(level <= FATAL);
    51. char *name = getenv("USER");
    52. char buff[1024];
    53. va_list v;
    54. va_start(v, format);
    55. vsnprintf(buff, sizeof(buff) - 1, format, v);
    56. va_end(v);
    57. FILE *out = (level == FATAL) ? stderr : stdout;
    58. // umask(0);
    59. // int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
    60. // assert(fd > 0);
    61. fprintf(out, "%s | %u | %s | %s\n",
    62. log_level[level],
    63. (unsigned int)time(nullptr),
    64. name == nullptr ? "unknow" : name,
    65. buff);
    66. // dup2(fd, 1);
    67. // dup2(fd, 2);
    68. fflush(out);
    69. // fsync(fd);
    70. // close(fd);
    71. }
    1. #include <signal.h>
    2. #include <unistd.h>
    3. #include <stdlib.h>
    4. #include <sys/types.h>
    5. #include <sys/stat.h>
    6. #include <fcntl.h>
    7. void daemonize()
    8. {
    9. // 打开linux的垃圾桶
    10. int fd = open("/dev/null", O_WRONLY);
    11. // 忽略SIGPIPE
    12. signal(SIGPIPE, SIG_IGN);
    13. // 创建子进程,然后关闭其父进程
    14. if (fork() > 0)
    15. exit(1);
    16. //将由BASH 1号进程接管的子进程设置成为独立会话
    17. setsid();
    18. if (fd != -1)
    19. {
    20. dup2(fd, 0);
    21. dup2(fd, 1);
    22. dup2(fd, 2);
    23. if (fd > 2)
    24. close(fd);
    25. }
    26. }
    1. #pragma once
    2. #include <iostream>
    3. #include <string>
    4. #include <cstring>
    5. #include <cstdlib>
    6. #include <cassert>
    7. #include <ctype.h>
    8. #include <unistd.h>
    9. #include <strings.h>
    10. #include <sys/types.h>
    11. #include <sys/socket.h>
    12. #include <netinet/in.h>
    13. #include <arpa/inet.h>
    14. using namespace std;
    15. #define SOCKET_ERR 1
    16. #define BIND_ERR 2
    17. #define LISTEN_ERR 3
    18. #define USAGE_ERR 4
    19. #define CONN_ERR 5
    20. #define BUFFER_SIZE 1024

    TCPserver端

    1. #include "util.hpp"
    2. #include "Log.hpp"
    3. #include "ThreadPool.hpp"
    4. #include "daemonize.hpp"
    5. #include <signal.h>
    6. #include <sys/types.h>
    7. #include <sys/wait.h>
    8. #include <pthread.h>
    9. class Server;
    10. class ThreadData
    11. {
    12. public:
    13. uint16_t clientPort_;
    14. std::string clinetIp_;
    15. int sock_;
    16. Server *this_;
    17. public:
    18. ThreadData(uint16_t port, std::string ip, int sock, Server *ts)
    19. : clientPort_(port), clinetIp_(ip), sock_(sock), this_(ts)
    20. {
    21. }
    22. };
    23. // 创建task时的传入函数
    24. void transService(int sock, const string &ip, int port)
    25. {
    26. assert(sock >= 0);
    27. assert(!ip.empty());
    28. assert(port >= 1024);
    29. char buffer[BUFFER_SIZE];
    30. while (true)
    31. {
    32. ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
    33. if (s > 0)
    34. {
    35. buffer[s] = '\0';
    36. if (strcasecmp(buffer, "quit") == 0)
    37. {
    38. LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
    39. break;
    40. }
    41. LogMessage(DEBUG, "trans before: %s[%d]>>> %s", ip.c_str(), port, buffer);
    42. for (int i = 0; i < s; i++)
    43. {
    44. if (isalpha(buffer[i]) && islower(buffer[i]))
    45. buffer[i] = toupper(buffer[i]);
    46. }
    47. LogMessage(DEBUG, "trans after: %s[%d]>>> %s", ip.c_str(), port, buffer);
    48. write(sock, buffer, strlen(buffer));
    49. }
    50. else if (s == 0)
    51. {
    52. LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
    53. break;
    54. }
    55. else
    56. {
    57. LogMessage(DEBUG, "%s[%d] - read: %s", ip.c_str(), port, strerror(errno));
    58. break;
    59. }
    60. }
    61. close(sock);
    62. LogMessage(DEBUG, "server close %d done", sock);
    63. }
    64. // 创建task时的传入函数
    65. void commandService(int sock, const string &ip, int port)
    66. {
    67. assert(sock >= 0);
    68. assert(!ip.empty());
    69. assert(port >= 1024);
    70. char buffer[BUFFER_SIZE];
    71. while (true)
    72. {
    73. ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
    74. if (s > 0)
    75. {
    76. buffer[s] = 0;
    77. LogMessage(DEBUG, "[%s:%d] exec [%s]", ip.c_str(), port, buffer);
    78. FILE *f = popen(buffer, "r");
    79. if (f == nullptr)
    80. {
    81. LogMessage(WARINING, "exec %s failed, beacuse: %s", buffer, strerror(errno));
    82. }
    83. char line[1024];
    84. while (fgets(line, sizeof(line), f) != nullptr)
    85. {
    86. write(sock, line, sizeof(line));
    87. }
    88. pclose(f);
    89. LogMessage(DEBUG, "[%s:%d] exec [%s] ... done", ip.c_str(), port, buffer);
    90. }
    91. else if (s == 0)
    92. {
    93. LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
    94. break;
    95. }
    96. else
    97. {
    98. LogMessage(DEBUG, "%s[%d] - read: %s", ip.c_str(), port, strerror(errno));
    99. break;
    100. }
    101. }
    102. close(sock);
    103. LogMessage(DEBUG, "server close %d done", sock);
    104. }
    105. class Server
    106. {
    107. public:
    108. Server(int port, string ip = "") : port_(port), ip_(ip), listenSock_(-1) {}
    109. ~Server() {}
    110. void init()
    111. {
    112. // 创建TCP套接字
    113. listenSock_ = socket(AF_INET, SOCK_STREAM, 0);
    114. if (listenSock_ < 0)
    115. {
    116. LogMessage(FATAL, "socket: %s", strerror(errno));
    117. exit(SOCKET_ERR);
    118. }
    119. LogMessage(DEBUG, "socket success: %s , %d", strerror(errno), listenSock_);
    120. struct sockaddr_in local;
    121. socklen_t len = sizeof(local);
    122. memset(&local, 0, sizeof(local));
    123. local.sin_port = htons(port_);
    124. local.sin_family = PF_INET;
    125. ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));
    126. // 绑定套接字
    127. if (bind(listenSock_, (sockaddr *)&local, len) < 0)
    128. {
    129. LogMessage(FATAL, "bind: %s", strerror(errno));
    130. exit(BIND_ERR);
    131. }
    132. LogMessage(DEBUG, "bind success : %s %d", strerror(errno), listenSock_);
    133. // 监听套接字 最大连接数5
    134. if (listen(listenSock_, 5) < 0)
    135. {
    136. LogMessage(FATAL, "listen: %s", strerror(errno));
    137. exit(LISTEN_ERR);
    138. }
    139. LogMessage(DEBUG, "listen succcess: %s , %d", strerror(errno), listenSock_);
    140. // 加载线程池
    141. tp = threadPool<task>::getInstance();
    142. }
    143. // static void *thradRoutine(void *args)
    144. // {
    145. // pthread_detach(pthread_self());
    146. // ThreadData *td = (ThreadData *)args;
    147. // td->this_->transService(td->sock_, td->clinetIp_, td->clientPort_);
    148. // delete td;
    149. // return nullptr;
    150. // }
    151. void loop()
    152. {
    153. // signal(SIGCHLD, SIG_IGN);
    154. // 初始化线程池 创建线程
    155. tp->start();
    156. LogMessage(DEBUG, "thread pool start success, thread num: %d", tp->getThreadNum());
    157. while (true)
    158. {
    159. struct sockaddr_in peer;
    160. socklen_t len = sizeof(peer);
    161. // 接收请求连接 serviceSock服务套接字
    162. int serviceSock = accept(listenSock_, (sockaddr *)&peer, &len);
    163. if (serviceSock < 0)
    164. {
    165. LogMessage(WARINING, "accept: %s , %d", strerror(errno), serviceSock);
    166. continue;
    167. }
    168. uint16_t peerPort = ntohs(peer.sin_port);
    169. string peerIp = inet_ntoa(peer.sin_addr);
    170. LogMessage(DEBUG, "accept success: %s | %s[%d], socket fd: %d",
    171. strerror(errno), peerIp.c_str(), peerPort, serviceSock);
    172. // demo 5 线程池
    173. // 5.1
    174. // task t(serviceSock, peerIp, peerPort, std::bind(&Server::transService, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
    175. // tp->push(t);
    176. // 5.2
    177. // task t(serviceSock, peerIp, peerPort,transService);
    178. // tp->push(t);
    179. // 5.3
    180. // 创建任务
    181. task t(serviceSock, peerIp, peerPort, commandService);
    182. // 将任务放入任务队列中
    183. tp->push(t);
    184. // demo 4 多线程
    185. // ThreadData *td = new ThreadData(peerPort, peerIp, serviceSock, this);
    186. // pthread_t pt;
    187. // pthread_create(&pt, nullptr, thradRoutine, (void *)td);
    188. // demo3 多进程
    189. // pid_t id = fork();
    190. // if (id == 0)
    191. // {
    192. // close(listenSock_);
    193. // if (fork() > 0)
    194. // exit(0);
    195. // transService(serviceSock, peerIp, peerPort);
    196. // exit(0);
    197. // }
    198. // close(serviceSock);
    199. // pid_t ret = waitpid(id, nullptr, 0);
    200. // assert(ret > 0);
    201. // (void)ret;
    202. // demo2 多进程
    203. // pid_t id = fork();
    204. // if(id == 0){
    205. // close(listenSock_);
    206. // transService(serviceSock,peerIp,peerPort);
    207. // exit(0);
    208. // }
    209. // close(serviceSock);
    210. // 通过signal(SIGCHLD,SIG_IGN)来处理僵尸子进程
    211. // demo 1
    212. // transService(serviceSock, peerIp, peerPort);
    213. // LogMessage(DEBUG, "server 提供 service start ...");
    214. // sleep(1);
    215. }
    216. }
    217. // 5.1
    218. // void transService(int sock, const string &ip, int port)
    219. // {
    220. // assert(sock >= 0);
    221. // assert(!ip.empty());
    222. // assert(port >= 1024);
    223. // char buffer[BUFFER_SIZE];
    224. // while (true)
    225. // {
    226. // ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
    227. // if (s > 0)
    228. // {
    229. // buffer[s] = '\0';
    230. // if (strcasecmp(buffer, "quit") == 0)
    231. // {
    232. // LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
    233. // break;
    234. // }
    235. // LogMessage(DEBUG, "trans before: %s[%d]>>> %s", ip.c_str(), port, buffer);
    236. // for (int i = 0; i < s; i++)
    237. // {
    238. // if (isalpha(buffer[i]) && islower(buffer[i]))
    239. // buffer[i] = toupper(buffer[i]);
    240. // }
    241. // LogMessage(DEBUG, "trans after: %s[%d]>>> %s", ip.c_str(), port, buffer);
    242. // write(sock, buffer, strlen(buffer));
    243. // }
    244. // else if (s == 0)
    245. // {
    246. // LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
    247. // break;
    248. // }
    249. // else
    250. // {
    251. // LogMessage(DEBUG, "%s[%d] - read: %s", ip.c_str(), port, strerror(errno));
    252. // break;
    253. // }
    254. // }
    255. // close(sock);
    256. // LogMessage(DEBUG, "server close %d done", sock);
    257. // }
    258. private:
    259. uint16_t port_;
    260. string ip_;
    261. int listenSock_;
    262. threadPool<task> *tp;
    263. };
    264. static void Usage(string s)
    265. {
    266. cout << "Usage: \n\t" << s << "port ip" << endl;
    267. cout << "ex: \n\t" << s << "8080 127.0.0.1" << endl;
    268. }
    269. int main(int args, char *argv[])
    270. {
    271. if (args != 2 && args != 3)
    272. {
    273. Usage(argv[0]);
    274. exit(USAGE_ERR);
    275. }
    276. uint16_t p = atoi(argv[1]);
    277. string i;
    278. if (args == 3)
    279. i = argv[2];
    280. daemonize();
    281. Log log;
    282. log.enable();
    283. Server s(p, i);
    284. s.init();
    285. s.loop();
    286. return 0;
    287. }

    TCPclient端

    1. #include "util.hpp"
    2. #include "Log.hpp"
    3. static void Usage(string s)
    4. {
    5. cout << "Usage: \n\t" << s << "ip port" << endl;
    6. cout << "ex: \n\t" << s << "127.0.0.1 8080" << endl;
    7. }
    8. volatile bool flag = false;
    9. int main(int args, char *argv[])
    10. {
    11. if (args != 3)
    12. {
    13. Usage(argv[0]);
    14. exit(USAGE_ERR);
    15. }
    16. string ip = argv[1];
    17. uint16_t port = atoi(argv[2]);
    18. // 创建套接字
    19. int clientSock = socket(AF_INET, SOCK_STREAM, 0);
    20. if (clientSock < 0)
    21. {
    22. cerr << "socket: " << strerror(errno) << endl;
    23. exit(SOCKET_ERR);
    24. }
    25. struct sockaddr_in server;
    26. memset(&server, 0, sizeof(server));
    27. socklen_t len = sizeof(server);
    28. server.sin_port = htons(port);
    29. server.sin_family = AF_INET;
    30. inet_aton(ip.c_str(), &server.sin_addr);
    31. // 请求连接
    32. if (connect(clientSock, (const sockaddr *)&server, len) == -1)
    33. {
    34. cerr << "connect: " << strerror(errno) << endl;
    35. exit(SOCKET_ERR);
    36. }
    37. cout << "info : connect success: " << clientSock << endl;
    38. // 与服务器开始通信
    39. string message;
    40. while (!flag)
    41. {
    42. message.clear();
    43. cout << "Please Enter# ";
    44. getline(cin, message);
    45. if (strcasecmp(message.c_str(), "quit") == 0)
    46. flag = true;
    47. // 发送信息给服务器
    48. ssize_t s = write(clientSock, message.c_str(), message.size());
    49. // 发送成功
    50. if (s > 0)
    51. {
    52. message.resize(1024);
    53. // 读取服务器处理后发送过来的信息
    54. ssize_t s = read(clientSock, (char *)(message.c_str()), 1024);
    55. if (s > 0)
    56. message[s] = 0;
    57. cout << "Server Echo>>> " << message << endl;
    58. }
    59. // 发送失败
    60. else if (s <= 0)
    61. {
    62. break;
    63. }
    64. }
    65. // 关闭套接字
    66. close(clientSock);
    67. return 0;
    68. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/293959
推荐阅读
相关标签
  

闽ICP备14008679号