当前位置:   article > 正文

稳定、高效、可靠、多平台支持的RTSP-Server组件RTSP-Server组件解决多线程效率问题_rtspserver

rtspserver

背景分析

RTSP(Real Time Streaming Protocol)是由Real Network和Netscape共同提出的如何有效地在IP网络上传输流媒体数据的应用层协议。RTSP对流媒体提供了诸如暂停,快进等控制,而它本身并不传输数据,RTSP的作用相当于流媒体服务器的远程控制。服务器端可以自行选择使用TCP或UDP来传送串流内容,它的语法和运作跟HTTP 1.1类似,但并不特别强调时间同步,所以比较能容忍网络延迟。而且允许同时多个串流需求控制(Multicast),除了可以降低服务器端的网络用量,还可以支持多方视频会议(Video onference)。

关于EasyRTSPServer

EasyRTSPServer是一套稳定、高效、可靠、多平台支持的RTSP-Server组件, 接口调用非常简单成熟,无需关注RTSPServer中关于客户端监听接入、音视频多路复用、RTSP具体流程、RTP打包与发送等相关问题,支持多种音视频格式,再也不用去处理整个RTSP OPTIONS/DESCRIBE/SETUP/PLAY/RTP/RTCP的复杂流程和担心内存释放的问题了,非常适合于安防领域、教育领域、互联网直播领域等。

功能特点.png

EasyRTSPServer解决多线程效率问题

提出问题

EasyRTSPServer基于live555改造而来,前面已说过怎样将单线程改造为多线程, 现就多线程的效率问题再补充一点说明。

解决问题

在GenericMediaServer.h中声明MultiThread_CORE_T结构体,如下:

  1. #define MAX_BATCH_CLIENT_NUM 5
  2. typedef struct __LIVE_THREAD_TASK_T
  3. {
  4. int id;
  5. TaskScheduler *pSubScheduler;
  6. UsageEnvironment *pSubEnv;
  7. char liveURLSuffix[512];
  8. int releaseChannel; //释放标记
  9. int handleDescribe;
  10. OSTHREAD_OBJ_T *osThread; //线程对象
  11. int clientNum;
  12. void *pClientConnectionPtr[MAX_BATCH_CLIENT_NUM];
  13. void *procPtr;
  14. void *extPtr;
  15. }LIVE_THREAD_TASK_T;
  16. #define MAX_DEFAULT_MULTI_THREAD_NUM 256 //最大支持通道数
  17. typedef struct __MultiThread_CORE_T
  18. {
  19. int threadNum;
  20. LIVE_THREAD_TASK_T *threadTask;
  21. }MultiThread_CORE_T;

在GenericMediaServer构造函数中,仅创建256个MultiThread_CORE_T, 实际的线程并不在此创建;

在处理客户端的DESCRIBE请求时, 先验证请求的资源是否在已有列表中, 如没有,这时才开始创建相应的工作线程, 如下:

  1. //如果当前是主线程,则进入到查找通道流程
  2. if (pEnv->GetEnvirId() == MAIN_THREAD_ID)
  3. {
  4. UsageEnvironment *pChEnv = fOurServer.GetEnvBySuffix(pEnv, urlTotalSuffix, this, pThreadTask, True);
  5. if (NULL == pChEnv)
  6. {
  7. handleCmdRet = -1;
  8. this->pClientConnectionEnv = NULL;
  9. handleCmd_notFound();
  10. break;
  11. }
  12. else
  13. {
  14. _TRACE(TRACE_LOG_DEBUG, (char*)"[%s]Set socket[%d] Assign to [%d:%s]\n", pEnv->GetEnvirName(), this->fOurSocket, pChEnv->GetEnvirId(), pChEnv->GetEnvirName());
  15. //将socket从主线程移到工作线程中
  16. pEnv->taskScheduler().disableBackgroundHandling(fOurSocket);
  17. return MAIN_THREAD_ID;
  18. }
  19. break;
  20. }

主线程中主要调用了GenericMediaServer的GetEnvBySuffix函数,该函数实现了主线程中任务的分配,如下:

  1. UsageEnvironment *GenericMediaServer::GetEnvBySuffix(UsageEnvironment *pMainThreadEnv, const char *urlSuffix, void *pClientConnection,
  2. LIVE_THREAD_TASK_T **pThreadTask, Boolean bLockServerMediaSession)
  3. {
  4. GenericMediaServer::ClientConnection *pClient = (GenericMediaServer::ClientConnection *)pClientConnection;
  5. int iFreeIdx = -1;
  6. UsageEnvironment *pEnv = NULL;
  7. if ( (int)strlen(urlSuffix) < 1)
  8. {
  9. return NULL;
  10. }
  11. char streamName[512] = {0};
  12. int iProcRet = 0;
  13. Boolean bRequestTooMany = False;
  14. if (bLockServerMediaSession) LockServerMediaSession(pMainThreadEnv->GetEnvirName(), (char*)"GenericMediaServer::GetEnvBySuffix", (unsigned long long)this);
  15. do
  16. {
  17. for (int i=0; i<multiThreadCore.threadNum; i++)
  18. {
  19. if ( (iFreeIdx<0) && (((int)strlen(multiThreadCore.threadTask[i].liveURLSuffix) < 1 )) && (multiThreadCore.threadTask[i].releaseChannel==0x00) )
  20. {
  21. iFreeIdx = i;
  22. }
  23. if ( 0 == strcmp(urlSuffix, multiThreadCore.threadTask[i].liveURLSuffix))
  24. {
  25. if (multiThreadCore.threadTask[i].releaseChannel>0x00)
  26. {
  27. iProcRet = -1;
  28. _TRACE(TRACE_LOG_DEBUG, (char *)"[%s] 当前通道正在被删除. 请稍候访问: %s\n", multiThreadCore.threadTask[i].pSubEnv->GetEnvirName(), urlSuffix);
  29. break;
  30. }
  31. if (NULL == multiThreadCore.threadTask[i].pSubEnv)
  32. {
  33. iProcRet = -2;
  34. break;
  35. }
  36. if (multiThreadCore.threadTask[i].pSubEnv->GetStreamStatus() == 0x00)
  37. {
  38. iProcRet = -3;
  39. break;
  40. }
  41. multiThreadCore.threadTask[i].pSubEnv->LockEnvir("GenericMediaServer::GetEnvBySuffix", (unsigned long long)this);
  42. if (multiThreadCore.threadTask[i].pSubEnv->GetLockFlag() != 0x00)
  43. {
  44. iProcRet = -4;
  45. multiThreadCore.threadTask[i].pSubEnv->UnlockEnvir("GenericMediaServer::GetEnvBySuffix", (unsigned long long)this);
  46. break;
  47. }
  48. bool assignEnv = false;
  49. for (int k=0; k<MAX_BATCH_CLIENT_NUM; k++)
  50. {
  51. if (NULL == multiThreadCore.threadTask[i].pClientConnectionPtr[k])
  52. {
  53. assignEnv = true;
  54. multiThreadCore.threadTask[i].pClientConnectionPtr[k] = pClient;
  55. _TRACE(TRACE_LOG_INFO, (char*)"GenericMediaServer::GetEnvBySuffix [%s] set [%d] to Index[%d]\n", urlSuffix, pClient->fOurSocket, k);
  56. strcpy(streamName, urlSuffix);
  57. break;
  58. }
  59. }
  60. if (assignEnv)
  61. {
  62. pEnv = multiThreadCore.threadTask[i].pSubEnv;
  63. //multiThreadCore.threadTask[i].subSocket = pClient->fOurSocket;
  64. pClient->pClientConnectionEnv = multiThreadCore.threadTask[i].pSubEnv;
  65. //multiThreadCore.threadTask[i].handleDescribe = 0x01;
  66. //*handleDescribe = &multiThreadCore.threadTask[i].handleDescribe;
  67. if (NULL != pThreadTask) *pThreadTask = &multiThreadCore.threadTask[i];
  68. multiThreadCore.threadTask[i].clientNum ++;
  69. pEnv->IncrementReferenceCount(); //增加引用计数
  70. iProcRet = 0;
  71. _TRACE(TRACE_LOG_INFO, (char*)"共用通道GenericMediaServer::GetEnvBySuffix:: Channel already exist. New Connection[%d] [%s][%s] ClientNum[%d]\n",
  72. pClient->fOurSocket, pClient->pClientConnectionEnv->GetEnvirName(), urlSuffix,
  73. multiThreadCore.threadTask[i].clientNum);
  74. }
  75. else
  76. {
  77. //没有找到有效的Env, 说明客户端列表已满
  78. iProcRet = -10;
  79. _TRACE(TRACE_LOG_ERROR, (char*)"GenericMediaServer::GetEnvBySuffix 当前通道客户端已满[%s]\n", urlSuffix);
  80. }
  81. multiThreadCore.threadTask[i].pSubEnv->UnlockEnvir("GenericMediaServer::GetEnvBySuffix", (unsigned long long)this);
  82. break;
  83. }
  84. }
  85. if (pEnv) break;
  86. if (iFreeIdx<0) break;
  87. if (iProcRet < 0) break;
  88. if (NULL == multiThreadCore.threadTask[iFreeIdx].osThread)
  89. {
  90. CreateOSThread( &multiThreadCore.threadTask[iFreeIdx].osThread, __WorkerThread_Proc, (void *)&multiThreadCore.threadTask[iFreeIdx] );
  91. }
  92. multiThreadCore.threadTask[iFreeIdx].pClientConnectionPtr[0] = pClient;
  93. pClient->pClientConnectionEnv = multiThreadCore.threadTask[iFreeIdx].pSubEnv;
  94. #ifdef _DEBUG
  95. for (int i=0; i<multiThreadCore.threadNum; i++)
  96. {
  97. if ( (int)strlen(multiThreadCore.threadTask[i].liveURLSuffix) > 0)
  98. {
  99. _TRACE(TRACE_LOG_DEBUG, (char *)"通道列表[%d:%s]: %s\n", i, multiThreadCore.threadTask[i].pSubEnv->GetEnvirName(), multiThreadCore.threadTask[i].liveURLSuffix);
  100. if ( (0 == strcmp(urlSuffix, multiThreadCore.threadTask[i].liveURLSuffix)) )
  101. {
  102. multiThreadCore.threadTask[i].releaseChannel = multiThreadCore.threadTask[i].releaseChannel;
  103. }
  104. }
  105. }
  106. #endif
  107. pEnv = pClient->pClientConnectionEnv;
  108. strcpy(multiThreadCore.threadTask[iFreeIdx].liveURLSuffix, urlSuffix);
  109. strcpy(streamName, multiThreadCore.threadTask[iFreeIdx].liveURLSuffix);
  110. pEnv->IncrementReferenceCount(); //增加引用计数
  111. if (NULL != pThreadTask) *pThreadTask = &multiThreadCore.threadTask[iFreeIdx];
  112. multiThreadCore.threadTask[iFreeIdx].clientNum ++;
  113. _TRACE(TRACE_LOG_INFO, (char*)"新建通道 GenericMediaServer::GetEnvBySuffix New Connection[%d] [%s][%s] ClientNum[%d]\n",
  114. pClient->fOurSocket, pClient->pClientConnectionEnv->GetEnvirName(),
  115. multiThreadCore.threadTask[iFreeIdx].liveURLSuffix,
  116. multiThreadCore.threadTask[iFreeIdx].clientNum);
  117. }while (0);
  118. if (bLockServerMediaSession) UnlockServerMediaSession(pMainThreadEnv->GetEnvirName(), "GenericMediaServer::GetEnvBySuffix", (unsigned long long)this);
  119. //UnlockClientConnection();
  120. if (NULL != pEnv)
  121. {
  122. if ( (int)strlen(streamName) < 1)
  123. {
  124. _TRACE(TRACE_LOG_DEBUG, (char *)"#### ERROR\n");
  125. }
  126. }
  127. return pEnv;
  128. }

到这里为止,针对当前客户端,主线程的工作已全部结束, 剩下的就是工作线程来工作了,工作线程被创建成功后,一直检测是否有需要处理的任务, 如果有新的客户端被分配到该线程,则又从handleCmd_DESCRIBE开始处理。

总结

工作线程和主线程的衔接点是在RTSPServer::RTSPClientConnection
::handleCmd_DESCRIBE,即主线程仅处理到DESCRIBE命令,后面的处理全部由工作线程完成。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/611160
推荐阅读
相关标签
  

闽ICP备14008679号