基于live555实现流媒体代理服务器(5)-多线程
基于live555实现多线程代理服务器或点播服务器,其实是一个比较复杂的流程,为了方便后续的讲述,这里首先给出架构设计,或者说是思路吧!
1、首先需要找到创建多线程的点,或说时机;一般来说,至少有两种思路:一种是,单线程处理所有的rtsp消息或信令;使用多线程处理rtp/rtcp媒体流,类似信令和媒体分开处理,这种方法对于rtp over udp应该没有问题,但是rtp over rtsp的话,就有些复杂了,或者是否能够实现,我还不太确定;另一种就相对简单了,使用线程池的思路,在live555初始化时候,创建线程池,每一个线程完全独立运行,处理整个rtsp会话,只需要将公用资源加锁就可以了,为了节省端口资源可以将accept函数放在独立线程处理;基本上就是,accept函数在独立的主线程中,负责接收tcp连接;然后将接收的连接分配给负载的子线程处理rtsp会话,每个rtsp会话都在独立的一个负载子线程中处理;也就是一个主线程处理tcp连接,一个多线程的线程池,负载均衡的处理rtsp会话;我们实现的就是第二种相对简单的方法。
2、基于live555单线程事件调度的原理,以及上面的描述,所以我们需要确保每一个rtsp会话在同一个线程中进行处理;如何理解这句话,按照1中描述的思路,实际在tcp链接完成后,分配的单个线程来处理所有的rtsp消息,实际上已经初步确保了整个会话在独立的同一个线程中处理了;那这里单独拎出来讲是什么意思呢?因为,在rtsp会话过程中,一旦发生了tcp重连,就会导致重连的tcp连接进入accept主线程,会被重新分配线程处理,此时可能分配给了不同的线程,我们就需要进行线程切换了,需要确保出现tcp重连后,我们还能将该会话调度到原始的线程继续进行处理;什么时候会出现tcp重连,rtsp会话建立过程中,以及会话建立后,都可能会出现,所以这个线程切换需要在任何可能的时候执行,这也是多线程实现的主要难点;
3、完成了上述两步的处理实际就可以了,剩下只需要确保在一些公共资源的处理时,不要出现不同步的问题就可以了,也就是适当的对公共资源加锁即可;
废话讲的有些多,直接上代码了:
//多线程创建的时机,在服务器构造函数中创建线程池
GenericMediaServer::GenericMediaServer(UsageEnvironment &env, int ourSocket, Port ourPort,
unsigned reclamationSeconds, unsigned subThreadNum)
: Medium(env), fSubThreadNum(subThreadNum), fSubThreadList(NULL), fCapsControlTask(NULL),
fServerSocket(ourSocket), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
fClientSessions(HashTable::create(STRING_HASH_KEYS))
{
ignoreSigPipeOnSocket(fServerSocket); // so that clients on the same host that are killed don't also kill us
fSmssMutex = new Lock();//对应fServerMediaSessions
fCcsMutex = new Lock();//对应fClientConnections
fCssMutex = new Lock();//对应fClientSessions
if(fSubThreadNum) {
RTSP_CRIT("Create %d Extra Threads, Live555 Total Run On %d Threads!\n", fSubThreadNum, fSubThreadNum + 1);
fSubThreadList = new Live555SubThreadInfo[fSubThreadNum];//负载的线程池,不包含主线程
memset(fSubThreadList, 0, fSubThreadNum * sizeof(Live555SubThreadInfo));
}
for(int i = 0; i < fSubThreadNum; ++i) {//服务器多线程的线程池初始化
fSubThreadList[i].fSchedule = BasicTaskScheduler::createNew();
fSubThreadList[i].fEnv = BasicUsageEnvironment::createNew(*fSubThreadList[i].fSchedule);
IFLY_CreateThread(GenericMediaServer::doSubEventLoop, fSubThreadList[i].fEnv);
}
// Arrange to handle connections from others://主线程处理tcp连接,或accept函数的独立主线程
env.taskScheduler().setBackgroundHandling(fServerSocket, SOCKET_READABLE, incomingConnectionHandler, this);
}
关键点就是上面我补充的红色注释了。
THREAD_RTN GenericMediaServer::doSubEventLoop(void *lpParameter)
{
char multiThreadFlag = 0;
UsageEnvironment *env = (UsageEnvironment *)lpParameter;
env->taskScheduler().setId(IFLY_GetThreadId());//记录所有线程的线程ID方便判断是否需要进行线程切换
RTSP_CRIT("Live555 Create Thread: %ld\n", env->taskScheduler().getId());
env->taskScheduler().doEventLoop(&multiThreadFlag);
return 0;
}
// GenericMediaServer::ClientConnection implementation //
GenericMediaServer::ClientConnection::ClientConnection(GenericMediaServer &ourServer, int clientSocket, struct sockaddr_in clientAddr)
: fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr), fIsCheck(True), fIsActive(True),
fRecursionCount(0), fRefereceCheck(NULL), fSeqNum(0)
{
// Add ourself to our 'client connections' table:
fOurServer.fCcsMutex->lock();
fOurServer.fClientConnections->Add((char const *)this, this);
fOurServer.fCcsMutex->unlock();
// Arrange to handle incoming requests:
resetRequestBuffer();
//给连接的客户端分配初始的线程ID,实际上就是给tcp连接分配处理线程,处理rtsp会话
int i = fOurServer.getSubThreadIndex();//线程池的负载线程分配算法
if(i >= 0) {
fOurEnv = fOurServer.fSubThreadList[i].fEnv;
RTSP_INFO("ClientSocket[%d] Alloc SubThread[%ld] Handle\n", fOurSocket, envir().taskScheduler().getId());
} else {
fOurEnv = &ourServer.envir();//如果没有配置多线程,那就给主线程处理了,也就是还是要兼容单线程的。
}
noteLiveness(envir());//这个定时器,是空连接判断定时器,如果一段时间没有收到任何rtsp消息,就把他释放了,否则空连接太多,把我的fd用完了,select的FD_SET就溢出了,当然,改epoll后,就无所谓了,不过总之不要浪费是吧!
}
int GenericMediaServer::getSubThreadIndex()
{
static volatile int index = 0;
//单线程模式
if(!fSubThreadNum) {
return -1;
}
//多线程模式下,分配活动的子线程进行处理,这个算法比较简单,就是顺序分配了,如果你有想法的话,在这里根据统计的子线程中的会话数量,做一个负载均衡的分配就更合理了。
int i = ++index;
bool getFlag = false;
int count = fSubThreadNum;
while(count) {
if(i >= fSubThreadNum) {
i = 0;
}
if(!!fSubThreadList[i].fEnv->taskScheduler().getId()) {
index = i;
getFlag = true;
break;
}
++i;
--count;
}
//子线程分配失败,返回错误,然后交给主线程处理
if(!getFlag) {
RTSP_WARN("getSubThreadIndex failed, main thread: %ld\n", IFLY_GetThreadId());
index = 0;
return -1;
}
RTSP_INFO("getSubThreadIndex: %d, main thread: %ld\n", index, IFLY_GetThreadId());
return index;
}
完成上面的处理,实际上多线程改造就已经可用的。下面再给出部分需要线程切换的地方,也就是上面第二点数的内容,最主要的地方就是int RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead)中
if(requestIncludedSessionId) {
clientSession = (RTSPServer::RTSPClientSession *)(fOurRTSPServer.lookupClientSession(sessionIdStr));
//在通信过程中,网络发生重连,我们还需要再次切换线程
if(clientSession && clientSession->fOurServerMediaSession &&
IFLY_GetThreadId() != clientSession->fOurServerMediaSession->envir().taskScheduler().getId()) {
//对于所有相同或共享fOurServerMediaSession的流媒体,都需要切换到同一线程进行处理,本次切换下次生效
RTSP_INFO("ClientSocket[%d] Change SubThread[%ld->%ld] Handle\n",
fOurSocket, IFLY_GetThreadId(), clientSession->fOurServerMediaSession->envir().taskScheduler().getId());
envir().taskScheduler().disableBackgroundHandling(fOurSocket);
clientSession->envir().taskScheduler().setBackgroundHandling(fOurSocket, SOCKET_READABLE | SOCKET_EXCEPTION,
incomingRequestHandler, this);
noteLiveness(clientSession->fOurServerMediaSession->envir());//tcp空连接判断定时器
updateEnvir(clientSession->fOurServerMediaSession->envir());
} else {
noteLiveness(envir());//tcp空连接判断定时器
}
if(clientSession) {
clientSession->noteLiveness();//rtsp服务端会话定时器
}
}
上面这个地方是线程切换最重要的地方,在setup消息后,生成了媒体sessionID的媒体会话后,出现了tcp重连,就都在这里完成了线程切换;包括视频传输过程中出现的重连等(实际包含play、get_parameters的线程切换)。
还有在describe、setup消息处理的时候,也需要做线程切换的,代码我就不给出了(为了优化处理性能,实际上我们在option的时候也进行了线程切换的)。
如果你不进行线程切换的话,同一个rtsp任务运行在多个线程里面,就会导致上面设置的定时器同时在多个线程中运行,一旦超时,就重复释放资源导致进程挂掉;
最后补充一下,通常情况下,我们认为rtsp会话建立过程是比较快的,一般很少出现需要进行线程切换的情况,但是在会话建立完成后,媒体传输阶段,出现tcp重连导致线程切换的情况会比较多。但是呢,现实情况就是那么的打脸,使用vlc调试,或者与某些厂商对接时候,很多rtsp播放器,他会在任何你想不到的情况下,断开tcp,然后重连,然后继续前面的操作;例如,某个播放器,就会在setup建立udp流失败后,关掉tcp连接,然后重新建立tcp连接,接着发送setup建立rtp over rtsp的流,这样就会导致处理线程不一致,需要线程切换。类似的场景,居然还很常见,所以,我们需要将所有可能的情况都进行线程切换。