赞
踩
废话不说, 直接上代码:
头文件:
#if !defined(_WEBSOCKET_H_) #define _WEBSOCKET_H_ // 引入 APR 相关头文件 #include <apr-1/apr.h> #include <apr-1/apr_version.h> #include <apr-1/apr_env.h> #include <apr-1/apr_general.h> #include <apr-1/apr_atomic.h> #include <apr-1/apr_thread_proc.h> // 线程支持 #include <apr-1/apr_thread_mutex.h> // 锁 #include <apr-1/apr_thread_rwlock.h> // 读写锁 #include <apr-1/apr_thread_pool.h> // 线程池 #include <apr-1/apr_thread_cond.h> // 条件 #include <apr-1/apr_time.h> // 时间 #include <apr-1/apr_network_io.h> // 网络 #include <apr-1/apr_portable.h> // 端口 #include <apr-1/apr_pools.h> // 内存池 #include <apr-1/apr_signal.h> // 信号 #include <apr-1/apr_hash.h> // 哈希表 #include <apr-1/apr_strings.h> // 字符串 #include <apr-1/apr_queue.h> // 队列 #include <apr-1/apr_fnmatch.h> // 字符串匹配 #include <apr-1/apr_tables.h> #include <apr-1/apr_getopt.h> #include <apr-1/apr_sha1.h> #include <apr-1/apr_poll.h> #include <apr-1/apr_base64.h> #include <apr-1/apr_sha1.h> #include <apr-1/apr_strmatch.h> typedef struct websocket_t websocket_t; typedef struct websocket_client_t websocket_client_t; typedef void (*ws_handler)(websocket_client_t*, const char*, int); websocket_t* websocket_create(int port); void websocket_destroy(websocket_t* ws); int websocket_send(websocket_client_t* client, struct evbuffer* evb); int websocket_send2(websocket_client_t* client, const char* data, size_t size); // 需要设置处理函数 void websocket_set_handler(websocket_t* ws, const char* path, ws_handler handler); #endif //!defined(WEBSOCKET_H_)
C文件:
#include "websocket.h" #define WS_KEY_NAME "Sec-WebSocket-Key" #define WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" // opcode. enum { OPCODE_SLICE = 0, ///< 数据分片, 其中一片 OPCODE_TEXT_FRAME = 1, ///< 文本 OPCODE_BINARY_FRAME = 2, ///< 二进制数据 OPCODE_DISCONNECT = 8, ///< 连接断开. OPCODE_PING = 9, OPCODE_PONG = 10, }; //定义websocket 头部操作数 (下面利用了结构体的位域 给每个变量分配空间) typedef struct websocket_ophdr_t websocket_ophdr_t; struct websocket_ophdr_t { //websocket operator holder 操作符 //注意opcode对应的是高位,fin是低位。(要根据网络字节序来) unsigned char opcode : 4; unsigned char rsv3 : 1; unsigned char rsv2 : 1; unsigned char rsv1 : 1; unsigned char fin : 1; unsigned char pl_len : 7; unsigned char mask : 1; }; // 连接的客户端 struct websocket_client_t { apr_pool_t* pool; apr_socket_t* socket; apr_pollfd_t pollfd; apr_byte_t handshark; char* ip; char* url; char buffer[4096]; char zero; int buffer_size; ws_handler handler; websocket_client_t* next; websocket_client_t* prev; }; // 处理函数映射关系 struct websocket_handler_t { const char* pattern; size_t size; ws_handler handler; struct websocket_handler_t* next; }; // 服务信息 struct websocket_t { apr_pool_t* pool; apr_socket_t* socket; apr_pollset_t* pollset; apr_thread_t* thread; apr_pollfd_t server; apr_byte_t running; apr_thread_rwlock_t* lock; struct websocket_handler_t* handlers; websocket_client_t* clients; }; static void websocket_handshark(websocket_t* ws, websocket_client_t* client, const char* buf); static void websocket_remove(websocket_t* ws, websocket_client_t* client); static void websocket_receiver(websocket_t* ws, websocket_client_t* client); static void websocket_accept(websocket_t* ws); static int websocket_transmission(websocket_client_t* client, char* buf, int len); static void* APR_THREAD_FUNC websocket_thread(apr_thread_t* thread, void* param); websocket_t* websocket_create(int port) { apr_pool_t* pool; websocket_t* ws = NULL; apr_pool_create(&pool, 0); ws = apr_pcalloc(pool, sizeof(*ws)); ws->pool = pool; apr_thread_rwlock_create(&ws->lock, pool); apr_pollset_create_ex(&ws->pollset, 100, pool, APR_POLLSET_EPOLL | APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY, APR_POLLSET_EPOLL); ws->socket = socket_create((apr_port_t)port, pool); if (NULL == ws->socket) { apr_pool_destroy(pool); return NULL; } apr_socket_listen(ws->socket, 5); ws->server.desc.s = ws->socket; ws->server.desc_type = APR_POLL_SOCKET; ws->server.reqevents = APR_POLLIN; ws->server.client_data = NULL; apr_pollset_add(ws->pollset, &ws->server); dzlog_info("websocket listen on port %u", port); ws->running = TRUE; apr_thread_create(&ws->thread, NULL, &websocket_thread, ws, pool); return ws; } void websocket_destroy(websocket_t* ws) { if (NULL == ws) return; dzlog_info("destroy websocket"); if (ws->socket) apr_socket_close(ws->socket); if (ws->pollset) apr_pollset_destroy(ws->pollset); apr_thread_rwlock_wrlock(ws->lock); if (ws->clients) { websocket_client_t* ptr, * next; ptr = ws->clients; while (ptr) { apr_socket_close(ptr->socket); next = ptr->next; apr_pool_destroy(ptr->pool); ptr = next; } ws->clients = NULL; } apr_thread_rwlock_unlock(ws->lock); apr_pool_destroy(ws->pool); } void websocket_set_handler(websocket_t* ws, const char* path, ws_handler handler) { struct websocket_handler_t* ptr; ptr = apr_pcalloc(ws->pool, sizeof(*ptr)); ptr->pattern = apr_pstrdup(ws->pool, path); ptr->size = strlen(ptr->pattern); ptr->handler = handler; apr_thread_rwlock_wrlock(ws->lock); ptr->next = ws->handlers; ws->handlers = ptr; apr_thread_rwlock_unlock(ws->lock); } static void websocket_accept(websocket_t* ws) { apr_pool_t* pool; apr_sockaddr_t* addr; websocket_client_t* client; apr_pool_create(&pool, 0); client = apr_pcalloc(pool, sizeof(*client)); client->pool = pool; client->handshark = FALSE; if (APR_SUCCESS != apr_socket_accept(&client->socket, ws->socket, pool)) { dzlog_error("accept failed"); apr_pool_destroy(pool); return; } apr_socket_opt_set(client->socket, APR_SO_NONBLOCK, 1); apr_socket_opt_set(client->socket, APR_SO_LINGER, 1); client->pollfd.client_data = client; client->pollfd.desc.s = client->socket; client->pollfd.desc_type = APR_POLL_SOCKET; client->pollfd.reqevents = APR_POLLIN; apr_thread_rwlock_wrlock(ws->lock); client->next = ws->clients; if (ws->clients) { ws->clients->prev = client; } ws->clients = client; apr_thread_rwlock_unlock(ws->lock); if (APR_SUCCESS != apr_pollset_add(ws->pollset, &client->pollfd)) { websocket_remove(ws, client); return; } if (APR_SUCCESS == apr_socket_addr_get(&addr, APR_REMOTE, client->socket)) { char buf[64]; if (APR_SUCCESS == apr_sockaddr_ip_getbuf(buf, sizeof(buf), addr)) { dzlog_info("accept a connection, from: %s", buf); client->ip = apr_pstrdup(pool, buf); } } if (NULL == client->ip) client->ip = "unknown"; } static void websocket_handshark(websocket_t* ws, websocket_client_t* client, const char* buf) { char data[512]; char sec_data[128] = { 0 }; const char* str, * p; const char* url; apr_size_t len; // 先找到URL if (0 != strncmp(buf, "GET ", 4)) return; url = buf + 4; str = strchr(url, ' '); if (NULL == str) return; //if ((int)(str - url) >= sizeof(client->url)) // return; //strncpy(client->url, url, (size_t)(str - url)); client->url = apr_pstrndup(client->pool, url, (apr_size_t)(str - url)); str = strstr(buf, WS_KEY_NAME); if (NULL == str) return; str = strchr(str, ':'); if (NULL == str)return; str++; while (' ' == *str) str++; p = strchr(str, '\r'); if (NULL == p) return; len = (apr_size_t)(p - str); if (len + (apr_size_t)sizeof(WS_GUID) >= (apr_size_t)sizeof(data)) return; strncpy(data, str, (size_t)len); strncpy(data + len, WS_GUID, (size_t)sizeof(WS_GUID)); assert((int)strlen(data) == len + (int)sizeof(WS_GUID) - 1); //apr_sha1_base64(data, len + (int)sizeof(WS_GUID) - 1, sec_data); apr_sha1_ctx_t ctx = { 0 }; unsigned char digest[APR_SHA1_DIGESTSIZE] = { 0 }; apr_sha1_init(&ctx); apr_sha1_update(&ctx, data, (unsigned int)len + (unsigned int)sizeof(WS_GUID) - 1); apr_sha1_final(digest, &ctx); apr_base64_encode(sec_data, (const char*)digest, APR_SHA1_DIGESTSIZE); len = (apr_size_t)snprintf(data, sizeof(data), "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n\r\n", sec_data); apr_status_t rv = apr_socket_send(client->socket, data, &len); if (APR_SUCCESS != rv) { apr_strerror(rv, data, sizeof(data)); dzlog_error("failed to send: %s", data); } client->handshark = TRUE; // 匹配url. 设置handler. 如果没有, 则用默认规则. if (ws->handlers) { struct websocket_handler_t* ptr; apr_thread_rwlock_rdlock(ws->lock); ptr = ws->handlers; while (ptr) { if (!av_strncasecmp(client->url, ptr->pattern, ptr->size)) { client->handler = ptr->handler; break; } if (APR_SUCCESS == apr_fnmatch(ptr->pattern, client->url, 0)) { client->handler = ptr->handler; break; } ptr = ptr->next; } apr_thread_rwlock_unlock(ws->lock); } dzlog_info("handshark success, from: %s", client->ip); } static inline void websocket_umask(uint8_t* payload, int length, uint8_t* mask_key) { int i = 0; for (i = 0; i < length; i++) { payload[i] ^= mask_key[i % 4]; } } int websocket_send(websocket_client_t* client, struct evbuffer* evb) { size_t len = evbuffer_get_length(evb); const char* data = (const char*)evbuffer_pullup(evb, (ssize_t)len); return websocket_send2(client, data, len); } int websocket_send2(websocket_client_t* client, const char* data, size_t size) { // 发送非加密数据. struct iovec v[2] = { 0 }; unsigned char header[10]; size_t header_len = 0; apr_size_t bytes; // 固定的头 header[0] = 0x81; if (size < 126) { header[1] = (uint8_t)size; // 最高bit为0, header_len = 2; } else if (size <= 0xffff) { header[1] = 126; *(uint16_t*)(header + 2) = htons((uint16_t)size); header_len = 4; } else { header[1] = 127; *(uint32_t*)(header + 2) = htonl((uint32_t)size); header[6] = 0; header[7] = 0; header[8] = 0; header[9] = 0; header_len = 10; } v[0].iov_len = header_len; v[0].iov_base = header; v[1].iov_len = size; v[1].iov_base = (void*)data; if (APR_SUCCESS != apr_socket_sendv(client->socket, v, 2, &bytes)) return -1; return (int)(bytes - header_len); } static int websocket_transmission(websocket_client_t* client, char* buf, int len) { // 收到ws消息. websocket_ophdr_t* hdr; uint8_t* payload = NULL; int plen, hlen; if (client->buffer_size > 0) { if (client->buffer_size + len >= (int)sizeof(client->buffer)) { // 太长了. client->buffer_size = 0; return -1; } memcpy(client->buffer + client->buffer_size, buf, (size_t)len); buf = client->buffer; len += client->buffer_size; } while (len > 0) { hdr = (websocket_ophdr_t*)buf; if (hdr->opcode != OPCODE_TEXT_FRAME) { len = 0; break; } if (hdr->pl_len < 126) { plen = hdr->pl_len; hlen = 2; } else if (hdr->pl_len == 126) { plen = (int)ntohs(*(uint16_t*)(buf + sizeof(websocket_ophdr_t))); hlen = 4; } else { plen = (int)ntohl(*(uint32_t*)(buf + sizeof(websocket_ophdr_t))); hlen = 10; } if (hdr->mask) { if (plen + hlen + 4 > len) { // 长度不够. break; } payload = (uint8_t*)(buf + hlen + 4); websocket_umask(payload, (int)plen, (uint8_t*)(buf + hlen)); buf += plen + hlen + 4; len -= (plen + hlen + 4); } else { assert(plen + hlen == len); if (plen + hlen > len) { break; } payload = (uint8_t*)(buf + hlen); buf += plen + hlen; len -= (plen + hlen); } payload[plen] = 0; // 进行处理 if (client->handler) { client->handler(client, (const char*)payload, plen); } } if (len > 0) { memcpy(client->buffer, buf, (size_t)len); client->buffer_size = len; } else { client->buffer_size = 0; } // client->update_time = apr_time_now(); return 0; } static void websocket_remove(websocket_t* ws, websocket_client_t* client) { websocket_client_t* next, * prev; apr_pollset_remove(ws->pollset, &client->pollfd); apr_thread_rwlock_wrlock(ws->lock); next = client->next; prev = client->prev; if (prev) prev->next = next; else ws->clients = next; if (next) next->prev = prev; apr_thread_rwlock_unlock(ws->lock); dzlog_info("remove websocket, from:%s", client->ip); apr_socket_close(client->socket); apr_pool_destroy(client->pool); } static void websocket_receiver(websocket_t* ws, websocket_client_t* client) { char buf[4096] = { 0 }; apr_size_t len = sizeof(buf); if (APR_SUCCESS != apr_socket_recv(client->socket, buf, &len)) { // 异常, 那么断开. websocket_remove(ws, client); return; } if (len > 0) { buf[len] = '\0'; if (client->handshark) { websocket_transmission(client, buf, (int)len); } else { websocket_handshark(ws, client, buf); } } } //static void websocket_check_timeout(websocket_t* ws) //{ // websocket_client_t* ptr, * next, * prev; // apr_time_t now = apr_time_now(); // // if (now - ws->check_time < 60 * 1000 * 1000L) // return; // // ws->check_time = now; // // apr_thread_rwlock_wrlock(ws->lock); // // ptr = ws->clients; // while (ptr) // { // if (now - ptr->update_time >= 10 * 1000 * 1000L) // { // // 超过10秒无消息, 则释放 // // apr_pollset_remove(ws->pollset, &ptr->pollfd); // // next = ptr->next; // prev = ptr->prev; // if (prev) prev->next = next; // else ws->clients = next; // if (next) next->prev = prev; // // dzlog_warn("websocket timeout, from:%s", ptr->ip); // // apr_socket_close(ptr->socket); // apr_pool_destroy(ptr->pool); // // ptr = next; // } // else // { // ptr = ptr->next; // } // } // apr_thread_rwlock_unlock(ws->lock); // //} static void* APR_THREAD_FUNC websocket_thread(apr_thread_t* thread, void* param) { // 不追求极致性能, 使用单个线程处理所有websocket请求. websocket_t* ws = param; apr_int32_t i, num; const apr_pollfd_t* descriptors; apr_thread_detach(thread); #if defined(_DEBUG) apr_thread_rwlock_wrlock(ws->lock); dzlog_debug("start websocket "); apr_thread_rwlock_unlock(ws->lock); #endif while (ws->running) { if (APR_SUCCESS == apr_pollset_poll(ws->pollset, 100 * 1000L, &num, &descriptors)) { for (i = num - 1; i >= 0; i--) { if (descriptors[i].client_data) { // 客户端. websocket_receiver(ws, (websocket_client_t*)descriptors[i].client_data); } else { // 服务端 websocket_accept(ws); } } } //else //{ // // 每分钟检查一次, 长时间无消息的删除掉. // websocket_check_timeout(ws); //} } return NULL; }
#include "websocket.h"
ws = websocket_create(80);
websocket_set_handler(ws, "/api", &websocket_handler);
static void websocket_handler(websocket_client_t* client, const char* body, int size)
{
cJSON * root = cJSON_Parse(body);
// 处理WebSocket的请求
...
// 返回数据
websocket_send2(client, data, size);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。