赞
踩
- #pragma once
- #include <iostream>
- #include <cassert>
- #include <queue>
- #include <pthread.h>
- #include <sys/prctl.h>
- #include "Log.hpp"
- #include "Lock.hpp"
- #include "Task.hpp"
-
- using namespace std;
-
- const int gThreadNum = 5;
-
- template <class T>
- class threadPool
- {
- private:
- threadPool(int num = gThreadNum) : threadNums(num), isStart(false)
- {
- assert(threadNums > 0);
- // 初始化互斥锁和条件变量
- pthread_mutex_init(&mutex, nullptr);
- pthread_cond_init(&cond, nullptr);
- }
- // 单例模式 禁止拷贝构造和赋值操作
- threadPool(const threadPool<T> &t) = delete;
- threadPool<T> operator=(const threadPool<T> &t) = delete;
-
- public:
- // 懒汉模式
- static threadPool<T> *getInstance()
- {
- // 初始化锁
- static Mutex m;
- if (nullptr == instance) // 双重判定空指针, 降低锁冲突的概率, 提高性能。
- {
- // RAII思想
- GuardLock guard(&m);
- if (nullptr == instance)
- instance = new threadPool<T>();
- }
- return instance;
- }
-
- ~threadPool()
- {
- pthread_mutex_destroy(&mutex);
- pthread_cond_destroy(&cond);
- }
-
- // 线程启动后执行的函数 传入了this指针
- static void *taskRoutine(void *args)
- {
- pthread_detach(pthread_self()); // 分离线程 自动回收
- threadPool<T> *tp = static_cast<threadPool<T> *>(args);
- // prctl(PR_SET_NAME, "follower");//修改线程名称
- while (1)
- {
- // 队列中拿取任务
- tp->lockQueue();
- while (!tp->haveTask())
- {
- // 等待条件信号量
- tp->waitForTask();
- }
- T t = tp->pop();
- tp->unlockQueue();
- t(); // 让指定的线程执行任务
- }
- }
-
- void start()
- {
- assert(!isStart);
- // 根据线程池中的线程数量创建线程
- for (size_t i = 0; i < threadNums; i++)
- {
- pthread_t temp; // 输出型参数 线程ID
- pthread_create(&temp, nullptr, taskRoutine, this);
- }
- isStart = true;
- }
-
- // 添加任务
- void push(const T &in)
- {
- lockQueue();
- taskQueue.push(in);
- // 唤醒条件信号量 任务队列里面来任务了,线程可以开始处理任务了
- choiceThreadForHandler();
- unlockQueue();
- }
-
- int getThreadNum()
- {
- return threadNums;
- }
-
- private:
- void lockQueue() { pthread_mutex_lock(&mutex); }
- void unlockQueue() { pthread_mutex_unlock(&mutex); }
- bool haveTask() { return !taskQueue.empty(); }
- void waitForTask() { pthread_cond_wait(&cond, &mutex); }
- void choiceThreadForHandler() { pthread_cond_signal(&cond); }
-
- // 获取任务
- T pop()
- {
- T temp = taskQueue.front();
- taskQueue.pop();
- return temp;
- }
-
- private:
- bool isStart;
- int threadNums;
- queue<T> taskQueue;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
-
- static threadPool<T> *instance;
- };
-
- template <class T>
- threadPool<T> *threadPool<T>::instance;
- #pragma once
- #include <iostream>
- #include <string>
- #include <functional>
- #include "Log.hpp"
-
- using namespace std;
-
- class task
- {
- public:
- // 创建函数对象
- using called_func = function<void(int, string, uint16_t)>;
-
- task(int s, string i, int p, called_func f) : sock(s), ip(i), port(p), func(f) {}
- ~task() {}
-
- // 重载operator()
- void operator()()
- {
- LogMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 开始啦...", pthread_self(), ip.c_str(), port);
- func(sock, ip, port);
- LogMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 结束啦...", pthread_self(), ip.c_str(), port);
- }
-
- private:
- string ip;
- uint16_t port;
- int sock;
- called_func func; // 回调函数
- };
- #include <iostream>
- #include <pthread.h>
-
- using namespace std;
-
- class Mutex
- {
- public:
- Mutex()
- {
- pthread_mutex_init(&_lock, nullptr);
- }
-
- void lock()
- {
- pthread_mutex_lock(&_lock);
- }
-
- void unlock()
- {
- pthread_mutex_unlock(&_lock);
- }
-
- ~Mutex()
- {
- pthread_mutex_destroy(&_lock);
- }
-
- private:
- pthread_mutex_t _lock;
- };
-
- class GuardLock
- {
- public:
- GuardLock(Mutex *mutex) : _mutex(mutex)
- {
- _mutex->lock();
- cout << "加锁成功..." << endl;
- }
-
- ~GuardLock()
- {
- _mutex->unlock();
- cout << "解锁成功..." << endl;
- }
-
- private:
- Mutex *_mutex;
- };
- #pragma once
-
- #include <cassert>
- #include <cstdarg>
- #include <cstring>
- #include <ctime>
- #include <cerrno>
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <fcntl.h>
- #include <unistd.h>
-
- #define DEBUG 0
- #define NOTICE 1
- #define WARINING 2
- #define FATAL 3
-
- #define LOGFILE "serverTcp.log"
-
- const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};
-
- // 创建serverTcp.log日志文件 将控制台输出全部重定向到文件中
- // 在Log类对象被销毁时,进行刷盘操作
- class Log
- {
- public:
- Log() : logFd(-1)
- {
- }
- void enable()
- {
- umask(0);
- logFd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
- assert(logFd > 0);
- dup2(logFd, 1);
- dup2(logFd, 2);
- }
- ~Log()
- {
- if (logFd > 0)
- {
- fsync(logFd);
- close(logFd);
- }
- }
-
- private:
- int logFd;
- };
-
- // 打印日志信息
- void LogMessage(int level, const char *format, ...)
- {
- assert(level >= DEBUG);
- assert(level <= FATAL);
-
- char *name = getenv("USER");
-
- char buff[1024];
- va_list v;
- va_start(v, format);
-
- vsnprintf(buff, sizeof(buff) - 1, format, v);
-
- va_end(v);
-
- FILE *out = (level == FATAL) ? stderr : stdout;
-
- // umask(0);
- // int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
- // assert(fd > 0);
-
- fprintf(out, "%s | %u | %s | %s\n",
- log_level[level],
- (unsigned int)time(nullptr),
- name == nullptr ? "unknow" : name,
- buff);
-
- // dup2(fd, 1);
- // dup2(fd, 2);
-
- fflush(out);
-
- // fsync(fd);
-
- // close(fd);
- }
- #include <signal.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <fcntl.h>
-
- void daemonize()
- {
- // 打开linux的垃圾桶
- int fd = open("/dev/null", O_WRONLY);
- // 忽略SIGPIPE
- signal(SIGPIPE, SIG_IGN);
- // 创建子进程,然后关闭其父进程
- if (fork() > 0)
- exit(1);
- //将由BASH 1号进程接管的子进程设置成为独立会话
- setsid();
-
- if (fd != -1)
- {
- dup2(fd, 0);
- dup2(fd, 1);
- dup2(fd, 2);
- if (fd > 2)
- close(fd);
- }
- }
- #pragma once
-
- #include <iostream>
- #include <string>
- #include <cstring>
- #include <cstdlib>
- #include <cassert>
- #include <ctype.h>
- #include <unistd.h>
- #include <strings.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
-
- using namespace std;
-
- #define SOCKET_ERR 1
- #define BIND_ERR 2
- #define LISTEN_ERR 3
- #define USAGE_ERR 4
- #define CONN_ERR 5
-
- #define BUFFER_SIZE 1024
TCPserver端
- #include "util.hpp"
- #include "Log.hpp"
- #include "ThreadPool.hpp"
- #include "daemonize.hpp"
- #include <signal.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- #include <pthread.h>
-
- class Server;
-
- class ThreadData
- {
- public:
- uint16_t clientPort_;
- std::string clinetIp_;
- int sock_;
- Server *this_;
-
- public:
- ThreadData(uint16_t port, std::string ip, int sock, Server *ts)
- : clientPort_(port), clinetIp_(ip), sock_(sock), this_(ts)
- {
- }
- };
-
- // 创建task时的传入函数
- void transService(int sock, const string &ip, int port)
- {
- assert(sock >= 0);
- assert(!ip.empty());
- assert(port >= 1024);
- char buffer[BUFFER_SIZE];
- while (true)
- {
- ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
- if (s > 0)
- {
- buffer[s] = '\0';
- if (strcasecmp(buffer, "quit") == 0)
- {
- LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
- break;
- }
- LogMessage(DEBUG, "trans before: %s[%d]>>> %s", ip.c_str(), port, buffer);
- for (int i = 0; i < s; i++)
- {
- if (isalpha(buffer[i]) && islower(buffer[i]))
- buffer[i] = toupper(buffer[i]);
- }
- LogMessage(DEBUG, "trans after: %s[%d]>>> %s", ip.c_str(), port, buffer);
- write(sock, buffer, strlen(buffer));
- }
- else if (s == 0)
- {
- LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
- break;
- }
- else
- {
- LogMessage(DEBUG, "%s[%d] - read: %s", ip.c_str(), port, strerror(errno));
- break;
- }
- }
- close(sock);
- LogMessage(DEBUG, "server close %d done", sock);
- }
- // 创建task时的传入函数
- void commandService(int sock, const string &ip, int port)
- {
- assert(sock >= 0);
- assert(!ip.empty());
- assert(port >= 1024);
- char buffer[BUFFER_SIZE];
- while (true)
- {
- ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
- if (s > 0)
- {
- buffer[s] = 0;
- LogMessage(DEBUG, "[%s:%d] exec [%s]", ip.c_str(), port, buffer);
- FILE *f = popen(buffer, "r");
- if (f == nullptr)
- {
- LogMessage(WARINING, "exec %s failed, beacuse: %s", buffer, strerror(errno));
- }
- char line[1024];
- while (fgets(line, sizeof(line), f) != nullptr)
- {
- write(sock, line, sizeof(line));
- }
- pclose(f);
- LogMessage(DEBUG, "[%s:%d] exec [%s] ... done", ip.c_str(), port, buffer);
- }
- else if (s == 0)
- {
- LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
- break;
- }
- else
- {
- LogMessage(DEBUG, "%s[%d] - read: %s", ip.c_str(), port, strerror(errno));
- break;
- }
- }
- close(sock);
- LogMessage(DEBUG, "server close %d done", sock);
- }
-
- class Server
- {
- public:
- Server(int port, string ip = "") : port_(port), ip_(ip), listenSock_(-1) {}
- ~Server() {}
-
- void init()
- {
- // 创建TCP套接字
- listenSock_ = socket(AF_INET, SOCK_STREAM, 0);
- if (listenSock_ < 0)
- {
- LogMessage(FATAL, "socket: %s", strerror(errno));
- exit(SOCKET_ERR);
- }
- LogMessage(DEBUG, "socket success: %s , %d", strerror(errno), listenSock_);
-
- struct sockaddr_in local;
- socklen_t len = sizeof(local);
- memset(&local, 0, sizeof(local));
- local.sin_port = htons(port_);
- local.sin_family = PF_INET;
- ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));
- // 绑定套接字
- if (bind(listenSock_, (sockaddr *)&local, len) < 0)
- {
- LogMessage(FATAL, "bind: %s", strerror(errno));
- exit(BIND_ERR);
- }
- LogMessage(DEBUG, "bind success : %s %d", strerror(errno), listenSock_);
- // 监听套接字 最大连接数5
- if (listen(listenSock_, 5) < 0)
- {
- LogMessage(FATAL, "listen: %s", strerror(errno));
- exit(LISTEN_ERR);
- }
- LogMessage(DEBUG, "listen succcess: %s , %d", strerror(errno), listenSock_);
-
- // 加载线程池
- tp = threadPool<task>::getInstance();
- }
-
- // static void *thradRoutine(void *args)
- // {
- // pthread_detach(pthread_self());
- // ThreadData *td = (ThreadData *)args;
- // td->this_->transService(td->sock_, td->clinetIp_, td->clientPort_);
- // delete td;
- // return nullptr;
- // }
-
- void loop()
- {
- // signal(SIGCHLD, SIG_IGN);
- // 初始化线程池 创建线程
- tp->start();
- LogMessage(DEBUG, "thread pool start success, thread num: %d", tp->getThreadNum());
-
- while (true)
- {
- struct sockaddr_in peer;
- socklen_t len = sizeof(peer);
- // 接收请求连接 serviceSock服务套接字
- int serviceSock = accept(listenSock_, (sockaddr *)&peer, &len);
- if (serviceSock < 0)
- {
- LogMessage(WARINING, "accept: %s , %d", strerror(errno), serviceSock);
- continue;
- }
- uint16_t peerPort = ntohs(peer.sin_port);
- string peerIp = inet_ntoa(peer.sin_addr);
- LogMessage(DEBUG, "accept success: %s | %s[%d], socket fd: %d",
- strerror(errno), peerIp.c_str(), peerPort, serviceSock);
-
- // demo 5 线程池
- // 5.1
- // task t(serviceSock, peerIp, peerPort, std::bind(&Server::transService, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- // tp->push(t);
- // 5.2
- // task t(serviceSock, peerIp, peerPort,transService);
- // tp->push(t);
- // 5.3
- // 创建任务
- task t(serviceSock, peerIp, peerPort, commandService);
- // 将任务放入任务队列中
- tp->push(t);
-
- // demo 4 多线程
- // ThreadData *td = new ThreadData(peerPort, peerIp, serviceSock, this);
- // pthread_t pt;
- // pthread_create(&pt, nullptr, thradRoutine, (void *)td);
-
- // demo3 多进程
- // pid_t id = fork();
- // if (id == 0)
- // {
- // close(listenSock_);
- // if (fork() > 0)
- // exit(0);
- // transService(serviceSock, peerIp, peerPort);
- // exit(0);
- // }
- // close(serviceSock);
- // pid_t ret = waitpid(id, nullptr, 0);
- // assert(ret > 0);
- // (void)ret;
-
- // demo2 多进程
- // pid_t id = fork();
- // if(id == 0){
- // close(listenSock_);
- // transService(serviceSock,peerIp,peerPort);
- // exit(0);
- // }
- // close(serviceSock);
- // 通过signal(SIGCHLD,SIG_IGN)来处理僵尸子进程
-
- // demo 1
- // transService(serviceSock, peerIp, peerPort);
-
- // LogMessage(DEBUG, "server 提供 service start ...");
- // sleep(1);
- }
- }
-
- // 5.1
- // void transService(int sock, const string &ip, int port)
- // {
- // assert(sock >= 0);
- // assert(!ip.empty());
- // assert(port >= 1024);
- // char buffer[BUFFER_SIZE];
- // while (true)
- // {
- // ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
- // if (s > 0)
- // {
- // buffer[s] = '\0';
- // if (strcasecmp(buffer, "quit") == 0)
- // {
- // LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
- // break;
- // }
- // LogMessage(DEBUG, "trans before: %s[%d]>>> %s", ip.c_str(), port, buffer);
- // for (int i = 0; i < s; i++)
- // {
- // if (isalpha(buffer[i]) && islower(buffer[i]))
- // buffer[i] = toupper(buffer[i]);
- // }
- // LogMessage(DEBUG, "trans after: %s[%d]>>> %s", ip.c_str(), port, buffer);
- // write(sock, buffer, strlen(buffer));
- // }
- // else if (s == 0)
- // {
- // LogMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);
- // break;
- // }
- // else
- // {
- // LogMessage(DEBUG, "%s[%d] - read: %s", ip.c_str(), port, strerror(errno));
- // break;
- // }
- // }
- // close(sock);
- // LogMessage(DEBUG, "server close %d done", sock);
- // }
-
- private:
- uint16_t port_;
- string ip_;
- int listenSock_;
- threadPool<task> *tp;
- };
-
- static void Usage(string s)
- {
- cout << "Usage: \n\t" << s << "port ip" << endl;
- cout << "ex: \n\t" << s << "8080 127.0.0.1" << endl;
- }
-
- int main(int args, char *argv[])
- {
- if (args != 2 && args != 3)
- {
- Usage(argv[0]);
- exit(USAGE_ERR);
- }
- uint16_t p = atoi(argv[1]);
- string i;
- if (args == 3)
- i = argv[2];
-
- daemonize();
-
- Log log;
- log.enable();
-
- Server s(p, i);
- s.init();
- s.loop();
-
- return 0;
- }
TCPclient端
- #include "util.hpp"
- #include "Log.hpp"
-
- static void Usage(string s)
- {
- cout << "Usage: \n\t" << s << "ip port" << endl;
- cout << "ex: \n\t" << s << "127.0.0.1 8080" << endl;
- }
-
- volatile bool flag = false;
-
- int main(int args, char *argv[])
- {
- if (args != 3)
- {
- Usage(argv[0]);
- exit(USAGE_ERR);
- }
- string ip = argv[1];
- uint16_t port = atoi(argv[2]);
-
- // 创建套接字
- int clientSock = socket(AF_INET, SOCK_STREAM, 0);
- if (clientSock < 0)
- {
- cerr << "socket: " << strerror(errno) << endl;
- exit(SOCKET_ERR);
- }
- struct sockaddr_in server;
- memset(&server, 0, sizeof(server));
- socklen_t len = sizeof(server);
- server.sin_port = htons(port);
- server.sin_family = AF_INET;
- inet_aton(ip.c_str(), &server.sin_addr);
-
- // 请求连接
- if (connect(clientSock, (const sockaddr *)&server, len) == -1)
- {
- cerr << "connect: " << strerror(errno) << endl;
- exit(SOCKET_ERR);
- }
- cout << "info : connect success: " << clientSock << endl;
-
- // 与服务器开始通信
- string message;
- while (!flag)
- {
- message.clear();
- cout << "Please Enter# ";
- getline(cin, message);
- if (strcasecmp(message.c_str(), "quit") == 0)
- flag = true;
- // 发送信息给服务器
- ssize_t s = write(clientSock, message.c_str(), message.size());
- // 发送成功
- if (s > 0)
- {
- message.resize(1024);
- // 读取服务器处理后发送过来的信息
- ssize_t s = read(clientSock, (char *)(message.c_str()), 1024);
- if (s > 0)
- message[s] = 0;
- cout << "Server Echo>>> " << message << endl;
- }
- // 发送失败
- else if (s <= 0)
- {
- break;
- }
- }
- // 关闭套接字
- close(clientSock);
-
- return 0;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。