大家好,欢迎来到IT知识分享网。
jrtplib
作用
- jrtplib是一个基于C++编写的面向对象的库,旨在帮助开发人员使用RFC3550中描述的实时传输协议(RTP),目前已经可以运行在Windows、Linux、FreeBSD、 Solaris、Unix和VxWorks等多种操作系统上。
- 该库使用户能够发送和接收数据使用RTP,无需担心SSRC冲突、调度和传输RTCP数据等。用户只需提供库通过发送有效负载数据,库为用户提供访问权限输入RTP和RTCP数据。
- 该库提供了几个类,这些类有助于创建RTP应用程序。大多数用户可能只需要RTPSession类来构建应用程序,或者从RTPSecureSession派生一个类来支持SRTP。这些类提供了发送RTP数据的必要功能,并在内部处理RTCP部分。
依赖
- 依赖:
JThread:- 第一种是用 jthread 库提供的线程自动在后台执行对数据的接收。第二种是用户自己调用 RTPSession 中的 Poll 方法。如果采取第一种方法则要安装 jthread 库
- 在jrtplib的configure中,会查找系统是否有编译了jthread库,如果有,那么编译的jrtp库会开启对jthread的支持。。
JRTPLIB 2.x系列和3.x系列
网上一般有JRTPLIB 2.x系列和3.x系列两种版本:
简单来说:
- 2.x系列代码量少使用简单,但是只支持RFC 1889不支持RFC 3550()
- 3.x支持RFC 3550,但代码量稍多,以及使用也稍显复杂。
详细的说:
- 最重要的变化之一可能是,3.x版本基于RFC 3550,2x版本基于已经过时的RFC 1889
- 此外,创建2.x系列的想法是,用户只需要使用RTPSession类,这意味着其他类本身不是很有用。另一方面,该版本旨在提供许多有用的组件,以帮助用户构建支持RTP的应用程序。
- 在3.x版本中,特定于传输RTP数据包的底层协议的代码捆绑在一个类中,该类从名为RTPTTransmiter的类继承其接口。这使得不同的底层应用程序变得容易需要支持的协议。目前支持IPv4上的UDP和IPv6上的UDP。
- 对于诸如混音器或转换器之类的应用程序,使用RTPSession类并不是一个好的解决方案。其他组件也可以用于此目的:传输组件、SSRC表、RTCP调度程序等。使用这些组件,构建各种应用程序应该更容易。
编译
下载
首先从JRTPLIB的网站来使用下载最新的源码包,我现在的是jrtplib-3.11.2.zip 。
请注意,此版本至少需要JThread 1.3.0
我下载的是jthread-1.3.3.zip
编译
方法一(推荐)
工程目录如下
- include:自己创建的,存放自己写的头文件
- src:自己创建的,存放自己写的源文件,稍后在下文将会说明
- lib:自己创建的,存放当前这个工程的所有依赖
- jthread-1.3.3,是刚刚
jthread-1.3.3.zip的解压 - jrtplib-3.11.2,是刚刚
jrtplib-3.11.2.zip的解压 - export:存放编译脚本和生成的依赖头文件和库文件。当前写了两个脚本
- build_jthread.sh:用来编译jthread
- build_jrtp.sh:用来编译jrtplib
- jthread-1.3.3,是刚刚
build_jthread.sh
其内容为:
#!/bin/bash currentPath=$(pwd) libPath=$(pwd)/../jthread-1.3.3 if [ -d "./jthread" ]; then rm -rf jthread fi mkdir jthread cd jthread installPath=$(pwd) cd ${libPath} if [ -d "./build" ]; then rm -rf build fi mkdir build cd build cmake -DCMAKE_INSTALL_PREFIX=${installPath} .. make make install cd ..
build_jrtp.sh
其内容为:
#!/bin/bash currentPath=$(pwd) libPath=$(pwd)/../jrtplib-3.11.2 if [ -d "./jrtplib" ]; then rm -rf jrtplib fi mkdir jrtplib cd jrtplib installPath=$(pwd) cd ${libPath} if [ -d "./build" ]; then rm -rf build fi mkdir build cd build cmake -DCMAKE_INSTALL_PREFIX=${installPath} .. make make install cd ..
修改jrtplib-3.11.2中的CMakeLists.txt文件第37行,加上路径。
find_package(JThread PATHS /home/oceanstar/CLionProjects/jrtp_test/lib/export/jthread)
开始编译
cd export chmod 777 *.sh ./build_jthread.sh ./build_jrtp.sh
置于对应的makefile怎么写,可以看
方法二(不推荐)
编译JTHREAD
将下载的压缩包解压后进入jthread-1.3.3目录中
cd jthread-1.3.3 mkdir build cd build cmake ../ -DCMAKE_INSTALL_PREFIX=../../export make make install
我们来看下应该怎么引用生成的库和头。 在build下有个pkgconfig,下面生成了一个jthread.pc
编译jrtplib
将下载的源码解压缩
cd jrtplib-3.11.2 mkdir build cd build cmake ../ -DCMAKE_INSTALL_PREFIX=../../export make && make install
使用
工程结构
最外层CMakeLists.txt
cmake_minimum_required(VERSION 3.16) project(jrtp_test) set(CMAKE_CXX_STANDARD 14) add_subdirectory (src)
src目录下的CMakeLists.txt
include_directories( ${
CMAKE_SOURCE_DIR}/include ${
CMAKE_SOURCE_DIR}/lib/export/jrtplib/include/jrtplib3 ${
CMAKE_SOURCE_DIR}/lib/export/jthread/include/jthread ) link_directories( ${
CMAKE_SOURCE_DIR}/lib/export/jrtplib/lib ${
CMAKE_SOURCE_DIR}/lib/export/jthread/lib ) add_definitions("-Wall -g") aux_source_directory(. SRC_LIST) add_executable(${
PROJECT_NAME} ${
SRC_LIST} ) set (EXECUTABLE_OUTPUT_PATH ${
PROJECT_SOURCE_DIR}/bin) target_link_libraries( ${
PROJECT_NAME} -ljrtp -ljthread -lpthread )
实例一
必须先进入命名空间:
using namespace jrtplib;
初始化
RTPSession rtpSession;
(2)然后调用 Create() 方法来对初始化一个会话。
- 要真正创建会话,必须调用create成员函数,该函数接受三个参数:
- 第一个参数是RTPSessionParams类型,用于指定会话的常规选项。
- 必须显式设置[此类的要发送的数据的时间戳单位,即调用
SetOwnTimestampUnit],否则将无法成功创建会话 - 其他会话参数可能取决于您打算使用的实际RTP配置文件。
- 必须显式设置[此类的要发送的数据的时间戳单位,即调用
- 第二个参数是指向RTPTransimissionParams实例的指针,并描述传输组件的参数
- 可以
SetPortbase指定接收数据的端口,注意端口不能是奇数,否者运行时会出现错误:
- 可以
- 第三个参数选择要使用的传输组件的类型。默认情况下,使用UDP / IPv4传输器,对于这个特定的传输器,传输参数应该是RTPUDPv4TransmissionParams类型。
(3)处理返回值:
- 第一个参数是RTPSessionParams类型,用于指定会话的常规选项。
- 如果RTP会话创建过程失败,Create()方法将会返回一个负数,通过它虽然可以很容易地判断出函数调用究竟是成功的还是失败的,但却很难明白出错的原因到底什么。
- JRTPLIB采用了统一的错误处理机制,它提供的所有函数如果返回负数就表明出现了某种形式的错误,而具体的出错信息则可以通过调用 RTPGetErrorString()函数得到。RTPGetErrorString()函数将错误代码作为参数传入,然后返回该错误代码所对应的错误信息。
#include <stdio.h> #include "rtpsessionparams.h" #include "rtpudpv4transmitter.h" #include "rtpsession.h" using namespace jrtplib; int main(void) {
double tsunit; int ListenPort; RTPSession rtpSession; RTPSessionParams SessParams; RTPUDPv4TransmissionParams TransParams; tsunit = 1.0/8000.0; /* 1/8000表示1秒钟采样8000次,即录音时的8KHz*/ SessParams.SetOwnTimestampUnit(tsunit); // 时间戳:1秒钟8000个样本 SessParams.SetAcceptOwnPackets(true); // 设置是否接收属于本身的数据,true-接收,false-不接收 ListenPort = 5440; TransParams.SetPortbase(ListenPort); // 设置本地接收的端口号 int iErrNum = rtpSession.Create(SessParams, &TransParams); std::string RtpError = RTPGetErrorString(iErrNum); if (iErrNum < 0){
printf( "Create RTP Session error! Reason: %s!\r\n", RtpError.c_str() ); return -1; } printf( "Create RTP Session OK! Reason: %s!\r\n", RtpError.c_str() ); return 0; }
数据发送
当RTP会话成功建立起来之后,接下来就可以进行流媒体数据的实时传输了。
(1)首先要设置RTP和RTCP数据应该发送到哪个目的地
- RTP协议允许同一会话存在多个目标地址,这可以通过调用RTPSession类的AddDestination()、 DeleteDestination()和ClearDestinations()方法来完成。
- 此函数接受RTPAddress类型的参数。这是一个抽象类,对于IPv4上的UDP发送器,实际使用的类是RTPIPv4Address。
- 例如,下面的语句表示的是让RTP会话将数据发送到本地主机的6000端口:
char destIp [16] = "127.0.0.1"; int destPort = 10000; RTPIPv4Address addr(ntohl(inet_addr(destIp)), destPort); iErrNum = rtpSession.AddDestination(addr); if (iErrNum < 0) {
printf( "rtpSession AddDestination error! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() ); exit(-1); } printf( "rtpSession AddDestination ok! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() );
(2)目标地址全部指定之后,接着就可以调用 RTPSession 类的 SendPacket() 方法,向所有的目标地址发送流媒体数据。
int SendPacket(const void *data,size_t len); int SendPacket(const void *data,size_t len, uint8_t pt,bool mark,uint32_t timestampinc);
- SendPacket()最典型的用法是类似于下面的语句,其中第一个参数是要被发送的数据,而第二个参数则指明将要发送数据的长度,再往后依次是RTP负载类型、标识和时戳增量。
sess.SendPacket(buffer, 5, 0, false, 10);
- 对于同一个 RTP 会话来讲,负载类型、标识和时戳增量通常来讲都是相同的,JRTPLIB 允许将它们设置为会话的默认参数,这是通过调用 RTPSession 类的 SetDefaultPayloadType()、SetDefaultMark() 和SetDefaultTimeStampIncrement() 方法来完成的。
session.SetDefaultPayloadType(96); //希望使用负载类型96 session.SetDefaultMark(false); //想要指出何时收到了来自其他人的数据包 session.SetDefaultTimestampIncrement(160); //假设在一分钟的时间内,我们想要发送包含20毫秒(或160个样本)的数据包
- 之后在进行数据发送时只需指明要发送的数据及其长度就可以了:
sess.SendPacket(buffer, 5);
接收数据
- 如果库是在JThread支持下编译的,那么传入的数据将在后台处理。
- 如果在编译时没有启用JThread支持,或者在会话参数中指定不应该使用轮询线程,那么必须定期调用RTPSession成员函数poll来处理传入数据,并在必要时发送RTCP数据。
方法1: 自己使用PollData方法来接收发送过来的RTP或者 RTCP数据报。(不推荐)
在调用session.Poll()函数时,程序报错This function is not available when using the RTP poll thread feature。
- 网上搜索说是防火墙问题,导致数据未收到,经检查我的防火墙一直是关着的,跟这个应该没关系。
- 注意到jrtplib源码中的example1示例,其中当需要调用Poll函数(对于流媒体数据的接收端,首先需要调用 RTPSession 类的 PollData() 方法来接收发送过来的 RTP 或者RTCP 数据报。JRTPLIB-3.7中修改PollData()方法为Poll(),使用都一样)时,会有个宏定义,最后发现应该是与JThread这个库有关,当你使用这个库的时候,就不需要你自己去主动调用Poll函数,而是JThread会帮你做。所以如果不使用JThread那么,就需要使用Poll函数,可以测试下不安装JThread的情况下是否就不会报错了。而我这边是安装并使用JThread的,所以代码中不再需要自己手动去Poll,所以不调用该函数即可!
方法2:传入的数据将在后台处理。
- 在调用RTPSession成员函数RTPSession::BeginDataAccess和RTPSession::EndDataAccess之间,可以完成有关会话参与者、数据包检索等的信息。这可以确保后台线程不会试图更改您试图访问的数据。
- 由于同一个 RTP 会话中允许有多个参与者(源),你既可以通过调用 RTPSession 类的GotoFirstSource() 和 GotoNextSource() 方法来遍历所有的源,也可以通过调用 RTPSession 类的GotoFirstSourceWithData() 和 GotoNextSourceWithData() 方法来遍历那些携带有数据的源。
- 在从 RTP 会话中检测出有效的数据源之后,接下去就可以调用 RTPSession 类的 GetNextPacket() 方法从中抽取 RTP 数据报,有的话返回非NULL,获取数据长度和收到的数据,可对数据进行处理,当接收到的 RTP 数据报处理完之后,一定要记得需要调用DeletePacket及时释放。
// 开始接收数据 rtpSession.BeginDataAccess(); if (rtpSession.GotoFirstSource()) {
do {
RTPPacket *packet; while ((packet = rtpSession.GetNextPacket()) != 0) {
// 获取接收数据长度 unsigned int recvSize = packet->GetPayloadLength(); // 获取接收数据 unsigned char * recvData = (unsigned char *)packet->GetPayloadData(); std::cout << "Got packet with extended sequence number " << packet->GetExtendedSequenceNumber() << " from SSRC " << packet->GetSSRC() << "; recvSize " << recvSize << "[" << recvData << "]" << std::endl; // 删除数据包 rtpSession.DeletePacket(packet); } } while (rtpSession.GotoNextSource()); } rtpSession.EndDataAccess();
- JRTPLIB 为 RTP 数据报定义了三种接收模式,其中每种接收模式都具体规定了哪些到达的 RTP 数据报将会被接受,而哪些到达的 RTP 数据报将会被拒绝。通过调用 RTPSession 类的 SetReceiveMode() 方法可以设置下列这些接收模式:
- RECEIVEMODE_ALL 缺省的接收模式,所有到达的 RTP 数据报都将被接受;
- RECEIVEMODE_IGNORESOME 除了某些特定的发送者之外,所有到达的 RTP 数据报都将被接受,而被拒绝的发送者列表可以通过调用 AddToIgnoreList()、DeleteFromIgnoreList() 和 ClearIgnoreList() 方法来进行设置;
- RECEIVEMODE_ACCEPTSOME 除了某些特定的发送者之外,所有到达的 RTP 数据报都将被拒绝,而被接受的发送者列表可以通过调用 AddToAcceptList ()、DeleteFromAcceptList 和 ClearAcceptList () 方法来进行设置。 下面是采用第三种接收模式的程序示例。
rtpSession.BeginDataAccess(); if (rtpSession.GotoFirstSourceWithData()) {
do {
//rtpSession.AddToAcceptList(&addresss); rtpSession.SetReceiveMode(RTPTransmitter::ReceiveMode::AcceptAll); RTPPacket *pack; pack = rtpSession.GetNextPacket(); // 处理接收到的数据 delete pack; } while (rtpSession.GotoNextSourceWithData()); } rtpSession.EndDataAccess();
控制信息
- JRTPLIB 是一个高度封装后的RTP库,程序员在使用它时很多时候并不用关心RTCP数据报是如何被发送和接收的,因为这些都可以由JRTPLIB自己来完成。 只要 Poll()或者SendPacket()方法被成功调用,JRTPLIB就能够自动对到达的 RTCP数据报进行处理,并且还会在需要的时候发送RTCP数据报,从而能够确保整个RTP会话过程的正确性。
- 而另一方面,通过调用RTPSession类提供的SetLocalName()、SetLocalEMail()、 SetLocalLocation()、SetLocalPhone()、SetLocalTool()和SetLocalNote()方法, JRTPLIB又允许程序员对RTP会话的控制信息进行设置。所有这些方法在调用时都带有两个参数,其中第一个参数是一个char型的指针,指向将要被设置的数据;而第二个参数则是一个int型的数值,表明该数据中的前面多少个字符将会被使用。例如下面的语句可以被用来设置控制信息中的电子邮件地址:
rtpSession.SetLocalEMail("",19);
- 在RTP 会话过程中,不是所有的控制信息都需要被发送,通过调用RTPSession类提供的 EnableSendName()、EnableSendEMail()、EnableSendLocation()、EnableSendPhone ()、EnableSendTool()和EnableSendNote()方法,可以为当前RTP会话选择将被发送的控制信息。
销毁
发送退出记得释放内存即可,但是接收退出有两点要注意:
- 第一点是若是开始接收数据BeginDataAccess一定要调用EndDataAccess否则不会关掉jthread线程,不会马上退出,退出不了也就无法重新Create
- 第二点是接收了数据包则一定要调用DeletePacket数据包,然后调用销毁和等待退出,只要调用了EndDataAccess,AboutWait基本上是立即返回的,秒开秒关。
rtpSession.Destroy(); rtpSession.AbortWait();
整体代码
- 发送端
#include <stdio.h> #include "rtpsessionparams.h" #include "rtpudpv4transmitter.h" #include "rtpsession.h" using namespace jrtplib; int main(void) {
char destIp [16] = "127.0.0.1"; int destPort = 10000; RTPSession rtpSession; RTPSessionParams SessParams; RTPUDPv4TransmissionParams TransParams; SessParams.SetOwnTimestampUnit(1.0/8000.0); // 时间戳:1秒钟8000个样本 int iErrNum = rtpSession.Create(SessParams, &TransParams); if (iErrNum < 0){
printf( "Create RTP Session error! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() ); exit(-1); } printf( "Create RTP Session OK! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() ); // 指定RTP数据接收端 RTPIPv4Address addr(ntohl(inet_addr(destIp)), destPort); iErrNum = rtpSession.AddDestination(addr); if (iErrNum < 0) {
printf( "rtpSession AddDestination error! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() ); exit(-1); } printf( "rtpSession AddDestination ok! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() ); // 设置RTP会话默认参数 rtpSession.SetDefaultPayloadType(0); rtpSession.SetDefaultMark(false); rtpSession.SetDefaultTimestampIncrement(0); // 发送流媒体数据 char buffer[128]; int index = 1; do {
sprintf(buffer, "%d: RTP packet", index ++); rtpSession.SendPacket(buffer, strlen(buffer)); printf("Send packet [%s]!\n", buffer); } while(1); rtpSession.Destroy(); rtpSession.AbortWait(); return 0; }
- 接收端
#include <stdio.h> #include <iostream> #include "rtpsessionparams.h" #include "rtpudpv4transmitter.h" #include "rtpsession.h" #include "rtppacket.h" using namespace jrtplib; int main(void) {
int localPort = 10000; RTPSession rtpSession; RTPSessionParams SessParams; RTPUDPv4TransmissionParams TransParams; SessParams.SetOwnTimestampUnit(1.0/8000.0); // 时间戳:1秒钟8000个样本 TransParams.SetPortbase(localPort); // 设置本地接收的端口号 int iErrNum = rtpSession.Create(SessParams, &TransParams); if (iErrNum < 0){
printf( "Create RTP Session error! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() ); exit(-1); } printf( "Create RTP Session OK! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() ); // 开始接收数据 rtpSession.BeginDataAccess(); if (rtpSession.GotoFirstSource()) {
do {
RTPPacket *packet; while ((packet = rtpSession.GetNextPacket()) != 0) {
// 获取接收数据长度 unsigned int recvSize = packet->GetPayloadLength(); // 获取接收数据 unsigned char * recvData = (unsigned char *)packet->GetPayloadData(); std::cout << "Got packet with extended sequence number " << packet->GetExtendedSequenceNumber() << " from SSRC " << packet->GetSSRC() << "; recvSize " << recvSize << "[" << recvData << "]" << std::endl; // 删除数据包 rtpSession.DeletePacket(packet); } } while (rtpSession.GotoNextSource()); } rtpSession.EndDataAccess(); rtpSession.Destroy(); rtpSession.AbortWait(); return 0; }
实例二
下面是官方例程的实例一:向指定目的地发送数据包。注意我的库是支持jthread的
#include "rtpsession.h" #include "rtpudpv4transmitter.h" #include "rtpipv4address.h" #include "rtpsessionparams.h" #include "rtperrors.h" #include "rtplibraryversion.h" #include <stdlib.h> #include <stdio.h> #include <iostream> #include <string> using namespace jrtplib; void checkerror(int rtperr) {
if (rtperr < 0) {
std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl; exit(-1); } } int main(void) {
RTPSession sess; uint16_t portbase,destport; uint32_t destip; std::string ipstr; int status,i,num; std::cout << "Using version " << RTPLibraryVersion::GetVersion().GetVersionString() << std::endl; std::cout << "Enter local portbase:" << std::endl; std::cin >> portbase; std::cout << std::endl; std::cout << "Enter the destination IP address" << std::endl; std::cin >> ipstr; destip = inet_addr(ipstr.c_str()); if (destip == INADDR_NONE){
std::cerr << "Bad IP address specified" << std::endl; return -1; } //inet_addr函数以网络字节顺序返回一个值,但我们需要按主机字节顺序的IP地址,所以我们使用对ntohl的调用 destip = ntohl(destip); std::cout << "Enter the destination port" << std::endl; std::cin >> destport; std::cout << std::endl; std::cout << "Number of packets you wish to be sent:" << std::endl; std::cin >> num; RTPUDPv4TransmissionParams transparams; RTPSessionParams sessparams; 重要提示:必须设置本地时间戳单位,否则RTCP发送方报告信息将计算错误。 /// 在这种情况下,我们将每秒发送10个样本,因此我们将时间戳单位设置为(1.0/10.0) sessparams.SetOwnTimestampUnit(1.0/10.0); sessparams.SetAcceptOwnPackets(true); transparams.SetPortbase(portbase); status = sess.Create(sessparams,&transparams); checkerror(status); RTPIPv4Address addr(destip,destport); status = sess.AddDestination(addr); checkerror(status); for (i = 1 ; i <= num ; i++) {
printf("\nSending packet %d/%d\n",i,num); // send the packet status = sess.SendPacket((void *)"",10,0,false,10); checkerror(status); sess.BeginDataAccess(); // check incoming packets if (sess.GotoFirstSourceWithData()) {
do {
RTPPacket *pack; while ((pack = sess.GetNextPacket()) != NULL) {
// You can examine the data here printf("Got packet !\n"); // we don't longer need the packet, so // we'll delete it sess.DeletePacket(pack); } } while (sess.GotoNextSourceWithData()); } sess.EndDataAccess(); #ifndef RTP_SUPPORT_THREAD status = sess.Poll(); checkerror(status); #endif // RTP_SUPPORT_THREAD RTPTime::Wait(RTPTime(1,0)); } sess.BYEDestroy(RTPTime(10,0),0,0); return 0; }
怎么开发一个基于RTP协议的流媒体播放器
如何实现发送
我们需要创建一个RTPSession的发送对象,然后初始化相关的参数:
RTPSession session; RTPSessionParams sessionparams; sessionparams.SetOwnTimestampUnit(1.0 / 90000.0); sessionparams.SetAcceptOwnPackets(true); RTPUDPv4TransmissionParams transparams; transparams.SetPortbase(8000); //这个端口必须未被占用 int status = session.Create(sessionparams, &transparams); if (status < 0) {
//std::cerr << RTPGetErrorString(status) << std::endl; return - 1; } #if 1 RTPIPv4Address addr(ntohl(inet_addr(m_szDestIP)), m_nDestPort); status = session.AddDestination(addr); #else unsigned long addr = ntohl(inet_addr(m_szDestIP)); status = session.AddDestination(addr, m_nDestPort); #endif if (status < 0) {
//std::cerr << RTPGetErrorString(status) << std::endl; return -2; } session.SetDefaultPayloadType(96); session.SetDefaultMark(false); session.SetDefaultTimestampIncrement(90000.0 / 25.0);
这里初始化的参数包括RTP头的Payload类型(赋值为96),时间单位(1.0/90000.0),时间戳增量(90000/25=3600),以及Rtp头的MarkerBit的默认值。
接着读取一个视频文件,每次读1K字节,然后调用jrtplib的RTPSession::SendPacket函数发送数据:
FILE *fp_open; uint8_t buff[1024 * 5] = {
0 }; DWORD bufsize = 1024; //每次读1024字节,不超过1400就行 DWORD dwReadBytesPerSec = 2*1024*1024/8; //读取速度 RTPTime delay(bufsize*1.0/ dwReadBytesPerSec); //读取文件 fp_open = fopen(m_szFilePath, "rb"); while (!feof(fp_open) && g_RTPSendThreadRun) {
int true_size = fread(buff, 1, bufsize, fp_open); int status = session.SendPacket(buff, true_size); Sleep(1000* bufsize/dwReadBytesPerSec); //RTPTime::Wait(delay); //delay for a few milliseconds }
(注意:这里读文件数据只是简单地将文件数据块读出来然后直接发送,没有对视频帧做二次封装和处理,对于某些格式比如H264,一般要求要以NALU单元来传输,以FU-A分片方式打包,然后再封装到RTP包里面,而这里没有采取这种方式,大家要注意区分。)
如何实现接收
接收的实现较为复杂一些,用到了多线程技术和缓冲队列建议用到两条线程,一条用于接收RTP包,从中提取出视频数据;另一条线程用于解码视频,并把视频帧转成RGB格式后显示到窗口中。
- 用到两条线程的好处是:可以并行接收和解码,两个工作相互独立,提高视频帧的处理效率,减少播放延时。
- 而如果用一条线程来做,它既要接收又要解码,线程中处理一个帧的时间就长一些,而这时又不能接收数据,很可能造成后面的数据包丢掉。所以,用双线程的”分工合作“方式处理效率更高。
- 两条线程之间需要维护一个队列,其中一条线程收到数据后放到队列里,然后另外一个线程从队列里读取数据,这是一个典型的”生产者-消费者“的模型,我们需要实现一个先入先出的队列来转运”视频帧“,这个队列的定义如下:
std::list<PacketNode_t> m_packetList; //包列表
其中,PacketNode_t结构体的定义为:
typedef struct {
unsigned length; uint8_t *buf; }PacketNode_t;
下面对接收线程和解码线程的工作流程作详细介绍。
首先,程序在接收前需要创建两个线程:
g_RTPRecvThreadRun = true; g_decoding_thread_run = true; DWORD threadID = 0; m_hRecvThread = CreateThread(NULL, 0, RTPRecvThread, this, 0, &threadID); m_hDecodeThread = CreateThread(NULL, 0, decoding_thread, this, 0, &threadID);
RTPRecvThread是RTP数据的接收线程,实现方式如下:
DWORD WINAPI RTPRecvThread(void* param) {
TRACE("RTPRecvThread began! \n"); CPlayStreamDlg * pThisDlg = (CPlayStreamDlg*)param; RTPSession session; //WSADATA dat; //WSAStartup(MAKEWORD(2, 2), &dat); RTPSessionParams sessionparams; sessionparams.SetOwnTimestampUnit(1.0 / 90000.0); //sessionparams.SetAcceptOwnPackets(true); RTPUDPv4TransmissionParams transparams; transparams.SetPortbase(m_nRecvPort); //接收端口 int oldBufSize = transparams.GetRTPReceiveBuffer(); transparams.SetRTPReceiveBuffer(oldBufSize * 2); int status = session.Create(sessionparams, &transparams); int newBufSize = transparams.GetRTPReceiveBuffer(); int oldBufSizec = transparams.GetRTCPReceiveBuffer(); transparams.SetRTCPReceiveBuffer(oldBufSizec * 2); int newBufSizec = transparams.GetRTCPReceiveBuffer(); while (g_RTPRecvThreadRun) {
session.BeginDataAccess(); if (session.GotoFirstSourceWithData()) {
do {
RTPPacket *pack; while ((pack = session.GetNextPacket()) != NULL) {
int nPayType = pack->GetPayloadType(); int nLen = pack->GetPayloadLength(); unsigned char *pPayData = pack->GetPayloadData(); int nPackLen = pack->GetPacketLength(); unsigned char *pPackData = pack->GetPacketData(); int csrc_cont = pack->GetCSRCCount(); int ssrc = pack->GetSSRC(); int nTimestamp = pack->GetTimestamp(); int nSeqNum = pack->GetSequenceNumber(); #if 0 Writebuf((char*)pPayData, nLen); #else pThisDlg->m_cs.Lock(); //if (pThisDlg->m_packetList.size() < MAX_PACKET_COUNT) {
PacketNode_t temNode; temNode.length = nLen; temNode.buf = new uint8_t[nLen]; memcpy(temNode.buf, pPayData, nLen); pThisDlg->m_packetList.push_back(temNode); //存包列表 } pThisDlg->m_cs.Unlock(); #endif session.DeletePacket(pack); } } while (session.GotoNextSourceWithData()); } else {
//Sleep(10); } session.EndDataAccess(); Sleep(1); } session.Destroy(); TRACE("RTPRecvThread end! \n"); return 0; }
- 接收线程里创建了一个RTPSession对象,这个对象是用于接收RTP包,前面一部分代码用于初始化一些参数,包括:接收端口,时间戳单位,接收缓冲区大小。然后,进入一个循环,在里面不停地读取RTP数据包,如果session.GetNextPacket()返回的指针不为空,则表示读取到一个数据包,返回的指针变量是一个RTPPacket*类型,其指向的成员变量包括RTP头的各个字段的值,以及Payload数据的内存地址和大小。我们关键要提取出Payload的数据和大小,然后把它作为一个元素插入到缓冲队列中(如下面代码所示:)
pThisDlg->m_cs.Lock(); PacketNode_t temNode; temNode.length = nLen; temNode.buf = new uint8_t[nLen]; memcpy(temNode.buf, pPayData, nLen); pThisDlg->m_packetList.push_back(temNode); //存包列表 pThisDlg->m_cs.Unlock();
上面的接收线程实现了一个“生成者”,而“消费者”是实现在另外一个线程—decoding_thread,这个线程做的工作是解码。
- 。这个线程调用了很多FFmpeg的函数,但基本的流程是:打开一个文件源或URL地址-》从源中读取各个流的信息-》初始化解码器-》解码和显示。
- 因为我们是从网络中收数据,所以是一个网络源,从网络源中读取数据有两种方式:
- 一种是用FFmpeg内置的协议栈的支持,比如RTSP/RTMP/RTP
- 还有一种方式是我们传数据给FFmpeg,FFmpeg从内存中读取我们送的数据,然后用它的Demuxer和Parser来进行分析,分离出视频和音频。
- 这里程序使用的是第二种方式,即从网络中探测数据,然后送数据给FFmpeg去解析。探测网络数据需要调用FFmpeg的av_probe_input_buffer函数,这个函数要传入一个内存缓冲区地址和一个回调函数指针,其中回调函数是用来从网络中读数据的(即我们放到缓冲队列里的数据包)。下面的fill_iobuffer就是读数据的回调函数,而pIOBuffer指向用于存放读取数据的缓冲区地址,FFmpeg就是从这里读取数据。
pIObuffer = (uint8_t*)av_malloc(4096); pb = avio_alloc_context( pIObuffer, 4096, 0, param, fill_iobuffer, NULL, NULL); if (av_probe_input_buffer(pb, &piFmt, "", NULL, 0, 0) < 0)//探测从内存中获取到的媒体流的格式 {
TRACE("Error: probe format failed\n"); return -1; } else {
TRACE("input format:%s[%s]\n", piFmt->name, piFmt->long_name); }
- 回调函数fill_iobuffer调用了一个ReadBuf的函数:
int fill_iobuffer(void* opaque, uint8_t* buf, int bufSize) {
ASSERT(opaque != NULL); CPlayStreamDlg* p_CPSDecoderDlg = (CPlayStreamDlg*)opaque; //TRACE("ReadBuf----- \n"); int nBytes = ReadBuf((char*)buf, bufSize, (void*)p_CPSDecoderDlg); return (nBytes > 0) ? bufSize : -1; }
static int ReadBuf(char* data, int len, void* pContext) {
CPlayStreamDlg * pThisDlg = (CPlayStreamDlg*)pContext; int data_to_read = len; char * pReadPtr = data; while (g_RTPRecvThreadRun) {
int nRead = pThisDlg->ReadNetPacket((uint8_t*)pReadPtr, data_to_read); if (nRead < 0) {
Sleep(10); continue; } pReadPtr += nRead; data_to_read -= nRead; if (data_to_read > 0) {
Sleep(10); continue; } break; } return (data_to_read > 0) ? -1 : len; }
- ReadBuf函数的作用就不用解释了,大家一看就明白了。它实现了一个我们前面说的“消费者”,从前面实现的缓冲队列中读取数据包,读取之后就会从队列中删除相应的元素。如果队列不为空,则直接从前面的元素读取;如果无数据,则继续等待。
- 读了视频帧数据之后,就到了解码,解码的代码如下:
while (g_decoding_thread_run) {
av_read_frame(pFormatContext, pAVPacket); if(pAVPacket->stream_index == video_stream_index) {
avcodec_decode_video2(pCodecCtx, pFrame, &got_picture, pAVPacket); if(got_picture) {
p_uint8_t_temp = pFrame->data[1]; pFrame->data[1] = pFrame->data[2]; pFrame->data[2] = p_uint8_t_temp; pFrame->data[0] += pFrame->linesize[0] * (pCodecCtx->height - 1); pFrame->linesize[0] *= -1; pFrame->data[1] += pFrame->linesize[1] * (pCodecCtx->height / 2 - 1); pFrame->linesize[1] *= -1; pFrame->data[2] += pFrame->linesize[2] * (pCodecCtx->height / 2 - 1); pFrame->linesize[2] *= -1; got_picture = sws_scale(img_convert_ctx, pFrame->data, pFrame->linesize, 0, pCodecCtx->height, RGB24Data, RGB24Linesize); got_picture = StretchDIBits(hDC, 0, 0, PlayingWidth, PlayingHeight, 0, 0, pCodecCtx->width, pCodecCtx->height, RGB24Data[0], (BITMAPINFO*)&bmpinfo, DIB_RGB_COLORS, SRCCOPY); } } av_free_packet(pAVPacket); }
- FFmpeg从解码器输出的格式是YUV的,我们要转成RGB图像格式显示,所以调用了sws_scale函数来转换,最后调用Windows GDI函数—StretchDiBits来把图像显示到指定的窗口区域。
- 如果要停止解码,则退出线程的时候记得要释放FFmpeg创建的资源:
if (pFormatContext) {
avformat_close_input(&pFormatContext); pFormatContext = NULL; } sws_freeContext(img_convert_ctx); av_freep(&RGB24Data[0]); av_frame_free(&pFrame); //avcodec_close(pCodecCtx); //av_free(pIObuffer); //调用了avformat_close_input会自动释放pIObuffer ReleaseDC(hwnd, hDC);
到此为止,一个简单的流媒体播放器的实现过程就介绍完了。
参考
- Linux下几种RTP协议实现的比较和JRTPLIB编程讲解
- jrtplib简介
- 流媒体协议之JRTPLIB的使用
- 基于JRTPLib的rtp编程例程
- 如何发送和接收RTP包,用FFmpeg分离、解码
- 编译jrtplib
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/118426.html












