我是靠谱客的博主 轻松黄豆,最近开发中收集的这篇文章主要介绍流媒体学习之路(mediasoup)——信令传输(3)流媒体学习之路(mediasoup)——信令传输(3)一、Node.js部分二、C++部分三、总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

流媒体学习之路(mediasoup)——信令传输(3)

文章目录

  • 流媒体学习之路(mediasoup)——信令传输(3)
  • 一、Node.js部分
  • 二、C++部分
    • 2.1 UnixStreamSocket创建
    • 2.2 UnixStreamSocket
    • 2.3 OnRead函数:
    • 2.4 OnUvRead函数:
    • 2.5 UserOnUnixStreamRead:
    • 2.6 OnConsumerSocketMessage
    • 2.7 Request
    • 2.8 OnChannelRequest
  • 三、总结


一、Node.js部分

  mediasoup——c++部分的信令交互主要通过Socket管道来进行的。Node.js部分与c++部分通过本机网络进行通话,实现交互。
前面两篇提到,Node.js部分worker建立后需要建立channel来进行通信,我们再来回顾一下channel.js部分通信的代码。下面是Worker的构造函数:

 constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) {
        super();
        // Closed flag.
        this._closed = false;
        // Routers set.
        this._routers = new Set();
        // Observer instance.
        this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
        logger.debug('constructor()');
        let spawnBin = workerBin;
        let spawnArgs = [];
        if (process.env.MEDIASOUP_USE_VALGRIND === 'true') {
            spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind';
            if (process.env.MEDIASOUP_VALGRIND_OPTIONS)
                spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/s+/));
            spawnArgs.push(workerBin);
        }
        if (typeof logLevel === 'string' && logLevel)
            spawnArgs.push(`--logLevel=${logLevel}`);
        for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
            if (typeof logTag === 'string' && logTag)
                spawnArgs.push(`--logTag=${logTag}`);
        }
        if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort))
            spawnArgs.push(`--rtcMinPort=${rtcMinPort}`);
        if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort))
            spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`);
        if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile)
            spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`);
        if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile)
            spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`);
        logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' '));
        this._child = child_process_1.spawn(
        // command
        spawnBin, 
        // args
        spawnArgs, 
        // options
        {
            env: {
                MEDIASOUP_VERSION: '3.6.13'
            },
            detached: false,
            // fd 0 (stdin)   : Just ignore it.
            // fd 1 (stdout)  : Pipe it for 3rd libraries that log their own stuff.
            // fd 2 (stderr)  : Same as stdout.
            // fd 3 (channel) : Producer Channel fd.
            // fd 4 (channel) : Consumer Channel fd.
            // fd 5 (channel) : Producer PayloadChannel fd.
            // fd 6 (channel) : Consumer PayloadChannel fd.
            stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
            windowsHide: true
        });
        this._pid = this._child.pid;
        this._channel = new Channel_1.Channel({
            producerSocket: this._child.stdio[3],
            consumerSocket: this._child.stdio[4],
            pid: this._pid
        });
        this._payloadChannel = new PayloadChannel_1.PayloadChannel({
            // NOTE: TypeScript does not like more than 5 fds.
            // @ts-ignore
            producerSocket: this._child.stdio[5],
            // @ts-ignore
            consumerSocket: this._child.stdio[6]
        });
        this._appData = appData;
        let spawnDone = false;
        // Listen for 'running' notification.
        this._channel.once(String(this._pid), (event) => {
            if (!spawnDone && event === 'running') {
                spawnDone = true;
                logger.debug('worker process running [pid:%s]', this._pid);
                this.emit('@success');
            }
        });
        this._child.on('exit', (code, signal) => {
            this._child = undefined;
            this.close();
            if (!spawnDone) {
                spawnDone = true;
                if (code === 42) {
                    logger.error('worker process failed due to wrong settings [pid:%s]', this._pid);
                    this.emit('@failure', new TypeError('wrong settings'));
                }
                else {
                    logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                    this.emit('@failure', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
                }
            }
            else {
                logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                this.safeEmit('died', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
            }
        });
        this._child.on('error', (error) => {
            this._child = undefined;
            this.close();
            if (!spawnDone) {
                spawnDone = true;
                logger.error('worker process failed [pid:%s]: %s', this._pid, error.message);
                this.emit('@failure', error);
            }
            else {
                logger.error('worker process error [pid:%s]: %s', this._pid, error.message);
                this.safeEmit('died', error);
            }
        });
        // Be ready for 3rd party worker libraries logging to stdout.
        this._child.stdout.on('data', (buffer) => {
            for (const line of buffer.toString('utf8').split('n')) {
                if (line)
                    workerLogger.debug(`(stdout) ${line}`);
            }
        });
        // In case of a worker bug, mediasoup will log to stderr.
        this._child.stderr.on('data', (buffer) => {
            for (const line of buffer.toString('utf8').split('n')) {
                if (line)
                    workerLogger.error(`(stderr) ${line}`);
            }
        });
    }

  可见创建子进程(worker)后,创建了两个频道channel、payloadchannel。这两个对应cpp代码中的两个频道。创建channel时传入的三个参数——producerSocket, consumerSocket, pid 分别代表:子进程的生产者Socket、子进程消费者Socket以及子进程(worker-cpp)的进程id。

 constructor({ producerSocket, consumerSocket, pid }) {
        super();
        // Closed flag.
        this._closed = false;
        // Next id for messages sent to the worker process.
        this._nextId = 0;
        // Map of pending sent requests.
        this._sents = new Map();
        logger.debug('constructor()');
        this._producerSocket = producerSocket;
        this._consumerSocket = consumerSocket;
        // Read Channel responses/notifications from the worker.
        this._consumerSocket.on('data', (buffer) => {
            if (!this._recvBuffer) {
                this._recvBuffer = buffer;
            }
            else {
                this._recvBuffer = Buffer.concat([this._recvBuffer, buffer], this._recvBuffer.length + buffer.length);
            }
            if (this._recvBuffer.length > NS_PAYLOAD_MAX_LEN) {
                logger.error('receiving buffer is full, discarding all data into it');
                // Reset the buffer and exit.
                this._recvBuffer = undefined;
                return;
            }
            while (true) // eslint-disable-line no-constant-condition
             {
                let nsPayload;
                try {
                    nsPayload = netstring.nsPayload(this._recvBuffer);
                }
                catch (error) {
                    logger.error('invalid netstring data received from the worker process: %s', String(error));
                    // Reset the buffer and exit.
                    this._recvBuffer = undefined;
                    return;
                }
                // Incomplete netstring message.
                if (nsPayload === -1)
                    return;
                try {
                    // We can receive JSON messages (Channel messages) or log strings.
                    switch (nsPayload[0]) {
                        // 123 = '{' (a Channel JSON messsage).
                        case 123:
                            this._processMessage(JSON.parse(nsPayload.toString('utf8')));
                            break;
                        // 68 = 'D' (a debug log).
                        case 68:
                            logger.debug(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);
                            break;
                        // 87 = 'W' (a warn log).
                        case 87:
                            logger.warn(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);
                            break;
                        // 69 = 'E' (an error log).
                        case 69:
                            logger.error(`[pid:${pid} ${nsPayload.toString('utf8', 1)}`);
                            break;
                        // 88 = 'X' (a dump log).
                        case 88:
                            // eslint-disable-next-line no-console
                            console.log(nsPayload.toString('utf8', 1));
                            break;
                        default:
                            // eslint-disable-next-line no-console
                            console.warn(`worker[pid:${pid}] unexpected data: %s`, nsPayload.toString('utf8', 1));
                    }
                }
                catch (error) {
                    logger.error('received invalid message from the worker process: %s', String(error));
                }
                // Remove the read payload from the buffer.
                this._recvBuffer =
                    this._recvBuffer.slice(netstring.nsLength(this._recvBuffer));
                if (!this._recvBuffer.length) {
                    this._recvBuffer = undefined;
                    return;
                }
            }
        });
        this._consumerSocket.on('end', () => (logger.debug('Consumer Channel ended by the worker process')));
        this._consumerSocket.on('error', (error) => (logger.error('Consumer Channel error: %s', String(error))));
        this._producerSocket.on('end', () => (logger.debug('Producer Channel ended by the worker process')));
        this._producerSocket.on('error', (error) => (logger.error('Producer Channel error: %s', String(error))));
    }

  在channel中执行了一个死循环,等待接收buffer中传入数据后想worker进行通信。

二、C++部分

  为了方便理解,我将cpp代码信令部分按语义进行了分层。分层图大致如下:
在这里插入图片描述

2.1 UnixStreamSocket创建

  这里有一个很容易让人头晕的类名,就是UnixStreamSocket。这个类在命名空间Channel中以及全局空间中有同名的两个类。
  a. 全局空间的类是生产者/消费者类的真正基类,它将会保存Socket的fd、最大buf大小、角色类型(CONSUMER 或者 PRODUCER)。
  b. Channel内部的类则为主函数中调用的类,该类负责初始化生产者、消费者,同时该类也继承了生产者消费者类内的监听类。这样就可以在外层实现内部类函数的调用。
  下面列一下伪代码:

class UnixStreamSocket
{
public:
	UnixStreamSocket(int fd, size_t bufferSize, UnixStreamSocket::Role role);
};

namespace Channel {
    class ConsumerSocket : public ::UnixStreamSocket{
	public:
		class Listener{
        };
    	ConsumerSocket(int fd, size_t bufferSize, Listener* listener);
    };     
    
	class UnixStreamSocket : public ConsumerSocket::Listener {
	public:
		explicit UnixStreamSocket(int consumerFd, int producerFd);
    };
};

2.2 UnixStreamSocket

  在C++代码的main函数中,最初就构造了两个channel。其中,Channel::UnixStreamSocket的构造中首先创建了一个名为uvHandle的uv_pipe_t私有对象。该对象将会开启libUV,这里做了一下libUV的基础简介:https://blog.csdn.net/qw225967/article/details/119319970
  libUV中,初始化了uv_pipe,随后调用uv_pipe_open然后开始uv_read_start。这样libUV就进入了loop。
  打开之后进入uv_read_start,这个函数是用来启动读的操作,它同样也有三个参数:
  a. uvHandle,里面存放的是这个对象本身
  b. onAlloc,当buffer不足时回调这个函数,便能重新创建一个buffer
  c. onRead,接收pipe的另一端发送的数据
  此处引用熠熠微光作者的文章:https://blog.csdn.net/Frederick_Fung/article/details/107063392

UnixStreamSocket::UnixStreamSocket(int fd, size_t bufferSize, UnixStreamSocket::Role role)
  : bufferSize(bufferSize), role(role)
{
	MS_TRACE_STD();

	int err;

	this->uvHandle       = new uv_pipe_t;
	this->uvHandle->data = static_cast<void*>(this);

	err = uv_pipe_init(DepLibUV::GetLoop(), this->uvHandle, 0);

	if (err != 0)
	{
		delete this->uvHandle;
		this->uvHandle = nullptr;

		MS_THROW_ERROR_STD("uv_pipe_init() failed: %s", uv_strerror(err));
	}

	err = uv_pipe_open(this->uvHandle, fd);

	if (err != 0)
	{
		uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));

		MS_THROW_ERROR_STD("uv_pipe_open() failed: %s", uv_strerror(err));
	}

	if (this->role == UnixStreamSocket::Role::CONSUMER)
	{
		// Start reading.
		err = uv_read_start(
		  reinterpret_cast<uv_stream_t*>(this->uvHandle),
		  static_cast<uv_alloc_cb>(onAlloc),
		  static_cast<uv_read_cb>(onRead));

		if (err != 0)
		{
			uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));

			MS_THROW_ERROR_STD("uv_read_start() failed: %s", uv_strerror(err));
		}
	}

	// NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb().
}

2.3 OnRead函数:

inline static void onRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
	auto* socket = static_cast<UnixStreamSocket*>(handle->data);

	if (socket)
		socket->OnUvRead(nread, buf);
}

  libUV回调时传入了fd、读取的数据信息以及buf位置。
  这里强转handle->data后取出socket,然后使用socket调用OnUvRead函数。为什么使用强转后可以调用呢?是因为uv_stream_t这个结构体第一个成员为void*,可以将其转为任何类型的指针。

2.4 OnUvRead函数:

  在函数里只是判断了读到数据的大小,判断是否需要调用子类的读函数。

inline void UnixStreamSocket::OnUvRead(ssize_t nread, const uv_buf_t* /*buf*/)
{
	MS_TRACE_STD();

	if (nread == 0)
		return;

	// Data received.
	if (nread > 0)
	{
		// Update the buffer data length.
		this->bufferDataLen += static_cast<size_t>(nread);

		// Notify the subclass.
		UserOnUnixStreamRead();
	}
	// Peer disconnected.
	else if (nread == UV_EOF || nread == UV_ECONNRESET)
	{
		this->isClosedByPeer = true;

		// Close local side of the pipe.
		Close();

		// Notify the subclass.
		UserOnUnixStreamSocketClosed();
	}
	// Some error.
	else
	{
		MS_ERROR_STD("read error, closing the pipe: %s", uv_strerror(nread));

		this->hasError = true;

		// Close the socket.
		Close();

		// Notify the subclass.
		UserOnUnixStreamSocketClosed();
	}
}

2.5 UserOnUnixStreamRead:

void ConsumerSocket::UserOnUnixStreamRead()
	{
		MS_TRACE();

		// Be ready to parse more than a single message in a single chunk.
		while (true)
		{
			if (IsClosed())
				return;

			size_t readLen = this->bufferDataLen - this->msgStart;
			char* msgStart = nullptr;
			size_t msgLen;
			int nsRet = netstring_read(
			  reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &msgStart, &msgLen);

			if (nsRet != 0)
			{
				switch (nsRet)
				{
					case NETSTRING_ERROR_TOO_SHORT:
					{
						// Check if the buffer is full.
						if (this->bufferDataLen == this->bufferSize)
						{
							// First case: the incomplete message does not begin at position 0 of
							// the buffer, so move the incomplete message to the position 0.
							if (this->msgStart != 0)
							{
								std::memmove(this->buffer, this->buffer + this->msgStart, readLen);
								this->msgStart      = 0;
								this->bufferDataLen = readLen;
							}
							// Second case: the incomplete message begins at position 0 of the buffer.
							// The message is too big, so discard it.
							else
							{
								MS_ERROR(
								  "no more space in the buffer for the unfinished message being parsed, "
								  "discarding it");

								this->msgStart      = 0;
								this->bufferDataLen = 0;
							}
						}

						// Otherwise the buffer is not full, just wait.
						return;
					}

					case NETSTRING_ERROR_TOO_LONG:
					{
						MS_ERROR("NETSTRING_ERROR_TOO_LONG");

						break;
					}

					case NETSTRING_ERROR_NO_COLON:
					{
						MS_ERROR("NETSTRING_ERROR_NO_COLON");

						break;
					}

					case NETSTRING_ERROR_NO_COMMA:
					{
						MS_ERROR("NETSTRING_ERROR_NO_COMMA");

						break;
					}

					case NETSTRING_ERROR_LEADING_ZERO:
					{
						MS_ERROR("NETSTRING_ERROR_LEADING_ZERO");

						break;
					}

					case NETSTRING_ERROR_NO_LENGTH:
					{
						MS_ERROR("NETSTRING_ERROR_NO_LENGTH");

						break;
					}
				}

				// Error, so reset and exit the parsing loop.
				this->msgStart      = 0;
				this->bufferDataLen = 0;

				return;
			}

			// If here it means that msgStart points to the beginning of a message
			// with msgLen bytes length, so recalculate readLen.
			readLen =
			  reinterpret_cast<const uint8_t*>(msgStart) - (this->buffer + this->msgStart) + msgLen + 1;

			this->listener->OnConsumerSocketMessage(this, msgStart, msgLen);

			// If there is no more space available in the buffer and that is because
			// the latest parsed message filled it, then empty the full buffer.
			if ((this->msgStart + readLen) == this->bufferSize)
			{
				this->msgStart      = 0;
				this->bufferDataLen = 0;
			}
			// If there is still space in the buffer, set the beginning of the next
			// parsing to the next position after the parsed message.
			else
			{
				this->msgStart += readLen;
			}

			// If there is more data in the buffer after the parsed message
			// then parse again. Otherwise break here and wait for more data.
			if (this->bufferDataLen > this->msgStart)
			{
				continue;
			}

			break;
		}
	}

  子类的读函数中会解析信息,如果信息不止有一个块大小则会一直循环解析到信息完整。进行错误判断后就会调用监听者来进行上层回调到“业务处理层”。

2.6 OnConsumerSocketMessage

void UnixStreamSocket::OnConsumerSocketMessage(
	  ConsumerSocket* /*consumerSocket*/, char* msg, size_t msgLen)
	{
		MS_TRACE_STD();

		try
		{
			json jsonMessage = json::parse(msg, msg + msgLen);
			auto* request    = new Channel::Request(this, jsonMessage);

			// Notify the listener.
			try
			{
				this->listener->OnChannelRequest(this, request);
			}
			catch (const MediaSoupTypeError& error)
			{
				request->TypeError(error.what());
			}
			catch (const MediaSoupError& error)
			{
				request->Error(error.what());
			}

			// Delete the Request.
			delete request;
		}
		catch (const json::parse_error& error)
		{
			MS_ERROR_STD("JSON parsing error: %s", error.what());
		}
		catch (const MediaSoupError& error)
		{
			MS_ERROR_STD("discarding wrong Channel request");
		}
	}

  该函数将接收到的信息转换成json格式向上传递。
  而json的类型则是 Request类 的形式:

class Request
	{
	public:
		enum class MethodId
		{
			WORKER_DUMP = 1,
			WORKER_GET_RESOURCE_USAGE,
			WORKER_UPDATE_SETTINGS,
			WORKER_CREATE_ROUTER,
			ROUTER_CLOSE,
			ROUTER_DUMP,
			ROUTER_CREATE_WEBRTC_TRANSPORT,
			ROUTER_CREATE_PLAIN_TRANSPORT,
			ROUTER_CREATE_PIPE_TRANSPORT,
			ROUTER_CREATE_DIRECT_TRANSPORT,
			ROUTER_CREATE_AUDIO_LEVEL_OBSERVER,
			TRANSPORT_CLOSE,
			TRANSPORT_DUMP,
			TRANSPORT_GET_STATS,
			TRANSPORT_CONNECT,
			TRANSPORT_SET_MAX_INCOMING_BITRATE,
			TRANSPORT_RESTART_ICE,
			TRANSPORT_PRODUCE,
			TRANSPORT_CONSUME,
			TRANSPORT_PRODUCE_DATA,
			TRANSPORT_CONSUME_DATA,
			TRANSPORT_ENABLE_TRACE_EVENT,
			PRODUCER_CLOSE,
			PRODUCER_DUMP,
			PRODUCER_GET_STATS,
			PRODUCER_PAUSE,
			PRODUCER_RESUME,
			PRODUCER_ENABLE_TRACE_EVENT,
			CONSUMER_CLOSE,
			CONSUMER_DUMP,
			CONSUMER_GET_STATS,
			CONSUMER_PAUSE,
			CONSUMER_RESUME,
			CONSUMER_SET_PREFERRED_LAYERS,
			CONSUMER_SET_PRIORITY,
			CONSUMER_REQUEST_KEY_FRAME,
			CONSUMER_ENABLE_TRACE_EVENT,
			DATA_PRODUCER_CLOSE,
			DATA_PRODUCER_DUMP,
			DATA_PRODUCER_GET_STATS,
			DATA_CONSUMER_CLOSE,
			DATA_CONSUMER_DUMP,
			DATA_CONSUMER_GET_STATS,
			RTP_OBSERVER_CLOSE,
			RTP_OBSERVER_PAUSE,
			RTP_OBSERVER_RESUME,
			RTP_OBSERVER_ADD_PRODUCER,
			RTP_OBSERVER_REMOVE_PRODUCER
		};

	private:
		static std::unordered_map<std::string, MethodId> string2MethodId;

	public:
		Request(Channel::UnixStreamSocket* channel, json& jsonRequest);
		virtual ~Request();

		void Accept();
		void Accept(json& data);
		void Error(const char* reason = nullptr);
		void TypeError(const char* reason = nullptr);

	public:
		// Passed by argument.
		Channel::UnixStreamSocket* channel{ nullptr };
		uint32_t id{ 0u };
		std::string method;
		MethodId methodId;
		json internal;
		json data;
		// Others.
		bool replied{ false };
	};

2.7 Request

  Request的构造函数中,将json信息初始化。

Request::Request(Channel::UnixStreamSocket* channel, json& jsonRequest) : channel(channel)
	{
		MS_TRACE();

		auto jsonIdIt = jsonRequest.find("id");

		if (jsonIdIt == jsonRequest.end() || !Utils::Json::IsPositiveInteger(*jsonIdIt))
			MS_THROW_ERROR("missing id");

		this->id = jsonIdIt->get<uint32_t>();

		auto jsonMethodIt = jsonRequest.find("method");

		if (jsonMethodIt == jsonRequest.end() || !jsonMethodIt->is_string())
			MS_THROW_ERROR("missing method");

		this->method = jsonMethodIt->get<std::string>();

		auto methodIdIt = Request::string2MethodId.find(this->method);

		if (methodIdIt == Request::string2MethodId.end())
		{
			Error("unknown method");

			MS_THROW_ERROR("unknown method '%s'", this->method.c_str());
		}

		this->methodId = methodIdIt->second;

		auto jsonInternalIt = jsonRequest.find("internal");

		if (jsonInternalIt != jsonRequest.end() && jsonInternalIt->is_object())
			this->internal = *jsonInternalIt;
		else
			this->internal = json::object();

		auto jsonDataIt = jsonRequest.find("data");

		if (jsonDataIt != jsonRequest.end() && jsonDataIt->is_object())
			this->data = *jsonDataIt;
		else
			this->data = json::object();
	}

  将json内容转换完成后把Request对象继续向上传递。

2.8 OnChannelRequest

  当Worker接收到Request的数据后,便能做出相应的处理了,接下来进入Worker.cpp的OnChannelRequest()

  switch里就会根据methodId做出相应的处理,当它为:
  a. WORKER_DUMP,表示将Worker中的Router信息都打印出来
  b. WORKER_GET_RESOURCE_USAGE,表示将RU的参数信息打印出来
  c. WORKER_UPDATE_SETTINGS,表示更新设置
  d. WORKER_CREATE_ROUTER,表示创建Router
  e. ROUTER_CLOSE,表示关闭

inline void Worker::OnChannelRequest(Channel::UnixStreamSocket* /*channel*/, Channel::Request* request)
{
	MS_TRACE();

	MS_DEBUG_DEV(
	  "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id);

	switch (request->methodId)
	{
		case Channel::Request::MethodId::WORKER_DUMP:
		{
			json data = json::object();

			FillJson(data);

			request->Accept(data);

			break;
		}

		case Channel::Request::MethodId::WORKER_GET_RESOURCE_USAGE:
		{
			json data = json::object();

			FillJsonResourceUsage(data);

			request->Accept(data);

			break;
		}

		case Channel::Request::MethodId::WORKER_UPDATE_SETTINGS:
		{
			Settings::HandleRequest(request);

			break;
		}

		case Channel::Request::MethodId::WORKER_CREATE_ROUTER:
		{
			std::string routerId;

			// This may throw.
			SetNewRouterIdFromInternal(request->internal, routerId);

			auto* router = new RTC::Router(routerId);

			this->mapRouters[routerId] = router;

			MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());

			request->Accept();

			break;
		}

		case Channel::Request::MethodId::ROUTER_CLOSE:
		{
			// This may throw.
			RTC::Router* router = GetRouterFromInternal(request->internal);

			// Remove it from the map and delete it.
			this->mapRouters.erase(router->id);
			delete router;

			MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());

			request->Accept();

			break;
		}

		// Any other request must be delivered to the corresponding Router.
		default:
		{
			// This may throw.
			RTC::Router* router = GetRouterFromInternal(request->internal);

			router->HandleRequest(request);

			break;
		}
	}
}

三、总结

  本章主要捋了一下Channel这部分的信令传输流程,可以看到主要回调形式是通过观察者向上传递。分层处理了信令数据,后续我们会对各个信令的意义进行更详细的分析。

最后

以上就是轻松黄豆为你收集整理的流媒体学习之路(mediasoup)——信令传输(3)流媒体学习之路(mediasoup)——信令传输(3)一、Node.js部分二、C++部分三、总结的全部内容,希望文章能够帮你解决流媒体学习之路(mediasoup)——信令传输(3)流媒体学习之路(mediasoup)——信令传输(3)一、Node.js部分二、C++部分三、总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部