WebRTC RTP Sender
Abstract |
WebRTC RTP Sender |
Authors |
Walter Fan |
Status |
WIP |
Updated |
2024-08-21 |
Overview
A RTCPeerConnection method getSenders() returns an array of RTCRtpSender objects, each of which represents the RTP sender responsible for transmitting one track’s data.
A sender object provides methods and properties for examining and controlling the encoding and transmission of the track’s data.
和 Javascript 中的 API 定义的 RTCRtpSender 类似,这里的 RTPSender 就是 C++ 层面的实现
Implementation
RtpState
RtpSender 会维护一个状态
struct RtpState {
uint16_t sequence_number;
uint32_t start_timestamp;
uint32_t timestamp;
int64_t capture_time_ms;
int64_t last_timestamp_time_ms;
bool ssrc_has_acked;
};
Packet History
RtpSender 维护了一个用来存储发送过的包的队列,称为 Packet History, 根据 sequence number 来排序 老包放在队首,新包放在队尾
RtpPacketHistory* const packet_history_;
由于存在 sequence number 回滚的情况,队尾的包的 sequence number 有可能比之前的包小。
RtpPacketHistory 包含了一个队列,队列的元素是 StoredPacket , 形如 std::deque<StoredPacket> packet_history_
对 NACK 的处理
收到 NACK 请求,根据 sequence number 从发送缓存中找到相应的包重发
void RTPSender::OnReceivedNack(
const std::vector<uint16_t>& nack_sequence_numbers,
int64_t avg_rtt) {
packet_history_->SetRtt(TimeDelta::Millis(5 + avg_rtt));
for (uint16_t seq_no : nack_sequence_numbers) {
const int32_t bytes_sent = ReSendPacket(seq_no);
if (bytes_sent < 0) {
// Failed to send one Sequence number. Give up the rest in this nack.
RTC_LOG(LS_WARNING) << "Failed resending RTP packet " << seq_no
<< ", Discard rest of packets.";
break;
}
}
}
重发 RTP 包
int32_t RTPSender::ReSendPacket(uint16_t packet_id) {
int32_t packet_size = 0;
const bool rtx = (RtxStatus() & kRtxRetransmitted) > 0;
std::unique_ptr<RtpPacketToSend> packet =
packet_history_->GetPacketAndMarkAsPending(
packet_id, [&](const RtpPacketToSend& stored_packet) {
// Check if we're overusing retransmission bitrate.
// TODO(sprang): Add histograms for nack success or failure
// reasons.
packet_size = stored_packet.size();
std::unique_ptr<RtpPacketToSend> retransmit_packet;
if (retransmission_rate_limiter_ &&
!retransmission_rate_limiter_->TryUseRate(packet_size)) {
return retransmit_packet;
}
// walter: 如果开启了 RTX, 就构建 RTX 包,否则就用原始的流,相同的 SSRC
if (rtx) {
retransmit_packet = BuildRtxPacket(stored_packet);
} else {
retransmit_packet =
std::make_unique<RtpPacketToSend>(stored_packet);
}
if (retransmit_packet) {
retransmit_packet->set_retransmitted_sequence_number(
stored_packet.SequenceNumber());
}
return retransmit_packet;
});
if (packet_size == 0) {
// Packet not found or already queued for retransmission, ignore.
RTC_DCHECK(!packet);
return 0;
}
if (!packet) {
// Packet was found, but lambda helper above chose not to create
// `retransmit_packet` out of it.
return -1;
}
packet->set_packet_type(RtpPacketMediaType::kRetransmission);
packet->set_fec_protect_packet(false);
std::vector<std::unique_ptr<RtpPacketToSend>> packets;
packets.emplace_back(std::move(packet));
paced_sender_->EnqueuePackets(std::move(packets));
return packet_size;
}
构建 RTX 包
std::unique_ptr<RtpPacketToSend> RTPSender::BuildRtxPacket(
const RtpPacketToSend& packet) {
std::unique_ptr<RtpPacketToSend> rtx_packet;
// Walter: 先添加原始的 RTP 包头
// Add original RTP header.
{
MutexLock lock(&send_mutex_);
if (!sending_media_)
return nullptr;
RTC_DCHECK(rtx_ssrc_);
// Walter: 将 payload type 修改为重传的 RTP 包的 payload type
// Replace payload type.
auto kv = rtx_payload_type_map_.find(packet.PayloadType());
if (kv == rtx_payload_type_map_.end())
return nullptr;
rtx_packet = std::make_unique<RtpPacketToSend>(&rtp_header_extension_map_,
max_packet_size_);
rtx_packet->SetPayloadType(kv->second);
// Walter: 将 SSRC 修改为 RTX 重传包的 SSRC
// Replace SSRC.
rtx_packet->SetSsrc(*rtx_ssrc_);
CopyHeaderAndExtensionsToRtxPacket(packet, rtx_packet.get());
// Walter: RTX 包和原始包使用不同的 SSRC , 通过 MID 和 RRID 将其关联起来
// Walter: 关联的规则就是相同的 MID, RTX 包头中的 RRID 扩展头中存放的是原始包的 RID
// RTX packets are sent on an SSRC different from the main media, so the
// decision to attach MID and/or RRID header extensions is completely
// separate from that of the main media SSRC.
//
// Note that RTX packets must used the RepairedRtpStreamId (RRID) header
// extension instead of the RtpStreamId (RID) header extension even though
// the payload is identical.
if (always_send_mid_and_rid_ || !rtx_ssrc_has_acked_) {
// These are no-ops if the corresponding header extension is not
// registered.
if (!mid_.empty()) {
rtx_packet->SetExtension<RtpMid>(mid_);
}
if (!rid_.empty()) {
rtx_packet->SetExtension<RepairedRtpStreamId>(rid_);
}
}
}
RTC_DCHECK(rtx_packet);
uint8_t* rtx_payload =
rtx_packet->AllocatePayload(packet.payload_size() + kRtxHeaderSize);
if (rtx_payload == nullptr)
return nullptr;
// Walter: 添加原始包的 sequence number
// Add OSN (original sequence number).
ByteWriter<uint16_t>::WriteBigEndian(rtx_payload, packet.SequenceNumber());
// Walter: 复制原始包的 payload
// Add original payload data.
auto payload = packet.payload();
memcpy(rtx_payload + kRtxHeaderSize, payload.data(), payload.size());
// Walter: 复制原始包的 additional data
// Add original additional data.
rtx_packet->set_additional_data(packet.additional_data());
// Walter: 复制原始包的 capture time
// Copy capture time so e.g. TransmissionOffset is correctly set.
rtx_packet->set_capture_time(packet.capture_time());
return rtx_packet;
}
在 RTX 重传包的处理方法 ReSendPacket(uint16_t packet_id) 中有一个速率限制器 retransmission_rate_limiter_ 如果应用了这个限速器,并检查发送速率,只有低于这个限速器才发出去重传包