我是靠谱客的博主 满意帅哥,最近开发中收集的这篇文章主要介绍mediasoup服务器接收流媒体数据及接收拥塞控制,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

   webrtc服务器mediasoup即是媒体流的接收方也是媒体流的发送方,还是控制消息的转发方,作为接收方要接收音视频数据,码率统计,接收发送方发来的RTCP(SR,SDES等)包,给发送方发送RTPFB包及时反馈接收情况;作为发送方,要给接收方发送RTCP报告,处理接收方发来的RTPFB包,发送拥塞控制;作为控制拥塞消息转发方,需要流媒体数据的接收方发来的消息请求(如PSFB反馈请求I帧)然后处理转发给流媒体数据的发送方。这里讨论mediasoup作为接收方如何处理。
一、接收音视频数据
   接收到的音视频数据是SRTP,因此需要先解密成RTP,解密密钥就是在DTLS相互交换的密钥。
1.1、发送RTPFB反馈包(TRANSPORT_CC)
   将RTP包先输入到传输拥塞控制服务模块处理,mediasoup拥塞控制有两种BweType类型:TRANSPORT_CC和REMB,根据客户端rtp参数种rtcpfeedback参数对应。RTFB包结构示意图如下图所示

 0               1               2               3
   0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |V=2|P|  FMT=15 |    PT=205     |           length              |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                     SSRC of packet sender                     |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                      SSRC of media source                     |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |      base sequence number     |      packet status count      |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                 reference time                | fb pkt. count |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |          packet chunk         |         packet chunk          |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  .                                                               .
  .                                                               .
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |         packet chunk          |  recv delta   |  recv delta   |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  .                                                               .
  .                                                               .
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |           recv delta          |  recv delta   | zero padding  |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 */

FMT对应值如下:
            NACK = 1,
            TMMBR = 3,
            TMMBN = 4,
            SR_REQ = 5,
            RAMS = 6,
            TLLEI = 7,
            ECN = 8,
            PS = 9,
            TCC = 15,
            EXT = 31
SSRC of packet sender : 表示RTFB发送包所在所对应的流ssrc,ssrc=0,不单独创建一个会话来发送RTFB。
SSRC of media source :表示RTFB接收方对应的流ssrc,不是所有的流都可以用这个RTFB的,音频1是音频1接收,音频2是音频2接收,视频是视频接收,互不影响
base sequence number:基础序列号,上一个RTPFB包发送后接收的第一个包的序列号+1,作为下一个RTPFB包中的基础序列号;
packet status count :拥塞控制方对于接收数据包状态产生的状态包计数,后面讨论什么是状态包;
reference time :参考时间,为构造当前rtfp时第一个数据包的到达时间;
fb pkt. count:已经发送到指定发送方的RTPFB个数;
packet chunk:状态包;
recv delta:接收相邻包的时间间隔;
zero padding:填充0,确保4字节对齐;

   首先提取RTP包中的wideSequnencNumer(由rtp扩展头中携带)和ssrc,ssrc设置到SSRC of media source

在这里插入图片描述
生成RTPFB状态包chunks
   状态包即每收到一个数据包,都会将包分配一个状态status,状态取决于本次收到的包与上一次收到的包的时间戳差值delta,和本次包序列号与上一次接收包的序列号的差值+1,判断丢包;状态有5种:NotReceived = 0表示丢失的包,SmallDelta=1表示时间差值delta范围为(0,255),LargeDelta=2表示时间差值delta大于255范围,Reserved=3保留,None=4表示初始状态;
包状态和时间戳差值均保存到队列,只要队列中超过7个包状态就创建一个chunks;
   有两种chunks:
   RunLenghtChunk:表示收到的连续7个包的状态是一致
   TwoBitVectorChunks:表示包的状态不一样时创建。
创建chunk后包状态队列清除,且当前状态初始化None,继续下一组。代码如下:

**void FeedbackRtpTransportPacket::FillChunk(//根据包来统计并创建状态包chunks
		  uint16_t previousSequenceNumber, uint16_t sequenceNumber, int16_t delta)
		{
			MS_TRACE();

			auto missingPackets = static_cast<uint16_t>(sequenceNumber - (previousSequenceNumber + 1));//根据序列号连续性判断丢包数

			if (missingPackets > 0)
			{
				// Create a long run chunk before processing this packet, if needed.
				if (this->context.statuses.size() >= 7 && this->context.allSameStatus)
				{
					CreateRunLengthChunk(this->context.currentStatus, this->context.statuses.size());

					this->context.statuses.clear();
					this->context.currentStatus = Status::None;
				}

				this->context.currentStatus = Status::NotReceived;
				size_t representedPackets{ 0u };

				// Fill statuses vector.
				for (uint8_t i{ 0u }; i < missingPackets && this->context.statuses.size() < 7; ++i)
				{
					this->context.statuses.emplace_back(Status::NotReceived);
					representedPackets++;
				}

				// Create a two bit vector if needed.
				if (this->context.statuses.size() == 7)
				{
					// Fill a vector chunk.
					CreateTwoBitVectorChunk(this->context.statuses);

					this->context.statuses.clear();
					this->context.currentStatus = Status::None;
				}

				missingPackets -= representedPackets;

				// Not all missing packets have been represented.
				if (missingPackets != 0)
				{
					// Fill a run length chunk with the remaining missing packets.
					CreateRunLengthChunk(Status::NotReceived, missingPackets);

					this->context.statuses.clear();
					this->context.currentStatus = Status::None;
				}
			}

			Status status;

			if (delta >= 0 && delta <= 255)
				status = Status::SmallDelta;//
			else
				status = Status::LargeDelta;

			// Create a long run chunk before processing this packet, if needed.
			// clang-format off
			if (
				this->context.statuses.size() >= 7 &&
				this->context.allSameStatus &&
				status != this->context.currentStatus
			)
			// clang-format on
			{
				CreateRunLengthChunk(this->context.currentStatus, this->context.statuses.size());

				this->context.statuses.clear();
			}

			this->context.statuses.emplace_back(status);
			this->deltas.push_back(delta);
			this->deltasAndChunksSize += (status == Status::SmallDelta) ? 1u : 2u;

			// Update context info.

			// clang-format off
			if (
				this->context.currentStatus == Status::None ||
				(this->context.allSameStatus && this->context.currentStatus == status)
			)
			// clang-format on
			{
				this->context.allSameStatus = true;
			}
			else
			{
				this->context.allSameStatus = false;
			}

			this->context.currentStatus = status;

			// Not enough packet infos for creating a chunk.
			if (this->context.statuses.size() < 7)
			{
				return;
			}
			// 7 packet infos with heterogeneous status, create the chunk.
			else if (this->context.statuses.size() == 7 && !this->context.allSameStatus)
			{
				// Reset current status.
				this->context.currentStatus = Status::None;

				// Fill a vector chunk and return.
				CreateTwoBitVectorChunk(this->context.statuses);

				this->context.statuses.clear();
			}
		}**

   接收拥塞控制模块会按固定时间间隔向发送方发送反馈包,固定时间为100毫秒,超时时间100毫秒,按照RTPFB报文格式填充并发送。
1.2、发送RTPFB反馈包(REMB)
还研究,后续会补上
1.3、数据包码率与丢包统计
   包统计计算流程如下:
在这里插入图片描述
计算抖动方法:

void RtpStreamRecv::CalculateJitter(uint32_t rtpTimestamp)//rtpTimestamp包的时间戳
	{
		MS_TRACE();

		if (this->params.clockRate == 0u)
			return;

		auto transit =//包发送的时间与包接收时间的差
		  static_cast<int>(DepLibUV::GetTimeMs() - (rtpTimestamp * 1000 / this->params.clockRate));
		int d = transit - this->transit;//本次包的延时接收时间与上一个包的延时接收时间差

		// First transit calculation, save and return.
		if (this->transit == 0)
		{
			this->transit = transit;

			return;
		}

		this->transit = transit;

		if (d < 0)
			d = -d;

		this->jitter += (1. / 16.) * (static_cast<double>(d) - this->jitter);
	}

媒体包统计:统计接受的数据包码率和包计数。

void RtpDataCounter::Update(RTC::RtpPacket* packet)
	{
		uint64_t nowMs = DepLibUV::GetTimeMs();

		this->packets++;//统计接收的数据包个数
		this->rate.Update(packet->GetSize(), nowMs);//统计接收数据包的码率
	}

1.3、数据包转发到其他消费者
   包转发时mediasoup是作为媒体流发送方角色的,mediasoup服务器转发流媒体数据及发送拥塞控制
三、向发送方发送RTCP控制包
   
3.1 RTCP发送间隔
   服务器与客户端进行DTLS握手,交换密钥之后,两者之间的媒体通道标志着打通,客户端的流媒体数据包可以到达服务器,此时开启服务器端rtcp发送定时器,定时向发送端发送RR或RTX报告包。
   RTP的设计目的是允许应用程序自动从几个参与者到数千人的会话规模的扩展。例如,在音频会议上,音频数据流量本质上是自我限制的,因为一次只有一两个人说话,通过多播分布,任何给定链路上的数据速率都相对不变,不依赖于参与者的数量,但是,控制(RTCP消息)流量并不是自我限制的。如果每个参与者的接收报告以固定的速度发送,在同一时间所有参与者同时发送控制消息,则控制流量将与参与者的数量呈线性增长,会导致网络严重拥塞,那么会议就无法正常进行,因此,必须通过动态计算RTCP数据包传输之间的间隔来降低控制消息发送速率。
webrtc代码对于计算RTCP间隔的算法如下:
interval = 360 / (rate/1000)=360000/rate;
interval = random(0.5,1.5)*interval ;
rtcp带宽计算为360kbit/s除以发送码率rate,这个rate是指mediasoup服务器作为发送方向所有消费者发送所有数据(包括rtcp包)的码率,rate单位为bit/s,然后乘以0.5~1.5范围随机生成的因子,乘以随机因子的目的是避免对所有参与者的意外同步。对于音频发送rtcp间隔最大为5秒,对于视频发送rtcp间隔为1秒。
3.2 构造并发送RTCP包
   这里只讨论mediasoup作为媒体流的接收方角色,因此主要的RTCP处理就是接收来自发送方的SR以及给发送方发送RR包。对于接收的SR,SDES和BYE,服务器不做其他处理,只是更新下媒体流的分数,分数表明流接收的质量评估。
   通过前一个发送RR包和本次发送RR包之间的时间间隔内接收到的数据包来构造RR包,RR包结构如下:
在这里插入图片描述
SSRC of packet sender:发送RR包的源,为0,不会为RTCP单独创建源发送
SSRC_1 (SSRC of first source)和SSRC_1 (SSRC of first source):RR包发送的目的源,即我对这个SSRC媒体源发送的RR,其他媒体源忽略。
fraction lost:丢包率
cumulative number of packets lost:总丢包个数
interarrival jitter:包的抖动
last SR (LSR):上一个SR包中的ntp时间
delay since last SR (DLSR):从接收上一个SR到发送RR之间的时间间隔
构造RR包的代码如下,带有注释,

RTC::RTCP::ReceiverReport* RtpStreamRecv::GetRtcpReceiverReport()
	{
		MS_TRACE();

		uint8_t worstRemoteFractionLost{ 0 };

		if (this->params.useInBandFec)
		{
			// Notify the listener so we'll get the worst remote fraction lost.
			static_cast<RTC::RtpStreamRecv::Listener*>(this->listener)
			  ->OnRtpStreamNeedWorstRemoteFractionLost(this, worstRemoteFractionLost);

			if (worstRemoteFractionLost > 0)
				MS_DEBUG_TAG(rtcp, "using worst remote fraction lost:%" PRIu8, worstRemoteFractionLost);
		}

		auto* report = new RTC::RTCP::ReceiverReport();//创建RR包

		report->SetSsrc(GetSsrc());

		uint32_t prevPacketsLost = this->packetsLost;//记录前一个RR包丢包个数

		// Calculate Packets Expected and Lost.
		auto expected = GetExpectedPackets();//从发包开始到当前发送RR包的时间内期望收到的总包个数(根据序列号判断,因为序列号
		//是连续的,如果收到序列号1,2,3,8,10,则期望收到的包个数为10-1+1=10个包,其实序列号为4,5,6,7,9的包都丢失了)

		if (expected > this->mediaTransmissionCounter.GetPacketCount())//mediaTransmissionCounter记录的是实际收到的包个数
			this->packetsLost = expected - this->mediaTransmissionCounter.GetPacketCount();//packetsLost 为实际丢包的个数,注意这是从发包开始到目前为止总丢包个数,
		else
			this->packetsLost = 0u;

		// Calculate Fraction Lost.
		uint32_t expectedInterval = expected - this->expectedPrior;//在上一个发送RR包到当前发送RR包时间间隔内,期望收到的包个数

		this->expectedPrior = expected;

		uint32_t receivedInterval = this->mediaTransmissionCounter.GetPacketCount() - this->receivedPrior;//在上一个发送RR包到当前发送RR包时间间隔内,实际收到的包个数

		this->receivedPrior = this->mediaTransmissionCounter.GetPacketCount();

		int32_t lostInterval = expectedInterval - receivedInterval;//在上一个发送RR包到当前发送RR包时间间隔内,实际丢包个数

		if (expectedInterval == 0 || lostInterval <= 0)
			this->fractionLost = 0;
		else
			this->fractionLost = std::round((static_cast<double>(lostInterval << 8) / expectedInterval));//fractionLost :在上一个发送RR包到当前发送RR包时间间隔内丢包率

		// Worst remote fraction lost is not worse than local one.
		if (worstRemoteFractionLost <= this->fractionLost)
		{
			this->reportedPacketLost += (this->packetsLost - prevPacketsLost);

			report->SetTotalLost(this->reportedPacketLost);
			report->SetFractionLost(this->fractionLost);
		}
		else
		{
			// Recalculate packetsLost.
			uint32_t newLostInterval     = (worstRemoteFractionLost * expectedInterval) >> 8;
			uint32_t newReceivedInterval = expectedInterval - newLostInterval;

			this->reportedPacketLost += (receivedInterval - newReceivedInterval);

			report->SetTotalLost(this->reportedPacketLost);
			report->SetFractionLost(worstRemoteFractionLost);
		}

		// Fill the rest of the report.
		report->SetLastSeq(static_cast<uint32_t>(this->maxSeq) + this->cycles);
		report->SetJitter(this->jitter);

		if (this->lastSrReceived != 0)
		{
			// Get delay in milliseconds.
			auto delayMs = static_cast<uint32_t>(DepLibUV::GetTimeMs() - this->lastSrReceived);
			// Express delay in units of 1/65536 seconds.
			uint32_t dlsr = (delayMs / 1000) << 16;

			dlsr |= uint32_t{ (delayMs % 1000) * 65536 / 1000 };

			report->SetDelaySinceLastSenderReport(dlsr);//设置上一次sr到本次发送RR的间隔时间
			report->SetLastSenderReport(this->lastSrTimestamp);
		}
		else
		{
			report->SetDelaySinceLastSenderReport(0);
			report->SetLastSenderReport(0);
		}

		return report;
	}

三、NACK策略
   丢包重传(NACK)是抵抗网络丢包的重要手段。NACK在接收端检测到数据丢包后,发送NACK报文到发送端;发送端根据NACK报文中的序列号,在发送缓冲区找到对应的数据包,重新发送到接收端。NACK需要发送端,发送缓冲区的支持。
WebRTC中支持音频和视频的NACK重传,我们这里只分析nack机制。接收RTP数据包后,需要传入到重传nack模块处理检测丢包情况,流程如下:
在这里插入图片描述

代码如下:

bool NackGenerator::ReceivePacket(RTC::RtpPacket* packet, bool isRecovered)
	{
		MS_TRACE();

		uint16_t seq    = packet->GetSequenceNumber();
		bool isKeyFrame = packet->IsKeyFrame();

		if (!this->started)
		{
			this->started = true;
			this->lastSeq = seq;

			if (isKeyFrame)
				this->keyFrameList.insert(seq);

			return false;
		}

		// Obviously never nacked, so ignore.
		if (seq == this->lastSeq)
			return false;

		// 判断是否当前包序列号小于上一次收到的序列号,如果是,判断是否是重传包,
		if (SeqManager<uint16_t>::IsSeqLowerThan(seq, this->lastSeq))
		{
			auto it = this->nackList.find(seq);

			// It was a nacked packet.
			if (it != this->nackList.end())
			{//重传包处理
				MS_DEBUG_DEV(
				  "NACKed packet received [ssrc:%" PRIu32 ", seq:%" PRIu16 ", recovered:%s]",
				  packet->GetSsrc(),
				  packet->GetSequenceNumber(),
				  isRecovered ? "true" : "false");

				this->nackList.erase(it);

				return true;
			}

			// Out of order packet or already handled NACKed packet.
			if (!isRecovered)
			{
				MS_WARN_DEV(
				  "ignoring older packet not present in the NACK list [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
				  packet->GetSsrc(),
				  packet->GetSequenceNumber());
			}

			return false;
		}

		if (isKeyFrame)//如果是关键帧,插入到关键帧队列
			this->keyFrameList.insert(seq);
		{
			auto it = this->keyFrameList.lower_bound(seq - MaxPacketAge);//移除队列中最老的包,以至于维持队列10000包的条件

			if (it != this->keyFrameList.begin())
				this->keyFrameList.erase(this->keyFrameList.begin(), it);
		}

		if (isRecovered)
		{//如果包是恢复包,则插入到恢复队列中
			this->recoveredList.insert(seq);

			// Remove old ones so we don't accumulate recovered packets.
			auto it = this->recoveredList.lower_bound(seq - MaxPacketAge);

			if (it != this->recoveredList.begin())
				this->recoveredList.erase(this->recoveredList.begin(), it);

			// Do not let a packet pass if it's newer than last seen seq and came via
			// RTX.
			return false;
		}

		AddPacketsToNackList(this->lastSeq + 1, seq);//插入到nack队列

		this->lastSeq = seq;

		// Check if there are any nacks that are waiting for this seq number.
		std::vector<uint16_t> nackBatch = GetNackBatch(NackFilter::SEQ);//nacklist不为空是发送一次请求重传,并记录重传请求发起次数

		if (!nackBatch.empty())
			this->listener->OnNackGeneratorNackRequired(nackBatch);//发送请求重传包

		// This is important. Otherwise the running timer (filter:TIME) would be
		// interrupted and NACKs would never been sent more than once for each seq.
		if (!this->timer->IsActive())
			MayRunTimer();//启动定时器

		return false;
	}

   nack模块会维护三个队列,重传队列nackList(容量10000包),恢复队列recoveList(容量10000包)和关键帧队列keyFrameList(容量10000包),队列满之后,每收到一个包都需要更新队列,移除最老的包,。如果收到的包序列号比上一个收到的包序列号小,判定为可能是重传包,如果是重传包,则必然之前会存储在重传队列中,如果重传队列中不存在,表明不是重传包,忽略,如果存在,是重传包,并从队列中移除掉。

void NackGenerator::AddPacketsToNackList(uint16_t seqStart, uint16_t seqEnd)
	{
		MS_TRACE();

		// 队列数量满10000包后,移除收到的最老的包,就是移除队列头部包.
		auto it = this->nackList.lower_bound(seqEnd - MaxPacketAge);

		this->nackList.erase(this->nackList.begin(), it);

		// 如果重传队列中包数量到达MaxNackPackets=1000时,清楚关键帧队列,并请求关键帧.
		uint16_t numNewNacks = seqEnd - seqStart;

		if (this->nackList.size() + numNewNacks > MaxNackPackets)
		{
			// clang-format off
			while (
				RemoveNackItemsUntilKeyFrame() &&
				this->nackList.size() + numNewNacks > MaxNackPackets
			)
			// clang-format on
			{
			}

			if (this->nackList.size() + numNewNacks > MaxNackPackets)
			{
				MS_WARN_TAG(
				  rtx, "NACK list full, clearing it and requesting a key frame [seqEnd:%" PRIu16 "]", seqEnd);

				this->nackList.clear();
				this->listener->OnNackGeneratorKeyFrameRequired();//发送请求关键帧

				return;
			}
		}

		for (uint16_t seq = seqStart; seq != seqEnd; ++seq)
		{
			MS_ASSERT(this->nackList.find(seq) == this->nackList.end(), "packet already in the NACK list");

			// Do not send NACK for packets that are already recovered by RTX.
			if (this->recoveredList.find(seq) != this->recoveredList.end())//判断收到的包是否之前需要重传的包,且之前已经收到过发送方
			//发送的重传包了,如果是直接忽略,如果还没有收到重传后的包,则插入重传队列
				continue;

			this->nackList.emplace(std::make_pair(seq, NackInfo{ seq, seq }));
		}
	}

   只要发现有丢包,nacklisg不为空,则发送一个nack包并记录请求重传一次,当请求重传次数达到一定数量(根据服务器类型决定,mediasoup设定为请求重传最大次数为10次),且还没有收到重传包,则不在请求重传直接放弃。代码如下:

std::vector<uint16_t> NackGenerator::GetNackBatch(NackFilter filter)
	{
		MS_TRACE();

		uint64_t nowMs = DepLibUV::GetTimeMs();
		std::vector<uint16_t> nackBatch;

		auto it = this->nackList.begin();

		while (it != this->nackList.end())
		{
			NackInfo& nackInfo = it->second;
			uint16_t seq       = nackInfo.seq;

			// clang-format off
			if (//有重传包时,就发起一次请求重传
				filter == NackFilter::SEQ &&
				nackInfo.sentAtMs == 0 &&
				(
					nackInfo.sendAtSeq == this->lastSeq ||
					SeqManager<uint16_t>::IsSeqHigherThan(this->lastSeq, nackInfo.sendAtSeq)
				)
			)
			// clang-format on
			{
				nackBatch.emplace_back(seq);
				nackInfo.retries++;//重传次数叠加
				nackInfo.sentAtMs = nowMs;

				if (nackInfo.retries >= MaxNackRetries)//达到重传最大次数MaxNackRetries=10后,不在请求发送重传
				{
					MS_WARN_TAG(
					  rtx,
					  "sequence number removed from the NACK list due to max retries [filter:seq, seq:%" PRIu16
					  "]",
					  seq);

					it = this->nackList.erase(it);
				}
				else
				{
					++it;
				}

				continue;
			}

			if (filter == NackFilter::TIME && nowMs - nackInfo.sentAtMs >= this->rtt)
			{
				nackBatch.emplace_back(seq);
				nackInfo.retries++;
				nackInfo.sentAtMs = nowMs;

				if (nackInfo.retries >= MaxNackRetries)
				{
					MS_WARN_TAG(
					  rtx,
					  "sequence number removed from the NACK list due to max retries [filter:time, seq:%" PRIu16
					  "]",
					  seq);

					it = this->nackList.erase(it);
				}
				else
				{
					++it;
				}

				continue;
			}

			++it;
		}

#if MS_LOG_DEV_LEVEL == 3
		if (!nackBatch.empty())
		{
			std::ostringstream seqsStream;
			std::copy(
			  nackBatch.begin(), nackBatch.end() - 1, std::ostream_iterator<uint32_t>(seqsStream, ","));
			seqsStream << nackBatch.back();

			if (filter == NackFilter::SEQ)
				MS_DEBUG_DEV("[filter:SEQ, asking seqs:%s]", seqsStream.str().c_str());
			else
				MS_DEBUG_DEV("[filter:TIME, asking seqs:%s]", seqsStream.str().c_str());
		}
#endif

		return nackBatch;
	}

nack反馈包结构如下:
在这里插入图片描述
前12字节固定,后面紧跟至少一个nackItem,每个item结构包含16为Packet identifier,和16位Bitmap of lost packets。Packet identifier表示当前item下第一个重传包的序列号,Bitmap of lost packets表示当前item下携带的序列号map,一个item最多携带16个。
计算方法如下:
如果重传队列中有s1,s2,s3,s4,s5,s6,s7,s8…,从第一个seq往后的每一个系列号与第一个seq相减再-1,
   即s2包的bitmap偏移量shift=s2-s1-1;
   s3包的bitmap偏移量shift=s3-s1-1;
   s5包的bitmap偏移量shift=s5-s1-1;
只要shit<=15,对应是seq包就在当前item下,否则放在下一个item。
如:nack队列中序列号有100 102 103 106 119 122 126 130 150 152 153 154 180则第一个item的Packet identifier=100,
   seq=102的包shit=102-100-1=1,此时Bitmap of lost packets=0x0002;
   seq=103的包shit=103-100-1=2,此时Bitmap of lost packets=0x0006;
   seq=106的包shit=106-100-1=5,此时Bitmap of lost packets=0x0026;
   seq=119的包shit=119-100-1=18,由于18超过15,无法在16位的Bitmap of lost packets中进行偏移,因此第一个item结束,Bitmap of lost packets=0x0026,seq=119的包放在第二个item中,且第二个item的Packet identifier=119,计算同上,以此类推;代码如下:

inline void RtpStreamRecv::OnNackGeneratorNackRequired(const std::vector<uint16_t>& seqNumbers)
	{
		MS_TRACE();

		MS_ASSERT(this->params.useNack, "NACK required but not supported");

		MS_DEBUG_TAG(
		  rtx,
		  "triggering NACK [ssrc:%" PRIu32 ", first seq:%" PRIu16 ", num packets:%zu]",
		  this->params.ssrc,
		  seqNumbers[0],
		  seqNumbers.size());

		RTC::RTCP::FeedbackRtpNackPacket packet(0, GetSsrc());

		auto it        = seqNumbers.begin();
		const auto end = seqNumbers.end();
		size_t numPacketsRequested{ 0 };

		while (it != end)
		{
			uint16_t seq;
			uint16_t bitmask{ 0 };

			seq = *it;
			++it;

			while (it != end)
			{
				uint16_t shift = *it - seq - 1;

				if (shift > 15)
					break;

				bitmask |= (1 << shift);
				++it;
			}

			auto* nackItem = new RTC::RTCP::FeedbackRtpNackItem(seq, bitmask);

			packet.AddItem(nackItem);

			numPacketsRequested += nackItem->CountRequestedPackets();
		}

		// Ensure that the RTCP packet fits into the RTCP buffer.
		if (packet.GetSize() > RTC::RTCP::BufferSize)
		{
			MS_WARN_TAG(rtx, "cannot send RTCP NACK packet, size too big (%zu bytes)", packet.GetSize());

			return;
		}

		this->nackCount++;
		this->nackPacketCount += numPacketsRequested;

		packet.Serialize(RTC::RTCP::Buffer);

		// Notify the listener.
		static_cast<RTC::RtpStreamRecv::Listener*>(this->listener)->OnRtpStreamSendRtcpPacket(this, &packet);
	}

发送第一次请求重传后,如果nack队列还不为空则启动定时器(mediasoup设置40ms发送一次),定时发送请求重传,如果nack机制不设置重传定时器,则每个丢失包最多只会发送以此请求重传,定时器启动后最多可以发送10次,确保收到重传包,如果10次还没有收到重传包,则忽略,没办法了,只能丢了。

最后

以上就是满意帅哥为你收集整理的mediasoup服务器接收流媒体数据及接收拥塞控制的全部内容,希望文章能够帮你解决mediasoup服务器接收流媒体数据及接收拥塞控制所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部