赞
踩
在Linux系统中,使用C语言编程设计一个基于TCP/IP的多线程echo服务器程序,从而实现对客户端发送内容的响应。主要实现为一个面向连接的、多线程并发服务器,并由客户端程序发送信息。
修改多线程的echo服务器程序,其中客户端能将相应信息传输给服务器,服务器从客户端接收信息,并用将请求的信息内容应答客户端。同时要求一个服务器能够为多个客户并发提供服务,使之能够满足:
同下四-2
服务器:
//多线程服务器代码 //createby:jty /*------------------------------------------------------------------------ * 头文件 *------------------------------------------------------------------------ */ //TCPmetechod.c的头文件 #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <pthread.h> #include <sys/types.h> #include <sys/signal.h> #include <sys/socket.h> #include <sys/time.h> #include <sys/resource.h> #include <sys/wait.h> #include <sys/errno.h> #include <netinet/in.h> //errexit.c的头文件 #include <stdarg.h> //passivesock.c的头文件 #include <netdb.h> #include <errno.h> /*------------------------------------------------------------------------ * 宏定义 *------------------------------------------------------------------------ */ // #define WORKERS 2 //TCPmetechod.c的宏定义 #define QLEN 32 /* maximum connection queue length--最多连接的队列数量 */ #define BUFSIZE 4096 #define INTERVAL 5 /* secs--单独显示记录线程的等待时长 */ //线程池函数宏定义: //尾插法添加一个结点入队列 #define LL_ADD(item, list) do { \ if (list == NULL){ \ list=item; \ } \ else{ \ item->prev = list; \ while(list->next != NULL){ \ list = list->next; \ } \ list->next = item; \ item->next = item->prev; \ item->prev = list; \ list = item->next; \ item->next = NULL; \ } \ } while(0) //将一个结点从队列中删除 #define LL_REMOVE(item, list) do { \ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (item == list) list = item->next; \ item->prev = item->next = NULL; \ } while(0) /*------------------------------------------------------------------------ * 全局变量定义--copy from passivesock.c *------------------------------------------------------------------------ */ extern int errno; unsigned short portbase = 0; /* port base, for non-root servers */ /*------------------------------------------------------------------------ * 结构体定义--包含状态、线程池 *------------------------------------------------------------------------ */ //状态输出结构体 struct { pthread_mutex_t st_mutex; unsigned int st_concount; unsigned int st_contotal; unsigned long st_contime; unsigned long st_bytecount; } stats; typedef struct MANAGER ThreadPool; typedef struct JOB JOB; //执行队列结构体 struct WORKER { pthread_t thread; ThreadPool *pool; int terminate; //停止工作 struct WORKER *next; struct WORKER *prev; }; //任务队列结构体 struct JOB { void * (*func)(void *arg); //任务函数 void *user_data; //任务函数的参数 struct JOB *next; struct JOB *prev; }; //管理组件结构体 struct MANAGER { struct WORKER *workers; //执行队列 struct JOB *jobs; //任务队列 int num; pthread_cond_t jobs_cond; //任务队列条件等待变量 pthread_mutex_t jobs_mutex; //任务队列加锁 }; /*------------------------------------------------------------------------ * 函数的声明 *------------------------------------------------------------------------ */ //线程池引入函数 int threadPoolCreate(ThreadPool *pool, int numWorkers); int threadPoolDestory(ThreadPool *pool); void threadPoolPush(ThreadPool *pool, void *(*func)(void *arg), void *arg); //TCPmtechod.c中函数 void prstats(void); int TCPechod(int fd); int errexit(const char *format, ...); int passiveTCP(const char *service, int qlen); //passiveTCP.c中函数 int passivesock(const char *service, const char *transport, int qlen); /*------------------------------------------------------------------------ * main - Concurrent TCP server for ECHO service--main函数 *------------------------------------------------------------------------ */ int main(int argc, char *argv[]) { char *service = "echo"; /* service name or port number */ struct sockaddr_in fsin; /* the address of a client */ unsigned int alen; /* length of client's address */ int msock; /* master server socket */ int ssock; /* slave server socket */ //判断输入合法 switch (argc) { case 1: break; case 2: service = argv[1]; break; default: errexit("usage: TCPechod [port]\n"); } msock = passiveTCP(service, QLEN); //创建线程池 ThreadPool pool; int workers = WORKERS; int ret = threadPoolCreate(&pool,workers); if(ret < 0){ fprintf(stdout, "threadpool_create failed!\n"); return ret; } while (1) { alen = sizeof(fsin); ssock = accept(msock, (struct sockaddr *)&fsin, &alen); if(ssock < 0){ if(errno == EINTR) continue; errexit("accept: %s\n", strerror(errno)); } threadPoolPush(&pool,TCPechod, ssock); } threadPoolDestory(&pool); } /*------------------------------------------------------------------------ * threadCallback --线程的初始化--入口函数 *------------------------------------------------------------------------ */ static void* threadCallback(void *args) { struct WORKER *worker = (struct WORKER*)args; //worker有两种状态:执行或是等待 while (1) { //条件等待加锁 pthread_mutex_lock(&worker->pool->jobs_mutex); //若任务队列为空 while(worker->pool->jobs == NULL){ //终止信号 if(worker->terminate) break; //条件等待 pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex); } //终止信号退出 if(worker->terminate){ //条件等待解锁 pthread_mutex_unlock(&worker->pool->jobs_mutex); break; } //打印提示信息 printf("\n~~~~~~~~~~~~~~~~Thread %lu is awake!~~~~~~~~~~~~~~~~\n\n",worker->thread); struct JOB *job = worker->pool->jobs; LL_REMOVE(job, worker->pool->jobs); //条件等待解锁 pthread_mutex_unlock(&worker->pool->jobs_mutex); //执行响应函数 job->func(job->user_data); worker->pool->num--; } free(worker); pthread_exit(NULL); //线程退出 } /*------------------------------------------------------------------------ * threadPoolCreate --线程池的初始化 *------------------------------------------------------------------------ */ int threadPoolCreate(ThreadPool *pool, int numWorkers) { pool->num = 0; pthread_t th; pthread_attr_t ta; //线程属性 (void) pthread_attr_init(&ta); (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED); (void) pthread_mutex_init(&stats.st_mutex, 0); //输出显示线程创建 if (pthread_create(&th, &ta, (void * (*)(void *))prstats, 0) < 0) errexit("pthread_create(prstats): %s\n", strerror(errno)); if(numWorkers < 1) numWorkers = 1; if(pool == NULL) return -1; memset(pool, 0, sizeof(ThreadPool)); pool->workers=NULL; pool->jobs=NULL; //初始化条件变量和互斥锁 //结构体赋值--内存拷贝 pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t)); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t)); fprintf(stdout, "\n--------------------------------------------------------------------------------\n"); //初始化执行队列 int i = 0; for (i = 0; i < numWorkers; i++) { struct WORKER *worker = (struct WORKER *)malloc(sizeof(struct WORKER)); if (worker == NULL) { perror("malloc"); return -2; } memset(worker, 0, sizeof(struct WORKER)); worker->next=NULL; worker->prev=NULL; worker->pool = pool; //线程指针变量,属性设置为空,线程执行函数,传入执行函数的参数 int ret = pthread_create(&worker->thread, &ta, threadCallback, worker); if (ret) { perror("pthread_create"); free(worker); return -3; } fprintf(stdout, "worker thread: %lu\n", worker->thread); //形成一个执行队列 LL_ADD(worker, pool->workers); } fprintf(stdout, "--------------------------------------------------------------------------------\n"); return 0; } /*------------------------------------------------------------------------ * threadPoolPush --创建任务队列--往线程池里加入一个任务 *------------------------------------------------------------------------ */ void threadPoolPush(ThreadPool *pool, void *(*func)(void *arg), void *arg) { //创建一个任务 JOB *job=(JOB*)malloc(sizeof(JOB)); if(job==NULL){ perror("malloc"); exit(1); } memset(job, 0, sizeof(JOB)); job->func = func; job->user_data = arg; job->next=NULL; job->prev=NULL; //将任务加入到线程池中 pthread_mutex_lock(&pool->jobs_mutex); if(job!=NULL) LL_ADD(job, pool->jobs); pool->num++; if(pool->num > WORKERS){ printf("\n~~~~~~~~~~~~~~~~Waiting client number is %d.~~~~~~~~~~~~~~~~\n\n",(pool->num - WORKERS)); fflush(stdout); } else{ pthread_cond_signal(&pool->jobs_cond); //唤醒一个线程 } pthread_mutex_unlock(&pool->jobs_mutex); } /*------------------------------------------------------------------------ * threadPoolDestory --将所有线程都删除 *------------------------------------------------------------------------ */ int threadPoolDestory(ThreadPool *pool) { struct WORKER *worker = NULL; for (worker = pool->workers; worker != NULL; worker = worker->next){ worker->terminate = 1; } //唤醒所有线程 pthread_mutex_lock(&pool->jobs_mutex); int ret = pthread_cond_broadcast(&pool->jobs_cond); pthread_mutex_unlock(&pool->jobs_mutex); return ret; } /*------------------------------------------------------------------------ * TCPechod - echo data until end of file--线程运行函数 *------------------------------------------------------------------------ */ int TCPechod(int fd) { time_t start; char buf[BUFSIZ]; int cc; start = time(0); (void) pthread_mutex_lock(&stats.st_mutex); stats.st_concount++; (void) pthread_mutex_unlock(&stats.st_mutex); memset(buf,0,sizeof(buf)); while (cc = read(fd, buf, sizeof buf)) { if (cc < 0) errexit("echo read: %s\n", strerror(errno)); if (write(fd, buf, cc) <= 0) errexit("echo write: %s\n", strerror(errno)); (void) pthread_mutex_lock(&stats.st_mutex); stats.st_bytecount += cc; (void) pthread_mutex_unlock(&stats.st_mutex); memset(buf,0,sizeof(buf)); } (void) close(fd); (void) pthread_mutex_lock(&stats.st_mutex); stats.st_contime += time(0) - start; stats.st_concount--; stats.st_contotal++; (void) pthread_mutex_unlock(&stats.st_mutex); return 0; } /*------------------------------------------------------------------------ * prstats - print server statistical data--在服务器端每隔5秒显示一下历史数据 *------------------------------------------------------------------------ */ void prstats(void) { time_t now; while (1) { (void) sleep(INTERVAL); (void) pthread_mutex_lock(&stats.st_mutex); now = time(0); (void) printf("--- %s", ctime(&now)); (void) printf("%-32s: %u\n", "Current connections", stats.st_concount); (void) printf("%-32s: %u\n", "Completed connections", stats.st_contotal); if (stats.st_contotal) { (void) printf("%-32s: %.2f (secs)\n", "Average complete connection time", (float)stats.st_contime / (float)stats.st_contotal); (void) printf("%-32s: %.2f\n", "Average byte count", (float)stats.st_bytecount / (float)(stats.st_contotal + stats.st_concount)); } (void) printf("%-32s: %lu\n\n", "Total byte count", stats.st_bytecount); (void) pthread_mutex_unlock(&stats.st_mutex); } } /*------------------------------------------------------------------------ * errexit - print an error message and exit--错误展示函数 *------------------------------------------------------------------------ */ int errexit(const char *format, ...) { va_list args; va_start(args, format); vfprintf(stderr, format, args); va_end(args); exit(1); } /*------------------------------------------------------------------------ * passivesock - allocate & bind a server socket using TCP or UDP--Socket *------------------------------------------------------------------------ */ int passivesock(const char *service, const char *transport, int qlen) /* * Arguments: * service - service associated with the desired port * transport - transport protocol to use ("tcp" or "udp") * qlen - maximum server request queue length */ { struct servent *pse; /* pointer to service information entry */ struct protoent *ppe; /* pointer to protocol information entry*/ struct sockaddr_in sin; /* an Internet endpoint address */ int s, type; /* socket descriptor and socket type */ memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; /* Map service name to port number */ if ( pse = getservbyname(service, transport) ) sin.sin_port = htons(ntohs((unsigned short)pse->s_port) + portbase); else if ((sin.sin_port=htons((unsigned short)atoi(service))) == 0) errexit("can't get \"%s\" service entry\n", service); /* Map protocol name to protocol number */ if ( (ppe = getprotobyname(transport)) == 0) errexit("can't get \"%s\" protocol entry\n", transport); /* Use protocol to choose a socket type */ if (strcmp(transport, "udp") == 0) type = SOCK_DGRAM; else type = SOCK_STREAM; /* Allocate a socket */ s = socket(PF_INET, type, ppe->p_proto); if (s < 0) errexit("can't create socket: %s\n", strerror(errno)); /* Bind the socket */ if (bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) errexit("can't bind to %s port: %s\n", service, strerror(errno)); if (type == SOCK_STREAM && listen(s, qlen) < 0) errexit("can't listen on %s port: %s\n", service, strerror(errno)); return s; } /*------------------------------------------------------------------------ * passiveTCP - create a passive socket for use in a TCP server--TCP传输 *------------------------------------------------------------------------ */ int passiveTCP(const char *service, int qlen) /* * Arguments: * service - service associated with the desired port * qlen - maximum server request queue length */ { return passivesock(service, "tcp", qlen); }
客户端:
#include <stdio.h> #include <string.h> #include<stdlib.h> #include<sys/socket.h> #include<sys/types.h> #include<netinet/in.h> #include<unistd.h> #include<arpa/inet.h> #include<netdb.h> int main(int argc,char *argv[]) { int sockfd; int confd; struct hostent *he; //找到IP地址 struct sockaddr_in addr_ser; //服务器的地址 struct sockaddr_in addr_file; //服务器发来的文件地址 char sendMsg[1024],recvMsg[1024]; if(argc!=3){ //如果没有写服务器的连接 perror("Please input the server port!\n"); exit(1); } he=gethostbyname((char *)argv[1]); //通过域名获取IP地址 if(he==NULL){ //如果获取IP地址失败 perror("Cannot get host by name!\n"); exit(1); } sockfd=socket(AF_INET,SOCK_STREAM,0); //创建socket套接字 if(sockfd==-1){ //如果创建套接字失败 perror("Create socketfd failed!\n"); exit(1); } memset(&addr_ser,0,sizeof(addr_ser)); addr_ser.sin_family = AF_INET; addr_ser.sin_port = htons((unsigned short)atoi(argv[2])); addr_ser.sin_addr = *((struct in_addr *) he->h_addr); confd = connect(sockfd, (struct sockaddr *)&addr_ser, sizeof(addr_ser)); //客户端连接服务器 if(confd == -1){ //如果连接失败 perror("Connectfd error!\n"); exit(1); } while(1){ //循环使一个客户端不停的发送消息直到退出,同时建立新的连接 printf("--------------------------------------------------\n"); memset(sendMsg,0,sizeof(sendMsg)); printf("Please input what you want to send:\0"); scanf("%s",sendMsg); if(strncmp(sendMsg,"EXIT",4)==0){ break; } int n1; n1=write(sockfd,sendMsg,strlen(sendMsg)); if(n1==0){ perror("client send wrong:"); break; } memset(recvMsg,0,sizeof(recvMsg)); int n2; n2=read(sockfd,recvMsg,1024); if(n2==-1){ perror("client recv wrong:"); break; } printf("Receive from server:%s\n",recvMsg); fflush(stdout); } close(sockfd); return 0; }
此实验中,设计最大的线程数量为2,开启一个服务器和四个客户端进行测试。
编译服务器server,打开服务器,输入端口号——输出创建的两个线程的线程号
等待5秒钟,prstats进程打印相关的服务器连接和echo信息的数据——由于没有客户端连接,所以结果为0,0,0
打开一个客户端,输入要连接的IP地址和端口号——服务器端显示分配给该响应的线程号
输入响应信息——服务器端输出已连接的客户端个数,客户端收到服务器的回声信息
打开第二个客户端,输入要连接的IP地址和端口号——服务器端显示分配给该响应的线程号
在第二个客户端输入响应信息,并启动第三个客户端——客户端2收到响应信息,服务器端显示两个连接和等待连接的客户端个数1
在第三个客户端中输入信息——服务器无响应
启动第四个客户端——服务器端显示两个连接和等待连接的客户端个数2
在第四个客户端中输入信息——服务器无响应
退出第一个客户端——第三个客户端马上收到回应,服务器端显示被分配的线程号,并展示有2个客户端正在连接,1个已经结束连接,平均连接时长、传输比特数等等。
在第三个客户端中测试输入信息——可以成功从服务器端收获回应信息
退出第二个客户端——第四个客户端马上收到回应,服务器端显示被分配的线程号,并展示有2个客户端正在连接,2个已经结束连接,平均连接时长、传输比特数等等。
在第四个客户端中测试输入信息——可以成功从服务器端收获回应信息
喜欢这篇文章的话【点赞】、【收藏】下呗!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。