基于live555实现流媒体代理服务器(5)-多线程

基于live555实现多线程代理服务器或点播服务器,其实是一个比较复杂的流程,为了方便后续的讲述,这里首先给出架构设计,或者说是思路吧!

1、首先需要找到创建多线程的点,或说时机;一般来说,至少有两种思路:一种是,单线程处理所有的rtsp消息或信令;使用多线程处理rtp/rtcp媒体流,类似信令和媒体分开处理,这种方法对于rtp over udp应该没有问题,但是rtp over rtsp的话,就有些复杂了,或者是否能够实现,我还不太确定;另一种就相对简单了,使用线程池的思路,在live555初始化时候,创建线程池,每一个线程完全独立运行,处理整个rtsp会话,只需要将公用资源加锁就可以了,为了节省端口资源可以将accept函数放在独立线程处理;基本上就是,accept函数在独立的主线程中,负责接收tcp连接;然后将接收的连接分配给负载的子线程处理rtsp会话,每个rtsp会话都在独立的一个负载子线程中处理;也就是一个主线程处理tcp连接,一个多线程的线程池,负载均衡的处理rtsp会话;我们实现的就是第二种相对简单的方法。

2、基于live555单线程事件调度的原理,以及上面的描述,所以我们需要确保每一个rtsp会话在同一个线程中进行处理;如何理解这句话,按照1中描述的思路,实际在tcp链接完成后,分配的单个线程来处理所有的rtsp消息,实际上已经初步确保了整个会话在独立的同一个线程中处理了;那这里单独拎出来讲是什么意思呢?因为,在rtsp会话过程中,一旦发生了tcp重连,就会导致重连的tcp连接进入accept主线程,会被重新分配线程处理,此时可能分配给了不同的线程,我们就需要进行线程切换了,需要确保出现tcp重连后,我们还能将该会话调度到原始的线程继续进行处理;什么时候会出现tcp重连,rtsp会话建立过程中,以及会话建立后,都可能会出现,所以这个线程切换需要在任何可能的时候执行,这也是多线程实现的主要难点;

3、完成了上述两步的处理实际就可以了,剩下只需要确保在一些公共资源的处理时,不要出现不同步的问题就可以了,也就是适当的对公共资源加锁即可;

废话讲的有些多,直接上代码了:

//多线程创建的时机,在服务器构造函数中创建线程池

GenericMediaServer::GenericMediaServer(UsageEnvironment &env, int ourSocket, Port ourPort,
                                       unsigned reclamationSeconds, unsigned subThreadNum)
  : Medium(env), fSubThreadNum(subThreadNum), fSubThreadList(NULL), fCapsControlTask(NULL),
    fServerSocket(ourSocket), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
    fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
    fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
    fClientSessions(HashTable::create(STRING_HASH_KEYS))

{
  ignoreSigPipeOnSocket(fServerSocket);  // so that clients on the same host that are killed don't also kill us
  fSmssMutex = new Lock();//对应fServerMediaSessions
  fCcsMutex = new Lock();//对应fClientConnections
  fCssMutex = new Lock();//对应fClientSessions

  if(fSubThreadNum) {
    RTSP_CRIT("Create %d Extra Threads, Live555 Total Run On %d Threads!\n", fSubThreadNum, fSubThreadNum + 1);
    fSubThreadList = new Live555SubThreadInfo[fSubThreadNum];//负载的线程池,不包含主线程
    memset(fSubThreadList, 0, fSubThreadNum * sizeof(Live555SubThreadInfo));
  }

  for(int i = 0; i < fSubThreadNum; ++i) {//服务器多线程的线程池初始化
    fSubThreadList[i].fSchedule = BasicTaskScheduler::createNew();
    fSubThreadList[i].fEnv = BasicUsageEnvironment::createNew(*fSubThreadList[i].fSchedule);
    IFLY_CreateThread(GenericMediaServer::doSubEventLoop, fSubThreadList[i].fEnv);
  }

  // Arrange to handle connections from others://主线程处理tcp连接,或accept函数的独立主线程
  env.taskScheduler().setBackgroundHandling(fServerSocket, SOCKET_READABLE, incomingConnectionHandler, this);
}

关键点就是上面我补充的红色注释了。

THREAD_RTN GenericMediaServer::doSubEventLoop(void *lpParameter)
{
  char multiThreadFlag = 0;
  UsageEnvironment *env = (UsageEnvironment *)lpParameter;

  env->taskScheduler().setId(IFLY_GetThreadId());//记录所有线程的线程ID方便判断是否需要进行线程切换
  RTSP_CRIT("Live555 Create Thread: %ld\n", env->taskScheduler().getId());
  env->taskScheduler().doEventLoop(&multiThreadFlag);

  return 0;
}

// GenericMediaServer::ClientConnection implementation //
GenericMediaServer::ClientConnection::ClientConnection(GenericMediaServer &ourServer, int clientSocket, struct sockaddr_in clientAddr)
  : fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr), fIsCheck(True), fIsActive(True),
    fRecursionCount(0), fRefereceCheck(NULL), fSeqNum(0)
{
  // Add ourself to our 'client connections' table:
  fOurServer.fCcsMutex->lock();
  fOurServer.fClientConnections->Add((char const *)this, this);
  fOurServer.fCcsMutex->unlock();
  // Arrange to handle incoming requests:
  resetRequestBuffer();

  //给连接的客户端分配初始的线程ID,实际上就是给tcp连接分配处理线程,处理rtsp会话
  int i = fOurServer.getSubThreadIndex();//线程池的负载线程分配算法
  if(i >= 0) {
    fOurEnv = fOurServer.fSubThreadList[i].fEnv;
    RTSP_INFO("ClientSocket[%d] Alloc SubThread[%ld] Handle\n", fOurSocket, envir().taskScheduler().getId());
  } else {
    fOurEnv = &ourServer.envir();//如果没有配置多线程,那就给主线程处理了,也就是还是要兼容单线程的。
  }

  noteLiveness(envir());//这个定时器,是空连接判断定时器,如果一段时间没有收到任何rtsp消息,就把他释放了,否则空连接太多,把我的fd用完了,select的FD_SET就溢出了,当然,改epoll后,就无所谓了,不过总之不要浪费是吧!
}

int GenericMediaServer::getSubThreadIndex()
{
  static volatile int index = 0;

  //单线程模式
  if(!fSubThreadNum) {
    return -1;
  }

  //多线程模式下,分配活动的子线程进行处理,这个算法比较简单,就是顺序分配了,如果你有想法的话,在这里根据统计的子线程中的会话数量,做一个负载均衡的分配就更合理了。
  int i = ++index;
  bool getFlag = false;
  int count = fSubThreadNum;
  while(count) {
    if(i >= fSubThreadNum) {
      i = 0;
    }
    if(!!fSubThreadList[i].fEnv->taskScheduler().getId()) {
      index = i;
      getFlag = true;
      break;
    }
    ++i;
    --count;
  }
  //子线程分配失败,返回错误,然后交给主线程处理
  if(!getFlag) {
    RTSP_WARN("getSubThreadIndex failed, main thread: %ld\n", IFLY_GetThreadId());
    index = 0;
    return -1;
  }

  RTSP_INFO("getSubThreadIndex: %d, main thread: %ld\n", index, IFLY_GetThreadId());
  return index;
}

完成上面的处理,实际上多线程改造就已经可用的。下面再给出部分需要线程切换的地方,也就是上面第二点数的内容,最主要的地方就是int RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead)中

      if(requestIncludedSessionId) {
        clientSession = (RTSPServer::RTSPClientSession *)(fOurRTSPServer.lookupClientSession(sessionIdStr));

        //在通信过程中,网络发生重连,我们还需要再次切换线程
        if(clientSession && clientSession->fOurServerMediaSession &&
            IFLY_GetThreadId() != clientSession->fOurServerMediaSession->envir().taskScheduler().getId()) {
          //对于所有相同或共享fOurServerMediaSession的流媒体,都需要切换到同一线程进行处理,本次切换下次生效
          RTSP_INFO("ClientSocket[%d] Change SubThread[%ld->%ld] Handle\n",
                    fOurSocket, IFLY_GetThreadId(), clientSession->fOurServerMediaSession->envir().taskScheduler().getId());
          envir().taskScheduler().disableBackgroundHandling(fOurSocket);
          clientSession->envir().taskScheduler().setBackgroundHandling(fOurSocket, SOCKET_READABLE | SOCKET_EXCEPTION,
                                                                       incomingRequestHandler, this);
          noteLiveness(clientSession->fOurServerMediaSession->envir());//tcp空连接判断定时器
          updateEnvir(clientSession->fOurServerMediaSession->envir());
        } else {
          noteLiveness(envir());//tcp空连接判断定时器
        }

        if(clientSession) {
          clientSession->noteLiveness();//rtsp服务端会话定时器
        }
      }

上面这个地方是线程切换最重要的地方,在setup消息后,生成了媒体sessionID的媒体会话后,出现了tcp重连,就都在这里完成了线程切换;包括视频传输过程中出现的重连等(实际包含play、get_parameters的线程切换)。

还有在describe、setup消息处理的时候,也需要做线程切换的,代码我就不给出了(为了优化处理性能,实际上我们在option的时候也进行了线程切换的)。

如果你不进行线程切换的话,同一个rtsp任务运行在多个线程里面,就会导致上面设置的定时器同时在多个线程中运行,一旦超时,就重复释放资源导致进程挂掉;

最后补充一下,通常情况下,我们认为rtsp会话建立过程是比较快的,一般很少出现需要进行线程切换的情况,但是在会话建立完成后,媒体传输阶段,出现tcp重连导致线程切换的情况会比较多。但是呢,现实情况就是那么的打脸,使用vlc调试,或者与某些厂商对接时候,很多rtsp播放器,他会在任何你想不到的情况下,断开tcp,然后重连,然后继续前面的操作;例如,某个播放器,就会在setup建立udp流失败后,关掉tcp连接,然后重新建立tcp连接,接着发送setup建立rtp over rtsp的流,这样就会导致处理线程不一致,需要线程切换。类似的场景,居然还很常见,所以,我们需要将所有可能的情况都进行线程切换。