赞
踩
最近在做一个项目,项目中需要服务端后端和web端进行websocket通信,需要后端给web端不断地发送数据,之前程序已经写好,功能测试没有问题,代码如下:
#include "web_socket_server.h" extern std::mutex radar_obj_mutex_; extern std::map<std::string,std::string> g_map_obj_list_; #define SERVICE_RADARWEBSOCKET "radarwebsocket" struct per_session_data__http { int fd; }; #define MAX_PAYLOAD_SIZE 10 * 1024 struct session_data { int msg_count; unsigned char buf[LWS_PRE + MAX_PAYLOAD_SIZE]; int len; }; static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { return 0; } int CallBackFunc(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { // char message[] = "Hello, client!"; int mRet = 0; //for write msg to client InfoItem_t info; uint64_t delSessionID = 0; switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: std::cout << "client connected to server" << std::endl; break; case LWS_CALLBACK_CLOSED: break; case LWS_CALLBACK_SERVER_WRITEABLE: if(!g_map_obj_list_.empty()) { size_t sendLen = sizeof(RADAR_OBJ_INFO); sendLen = 1024 * 16; if (recvBuf == NULL) { HKLOG_write(HKLOG_LEVEL_ERROR, "malloc count*****************!"); recvBuf = (unsigned char *)malloc(sendLen * sizeof(char)); } if (!recvBuf) { free(recvBuf); break; } memset(recvBuf,0,sendLen); //memcpy(recvBuf,&package_head,sizeof(package_head)); //memcpy(recvBuf + sizeof(package_head),pDataBuffer,16); std::lock_guard<std::mutex> lock(radar_obj_mutex_); auto it = g_map_obj_list_.begin(); std::string key = it->first; std::string strBody = it->second; g_map_obj_list_.erase(it); //char* buffer = reinterpret_cast<char*>(&strBody); //memcpy(recvBuf + sizeof(package_head),strBody.c_str(),strBody.length()); //memcpy(recvBuf, strBody.c_str(), sendLen); // 使用memcpy进行复制 //recvBuf[sendLen] = '\0'; // 添加结尾的空字符 memcpy(recvBuf, strBody.c_str(), std::min((int)sendLen, (int)strBody.length())); // 使用memcpy进行复制 recvBuf[std::min((int)sendLen, (int)strBody.length())] = '\0'; // 添加结尾的空字符 //strcpy(recvBuf, (unsigned char*)strBody.c_str()); // HKLOG_write(HKLOG_LEVEL_INFO, "sendLen %d,bodylen %d!",sendLen,strBody.length()); mRet = lws_write(wsi, recvBuf, strlen(strBody.c_str()), LWS_WRITE_TEXT); if (mRet < 0) { HKLOG_write(HKLOG_LEVEL_ERROR, " LWS_CALLBACK_SERVER_WRITEABLE error %d!",mRet); } } break; case LWS_CALLBACK_CLIENT_RECEIVE: // 处理接收到的数据 lwsl_notice( "LWS_CALLBACK_CLIENT_RECEIVE\n" ); break; default: break; } return 0; } static struct lws_protocols protocols[] = { /* first protocol must always be HTTP handler */ { "http-only", /* name */ callback_http, /* callback */ sizeof (struct per_session_data__http), /* per_session_data_size */ 0, /* max frame size / rx buffer */ }, { SERVICE_RADARWEBSOCKET, CallBackFunc, sizeof (struct session_data), /* per_session_data_size */ 0, }, { NULL, NULL, 0, 0 } /* terminator */ }; void websocketProc() { prctl(PR_SET_NAME, "websocketProc"); struct lws_context_creation_info info; memset(&info, 0, sizeof(lws_context_creation_info)); info.protocols = protocols; info.gid = -1; info.uid = -1; info.port = 8081; //设置websocket的keepalive参数 //info.ka_time = 10; // 10s //info.ka_probes = 10; //info.ka_interval = 3; // 3次 struct lws_context *pContext = lws_create_context(&info); if (pContext == NULL) { HKLOG_write(HKLOG_LEVEL_ERROR, "libwebsocket init failed!"); return; } HKLOG_write(HKLOG_LEVEL_INFO, "libwebsocket create finish %p\n!",pContext); while (true) { lws_service(pContext, 10); // 10毫秒的服务循环 // 向客户端发送消息 lws_callback_on_writable_all_protocol(pContext, &protocols[1]); } HKLOG_write(HKLOG_LEVEL_INFO, "libwebsocket destroy %p\n!",pContext); lws_context_destroy(pContext); return; }
但是当web界面刷新的时候,会在 lws_service处崩溃,怀疑是在调用 lws_callback_on_writable_all_protocol(pContext, &protocols[1]);的时候没有连接的客户端有效性做校验,后来代码对连接的客户端进行了sessionid的管理。
如下图:
int CWebSocketProtocol::callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { int m = 0; bool bRet = false; Json::Reader reader; Json::Value value = Json::Value::null; //for write msg to client InfoItem_t info; std::shared_ptr<MsgItem_t> item(nullptr); std::string str; std::string send; Json::FastWriter writer; char *pBuffer = NULL; //uint clienttype = 0; //char szUserName[32] = {0}; //char szPassword[32] = {0}; //buffer for close char closebuffer[LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING] = {0}; //for get the sessionID from client uint64_t sessionID = 0; uint64_t delSessionID = 0; //for fragment struct per_session_data *psd = (struct per_session_data *)user; size_t ran = 0; int write_mode = LWS_WRITE_TEXT; //struct lws *last_wsi = nullptr; switch (reason) { case LWS_CALLBACK_ESTABLISHED: info.session = 0; info.timestamp = getCurElapseMSecLinux(); psd->left_message = 0; psd->total_message = 0; psd->pBuffer = nullptr; psd->state = FRAGSTATE_START_MESSAGE; psd->last_flag = 0; if(!m_pWebSockSessionManager->checkIfArriveMaxSession()) { printf("%s : %p LWS_CALLBACK_ESTABLISHED\n", m_ProtocolName.c_str(), wsi); AddClient2Queue(wsi, info); } else { printf("%s: %p Reach Max Session, LWS_WRITE_CLOSE\n", m_ProtocolName.c_str(), wsi); lws_close_reason(wsi,LWS_CLOSE_STATUS_INVALID_PAYLOAD, (unsigned char*)closebuffer, 0); } break; case LWS_CALLBACK_CLOSED: printf("%s : %p LWS_WRITE_CLOSE\n", m_ProtocolName.c_str(), wsi); if(psd->pBuffer) { delete[] psd->pBuffer; psd->pBuffer = nullptr; } delSessionID = RemoveClientFromQueue(wsi); RemoveWebClientFromProtocol(wsi); m_pWebSockSessionManager->deleteSessionById(delSessionID); break; case LWS_CALLBACK_SERVER_WRITEABLE: break; case LWS_CALLBACK_RECEIVE: sessionID = 0; bRet = false; str.assign((const char *)in, len); printf("%s : %p %s, LWS_CALLBACK_RECEIVE\n", m_ProtocolName.c_str(), wsi, str.c_str()); /*{ "method" : { "service" : "Subscribe", "protocol" : "radarwebsocket" } }*/ if(reader.parse(str, value)) { // if (value.isMember("method") && value["method"].isMember("service") && value["method"]["service"].isString()) { std::string method = value["method"]["service"].asString(); if("Subscribe" == method && value["method"]["protocol"].isString() && m_ProtocolName == value["method"]["protocol"].asString()) { if (1)//verifyUser(userName, passWordDigestBase, nonce,created)) { sessionID = m_pWebSockSessionManager->getSessionIdBysessionFd(lws_get_socket_fd(wsi)); if(0 == sessionID) { sessionID = m_pWebSockSessionManager->createNewSession(lws_get_socket_fd(wsi)); printf("new sessionId:%llu\n",(unsigned long long)sessionID); } else { printf("sdk websocket had sessionId:%llu\n",(unsigned long long)sessionID); } bRet = true; } if (sessionID != 0) { bRet = parseProtocol(value, wsi); value["session"] = (Json::UInt64)sessionID; } } } } if(!bRet) { lws_close_reason(wsi,LWS_CLOSE_STATUS_INVALID_PAYLOAD, (unsigned char*)closebuffer, 0); RemoveClientFromQueue(wsi); RemoveWebClientFromProtocol(wsi); m_pWebSockSessionManager->deleteSessionById(sessionID); printf("%s : %p LWS_WRITE_CLOSE\n", m_ProtocolName.c_str(), wsi); } else { SetClientSessionID(wsi, sessionID, WS_CLIENT_TYPE_SDK); printf("%s : %p client new in\n",m_ProtocolName.c_str(), wsi); } //here you can receive data from client break; /* * this just demonstrates how to use the protocol filter. If you won't * study and reject connections based on header content, you don't need * to handle this callback */ case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: dump_handshake_info(wsi); /* you could return non-zero here and kill the connection */ break; default: break; } return 0; }
实现的客户端的代码如下:
#include "libwebsockets.h" #include <signal.h> #include "json/json.h" using namespace std; static volatile int exit_sig = 0; #define MAX_PAYLOAD_SIZE 10 * 1024 uint8_t g_curLogLevel = HKLOG_LEVEL_INFO; void sighdl( int sig ) { lwsl_notice( "%d traped", sig ); exit_sig = 1; } /** * 会话上下文对象,结构根据需要自定义 */ struct session_data { int msg_count; unsigned char buf[LWS_PRE + MAX_PAYLOAD_SIZE]; int len; }; /** * 某个协议下的连接发生事件时,执行的回调函数 * wsi:指向WebSocket实例的指针 * reason:导致回调的事件 * user 库为每个WebSocket会话分配的内存空间 * in 某些事件使用此参数,作为传入数据的指针 * len 某些事件使用此参数,说明传入数据的长度 */ int callback( struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len ) { struct session_data *data = (struct session_data *) user; std::string strBody; int isize = 0; bool parsingSuccessful = false; char const* begin = nullptr; char const* end = nullptr; Json::Value root; Json::CharReaderBuilder readerBuilder; Json::CharReader* reader = nullptr; std::string errors; std::string strrecv; char* received_data = nullptr; switch ( reason ) { case LWS_CALLBACK_CLIENT_ESTABLISHED: // 连接到服务器后的回调 lwsl_notice( "Connected to server\n" ); break; case LWS_CALLBACK_CLIENT_RECEIVE: // 接收到服务器数据后的回调,数据为in,其长度为len isize = sizeof(RADAR_OBJ_INFO); //lwsl_notice( "RxBody: %s,len: %d,isize: %d\n", (char *) in,len,isize); received_data = static_cast<char*>(in); strrecv = received_data; strBody.assign((const char *)in, len); std::cout << "client Received: " << static_cast<char*>(in) << std::endl; std::cout << "strrecv: " << strrecv.c_str() << std::endl; printf("Read bodyinfo: [ %s ]\n", strBody.c_str()); strBody = HKCodingConv::AnsiToUtf8(strBody); HKLOG_write(HKLOG_LEVEL_INFO, " LWS_CALLBACK_SERVER_WRITEABLE body %s!",strBody.c_str()); begin = strBody.data(); end = begin + strBody.size(); reader = readerBuilder.newCharReader(); parsingSuccessful = reader->parse(begin, end, &root, &errors); if (!parsingSuccessful) { // 解析失败,输出错误信息 std::cout << "Error parsing JSON: " << errors << std::endl; } break; case LWS_CALLBACK_CLIENT_WRITEABLE: // 当此客户端可以发送数据时的回调 if ( data->msg_count < 1 ) { // 前面LWS_PRE个字节必须留给LWS //"method" : { "service" : "Subscribe", "protocol" : "radarwebsocket" } memset( data->buf, 0, sizeof( data->buf )); char *msg = (char *) &data->buf[ LWS_PRE ]; Json::Value eventInfo; eventInfo["service"] = "Subscribe"; eventInfo["protocol"] = "radarwebsocket"; Json::Value parent; parent["method"] = eventInfo; Json::FastWriter writer; std::string strresponse = writer.write(parent); data->len = sprintf( msg, "%s %d",strresponse.c_str(),++data->msg_count ); lwsl_notice( "Tx: %s\n", msg ); // 通过WebSocket发送文本消息 lws_write( wsi, &data->buf[ LWS_PRE ], data->len, LWS_WRITE_TEXT ); } break; } return 0; } /** * 支持的WebSocket子协议数组 * 子协议即JavaScript客户端WebSocket(url, protocols)第2参数数组的元素 * 你需要为每种协议提供回调函数 */ struct lws_protocols protocols[] = { { //协议名称,协议回调,接收缓冲区大小 "radarwebsocket", callback, sizeof( struct session_data ), 4*1024*1024, }, { NULL, NULL, 0, 0 } // 结束标识 }; int main() { HKLOG_init("TEST"); HKLOG_set_level(g_curLogLevel); // 信号处理函数 signal( SIGTERM, sighdl ); // 用于创建vhost或者context的参数 struct lws_context_creation_info ctx_info = { 0 }; ctx_info.port = CONTEXT_PORT_NO_LISTEN; ctx_info.protocols = protocols; ctx_info.gid = -1; ctx_info.uid = -1; // 创建一个WebSocket处理器 struct lws_context *context = lws_create_context( &ctx_info ); char *address = "172.0.0.1"; int port = 8085; char addr_port[256] = { 0 }; sprintf( addr_port, "%s:%u", address, port & 65535 ); // 客户端连接参数 struct lws_client_connect_info conn_info = { 0 }; conn_info.context = context; conn_info.address = address; conn_info.port = port; conn_info.ssl_connection = 0; conn_info.path = "/"; conn_info.host = addr_port; conn_info.origin = addr_port; conn_info.protocol = protocols[ 0 ].name; // 下面的调用触发LWS_CALLBACK_PROTOCOL_INIT事件 // 创建一个客户端连接 struct lws *wsi = lws_client_connect_via_info( &conn_info ); while ( !exit_sig ) { // 执行一次事件循环(Poll),最长等待1000毫秒 lws_service(context,20); /** * 下面的调用的意义是:当连接可以接受新数据时,触发一次WRITEABLE事件回调 * 当连接正在后台发送数据时,它不能接受新的数据写入请求,所有WRITEABLE事件回调不会执行 */ lws_callback_on_writable( wsi ); } // 销毁上下文对象 lws_context_destroy( context ); return 0; }
亲测服务端和客户端是可以通信的,完整的代码连接如下:
https://blog.csdn.net/weixin_46543392/article/details/140500490?spm=1001.2014.3001.5502
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。