live555MediaServer.cpp服务端源码讲解
int main(int argc, char** argv) {
// Begin by setting up our usage environment:
TaskScheduler* scheduler = BasicTaskScheduler::createNew();
UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);
UserAuthenticationDatabase* authDB = NULL;
// Create the RTSP server. Try first with the default port number (554),
// and then with the alternative port number (8554):
RTSPServer* rtspServer;
portNumBits rtspServerPortNum = 554;
//先使用554默认端口建立Rtsp Server
rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
//如果建立不成功,使用8554建立rtsp server
if (rtspServer == NULL) {
rtspServerPortNum = 8554;
rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
}
if (rtspServer == NULL) {
*env << "Failed to create RTSP server: " << env->getResultMsg() << "n";
// exit(1);
return -1;
}
env->taskScheduler().doEventLoop(); // does not return
return 0; // only to prevent compiler warning
}
跟踪进入CreateNew函数;
DynamicRTSPServer*
DynamicRTSPServer::createNew(UsageEnvironment&env,PortourPort,
UserAuthenticationDatabase*authDatabase,
unsigned reclamationTestSeconds) {
int ourSocket = setUpOurSocket(env,ourPort); //建立tcp socket
if (ourSocket == -1)returnNULL;
return new DynamicRTSPServer(env,ourSocket,ourPort,authDatabase,reclamationTestSeconds);
}
DynamicRTSPServer::DynamicRTSPServer(UsageEnvironment&env,intourSocket,
Port ourPort,
UserAuthenticationDatabase*authDatabase,unsignedreclamationTestSeconds)
: RTSPServerSupportingHTTPStreaming(env,ourSocket,ourPort,authDatabase,reclamationTestSeconds) {
}
首先建立socket,然后在调用DynamicRtspServer的构造函数,DynamicRtspServer继承RTSPServerSupportingHTTPStreaming类; RTSPServerSupportingHTTPStreaming类又继承RTSPServer类;
RTSPServerSupportingHTTPStreaming类的主要作用是支持Http;
接着看setUpOurSocket函数在前面已经讲过;就是建立socket;最后我们跟踪进入RTSPServer类的构造函数:
RTSPServer::RTSPServer(UsageEnvironment& env,
int ourSocket, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds)
: Medium(env),
fRTSPServerPort(ourPort), fRTSPServerSocket(ourSocket), fHTTPServerSocket(-1), fHTTPServerPort(0),
fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
fClientConnectionsForHTTPTunneling(NULL), // will get created if needed
fClientSessions(HashTable::create(STRING_HASH_KEYS)),
fPendingRegisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)),
fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds) {
ignoreSigPipeOnSocket(ourSocket); // so that clients on the same host that are killed don't also kill us
// Arrange to handle connections from others:
env.taskScheduler().turnOnBackgroundReadHandling(fRTSPServerSocket,
(TaskScheduler::BackgroundHandlerProc*)&incomingConnectionHandlerRTSP,this);
}
当fRTSPServerSocket收到数据时,调用incomingConnectionHandlerRTSP回调函数,继续跟进到incomingConnectionHandlerRTSP函数,源码如下:
void RTSPServer::incomingConnectionHandlerRTSP(void* instance,int/*mask*/) {
RTSPServer* server = (RTSPServer*)instance;
server->incomingConnectionHandlerRTSP1();
}
void RTSPServer::incomingConnectionHandler(int serverSocket) {
struct sockaddr_in clientAddr;
SOCKLEN_T clientAddrLen = sizeof clientAddr;
int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);
if (clientSocket < 0) {
int err = envir().getErrno();
if (err != EWOULDBLOCK) {
envir().setResultErrMsg("accept() failed: ");
}
return;
}
makeSocketNonBlocking(clientSocket);
increaseSendBufferTo(envir(), clientSocket, 50*1024);
#ifdef DEBUG
envir() << "accept()ed connection from " << AddressString(clientAddr).val() << "n";
#endif
// Create a new object for handling this RTSP connection:
(void)createNewClientConnection(clientSocket, clientAddr);
}
当收到客户的连接时需保存下代表客户端的新socket,以后用这个socket与这个客户通讯。每个客户将来会对应一个rtp会话,而且各客户的RTSP请求只控制自己的rtp会话;
incomingConnectionHandler函数的作用是accept接受客户端的socket连接,然后设置clientSocket的属性,这里需要注意,我们在建立服务端socket时已经对服务端socket设置了非阻塞属性,这个地方又要设置accept后的clientSecket的属性;
incomingConnectionHandler函数最后调用createNewClientConnection函数,源码如下:
RTSPServer::RTSPClientConnection*
RTSPServer::createNewClientConnection(int clientSocket,struct sockaddr_in clientAddr) {
return new RTSPClientConnection(*this, clientSocket, clientAddr);
}
对于每个新建立的客户端连接请求,new RTSPClientConnection的对象进行管理;
RTSPServer::RTSPClientConnection
::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
: fOurServer(ourServer), fIsActive(True),
fClientInputSocket(clientSocket), fClientOutputSocket(clientSocket), fClientAddr(clientAddr),
fRecursionCount(0), fOurSessionCookie(NULL) {
// Add ourself to our 'client connections' table:
fOurServer.fClientConnections->Add((charconst*)this,this);
// Arrange to handle incoming requests:
resetRequestBuffer();
envir().taskScheduler().setBackgroundHandling(fClientInputSocket, SOCKET_READABLE|SOCKET_EXCEPTION,
(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler,this);
}
在该函数中首先对RTSPServer的成员变量进行赋值:
fOurServer= ourServer;
fClientInputSocket= clientSocket;
fClientOutputSocket= clientSocket;
fClientAddr= clientAddr;
setBackgroundHandling函数用来处理fClientInputSocket socket上收到数据,或异常时,调用incomingRequestHandler回调函数;
下面在跟进到incomingRequestHandler函数:
void RTSPServer::RTSPClientConnection::incomingRequestHandler(void* instance,int/*mask*/) {
RTSPClientConnection* session = (RTSPClientConnection*)instance;
session->incomingRequestHandler1();
}
Session 为刚才new的RTSPClientConnection 对象,这个地方需要调试验证下;调用成员函数incomingRequestHandler1;跟进到该成员函数的代码:
void RTSPServer::RTSPClientConnection::incomingRequestHandler1() {
struct sockaddr_in dummy; // 'from' address, meaningless in this case
int bytesRead = readSocket(envir(), fClientInputSocket, &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy);
handleRequestBytes(bytesRead);
}
该函数调用ReadSocket从fClientInputSocket上读取数据;读到的数据保存在fRequestBuffer中,readSocket的返回值为实际读到的数据的长度;源码如下:
int readSocket(UsageEnvironment& env,
int socket, unsigned char* buffer, unsigned bufferSize,
struct sockaddr_in& fromAddress) {
SOCKLEN_T addressSize = sizeof fromAddress;
int bytesRead = recvfrom(socket, (char*)buffer, bufferSize, 0,
(struct sockaddr*)&fromAddress,
&addressSize);
if (bytesRead < 0) {
//##### HACK to work around bugs in Linux and Windows:
int err = env.getErrno();
if (err == 111 /*ECONNREFUSED (Linux)*/
#if defined(__WIN32__) ||defined(_WIN32)
// What a piece of crap Windows is. Sometimes
// recvfrom() returns -1, but with an 'errno' of 0.
// This appears not to be a real error; just treat
// it as if it were a read of zero bytes, and hope
// we don't have to do anything else to 'reset'
// this alleged error:
|| err == 0 || err == EWOULDBLOCK
#else
|| err == EAGAIN
#endif
|| err == 113 /*EHOSTUNREACH (Linux)*/) {// Why does Linux return this for datagram sock?
fromAddress.sin_addr.s_addr = 0;
return 0;
}
//##### END HACK
socketErr(env, "recvfrom() error: ");
} else if (bytesRead == 0) {
// "recvfrom()" on a stream socket can return 0 if the remote end has closed the connection. Treat this as an error:
return -1;
}
return bytesRead;
}
从socket中读到数据后必须对数据进行解析,解析的源码如下:
void RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead) {
int numBytesRemaining = 0;
++fRecursionCount;
do {
RTSPServer::RTSPClientSession* clientSession = NULL;
if (newBytesRead < 0 || (unsigned)newBytesRead >= fRequestBufferBytesLeft) {
// Either the client socket has died, or the request was too big for us.
// Terminate this connection:
#ifdef DEBUG
fprintf(stderr, "RTSPClientConnection[%p]::handleRequestBytes() read %d new bytes (of %d); terminating connection!n", this, newBytesRead, fRequestBufferBytesLeft);
#endif
fIsActive = False;
break;
}
Boolean endOfMsg = False;
unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen];
#ifdef DEBUG
ptr[newBytesRead] = '