我正在尝试向 udp 端口上传出的 RTP 数据包添加一些 RTP 标头扩展值。因此,我向
rtph265pay
元素添加了 add_probe() 方法。但是我根本没有看到探测器被调用。
这是我的管道的简化版本
"rtspsrc location=<myrtspurl> name=rtspsrc latency=100 \
rtspsrc. ! rtph265depay ! tee name=t \
t. ! queue ! h265parse ! rtph265pay name=rtph265pay config-interval=-1 \
! udpsink name=udpsink host=127.0.0.1 port=56001";
我验证了我能够在接收管道上播放数据包。所以数据正在传递。但我不明白为什么我没有收到 add_probe() 的回调。
我在使用
rtph264pay
发送 H264 的 rtsp 流上尝试了完全相同的代码。所以这似乎是 H265 独有的行为。
另一个观察结果是,当我将 add_probe() 添加到 rtph265pay 之后的
udpsink
接收器元素时。我也没有收到回电。
我已经在 Rust 中实现了这个,这是我添加回调的方式。
rtspsrc.connect_pad_added(move |_, src_pad| {
match src_pad.current_caps() {
Some(caps) => {
let new_pad_struct = caps.structure(0).expect("Failed to get first structure of caps for audio");
for i in 0..new_pad_struct.n_fields() {
match new_pad_struct.nth_field_name(i).unwrap().as_str() {
"media" => {
let media_type = new_pad_struct.value("media").unwrap();
let field_value = media_type.get::<&str>().unwrap();
println!("field_value={}", field_value);
if field_value == "video" {
bin_ref.debug_to_dot_file(gst::DebugGraphDetails::all(), "PLAYING");
rtppay_src_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
println!("adding probe for rtppay");
if let Some(probe_data) = probe_info.data.as_mut() {
if let gst::PadProbeData::Buffer(ref mut buffer) = probe_data {
let size = buffer.size();
match buffer.pts() {
Some(pts) => {
println!("ptstime={}", pts.seconds())
},
None => {
println!("No PTS, cannot get bandwidth")
}
}
let b = buffer.get_mut().unwrap();
let mut rtp_buffer = RTPBuffer::from_buffer_writable(b).unwrap();
let pts = rtp_buffer.buffer().pts().unwrap();
// Convert the PTS to bytes
let pts_bytes = pts.to_be_bytes();
let extension_data = &pts_bytes[..];
let appbits = 5; // Custom application-specific bits
let id = 1; // Custom extension ID
let result = rtp_buffer.add_extension_twobytes_header(appbits, id, extension_data);
if let Err(e) = result {
eprintln!("Failed to add RTP header extension: {:?}", e);
}
}
}
gst::PadProbeReturn::Ok
});
udpsink_sink_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
println!("adding pad probe to udpsink");
if let Some(probe_data) = probe_info.data.as_ref() {
if let gst::PadProbeData::Buffer(buffer) = probe_data {
let rtp_buffer = RTPBuffer::from_buffer_readable(buffer).unwrap();
// Check for RTP extension header
if let Some((appbits, extension_data)) = rtp_buffer.extension_twobytes_header(1, 0) { //extension_twobytes_header(1, 0) {
println!("RTP Extension present:");
println!("App bits: {}", appbits);
println!("Extension data: {:?}", extension_data);
// Convert the extension data back to PTS
if extension_data.len() != 0 {
let mut pts_bytes = [0u8; 8];
pts_bytes[..4].copy_from_slice(&extension_data[..4]); // Copy the first 4 bytes
let pts = u64::from_be_bytes(pts_bytes);
//let pts = u64::from_be_bytes(extension_data.try_into().unwrap());
println!("Extracted PTS from RTP extension: {}", pts);
}
} else {
println!("No RTP Extension found");
}
match rtp_buffer.buffer().pts() {
Some(pts) => {
println!("udpsink buffer.pts={}", pts.seconds());
},
None => {
println!("No PTS, cannot get bandwidth");
}
}
}
}
gst::PadProbeReturn::Ok
}).unwrap();
}
}
_ => {}
}
}
}
_ => {}
}
});
请让我知道我错过了什么。我感谢您的帮助!
您应该使用
GST_PAD_PROBE_TYPE_BUFFER_LIST
来表示 rtph265pay