赞
踩
事件包括:(自己网络编程的时候耦合太高)
1.连接的建立 3次握手
2.连接断开 4次挥手
3.消息的到达 read()
4.消息发送完毕 write()
1.注册感兴趣的事件-event_add(我们需要写的)
2.事件管理器检测事件的种类-
3.同步的分派异步的请求处理-callback(我们需要写的)
1.平台无关
2.将网络io转化为事件的处理
3.忽略具体的参数细节,io函数的细节,errno的返回值等
4.对具体事件的封装(事件举例:网络io事件、定时事件、信号事件)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <pthread.h> #include <errno.h> #include <sys/epoll.h> #define BUFFER_LENGTH 1024 #define LISTEN_PORT 100 struct sockitem { // int sockfd; int (*callback)(int fd, int events, void *arg); char recvbuffer[BUFFER_LENGTH]; // char sendbuffer[BUFFER_LENGTH]; int rlength; int slength; }; // mainloop / eventloop --> epoll --> struct reactor { int epfd; struct epoll_event events[512]; }; struct reactor *eventloop = NULL; int recv_cb(int fd, int events, void *arg); int send_cb(int fd, int events, void *arg) { struct sockitem *si = (struct sockitem*)arg; send(fd, si->sendbuffer, si->slength, 0); // struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; //ev.data.fd = clientfd; si->sockfd = fd; si->callback = recv_cb; ev.data.ptr = si; epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev); } // ./epoll 8080 //接受数据 int recv_cb(int fd, int events, void *arg) { //int clientfd = events[i].data.fd; struct sockitem *si = (struct sockitem*)arg; struct epoll_event ev; //char buffer[1024] = {0}; int ret = recv(fd, si->recvbuffer, BUFFER_LENGTH, 0); if (ret < 0) { //若readbuffer满了,就会返回EAGAIN if (errno == EAGAIN || errno == EWOULDBLOCK) { // return -1; } else { } ev.events = EPOLLIN; //ev.data.fd = fd; epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev); close(fd); free(si); } else if (ret == 0) { //若等于0表示连接断开了 // printf("disconnect %d\n", fd); ev.events = EPOLLIN; //ev.data.fd = fd; epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev); close(fd); free(si); } else { printf("Recv: %s, %d Bytes\n", si->recvbuffer, ret); si->rlength = ret; memcpy(si->sendbuffer, si->recvbuffer, si->rlength); si->slength = si->rlength; struct epoll_event ev; ev.events = EPOLLOUT | EPOLLET; //ev.data.fd = clientfd; si->sockfd = fd; si->callback = send_cb; ev.data.ptr = si; epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev); } } int accept_cb(int fd, int events, void *arg) { struct sockaddr_in client_addr; memset(&client_addr, 0, sizeof(struct sockaddr_in)); socklen_t client_len = sizeof(client_addr); int clientfd = accept(fd, (struct sockaddr*)&client_addr, &client_len); if (clientfd <= 0) return -1; char str[INET_ADDRSTRLEN] = {0}; printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port)); struct epoll_event ev; ev.events = EPOLLIN | EPOLLET;//连接建立完之后就打开响应的读事件 //ev.data.fd = clientfd; struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem)); si->sockfd = clientfd; si->callback = recv_cb;//如果客户端给服务器发数据,我们就调用这个回调函数 ev.data.ptr = si; epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, clientfd, &ev); return clientfd; } int init_sock(short port) { //1.创建socket int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { return -1; } //2.绑定具体的端口 struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { return -2; } //3.监听套接字 if (listen(sockfd, 5) < 0) { return -3; } printf ("listen port : %d\n", port); return sockfd; } //EPOLLIN 读事件,EPOLLOUT写事件、EPOLLERR网络出错事件,EPOLLHUP:写端和读端都关闭了,也就是连接关闭了 int main(int argc, char *argv[]) { //1.获取端口参数并创建eventloop,创建监听套接字 if (argc < 2) { return -1; } int port = atoi(argv[1]); eventloop = (struct reactor*)malloc(sizeof(struct reactor)); // epoll opera eventloop->epfd = epoll_create(1); int i = 0; for (i = 0;i < LISTEN_PORT;i ++) { int sockfd = init_sock(port+i); struct epoll_event ev; ev.events = EPOLLIN; struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem)); si->sockfd = sockfd; si->callback = accept_cb;//监听套接字的回调函数是accepte_cb,监听套接字有消息过来就调用accept_cb函数 ev.data.ptr = si; //把listenfd交给epoll管理 epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev); } while (1) { int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1); if (nready < -1) { break; } int i = 0; for (i = 0;i < nready;i ++) { if (eventloop->events[i].events & EPOLLIN) { //printf("sockitem\n"); struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr; si->callback(si->sockfd, eventloop->events[i].events, si); } if (eventloop->events[i].events & EPOLLOUT) { struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr; si->callback(si->sockfd, eventloop->events[i].events, si); } } } }
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <pthread.h> #include <errno.h> #include <sys/epoll.h> #include <event.h> #include <time.h> #include <event2/listener.h> #include <event2/bufferevent.h> void socket_read_callback(struct bufferevent *bev,void* arg) { char msg[4096]; //解耦合之后,不在这里处理连接的断开 size_t len = bufferevent_read(bev,msg,sizeof(msg)-1); msg[len] = '\0'; printf("server read the data %s\n",msg); char reply[4096] = "recvieced msg: "; strcat(reply + strlen(reply),msg); bufferevent_write(bev,reply,strlen(reply)); } void socket_event_callback(struct bufferevent *bev,short events,void* arg) { if(events & BEV_EVENT_EOF) //BEV_EVENT_EOF: 1.read=0,2.write=-1&errno=EPIPE,3.epoll的EPOLLHUP printf("connection closed\n"); else if(events & BEV_EVENT_ERROR) //网络错误 printf("some other error\n"); bufferevent_free(bev); //操作相当于close对应的fd } //连接的建立 void listener_callback(struct evconnlistener *listener,evutil_socket_t fd, struct sockaddr sock,int socklen,void* arg ) { printf("accept a client fd:%d\n",fd); struct event_base *base = (struct event_base*)arg;//上下文已经取出了相应的event //创建用户态的读写缓冲区对象bev(意义:由于读一次可能读不完) struct bufferevent *bev = bufferevent_socket_new(base,fd,BEV_OPT_CLOSE_ON_FREE); //设置的响应的读事件回调函数,socket_read_callback //socket_event_callback是连接的断开回调函数 bufferevent_setcb(bev,socket_read_callback,NULL,socket_event_callback,NULL); bufferevent_enable(bev,EV_READ|EV_PERSIST); //作用:注册读事件,跟epoll_ctl的作用一样;EV_PERSIST表示注册后不会被移除,不加注册一次后就会被移除 } //定时事件的回调函数 static void do_timer(int fd,short events,void* arg) { struct event* timer = (struct event*)arg; timer_t now = time(NULL); printf("do_timer %s",(char*)ctime(&now)); struct timeval tv = {1,0}; event_add(timer,&tv); } //编译指令 //gcc evmain.c -o evmain -levent int main(int argc,char*argv[]) { //1、创建结构体 struct sockaddr_in sin; memset(&sin,0,sizeof(struct sockaddr_in)); sin.sin_family = AF_INET; sin.sin_port = htons(8989); //2.初始化事件管理器 struct event_bash *base = event_bash_new(); //首先初始化一个事件管理器 //3.连接的建立,accept流程的处理封装成一个对象 //参数:第一个:选择那个事件管理器;第二个:提供一个回调函数,这个相当于accept_cb处理连接的建立; //第三个:LEV_OPT_REUSEABLE,如果不设置的化,端口连接不上(设置端口在服务器重启后可重用);LEV_OPT_CLOSE_ON_FREE:当客户端断开的时候,程序自动帮我们把fd给close掉 struct evconnlistener *listener = evconnlistener_new_bind(base,listener_callback,base, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, 10,(struct sockaddr*)&sin, sizeof(struct sockaddr_in)); // //对于服务端而言有三种事件:1.网络io事件,2.定时事件(比如有些事件需要延迟处理,或是每两秒执行一次),3.信号事件(例如KILL -9) //信号举例:log系统,写日志文件写不进去了,但是调试fd是可用的,原因:日志被重定向了,解决原理:由于日志被重定向了,出现异常的时候,内核当中以信号的方式通知应用程序,应用程序需要捕获这个信号,fd按重定向重新打开一下 struct event evtimer; //定时事件 struct timeval tv = {1,0}; //参数:第一个是秒,第二个是微秒;总体意思是每秒执行一次函数 event_set(&evtimer,-1,0,do_timer,&evtimer); //把任务设置为一个定时事件,do_timer是定时事件的回调函数,第四个参数为上下文参数 event_bash_set(base,&evtimer); //再将具体的event绑定到事件处理器base上面 event_add(&evtimer,&tv); //注册事件管理器 //总结:1秒之后事件管理器会以同步的方式派发出去,调用回调函数 //事件循环 event_base_dispatch(base); //检测事件+事件派发也就是调用响应的callback evconnlistener_free(listener); event_bash_free(base); return 0; }
//略
//memcached.c
//thread.c
初始化 libevent;对应理解 epoll_create
创建事件,初始化event和相应的回调函数
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)
(evutil_socket_t, short, void *), void *arg)
--arg是传给回调函数的参数
events:监听的事件
#define EV_TIMEOUT 0x01 //超时事件
#define EV_READ 0x02 //读事件
#define EV_WRITE 0x04 //写事件
#define EV_SIGNAL 0x08 //信号事件
#define EV_PERSIST 0x10 //周期性触发,不加上这个,表示只能触发一次
#define EV_ET //边缘触发模式
cb 回调函数,原型如下:
typedef void (*event_callback_fn)(evutil_socket_t fd,short events,void *arg);
void
event_set(struct event *ev, evutil_socket_t fd, short events,
void (*callback)(evutil_socket_t, short, void *), void *arg)
建立 event 与 event_base 的映射关系;
int event_base_set(struct event_base *eb, struct event *ev)
int
event_add(struct event *ev, const struct timeval *tv)
补充:tv可填NULL,表示永久监听,或填写固定的时间来限时等待
int
event_del(struct event *ev)
int
event_base_loop(struct event_base *base, int flags)
flags的取值
#define EVLOOP_ONCE 0x01
只触发一次,如果事件没有呗触发,阻塞等待
#defien EVLOOP_NONBLOCK 0x02
非阻塞方式检测事件是否被触发,不管事件触发与否,都会立即返回
(大多数情况下都会使用另外一个api:int event_base_dispatch)
补充
int event_base_loopexit(struct event_base* base, const struct timeval* tv);
等待一段事件后退出
int event_base_loopbreak(struct event_base* base);
立即退出
- int bufferevent_write(struct bufferevent* bufev, const void* data, size_t size)
将data数据写到bufferevent的写缓冲区- int bufferevent_write_buffer(struct bufferevent buffev , struct evbuffer buf);
将数据写到缓冲区的另外一个写法,实际上bufferevent的内部的两个缓冲区结构就是struct evbuffer- int bufferevent_read(struct bufferevent* bufev, void* data, size_t size);
将bufferevent的读缓冲区数据读到data中,同时将独到的1数据从bufferevent的读缓冲区清除- int bufferevent_read_buffer(struct bufferevent* bufev , struct evbuffer* buf)
跟上个函数一样
struct evconnlistener* evconnlistener_new_bind(
struct event_base* base, //base根节点
evconnlistener_cb cb, //提取cfd后调用的回调
void* ptr, //传给回调的函数
unsigned flags, //
LEV_OPT_CLOSE_ON_FREE 关闭时自动释放
LEV_OPT_REUSEABLE 端口重用
LEV_OPT_THREADSAFE 分配锁,线程安全
LEV_OPT_LEAVE_SOCKETS_BLOCKING 文件描述符为阻塞的的int backlog,
const struct sockaddr* sa,
int socklen
)
#define evsignal_new(b , x , cb ,arg)
event_new( (b) , (x) ——》x的放的就是具体的信号EV_SIGNAL|EV_PERSIST, (cb) ,(arg) )
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
用户可以在调用bufferevent_socket_new函数时,传一个-1作为socket的文件描述符,然后调用bufferevent_socket_connect函数连接服务器,无需自己写代码调用connect函数连接服务器。
bufferevent_socket_connect函数会调用socket函数申请一个套接字fd,然后把这个fd设置成非阻塞。接着就connect服务器,因为该socket fd是非阻塞的,所以不会等待,而是马上返回,连接这工作交给内核来完成。所以,返回后这个socket还没有真正连接上服务器。那么什么时候连接上呢?内核又是怎么通知通知用户呢?
一般来说,当可以往socket fd可写,那就说明已经连接上了。也就是说这个socket fd变成可写状态,就连接上了。
所以,对于“非阻塞connect”比较流行的做法是:用select或者poll这类多路IO复用函数监听该socket的可写事件。当这个socket触发了可写事件,然后再对这个socket调用getsockopt函数,做进一步的判断即可。
/* 直接连接 */ int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *sa, int socklen) { struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bev, struct bufferevent_private, bev); evutil_socket_t fd; int r = 0; int result=-1; int ownfd = 0; _bufferevent_incref_and_lock(bev); if (!bufev_p) goto done; fd = bufferevent_getfd(bev);//初始化设定fd小于0 if (fd < 0) {//该bufferevent还没有设置fd if (!sa) goto done; fd = socket(sa->sa_family, SOCK_STREAM, 0);//创建套接字,并将其设定为非阻塞 if (fd < 0) goto done; if (evutil_make_socket_nonblocking(fd)<0)//设置为非阻塞 goto done; ownfd = 1; } if (sa) { r = evutil_socket_connect(&fd, sa, socklen);//非阻塞调用connect连接服务器,不会等待,而是马上返回,连接工作交给内核来完成。 if (r < 0)//小于则错误 /* 0-EINPROGRESS或EINTR 正在连接 1-都在本机,连接成功 2-refuse拒绝连接 <0 错误 */ goto freesock; } //删除旧事件,并将事件和新的fd对应,并加入到epoll监听可写。 bufferevent_setfd(bev, fd); if (r == 0) {//返回值等于0,则握手开始,但是还没有连接上,必须监听可写 //此时需要监听可写事件,当可写了,并且没有错误的话,就成功连接上了 if (! be_socket_enable(bev, EV_WRITE)) {//将可写加入epoll监听 bufev_p->connecting = 1;//epoll返回即可写了 result = 0; goto done; } } else if (r == 1) {//当服务器和客户机处于同一个主机,connect直接返回可能发生。 /* The connect succeeded already. How very BSD of it. */ result = 0; bufev_p->connecting = 1; event_active(&bev->ev_write, EV_WRITE, 1); } else { /* The connect failed already. How very BSD of it. */ bufev_p->connection_refused = 1; bufev_p->connecting = 1; result = 0; event_active(&bev->ev_write, EV_WRITE, 1); } goto done; freesock: _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); if (ownfd) evutil_closesocket(fd); /* do something about the error? */ done: _bufferevent_decref_and_unlock(bev); return result; } /* XXX we should use an enum here. */ /* 2 for connection refused, 1 for connected, 0 for not yet, -1 for error. */ int evutil_socket_connect(evutil_socket_t *fd_ptr, struct sockaddr *sa, int socklen) { int made_fd = 0; if (*fd_ptr < 0) { if ((*fd_ptr = socket(sa->sa_family, SOCK_STREAM, 0)) < 0) goto err; made_fd = 1; if (evutil_make_socket_nonblocking(*fd_ptr) < 0) { goto err; } } if (connect(*fd_ptr, sa, socklen) < 0) {//调用系统connect函数 int e = evutil_socket_geterror(*fd_ptr);//获取socket错误 if (EVUTIL_ERR_CONNECT_RETRIABLE(e))//EINTR或EINPROGRESS则正在握手 return 0; if (EVUTIL_ERR_CONNECT_REFUSED(e)) return 2; goto err; } else { return 1;//返回 >=0 则表示连接成功,UNPv1有详细说明。客户和服务器位于本机则可能立即返回成功。 } err: if (made_fd) { evutil_closesocket(*fd_ptr); *fd_ptr = -1; } return -1; }
void bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg)
eg. bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
函数说明
启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,bufferevent是不会读写数据的
函数原型
int bufferevent_enable(struct bufferevent *bufev, short event)
eg. bufferevent_enable(bev, EV_READ|EV_WRITE);
①创建base:event_base_new()
②创建节点event_new
③节点注册event_add
④注销事件event_del
⑤释放节点event_free
①创建套接字:
②绑定
③监听
④创建event_base根节点(从这里开始调用libevent接口)
⑤初始化根节点ifd
⑥注册事件
⑦循环监听
⑧收尾 eventcb
①创建event_base管理器
struct event_base* event_base_new(void);
②创建新的节点:对已经存在的socket创建bufferevent事件,可用于后面讲到的链接监听器的回调函数中
函数:struct bufferevent* bufferevent_socket_new(struct event_base* base, evutil_socket_t fd, int options);
参数说明:
base:对应根节点
fd: 文件描述符
options ->bufferevent的选项
EV_OPT_CLOSE_ON_FREE 释放bufferevent自动关闭底层接口
BEV_OPT_THREADSAFE 能够在多线程下是安全的③设置节点对应文件描述符事件触发的回调函数,设置完就相当于注册读、写事件,但是还需要另外一个API把注册的读写回调函数生效
函数:void bufferevent_setcb(struct bufferevent* bufev,
bufferevent_data_cb readcb,
bufferevent_data_cb writecb,
bufferevent_event_cb eventcb,
void* cbarg
);
参数说明:
eventcb-》异常回调函数,比如说对端断开了
事件:
BEV_EVENT_EOF 对方关闭连接
BEV_EVENT_ERROR 出错
BEV_EVENT_TIMEOUT 超时
BEV_EVENT_CONNECTED 建立连接
void* cbarg -》传给回调函数的参数④使注册的读写回调函数生效
bufferevent_enable(struct bufferevent* bufev, short events);
bufferevent_disable(struct bufferevent* bufev,short events);
events包括:
EV_READ
EV_WRITE
⑤封装底层socket的connect接口,通过调用此函数,将bufferevent事件与通信的socket进行绑定
函数:int bufferevent_socket_connect(struct bufferevent* bev, struct sockaddr* serv ,int socklen);
参数:
bev 需要提前初始化的bufferevent事件
serv 对端的IP地址、端口、协议的结构指针
sockLen 描述serv的长度⑥开始事件循环
event_base_dispatch(base);
/*aasasdasdasdfsdfsd56165156561asdasdasd This example program provides a trivial server program that listens for TCP connections on port 9995. When they arrive, it writes a short message to each client connection, and closes each connection once it is flushed. Where possible, it exits cleanly in response to a SIGINT (ctrl-c). */ #include <string.h> #include <errno.h> #include <stdio.h> #include <signal.h> #ifndef _WIN32 #include <netinet/in.h> # ifdef _XOPEN_SOURCE_EXTENDED # include <arpa/inet.h> # endif #include <sys/socket.h> #endif #include <event2/bufferevent.h> #include <event2/buffer.h> #include <event2/listener.h> #include <event2/util.h> #include <event2/event.h> //编译指令 //gcc hello_world.c -levent static const char MESSAGE[] = "Hello, World!\n"; static const unsigned short PORT = 9995; static void listener_cb(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *); static void conn_writecb(struct bufferevent *, void *); static void conn_eventcb(struct bufferevent *, short, void *); static void signal_cb(evutil_socket_t, short, void *); static void conn_readcb(struct bufferevent* , void*); int main(int argc, char **argv) { //0.初始化一些变量 struct event_base *base; struct evconnlistener *listener; struct event *signal_event; struct sockaddr_in sin = {0}; //1.检测环境 #ifdef _WIN32 WSADATA wsa_data; WSAStartup(0x0201, &wsa_data); #endif //2.创建event_base管理器 base = event_base_new(); if (!base) { fprintf(stderr, "Could not initialize libevent!\n"); return 1; } //3.初始化一些监听套接字的东西 memset(&sin,0,sizeof(sin)); //这里没设置s_addr,因为都设置为0了,0是通配地址。所以就不用设置s_addr了 sin.sin_family = AF_INET; sin.sin_port = htons(PORT); //4.定义链接监听器 //有用户连接上来,执行listener_cb函数 listener = evconnlistener_new_bind(base, listener_cb, (void *)base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!listener) { fprintf(stderr, "Could not create a listener!\n"); return 1; } //5.设置信号回调函数,回调函数用来中断循环 signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); //6.将信号处理事件注册到管理器上面 if (!signal_event || event_add(signal_event, NULL)<0) { fprintf(stderr, "Could not create/add a signal event!\n"); return 1; } //7.开启事件循环 event_base_dispatch(base); //8.回收资源 evconnlistener_free(listener); event_free(signal_event); event_base_free(base); printf("done\n"); return 0; } // static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { //1.初始化数据 struct event_base *base = user_data; struct bufferevent *bev; //2.创建一个连接套接字,并将连接套接字放在epoll的红黑树 bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); if (!bev) { fprintf(stderr, "Error constructing bufferevent!"); event_base_loopbreak(base); return; } //3.设置连接套接字的读、写、错误处理事件的处理函数,并使能一些函数 bufferevent_setcb(bev, conn_readcb , conn_writecb, conn_eventcb, NULL); bufferevent_enable(bev, EV_WRITE | EV_READ);//使读\写事件都使能 //bufferevent_disable(bev, EV_READ); //4.给对端发送消息 helloworld bufferevent_write(bev, MESSAGE, strlen(MESSAGE)); } //连接套接字写事件回调函数 static void conn_writecb(struct bufferevent *bev, void *user_data) { //获取缓冲区类型也就是获取缓冲区地址 struct evbuffer *output = bufferevent_get_output(bev); //判断缓存区是否还有数据,若hello world已经发送出去,那么数据长度等于0,那就调用bufferevent_free释放节点 if (evbuffer_get_length(output) == 0) { printf("flushed answer\n"); //bufferevent_free(bev); } } static void conn_readcb(struct bufferevent* bev,void* user_data) { char buf[1500] = ""; int n = bufferevent_read(bev,buf,sizeof(buf)); printf("buf:%s\n"); //读了数据再给对端发送过去 bufferevent_write(bev,buf,n);//给对端发送数据 } //断开连接触发事件回调函数 static void conn_eventcb(struct bufferevent *bev, short events, void *user_data) { if (events & BEV_EVENT_EOF) { printf("Connection closed.\n"); } else if (events & BEV_EVENT_ERROR) { printf("Got an error on the connection: %s\n", strerror(errno));/*XXX win32*/ } /* None of the other events can happen here, since we haven't enabled * timeouts */ bufferevent_free(bev); } static void signal_cb(evutil_socket_t sig, short events, void *user_data) { struct event_base *base = user_data; struct timeval delay = { 2, 0 }; printf("Caught an interrupt signal; exiting cleanly in two seconds.\n"); event_base_loopexit(base, &delay); }
#include <string.h> #include <errno.h> #include <stdio.h> #include <signal.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/socket.h> #include <event2/bufferevent.h> #include <event2/buffer.h> #include <event2/listener.h> #include <event2/util.h> #include <event2/event.h> struct rA { uint8_t type; uint8_t len; uint16_t calccrc; //crc32用来校验包体长度是否改变 }; struct db2gs { uint32_t toatalSize; uint8_t nodeNum; //表示有多少个结构体 };
#include <iostream> #include <string.h> #include <errno.h> #include <stdio.h> #include <signal.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/socket.h> #include <event2/bufferevent.h> #include <event2/buffer.h> #include <event2/listener.h> #include <event2/util.h> #include <event2/event.h> #define redis_reply_string 1 #define redis_reply_array 2 #define redis_reply_integer 3 #define redis_reply_nul 4 #define redis_reply_status 5 #define redis_reply_error 6 #define _MAX_CLIENT_ 1024 //监听套接字结构体数组 typedef struct FdEventMap { int fd; //文件描述符 struct event* ev; //对应事件 }; struct rA { uint8_t type; uint8_t len; uint16_t calccrc; //crc32用来校验包体长度是否改变 }; struct db2gs { uint32_t toatalSize; //表示数据总长度 uint8_t nodeNum; //表示有多少个结构体 char data[0]; //柔性数组的灵魂 }; //----------------------------函数参数 static const int PORT = 8888; static char g_szWriteMsg[256] = {0}; static char g_szReadMsg[3000] = {0}; static int g_iCnt = 0; static void conn_ //---------------------------- void initEventArray() { int i; for(i = 0;i<_MAX_CLIENT_;++i) { mFdEvents[i].fd = -1; mFdEvents[i].ev = NULL; } } int addEvent(int fd, struct event* ev) { int i; for(i = 0;i < _MAX_CLIENT_;++i) { if(0 > mFdEvents[i].fd) { break; //return -1; } } if(i == _MAX_CLIENT_) { printf("too many clients...\n"); return -1; } mFdEvent[i].fd = fd; mFdEvent[i].ev = ev; return 0; } struct event* ghetEventByFd(int fd) { int i; for( i = 0; i< _MAX_CLIENT_;++i) { if(mFdEvents[i].fd == fd) { return mFdEvents[i].ev; } } return NULL; } void readcb(evutil_socket_t fd,short events,void* arg) { CCRC32* p_crc32 = CCRC32::GETInstance(); char buf[256] = {0}; int ret = recv(fd,buf, sizeof(buf),0); if( 0>=ret) { close(fd); event_del(getEventByFd(fd)); } else { int i; for(i = 0;i < ret:++i) { buf[i] = toupper(buf[i]); } printf("客户端已经链接,接受对端字符串string:%s\n",buf); //1.先算占多少内存 char tmp_str[50] = "strcpy123456"; size_t totalSize = sizeof(db2gs) + 3*strlen(tmp_str) + 3*sizeof(rA); db2gs* rp = (db2gs*)malloc(totalStruct); rp->nodeNUm = 3; char* ptmp = (char*)(rp->data); char* tmp = (char*)(rp->data); size_t p = 0; rA* ra = (rA*)ptmp; for(int16_t nIndex = 0;nIndex < rp->nodeNum;++nIndex) { ra = (rA*)ptmp; ra->type = 3; ra->len = strlen(tmp_str); ra->calccrc = p_crc32->Get_CRC((unsigned char* )tmp_str,ra->len); memccpy((void*)ptmp,(void*)ra,sizeof(rA)); printf("len:%d,tmp_str:%s,str:%s\n",ra->len,tmp_str,ptmp); ptmp += ra->len;//跳过字符串复制 } //2.打印测试 rA* rb = (rA*)tmp; uint8_t type_2 = rb->type; uint8_t len_2 = rb->len; uint16_t calccrc_2 = rb->calccrc; printf("type = %d,len=%d,calccrc=%3\n",type_2,len_2,calccrc_2); tmp = tmp +len_2; rb = (rA*)tmp; type_2 = rb->type; len_2 = rb->len; calccrc_2 = rb->calccrc; //3.发送数据 send(fd,(void*)rp,toatalStuct,0); free(rp); } } void conncb(evutil_socket_t fd,short events,void* arg) { printf("客户度已经连接上!\n"); struct event_base* base = (struct event_base*)arg; stuct sockaddr_in client; socklen_t lth = sizeof(client); int cfd = accept(fd,(struct sockaddr*)&client,&ith); if( 0 < cfd) { //创建事件 struct event* readev = event_new(base,cfd,EV_READ|EV_PERSIST,readcb,base); //注册事件 event_add(readev,NULL); //添加到数组 addEvent(cfd,readev); } } //编译指令 //g++ **.cpp -event int main() { printf("hello world!\n"); //初始化CRC32单例类 struct event_base* base = event_base_new(); //创建套接字 int lfd = socket(AF_INET,SOCK_STREAM,0); struct sockaddr_in recv; bzero(&serv,sizeof(serv)); serv.sin_family = AF_INET; serv.sin_addr.s_addr = htonl(INADDR_ANY); serv.sin_port = htons(8888); int opt = 1; //设置套接字套用 setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,*opt,sizeof(opt)); if(0 > bind(lfd,(struct sockaddr*)&serv,sizeof(serv))) { perror("bind err"; return -1;) } //监听 listen(lfd,128); //创建事件设置回调 initEventArray();//初始化事件数组 //创建事件 struct event* connev = event_new(base,lfd,EV_READ|EV_PERSIST,conncb,base); //注册事件 event_add(connev,NULL); //循环监听 event_base_dispatch(base); //回收内存资源 close(lfd); event_base(base); return 0; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。