赞
踩
RTSP 是Real Time Streaming Protocol(实时流媒体协议)的简称。RTSP提供一种可扩展的框架,使得能够提供可控制的,按需传输实时数据,比如音频和视频文件。RTSP对流媒体提供了诸如暂停,快进等控制,而它本身并不传输数据,RTSP作用相当于流媒体服务器的远程控制。传输数据可以通过传输层的TCP,UDP协议,RTSP也提供了基于 RTP传输机制的一些有效的方法。
客户机在向视频服务器请求视频服务之前,首先通过HTTP协议从WEB服务器获取所请求视频服务的演示描述(Presentation description)文件,在RTSP中,每个演示(Presentation)及其所对应的媒体流都由一个RTSP URL标识。整个演示及媒体特性都在一个演示描述(Presentation description)文件中定义,该文件可能包括媒体编码方式、语言、RTSPURLs、目标地址、端口及其它参数。用户在向服务器请求某个连续媒体流的服务之前,必须首先从服务器获得该媒体流的演示描述(Presentation description )文件以得到必需的参数。利用该文件提供的信息定位视频服务地址(包括视频服务器地址和端口号)及视频服务的编码方式等信息。
客户机根据上述信息向视频服务器请求视频服务。视频服务初始化完毕,视频服务器为该客户建立一个新的视频服务流,客户端与服务器运行实时流控制协议RTSP,以对该流进行各种VCR 控制信号的交换,如播放、暂停、快进、快退等。当服务完毕,客户端提出拆线(TEARDOWN)请求。服务器使用 RTP协议将媒体数据传输给客户端,一旦数据抵达客户端,客户端应用程序即可播放输出。在流式传输中,使用RTP/RTCP和RTSP /TCP两种不同的通信协议在客户端和服务器间建立联系。如下图:
方法 URI RTSP版本 CR LF 消息头 CR LF CR LF 消息体 CR LF |
其中方法包括OPTION回应中所有的命令,URI是接受方的地址,例如
rtsp://192.168.20.136
RTSP版本一般都是 RTSP/1.0.每行后面的CR LF表示回车换行,需要接受端有相应的解析,最后一个消息头需要有两个CR LF
RTSP版本 状态码 解释 CR LF 消息头 CR LF CR LF 消息体 CR LF |
其中RTSP版本一般都是RTSP/1.0,状态码是一个数值,200表示成功,解释是与状态码对应的文本解释。
下面以一次流媒体播放为例介绍整个播放过程的RTSP状态转换的流程:
其中C表示RTSP客户端,S表示RTSP服务端:
C->S:OPTION request //询问服务端有哪些方法可用 S->C:OPTION response //服务端回应信息中包括提供的所有可用方法 C->S:DESCRIBE request //要求得到服务端提供的媒体初始化描述信息 S->C:DESCRIBE response //服务端回应媒体初始化描述信息,主要是SDP C->S:SETUP request //设置会话的属性,以及传输模式提醒服务端建立会话 S->C:SETUP response //服务端建立会话,返回会话标识符,和会话相关信息 C->S:PLAY request //客户端请求播放 S->C:PLAY response //服务器回应该请求的信息 S->C: //发送流媒体数据 C->S:TEARDOWN request //客户端请求关闭会话 S->C:TEARDOWN response //服务端回应该请求 |
其中第SETUP和PLAY这两部是必需的,
OPTION 步骤只要服务器客户端约定好,有哪些方法可用,则option请求可以不要。
如果我们有其他途径得到媒体初始化描述信息,则我们也不需要通过RTSP中的DESCRIPTION请求来完成。
TEARDOWN,可以根据系统需求的设计来决定是否需要。
Status-Code = | "100" ; Continue | "200" ; OK | "201" ; Created | "250" ; Low on Storage Space | "300" ; Multiple Choices | "301" ; Moved Permanently | "302" ; Moved Temporarily | "303" ; See Other | "304" ; Not Modified | "305" ; Use Proxy | "400" ; Bad Request | "401" ; Unauthorized | "402" ; Payment Required | "403" ; Forbidden | "404" ; Not Found | "405" ; Method Not Allowed | "406" ; Not Acceptable | "407" ; Proxy Authentication Required | "408" ; Request Time-out | "410" ; Gone | "411" ; Length Required | "412" ; Precondition Failed | "413" ; Request Entity Too Large | "414" ; Request-URI Too Large | "415" ; Unsupported Media Type | "451" ; Parameter Not Understood | "452" ; Conference Not Found | "453" ; Not Enough Bandwidth | "454" ; Session Not Found | "455" ; Method Not Valid in This State | "456" ; Header Field Not Valid for Resource | "457" ; Invalid Range | "458" ; Parameter Is Read-Only | "459" ; Aggregate operation not allowed | "460" ; Only aggregate operation allowed | "461" ; Unsupported transport | "462" ; Destination unreachable | "500" ; Internal Server Error | "501" ; Not Implemented | "502" ; Bad Gateway | "503" ; Service Unavailable | "504" ; Gateway Time-out | "505" ; RTSP Version not supported | "551" ; Option not supported |
v=<version> (协议版本) o=<username> <session id> <version> <network type> <address type> <address> (所有者/创建者和会话标识符) s=<session name> (会话名称) i=<session description> (会话信息) u=<URI> (URI 描述) e=<email address> (Email 地址) p=<phone number> (电话号码) c=<network type> <address type> <connection address> (连接信息) b=<modifier>:<bandwidth-value> (带宽信息) t=<start time> <stop time> (会话活动时间) r=<repeat interval> <active duration> <list of offsets from start-time> (0或多次重复次数) z=<adjustment time> <offset> <adjustment time> <offset>(时间区域调整) k=<method>:<encryption key> (加密密钥) a=<attribute>:<value> (0 个或多个会话属性行) m=<media> <port> <transport> <fmt list> (媒体名称和传输地址) 时间描述: t = (会话活动时间) r = * (0或多次重复次数) 媒体描述: m = (媒体名称和传输地址) i = * (媒体标题) c = * (连接信息 — 如果包含在会话层则该字段可选) b = * (带宽信息) k = * (加密密钥) a = * (0 个或多个媒体属性行) |
实时传输协议(Real-time Transport Protocol,RTP)是用来在单播或者多播的情境中传流媒体数据的数据传输协议。通常使用UDP来进行多媒体数据的传输,也不排除使用TCP或者ATM等其它协议作为它的载体,整个RTP 协议由两个密切相关的部分组成:RTP数据协议和RTP控制协议(也就是RTCP协议)。
RTP为Internet上端到端的实时传输提供时间信息和流同步,但它并不保证服务质量,服务质量由RTCP来提供。
当应用程序建立一个RTP会话时,应用程序将确定一对目的传输地址。目的传输地址由一个网络地址和一对端口组成,有两个端口:一个给RTP包,一个给RTCP包,也就是说RTP和RTCP数据包是分开传输的,这样可以使得RTP/RTCP数据能够正确发送。其中RTP数据发向偶数的UDP端口,而对应的控制信号RTCP数据发向相邻的奇数UDP端口,这样就构成一个UDP端口对。
当发送数据的时候RTP协议从上层接收流媒体信息码流,封装成RTP数据包;RTCP从上层接收控制信息,封装成RTCP控制包。RTP将RTP 数据包发往UDP端口对中偶数端口;RTCP将RTCP控制包发往UDP端口对中的接收端口。
如果在一次会议中同时使用了音频和视频会议,这两种媒体将分别在不同的RTP会话中传送,每一个会话使用不同的传输地址(IP地址+端口)。如果一个用户同时使用了两个会话,则每个会话对应的RTCP包都使用规范化名字CNAME(Canonical Name)。与会者可以根据RTCP包中的CNAME来获取相关联的音频和视频,然后根据RTCP包中的计时信息(Network time protocol)来实现音频和视频的同步。
如前面所述RTCP的主要功能是:服务质量的监视与反馈、媒体间的同步,以及多播组中成员的标识。在RTP会话期间,各参与者周期性地传送RTCP包。RTCP包中含有已发送的数据包的数量、丢失的数据包的数量等统计资料,因此,各参与者可以利用这些信息动态地改变传输速率,甚至改变有效载荷类型。RTP和RTCP配合使用,它们能以有效的反馈和最小的开销使传输效率最佳化,因而特别适合传送网上的实时数据。RTCP也是用UDP来传送的,但RTCP封装的仅仅是一些控制信息,因而分组很短,所以可以将多个RTCP分组封装在一个UDP包中。
RTCP有如下五种分组类型:
下面是SR分组的格式:
setDataSource 阶段的任务这里就不重复介绍了,它主要完成播放引擎的建立以及根据URL格式创建对应的Source,比如这里将要提到的RTSPSource,然后赋值给mSource。
我们直接来看prepare阶段:
在prepare阶段我们首先会判断是否是SDP,mIsSDP这个变量是在初始化RTSPSource时候传入的,我们这里先分析mIsSDP = false的情况。这种情况下首先创建一个MyHandler,并调用connect,与服务器建立连接。
void NuPlayer::RTSPSource::prepareAsync() { //.......... sp<AMessage> notify = new AMessage(kWhatNotify, this); //检查当前状态是否为DISCONNECTED CHECK_EQ(mState, (int)DISCONNECTED); //设置当前状态为CONNECTING mState = CONNECTING; if (mIsSDP) { //如果是SDP那么就需要创建一个SDPLoader 从服务器上加载一个描述文件 mSDPLoader = new SDPLoader(notify, (mFlags & kFlagIncognito) ? SDPLoader::kFlagIncognito : 0, mHTTPService); mSDPLoader->load(mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders); } else { //如果不是SDP 那么就使用MyHandler 来进行连接 mHandler = new MyHandler(mURL.c_str(), notify, mUIDValid, mUID); mLooper->registerHandler(mHandler); mHandler->connect(); } //启动缓存 startBufferingIfNecessary(); } |
在介绍connect方法之前需要先了解mConn以及mRTPConn这两个成员变量,mConn是一个ARTSPConnection,它主要与服务器相连,发送和接收请求数据,mRTPConn是一个ARTPConnection 用于发送和接收媒体数据。
在connect方法中会使用mConn向服务器发起连接请求。
void connect() { //mConn(new ARTSPConnection(mUIDValid, mUID)), looper()->registerHandler(mConn); //mRTPConn(new ARTPConnection), (1 ? mNetLooper : looper())->registerHandler(mRTPConn); sp<AMessage> notify = new AMessage('biny', this); mConn->observeBinaryData(notify); //连接服务 sp<AMessage> reply = new AMessage('conn', this); mConn->connect(mOriginalSessionURL.c_str(), reply); } |
void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) { //发送一个kWhatConnect消息,注意这里传递的是一个url和reply sp<AMessage> msg = new AMessage(kWhatConnect, this); msg->setString("url", url); msg->setMessage("reply", reply); msg->post(); } |
case kWhatConnect: onConnect(msg); break; |
在ARTSPConnection::onConnect中将会从传递过来的URl中解析host,port,path,mUser,mPass,并调用::connect 和服务器取得联系,最后调用postReceiveReponseEvent将请求的回复响应暂存起来。
void ARTSPConnection::onConnect(const sp<AMessage> &msg) { ++mConnectionID; if (mState != DISCONNECTED) { if (mUIDValid) { HTTPBase::UnRegisterSocketUserTag(mSocket); HTTPBase::UnRegisterSocketUserMark(mSocket); } close(mSocket); mSocket = -1; flushPendingRequests(); } mState = CONNECTING; AString url; //从消息中取下Url CHECK(msg->findString("url", &url)); sp<AMessage> reply; //从消息中取下replay CHECK(msg->findMessage("reply", &reply)); AString host, path; unsigned port; //从URl中解析host,port,path,mUser,mPass if (!ParseURL(url.c_str(), &host, &port, &path, &mUser, &mPass) || (mUser.size() > 0 && mPass.size() == 0)) { //有用户名,但是没有密码,返回错误信息 // If we have a user name but no password we have to give up // right here, since we currently have no way of asking the user // for this information. ALOGE("Malformed rtsp url %s", uriDebugString(url).c_str()); reply->setInt32("result", ERROR_MALFORMED); reply->post(); mState = DISCONNECTED; return; } if (mUser.size() > 0) { ALOGV("user = '%s', pass = '%s'", mUser.c_str(), mPass.c_str()); } struct hostent *ent = gethostbyname(host.c_str()); if (ent == NULL) { ALOGE("Unknown host %s", host.c_str()); reply->setInt32("result", -ENOENT); reply->post(); mState = DISCONNECTED; return; } mSocket = socket(AF_INET, SOCK_STREAM, 0); if (mUIDValid) { HTTPBase::RegisterSocketUserTag(mSocket, mUID,(uint32_t)*(uint32_t*) "RTSP"); HTTPBase::RegisterSocketUserMark(mSocket, mUID); } MakeSocketBlocking(mSocket, false); struct sockaddr_in remote; memset(remote.sin_zero, 0, sizeof(remote.sin_zero)); remote.sin_family = AF_INET; remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; remote.sin_port = htons(port); //连接到服务器 int err = ::connect(mSocket, (const struct sockaddr *)&remote, sizeof(remote)); //返回服务器ip reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr)); if (err < 0) { if (errno == EINPROGRESS) { sp<AMessage> msg = new AMessage(kWhatCompleteConnection, this); msg->setMessage("reply", reply); msg->setInt32("connection-id", mConnectionID); msg->post(); return; } reply->setInt32("result", -errno); mState = DISCONNECTED; if (mUIDValid) { HTTPBase::UnRegisterSocketUserTag(mSocket); HTTPBase::UnRegisterSocketUserMark(mSocket); } close(mSocket); mSocket = -1; } else { //成功的花返回result为OK reply->setInt32("result", OK); //设置状态为CONNECTED mState = CONNECTED; mNextCSeq = 1; //发送等待返回消息 postReceiveReponseEvent(); } //‘conn’ reply->post(); } |
我们接下来看下postReceiveReponseEvent
void ARTSPConnection::postReceiveReponseEvent() { if (mReceiveResponseEventPending) { return; } sp<AMessage> msg = new AMessage(kWhatReceiveResponse, this); msg->post(); mReceiveResponseEventPending = true; } |
调用receiveRTSPReponse获得服务器的回复
void ARTSPConnection::onReceiveResponse() { mReceiveResponseEventPending = false; if (mState != CONNECTED) { return; } struct timeval tv; tv.tv_sec = 0; tv.tv_usec = kSelectTimeoutUs; fd_set rs; FD_ZERO(&rs); FD_SET(mSocket, &rs); //选择一个返回的连接 int res = select(mSocket + 1, &rs, NULL, NULL, &tv); if (res == 1) { MakeSocketBlocking(mSocket, true); bool success = receiveRTSPReponse(); MakeSocketBlocking(mSocket, false); if (!success) { // Something horrible, irreparable has happened. flushPendingRequests(); return; } } postReceiveReponseEvent(); } |
注意这里的receiveRTSPReponse是有双重功能的,方面可以接收从服务器发来的请求,另一方面可以处理服务器发来的应答信号。
bool ARTSPConnection::receiveRTSPReponse() { AString statusLine; if (!receiveLine(&statusLine)) { return false; } if (statusLine == "$") { sp<ABuffer> buffer = receiveBinaryData(); if (buffer == NULL) { return false; } if (mObserveBinaryMessage != NULL) { sp<AMessage> notify = mObserveBinaryMessage->dup(); notify->setBuffer("buffer", buffer); notify->post(); } else { ALOGW("received binary data, but no one cares."); } return true; } //RTSP返回对象 sp<ARTSPResponse> response = new ARTSPResponse; response->mStatusLine = statusLine; ALOGI("status: %s", response->mStatusLine.c_str()); ssize_t space1 = response->mStatusLine.find(" "); if (space1 < 0) { return false; } ssize_t space2 = response->mStatusLine.find(" ", space1 + 1); if (space2 < 0) { return false; } bool isRequest = false; //判断返回的RTSP版本是否正确 if (!IsRTSPVersion(AString(response->mStatusLine, 0, space1))) { CHECK(IsRTSPVersion(AString(response->mStatusLine,space2 + 1,response->mStatusLine.size() - space2 - 1))); isRequest = true; response->mStatusCode = 0; } else { //判断状态码是否正确 AString statusCodeStr(response->mStatusLine, space1 + 1, space2 - space1 - 1); if (!ParseSingleUnsignedLong(statusCodeStr.c_str(), &response->mStatusCode) || response->mStatusCode < 100 || response->mStatusCode > 999) { return false; } } AString line; ssize_t lastDictIndex = -1; for (;;) { if (!receiveLine(&line)) { break; } if (line.empty()) { break; } ALOGV("line: '%s'", line.c_str()); if (line.c_str()[0] == ' ' || line.c_str()[0] == '\t') { // Support for folded header values. if (lastDictIndex < 0) { // First line cannot be a continuation of the previous one. return false; } AString &value = response->mHeaders.editValueAt(lastDictIndex); value.append(line); continue; } ssize_t colonPos = line.find(":"); if (colonPos < 0) { // Malformed header line. return false; } AString key(line, 0, colonPos); key.trim(); key.tolower(); line.erase(0, colonPos + 1); lastDictIndex = response->mHeaders.add(key, line); } for (size_t i = 0; i < response->mHeaders.size(); ++i) { response->mHeaders.editValueAt(i).trim(); } unsigned long contentLength = 0; ssize_t i = response->mHeaders.indexOfKey("content-length"); if (i >= 0) { AString value = response->mHeaders.valueAt(i); if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) { return false; } } //接收mContent if (contentLength > 0) { response->mContent = new ABuffer(contentLength); if (receive(response->mContent->data(), contentLength) != OK) { return false; } } //isRequest 表示是服务器主动发送的请求,那么将调用handleServerRequest,否则表示是服务器被动响应客户端的请求,那么将通知服务器有响应了notifyResponseListener return isRequest ? handleServerRequest(response) : notifyResponseListener(response); } |
isRequest 表示是服务器主动发送的请求,那么将调用handleServerRequest,否则表示是服务器被动响应客户端的请求,那么将通知服务器有响应了notifyResponseListener,我们这里先看下这两个方法的实现:
看到handleServerRequest大家可能会有点失望,因为这里尚未实现这个功能所以只是向服务器返回一个“RTSP/1.0 501 Not Implemented”的消息。
bool ARTSPConnection::handleServerRequest(const sp<ARTSPResponse> &request) { // Implementation of server->client requests is optional for all methods // but we do need to respond, even if it's just to say that we don't // support the method. //这里我们不实现任何答复行为只是简单反馈我们尚未实现这个功能 ssize_t space1 = request->mStatusLine.find(" "); CHECK_GE(space1, 0); AString response; response.append("RTSP/1.0 501 Not Implemented\r\n"); ssize_t i = request->mHeaders.indexOfKey("cseq"); if (i >= 0) { AString value = request->mHeaders.valueAt(i); unsigned long cseq; if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { return false; } response.append("CSeq: "); response.append(cseq); response.append("\r\n"); } response.append("\r\n"); size_t numBytesSent = 0; while (numBytesSent < response.size()) { ssize_t n = send(mSocket, response.c_str() + numBytesSent, response.size() - numBytesSent, 0); if (n < 0 && errno == EINTR) { continue; } if (n <= 0) { if (n == 0) { // Server closed the connection. ALOGE("Server unexpectedly closed the connection."); } else { ALOGE("Error sending rtsp response (%s).", strerror(errno)); } performDisconnect(); return false; } numBytesSent += (size_t)n; } return true; } |
notifyResponseListener的实现比较清晰,它会根据服务器发来的应答响应,找出响应该应答的Message,然后将response返回给MyHandler,进行处理。
bool ARTSPConnection::notifyResponseListener( const sp<ARTSPResponse> &response) { ssize_t i; //在队列中查找尚未处理的请求 status_t err = findPendingRequest(response, &i); if (err != OK) { return false; } //发送服务器的回复给它 sp<AMessage> reply = mPendingRequests.valueAt(i); mPendingRequests.removeItemsAt(i); reply->setInt32("result", OK); reply->setObject("response", response); reply->post(); return true; } |
好了我们言归正传,我们看下MyHandler中对conn回复怎么处理:
case 'conn': { int32_t result; //取出反馈结果 CHECK(msg->findInt32("result", &result)); if (result == OK) { //发送请求描述符的消息 AString request; request = "DESCRIBE "; request.append(mSessionURL); request.append(" RTSP/1.0\r\n"); request.append("Accept: application/sdp\r\n"); request.append("\r\n"); sp<AMessage> reply = new AMessage('desc', this); mConn->sendRequest(request.c_str(), reply); } else { (new AMessage('disc', this))->post(); } break; } |
这里比较简单就是收到答复之后,直接判断结果是OK还是不OK,如果OK那么就发送一个DESCRIBE的请求。我们重点看下,onSendRequest理解这个很重要:
在onSendRequest中会对请求加工处理下,比如添加Cseq等操作,然后就会调用send向服务器发送请求。并将请求以Cseq为键码,replay为回复消息的待处理请求队列中。
void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) { sp<AMessage> reply; CHECK(msg->findMessage("reply", &reply)); //对请求进行加工处理 AString request; CHECK(msg->findString("request", &request)); // Just in case we need to re-issue the request with proper authentication // later, stash it away. reply->setString("original-request", request.c_str(), request.size()); addAuthentication(&request); addUserAgent(&request); // Find the boundary between headers and the body. ssize_t i = request.find("\r\n\r\n"); CHECK_GE(i, 0); int32_t cseq = mNextCSeq++; AString cseqHeader = "CSeq: "; cseqHeader.append(cseq); cseqHeader.append("\r\n"); request.insert(cseqHeader, i + 2); ALOGV("request: '%s'", request.c_str()); size_t numBytesSent = 0; while (numBytesSent < request.size()) { //如果请求还没完全发送结束那么继续发送 ssize_t n = send(mSocket, request.c_str() + numBytesSent,request.size() - numBytesSent, 0);} //忽略错误处理代码 numBytesSent += (size_t)n; } //将请求添加到mPendingRequests,等待服务器回复 mPendingRequests.add(cseq, reply); } |
回到上面提到的notifyResponseListener结合onSendRequest以及findPendingRequest是否看出了整个事件处理的流程?
status_t ARTSPConnection::findPendingRequest( const sp<ARTSPResponse> &response, ssize_t *index) const { *index = 0; ssize_t i = response->mHeaders.indexOfKey("cseq"); if (i < 0) { // This is an unsolicited server->client message. *index = -1; return OK; } AString value = response->mHeaders.valueAt(i); unsigned long cseq; if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { return ERROR_MALFORMED; } i = mPendingRequests.indexOfKey(cseq); if (i < 0) { return -ENOENT; } *index = i; return OK; } |
onSendRequest 会不断将请求放入mPendingRequests中,而每次服务器给出应答的时候会调用notifyResponseListener,notifyResponseListener会从mPendingRequests中取出一个应答消息,并发送消息给MyHandler进行处理,而notifyResponseListener又会阻塞等待下一个服务器的应答信号。
OK我们接下来看下收到‘desc’信号后的处理:
case 'desc': { int32_t result; CHECK(msg->findInt32("result", &result)); if (result == OK) { sp<RefBase> obj; CHECK(msg->findObject("response", &obj)); sp<ARTSPResponse> response = static_cast<ARTSPResponse *>(obj.get()); if (response->mStatusCode == 301 || response->mStatusCode == 302) { //重定向连接 //............ } if (response->mStatusCode != 200) { result = UNKNOWN_ERROR; } else if (response->mContent == NULL) { result = ERROR_MALFORMED; ALOGE("The response has no content."); } else { //获得ASessionDescription mSessionDesc = new ASessionDescription; mSessionDesc->setTo(response->mContent->data(),response->mContent->size()); if (!mSessionDesc->isValid()) { //............ } else { //............ if (mSessionDesc->countTracks() < 2) { // There's no actual tracks in this session. // The first "track" is merely session meta // data. ALOGW("Session doesn't contain any playable " "tracks. Aborting."); result = ERROR_UNSUPPORTED; } else { //这里才是真正要处理的代码 setupTrack(1); } } } } break; } |
上面代码很长我们忽略不重要的,直接看setupTrack。
void setupTrack(size_t index) { sp<APacketSource> source = new APacketSource(mSessionDesc, index); AString url; CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); AString trackURL; //获得多媒体文件的Uri CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); mTracks.push(TrackInfo()); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); //设置uri info->mURL = trackURL; //设置APacketSource info->mPacketSource = source; info->mUsingInterleavedTCP = false; info->mFirstSeqNumInSegment = 0; info->mNewSegment = true; info->mRTPSocket = -1; info->mRTCPSocket = -1; info->mRTPAnchor = 0; info->mNTPAnchorUs = -1; info->mNormalPlayTimeRTP = 0; info->mNormalPlayTimeUs = 0ll; unsigned long PT; AString formatDesc; AString formatParams; mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); int32_t timescale; int32_t numChannels; ASessionDescription::ParseFormatDesc(formatDesc.c_str(), ×cale, &numChannels); info->mTimeScale = timescale; info->mEOSReceived = false; ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); //建立SETUP请求 AString request = "SETUP "; request.append(trackURL); request.append(" RTSP/1.0\r\n"); if (mTryTCPInterleaving) { size_t interleaveIndex = 2 * (mTracks.size() - 1); info->mUsingInterleavedTCP = true; info->mRTPSocket = interleaveIndex; info->mRTCPSocket = interleaveIndex + 1; request.append("Transport: RTP/AVP/TCP;interleaved="); request.append(interleaveIndex); request.append("-"); request.append(interleaveIndex + 1); } else { unsigned rtpPort; ARTPConnection::MakePortPair( &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); if (mUIDValid) { HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, (uint32_t)*(uint32_t*) "RTP_"); HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, (uint32_t)*(uint32_t*) "RTP_"); HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); } request.append("Transport: RTP/AVP/UDP;unicast;client_port="); request.append(rtpPort); request.append("-"); request.append(rtpPort + 1); } request.append("\r\n"); if (index > 1) { request.append("Session: "); request.append(mSessionID); request.append("\r\n"); } request.append("\r\n"); sp<AMessage> reply = new AMessage('setu', this); reply->setSize("index", index); reply->setSize("track-index", mTracks.size() - 1); mConn->sendRequest(request.c_str(), reply); } |
这里的逻辑也很简单就是将要获取到的歌曲信息存放到mTracks,并使用sendRequest发起setu请求,sendRequest就不再作详细介绍了,我们直接看下‘setu’返回后的处理:
case 'setu': { size_t index; CHECK(msg->findSize("index", &index)); TrackInfo *track = NULL; size_t trackIndex; if (msg->findSize("track-index", &trackIndex)) { track = &mTracks.editItemAt(trackIndex); } int32_t result; CHECK(msg->findInt32("result", &result)); if (result == OK) { CHECK(track != NULL); sp<RefBase> obj; CHECK(msg->findObject("response", &obj)); sp<ARTSPResponse> response = static_cast<ARTSPResponse *>(obj.get()); if (response->mStatusCode != 200) { } else { ssize_t i = response->mHeaders.indexOfKey("session"); CHECK_GE(i, 0); //得到SessionID mSessionID = response->mHeaders.valueAt(i); mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; AString timeoutStr; //........................ sp<AMessage> notify = new AMessage('accu', this); notify->setSize("track-index", trackIndex); i = response->mHeaders.indexOfKey("transport"); CHECK_GE(i, 0); if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { if (!track->mUsingInterleavedTCP) { AString transport = response->mHeaders.valueAt(i); // We are going to continue even if we were // unable to poke a hole into the firewall... pokeAHole( track->mRTPSocket, track->mRTCPSocket, transport); } mRTPConn->addStream( track->mRTPSocket, track->mRTCPSocket, mSessionDesc, index, notify, track->mUsingInterleavedTCP); mSetupTracksSuccessful = true; } else { result = BAD_VALUE; } } } |
上面最重要的就是获取SessionID并调用mRTPConn->addStream完ARTPConnection中添加一个流,我们看下addStream:
void ARTPConnection::addStream( int rtpSocket, int rtcpSocket, const sp<ASessionDescription> &sessionDesc, size_t index, const sp<AMessage> ¬ify, bool injected) { sp<AMessage> msg = new AMessage(kWhatAddStream, this); msg->setInt32("rtp-socket", rtpSocket); msg->setInt32("rtcp-socket", rtcpSocket); msg->setObject("session-desc", sessionDesc); msg->setSize("index", index); msg->setMessage("notify", notify); msg->setInt32("injected", injected); msg->post(); } |
case kWhatAddStream: { onAddStream(msg); break; } |
void ARTPConnection::onAddStream(const sp<AMessage> &msg) { //将Stream信息添加到mStreams mStreams.push_back(StreamInfo()); StreamInfo *info = &*--mStreams.end(); int32_t s; //获得rtp-socket CHECK(msg->findInt32("rtp-socket", &s)); info->mRTPSocket = s; //获得rtcp-socket CHECK(msg->findInt32("rtcp-socket", &s)); info->mRTCPSocket = s; int32_t injected; CHECK(msg->findInt32("injected", &injected)); info->mIsInjected = injected; //获得session-desc sp<RefBase> obj; CHECK(msg->findObject("session-desc", &obj)); info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); CHECK(msg->findSize("index", &info->mIndex)); CHECK(msg->findMessage("notify", &info->mNotifyMsg)); info->mNumRTCPPacketsReceived = 0; info->mNumRTPPacketsReceived = 0; memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); //发送轮询查询事件 if (!injected) { postPollEvent(); } } |
上面代码中重点关注的是postPollEvent:
void ARTPConnection::postPollEvent() { if (mPollEventPending) { return; } sp<AMessage> msg = new AMessage(kWhatPollStreams, this); msg->post(); mPollEventPending = true; } |
case kWhatPollStreams: { onPollStreams(); break; } |
void ARTPConnection::onPollStreams() { mPollEventPending = false; if (mStreams.empty()) { return; } struct timeval tv; tv.tv_sec = 0; tv.tv_usec = kSelectTimeoutUs; fd_set rs; FD_ZERO(&rs); int maxSocket = -1; for (List<StreamInfo>::iterator it = mStreams.begin(); it != mStreams.end(); ++it) { if ((*it).mIsInjected) { continue; } FD_SET(it->mRTPSocket, &rs); FD_SET(it->mRTCPSocket, &rs); if (it->mRTPSocket > maxSocket) { maxSocket = it->mRTPSocket; } if (it->mRTCPSocket > maxSocket) { maxSocket = it->mRTCPSocket; } } if (maxSocket == -1) { return; } //选择一个网络请求 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); if (res > 0) { //在这里接收服务器发过来的数据 List<StreamInfo>::iterator it = mStreams.begin(); while (it != mStreams.end()) { if ((*it).mIsInjected) { ++it; continue; } status_t err = OK; //接受从服务器发来的数据 if (FD_ISSET(it->mRTPSocket, &rs)) { //调用的是status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) err = receive(&*it, true); } //接受从服务器发来的数据 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) { //调用的是status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) err = receive(&*it, false); } ++it; } } int64_t nowUs = ALooper::GetNowUs(); if (mLastReceiverReportTimeUs <= 0|| mLastReceiverReportTimeUs + 5000000ll <= nowUs) { //新建一个缓存区 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); List<StreamInfo>::iterator it = mStreams.begin(); while (it != mStreams.end()) { StreamInfo *s = &*it; if (s->mIsInjected) { ++it; continue; } if (s->mNumRTCPPacketsReceived == 0) { // We have never received any RTCP packets on this stream, // we don't even know where to send a report. ++it; continue; } buffer->setRange(0, 0); for (size_t i = 0; i < s->mSources.size(); ++i) { sp<ARTPSource> source = s->mSources.valueAt(i); //填充buffer source->addReceiverReport(buffer); if (mFlags & kRegularlyRequestFIR) { source->addFIR(buffer); } } if (buffer->size() > 0) { ALOGV("Sending RR..."); ssize_t n; do { //通過RTCPSocket發送 n = sendto(s->mRTCPSocket, buffer->data(), buffer->size(), 0,(const struct sockaddr *)&s->mRemoteRTCPAddr, sizeof(s->mRemoteRTCPAddr)); } while (n < 0 && errno == EINTR); CHECK_EQ(n, (ssize_t)buffer->size()); mLastReceiverReportTimeUs = nowUs; } ++it; } } if (!mStreams.empty()) { postPollEvent(); } } |
再onPollStreams中会阻塞监听服务器发过来的媒体数据,并调用receive对其进行处理,并定期发送RTCP消息给服务器。
status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP"); CHECK(!s->mIsInjected); sp<ABuffer> buffer = new ABuffer(65536); socklen_t remoteAddrLen = (!receiveRTP && s->mNumRTCPPacketsReceived == 0) ? sizeof(s->mRemoteRTCPAddr) : 0; ssize_t nbytes; do { //从服务器接收数据 nbytes = recvfrom( receiveRTP ? s->mRTPSocket : s->mRTCPSocket, buffer->data(), buffer->capacity(), 0, remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, remoteAddrLen > 0 ? &remoteAddrLen : NULL); } while (nbytes < 0 && errno == EINTR); if (nbytes <= 0) { return -ECONNRESET; } buffer->setRange(0, nbytes); // ALOGI("received %d bytes.", buffer->size()); status_t err; //解析RTP 或者 parseRTCP if (receiveRTP) { err = parseRTP(s, buffer); } else { err = parseRTCP(s, buffer); } return err; } |
receive方法中会调用recvfrom。将数据从服务器中读取到缓存,并调用parseRTP或者parseRTCP对缓存中的数据进行处理
status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { const uint8_t *data = buffer->data(); if ((data[0] >> 6) != 2) { // Unsupported version. return -1; } if (data[0] & 0x20) { // Padding present. size_t paddingLength = data[size - 1]; if (paddingLength + 12 > size) { // If we removed this much padding we'd end up with something // that's too short to be a valid RTP header. return -1; } size -= paddingLength; } int numCSRCs = data[0] & 0x0f; size_t payloadOffset = 12 + 4 * numCSRCs; if (size < payloadOffset) { // Not enough data to fit the basic header and all the CSRC entries. return -1; } if (data[0] & 0x10) { // Header eXtension present. if (size < payloadOffset + 4) { // Not enough data to fit the basic header, all CSRC entries // and the first 4 bytes of the extension header. return -1; } const uint8_t *extensionData = &data[payloadOffset]; size_t extensionLength = 4 * (extensionData[2] << 8 | extensionData[3]); if (size < payloadOffset + 4 + extensionLength) { return -1; } payloadOffset += 4 + extensionLength; } uint32_t srcId = u32at(&data[8]); sp<ARTPSource> source = findSource(s, srcId); uint32_t rtpTime = u32at(&data[4]); sp<AMessage> meta = buffer->meta(); meta->setInt32("ssrc", srcId); meta->setInt32("rtp-time", rtpTime); meta->setInt32("PT", data[1] & 0x7f); meta->setInt32("M", data[1] >> 7); buffer->setInt32Data(u16at(&data[2])); buffer->setRange(payloadOffset, size - payloadOffset); //这里十分重要void ARTPSource::processRTPPacket(const sp<ABuffer> &buffer) source->processRTPPacket(buffer); return OK; } |
在parsRTP中根据RTP格式对缓存区中的数据进行解析,最后调用ARTPSource::processRTPPacket进行后续处理。
void ARTPSource::processRTPPacket(const sp<ABuffer> &buffer) { if (queuePacket(buffer) && mAssembler != NULL) { mAssembler->onPacketReceived(this); } } |
processRTPPacket中调用Assembler来将数据进行重组,这里最重要的方法是assembleMore
void ARTPAssembler::onPacketReceived(const sp<ARTPSource> &source) { AssemblyStatus status; for (;;) { //assembleMore status = assembleMore(source); if (status == WRONG_SEQUENCE_NUMBER) { if (mFirstFailureTimeUs >= 0) { if (ALooper::GetNowUs() - mFirstFailureTimeUs > 10000ll) { mFirstFailureTimeUs = -1; // LOG(VERBOSE) << "waited too long for packet."; packetLost(); continue; } } else { mFirstFailureTimeUs = ALooper::GetNowUs(); } break; } else { mFirstFailureTimeUs = -1; if (status == NOT_ENOUGH_DATA) { break; } } } } |
ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::assembleMore( const sp<ARTPSource> &source) { //调用addPacket AssemblyStatus status = addPacket(source); if (status == MALFORMED_PACKET) { mAccessUnitDamaged = true; } return status; } |
这里实际上是对无序的数据包进行排序,并调用submitAccessUnit提交AU数据。
ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::addPacket( const sp<ARTPSource> &source) { List<sp<ABuffer> > *queue = source->queue(); if (queue->empty()) { return NOT_ENOUGH_DATA; } if (mNextExpectedSeqNoValid) { List<sp<ABuffer> >::iterator it = queue->begin(); while (it != queue->end()) { if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { break; } it = queue->erase(it); } if (queue->empty()) { return NOT_ENOUGH_DATA; } } sp<ABuffer> buffer = *queue->begin(); if (!mNextExpectedSeqNoValid) { mNextExpectedSeqNoValid = true; mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { #if VERBOSE LOG(VERBOSE) << "Not the sequence number I expected"; #endif return WRONG_SEQUENCE_NUMBER; } uint32_t rtpTime; CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); //提交AccessUnit if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) { submitAccessUnit(); } mAccessUnitRTPTime = rtpTime; //将缓存添加到mPackets mPackets.push_back(buffer); queue->erase(queue->begin()); ++mNextExpectedSeqNo; return OK; } |
submitAccessUnit中回调‘accu’,交给MyHandler处理
void AMPEG4AudioAssembler::submitAccessUnit() { CHECK(!mPackets.empty()); #if VERBOSE LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; #endif sp<ABuffer> accessUnit = MakeCompoundFromPackets(mPackets); accessUnit = removeLATMFraming(accessUnit); CopyTimes(accessUnit, *mPackets.begin()); if (mAccessUnitDamaged) { accessUnit->meta()->setInt32("damaged", true); } mPackets.clear(); mAccessUnitDamaged = false; //回调‘accu’ sp<AMessage> msg = mNotifyMsg->dup(); msg->setBuffer("access-unit", accessUnit); msg->post(); } |
case 'accu': { int32_t timeUpdate; if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); uint32_t rtpTime; uint64_t ntpTime; CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); onTimeUpdate(trackIndex, rtpTime, ntpTime); break; } int32_t first; if (msg->findInt32("first-rtcp", &first)) { mReceivedFirstRTCPPacket = true; break; } if (msg->findInt32("first-rtp", &first)) { mReceivedFirstRTPPacket = true; break; } ++mNumAccessUnitsReceived; postAccessUnitTimeoutCheck(); size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); if (trackIndex >= mTracks.size()) { ALOGV("late packets ignored."); break; } TrackInfo *track = &mTracks.editItemAt(trackIndex); int32_t eos; if (msg->findInt32("eos", &eos)) { ALOGI("received BYE on track index %zu", trackIndex); if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { ALOGI("No time established => fake existing data"); track->mEOSReceived = true; mTryFakeRTCP = true; mReceivedFirstRTCPPacket = true; fakeTimestamps(); } else { postQueueEOS(trackIndex, ERROR_END_OF_STREAM); } return; } sp<ABuffer> accessUnit; //取出accessUnit CHECK(msg->findBuffer("access-unit", &accessUnit)); uint32_t seqNum = (uint32_t)accessUnit->int32Data(); if (mSeekPending) { ALOGV("we're seeking, dropping stale packet."); break; } if (seqNum < track->mFirstSeqNumInSegment) { ALOGV("dropping stale access-unit (%d < %d)", seqNum, track->mFirstSeqNumInSegment); break; } if (track->mNewSegment) { track->mNewSegment = false; } //调用onAccessUnitComplete onAccessUnitComplete(trackIndex, accessUnit); break; } |
‘accu’取出AU数据后调用onAccessUnitComplete进行处理,我们接下来看下这部分逻辑:
void onAccessUnitComplete( int32_t trackIndex, const sp<ABuffer> &accessUnit) { ALOGV("onAccessUnitComplete track %d", trackIndex); if(!mPlayResponseParsed){ ALOGI("play response is not parsed, storing accessunit"); TrackInfo *track = &mTracks.editItemAt(trackIndex); track->mPackets.push_back(accessUnit); return; } handleFirstAccessUnit(); TrackInfo *track = &mTracks.editItemAt(trackIndex); if (!mAllTracksHaveTime) { ALOGV("storing accessUnit, no time established yet"); track->mPackets.push_back(accessUnit); return; } while (!track->mPackets.empty()) { sp<ABuffer> accessUnit = *track->mPackets.begin(); track->mPackets.erase(track->mPackets.begin()); if (addMediaTimestamp(trackIndex, track, accessUnit)) { //postQueueAccessUnit postQueueAccessUnit(trackIndex, accessUnit); } } if (addMediaTimestamp(trackIndex, track, accessUnit)) { postQueueAccessUnit(trackIndex, accessUnit); } if (track->mEOSReceived) { postQueueEOS(trackIndex, ERROR_END_OF_STREAM); track->mEOSReceived = false; } } |
void postQueueAccessUnit( size_t trackIndex, const sp<ABuffer> &accessUnit) { sp<AMessage> msg = mNotify->dup(); msg->setInt32("what", kWhatAccessUnit); msg->setSize("trackIndex", trackIndex); msg->setBuffer("accessUnit", accessUnit); msg->post(); } |
在RTSPSource中调用AnotherPacketSource queueAccessUnit(accessUnit)
case MyHandler::kWhatAccessUnit: { size_t trackIndex; CHECK(msg->findSize("trackIndex", &trackIndex)); if (mTSParser == NULL) { CHECK_LT(trackIndex, mTracks.size()); } else { CHECK_EQ(trackIndex, 0u); } sp<ABuffer> accessUnit; CHECK(msg->findBuffer("accessUnit", &accessUnit)); int32_t damaged; if (accessUnit->meta()->findInt32("damaged", &damaged) && damaged) { ALOGI("dropping damaged access unit."); break; } if (mTSParser != NULL) { size_t offset = 0; status_t err = OK; while (offset + 188 <= accessUnit->size()) { err = mTSParser->feedTSPacket( accessUnit->data() + offset, 188); if (err != OK) { break; } offset += 188; } if (offset < accessUnit->size()) { err = ERROR_MALFORMED; } if (err != OK) { sp<AnotherPacketSource> source = getSource(false /* audio */); if (source != NULL) { source->signalEOS(err); } source = getSource(true /* audio */); if (source != NULL) { source->signalEOS(err); } } break; } TrackInfo *info = &mTracks.editItemAt(trackIndex); sp<AnotherPacketSource> source = info->mSource; if (source != NULL) { uint32_t rtpTime; CHECK(accessUnit->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); if (!info->mNPTMappingValid) { // This is a live stream, we didn't receive any normal // playtime mapping. We won't map to npt time. source->queueAccessUnit(accessUnit); break; } int64_t nptUs = ((double)rtpTime - (double)info->mRTPTime) / info->mTimeScale * 1000000ll + info->mNormalPlaytimeUs; accessUnit->meta()->setInt64("timeUs", nptUs); //。。。。。。。。。。。。。。。 source->queueAccessUnit(accessUnit); } break; } |
queueAccessUnit(accessUnit);将AU数据存放到AnotherPacketSource 的mBuffers中供解码器解码播放:
void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) { int32_t damaged; if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { // LOG(VERBOSE) << "discarding damaged AU"; return; } Mutex::Autolock autoLock(mLock); mBuffers.push_back(buffer); mCondition.signal(); int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)){ ALOGV("queueing a discontinuity with queueAccessUnit"); mLastQueuedTimeUs = 0ll; mEOSResult = OK; mLatestEnqueuedMeta = NULL; mDiscontinuitySegments.push_back(DiscontinuitySegment()); return; } int64_t lastQueuedTimeUs; CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs)); mLastQueuedTimeUs = lastQueuedTimeUs; ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); // CHECK(!mDiscontinuitySegments.empty()); DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end()); if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) { tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs; } if (tailSeg.mMaxDequeTimeUs == -1) { tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs; } if (mLatestEnqueuedMeta == NULL) { mLatestEnqueuedMeta = buffer->meta()->dup(); } else { int64_t latestTimeUs = 0; int64_t frameDeltaUs = 0; CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs)); if (lastQueuedTimeUs > latestTimeUs) { mLatestEnqueuedMeta = buffer->meta()->dup(); frameDeltaUs = lastQueuedTimeUs - latestTimeUs; mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) { // For B frames frameDeltaUs = latestTimeUs - lastQueuedTimeUs; mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); } } } |
开始播放流程,这部分和介绍HLS的时候是重复的,方便查看,所以粘贴了过来,大体的任务就是初始化解码器,然后开始从输入缓冲区往解码器中添加数据。
void NuPlayer::start() { (new AMessage(kWhatStart, this))->post(); } |
case kWhatStart: { ALOGV("kWhatStart"); if (mStarted) { //............... } else { onStart(); } mPausedByClient = false; break; } |
void NuPlayer::onStart(int64_t startPositionUs) { if (!mSourceStarted) { mSourceStarted = true; mSource->start(); } mOffloadAudio = false; mAudioEOS = false; mVideoEOS = false; mStarted = true; uint32_t flags = 0; sp<MetaData> audioMeta = mSource->getFormatMeta(true /* audio */); audio_stream_type_t streamType = AUDIO_STREAM_MUSIC; if (mAudioSink != NULL) { streamType = mAudioSink->getAudioStreamType(); } sp<AMessage> videoFormat = mSource->getFormat(false /* audio */); sp<AMessage> notify = new AMessage(kWhatRendererNotify, this); ++mRendererGeneration; notify->setInt32("generation", mRendererGeneration); mRenderer = new Renderer(mAudioSink, notify, flags); mRendererLooper = new ALooper; mRendererLooper->setName("NuPlayerRenderer"); mRendererLooper->start(false, false, ANDROID_PRIORITY_AUDIO); mRendererLooper->registerHandler(mRenderer); status_t err = mRenderer->setPlaybackSettings(mPlaybackSettings); float rate = getFrameRate(); if (rate > 0) { mRenderer->setVideoFrameRate(rate); } if (mVideoDecoder != NULL) { mVideoDecoder->setRenderer(mRenderer); } if (mAudioDecoder != NULL) { mAudioDecoder->setRenderer(mRenderer); } postScanSources(); } |
紧接着我们看下初始化编码器部分:
void NuPlayer::postScanSources() { if (mScanSourcesPending) { return; } sp<AMessage> msg = new AMessage(kWhatScanSources, this); msg->setInt32("generation", mScanSourcesGeneration); msg->post(); mScanSourcesPending = true; } |
case kWhatScanSources: { int32_t generation; mScanSourcesPending = false; bool mHadAnySourcesBefore = (mAudioDecoder != NULL) || (mVideoDecoder != NULL); // initialize video before audio because successful initialization of // video may change deep buffer mode of audio. if (mSurface != NULL) { instantiateDecoder(false, &mVideoDecoder); } // Don't try to re-open audio sink if there's an existing decoder. if (mAudioSink != NULL && mAudioDecoder == NULL) { instantiateDecoder(true, &mAudioDecoder); } } |
status_t NuPlayer::instantiateDecoder(bool audio, sp<DecoderBase> *decoder) { //获取格式 sp<AMessage> format = mSource->getFormat(audio); format->setInt32("priority", 0 /* realtime */); if (audio) { sp<AMessage> notify = new AMessage(kWhatAudioNotify, this); ++mAudioDecoderGeneration; notify->setInt32("generation", mAudioDecoderGeneration); determineAudioModeChange(); if (mOffloadAudio) { //.................... } else { *decoder = new Decoder(notify, mSource, mPID, mRenderer); } } else { sp<AMessage> notify = new AMessage(kWhatVideoNotify, this); ++mVideoDecoderGeneration; notify->setInt32("generation", mVideoDecoderGeneration); *decoder = new Decoder(notify, mSource, mPID, mRenderer, mSurface, mCCDecoder); //........................... } //解码器初始化 (*decoder)->init(); //配置解码器 (*decoder)->configure(format); //......... return OK; } |
在这里创建出解码器并初始化它。
void NuPlayer::DecoderBase::configure(const sp<AMessage> &format) { sp<AMessage> msg = new AMessage(kWhatConfigure, this); msg->setMessage("format", format); msg->post(); } void NuPlayer::DecoderBase::init() { mDecoderLooper->registerHandler(this); } void NuPlayer::Decoder::onConfigure(const sp<AMessage> &format) { //创建MediaCodec mCodec = MediaCodec::CreateByType(mCodecLooper, mime.c_str(), false /* encoder */, NULL /* err */, mPid); //配置MediaCodec err = mCodec->configure(format, mSurface, NULL /* crypto */, 0 /* flags */); //如果是视频文件则设置宽高 if (!mIsAudio) { int32_t width, height; if (mOutputFormat->findInt32("width", &width)&& mOutputFormat->findInt32("height", &height)) { mStats->setInt32("width", width); mStats->setInt32("height", height); } } //启动MediaCodec err = mCodec->start(); } |
sp<MediaCodec> MediaCodec::CreateByType(const sp<ALooper> &looper, const char *mime, bool encoder, status_t *err, pid_t pid) { sp<MediaCodec> codec = new MediaCodec(looper, pid); const status_t ret = codec->init(mime, true /* nameIsType */, encoder); return ret == OK ? codec : NULL; // NULL deallocates codec. } |
这里说明mCodec是一个ACodec对象
status_t MediaCodec::init(const AString &name, bool nameIsType, bool encoder) { mResourceManagerService->init(); if (nameIsType || !strncasecmp(name.c_str(), "omx.", 4)) { //根据名称创建Codec mCodec = new ACodec; } else if (!nameIsType&& !strncasecmp(name.c_str(), "android.filter.", 15)) { } else { } sp<AMessage> msg = new AMessage(kWhatInit, this); msg->setString("name", name); msg->setInt32("nameIsType", nameIsType); if (nameIsType) { msg->setInt32("encoder", encoder); } return err; } |
case kWhatInit: { //.................... mCodec->initiateAllocateComponent(format); break; } |
void ACodec::initiateAllocateComponent(const sp<AMessage> &msg) { msg->setWhat(kWhatAllocateComponent); msg->setTarget(this); msg->post(); } |
case ACodec::kWhatAllocateComponent: { onAllocateComponent(msg); handled = true; break; } |
这里开始实例化编码器并设置状态
bool ACodec::UninitializedState::onAllocateComponent(const sp<AMessage> &msg) { Vector<OMXCodec::CodecNameAndQuirks> matchingCodecs; AString mime; AString componentName; uint32_t quirks = 0; int32_t encoder = false; if (msg->findString("componentName", &componentName)) { ssize_t index = matchingCodecs.add(); OMXCodec::CodecNameAndQuirks *entry = &matchingCodecs.editItemAt(index); entry->mName = String8(componentName.c_str()); if (!OMXCodec::findCodecQuirks(componentName.c_str(), &entry->mQuirks)) { entry->mQuirks = 0; } } else { CHECK(msg->findString("mime", &mime)); if (!msg->findInt32("encoder", &encoder)) { encoder = false; } OMXCodec::findMatchingCodecs( mime.c_str(), encoder, // createEncoder NULL, // matchComponentName 0, // flags &matchingCodecs); } sp<CodecObserver> observer = new CodecObserver; IOMX::node_id node = 0; status_t err = NAME_NOT_FOUND; for (size_t matchIndex = 0; matchIndex < matchingCodecs.size();++matchIndex) { componentName = matchingCodecs.itemAt(matchIndex).mName.string(); quirks = matchingCodecs.itemAt(matchIndex).mQuirks; pid_t tid = gettid(); int prevPriority = androidGetThreadPriority(tid); androidSetThreadPriority(tid, ANDROID_PRIORITY_FOREGROUND); err = omx->allocateNode(componentName.c_str(), observer, &node); androidSetThreadPriority(tid, prevPriority); node = 0; } notify = new AMessage(kWhatOMXMessageList, mCodec); observer->setNotificationMessage(notify); mCodec->mComponentName = componentName; mCodec->mRenderTracker.setComponentName(componentName); mCodec->mFlags = 0; mCodec->mQuirks = quirks; mCodec->mOMX = omx; mCodec->mNode = node; { sp<AMessage> notify = mCodec->mNotify->dup(); notify->setInt32("what", CodecBase::kWhatComponentAllocated); notify->setString("componentName", mCodec->mComponentName.c_str()); notify->post(); } mCodec->changeState(mCodec->mLoadedState); return true; } |
解码器的配置
status_t MediaCodec::configure( const sp<AMessage> &format, const sp<Surface> &surface, const sp<ICrypto> &crypto, uint32_t flags) { sp<AMessage> msg = new AMessage(kWhatConfigure, this); if (mIsVideo) { format->findInt32("width", &mVideoWidth); format->findInt32("height", &mVideoHeight); if (!format->findInt32("rotation-degrees", &mRotationDegrees)) { mRotationDegrees = 0; } } msg->setMessage("format", format); msg->setInt32("flags", flags); msg->setObject("surface", surface); //..................... // save msg for reset mConfigureMsg = msg; //..................... for (int i = 0; i <= kMaxRetry; ++i) { if (i > 0) { // Don't try to reclaim resource for the first time. if (!mResourceManagerService->reclaimResource(resources)) { break; } } sp<AMessage> response; err = PostAndAwaitResponse(msg, &response); //..................... } return err; } |
case kWhatConfigure: { sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); sp<RefBase> obj; CHECK(msg->findObject("surface", &obj)); sp<AMessage> format; CHECK(msg->findMessage("format", &format)); int32_t push; if (msg->findInt32("push-blank-buffers-on-shutdown", &push) && push != 0) { mFlags |= kFlagPushBlankBuffersOnShutdown; } if (obj != NULL) { format->setObject("native-window", obj); status_t err = handleSetSurface(static_cast<Surface *>(obj.get())); if (err != OK) { PostReplyWithError(replyID, err); break; } } else { handleSetSurface(NULL); } mReplyID = replyID; setState(CONFIGURING); void *crypto; uint32_t flags; CHECK(msg->findInt32("flags", (int32_t *)&flags)); if (flags & CONFIGURE_FLAG_ENCODE) { format->setInt32("encoder", true); mFlags |= kFlagIsEncoder; } //这里最重要 mCodec->initiateConfigureComponent(format); break; } |
void ACodec::initiateConfigureComponent(const sp<AMessage> &msg) { msg->setWhat(kWhatConfigureComponent); msg->setTarget(this); msg->post(); } |
case ACodec::kWhatConfigureComponent: { onConfigureComponent(msg); handled = true; break; } |
bool ACodec::LoadedState::onConfigureComponent( const sp<AMessage> &msg) { ALOGV("onConfigureComponent"); CHECK(mCodec->mNode != 0); status_t err = OK; AString mime; if (!msg->findString("mime", &mime)) { err = BAD_VALUE; } else { err = mCodec->configureCodec(mime.c_str(), msg); } { sp<AMessage> notify = mCodec->mNotify->dup(); notify->setInt32("what", CodecBase::kWhatComponentConfigured); notify->setMessage("input-format", mCodec->mInputFormat); notify->setMessage("output-format", mCodec->mOutputFormat); notify->post(); } return true; } |
case CodecBase::kWhatComponentConfigured: { if (mState == UNINITIALIZED || mState == INITIALIZED) { // In case a kWhatError message came in and replied with error, // we log a warning and ignore. ALOGW("configure interrupted by error, current state %d", mState); break; } CHECK_EQ(mState, CONFIGURING); // reset input surface flag mHaveInputSurface = false; CHECK(msg->findMessage("input-format", &mInputFormat)); CHECK(msg->findMessage("output-format", &mOutputFormat)); int32_t usingSwRenderer; if (mOutputFormat->findInt32("using-sw-renderer", &usingSwRenderer) && usingSwRenderer) { mFlags |= kFlagUsesSoftwareRenderer; } setState(CONFIGURED); (new AMessage)->postReply(mReplyID); break; } |
这里才是解码器最详细的配置,有时间好好针对这个展开研究,这篇博客先针对整个流程进行分析:
status_t ACodec::configureCodec( const char *mime, const sp<AMessage> &msg) { int32_t encoder; if (!msg->findInt32("encoder", &encoder)) { encoder = false; } sp<AMessage> inputFormat = new AMessage(); sp<AMessage> outputFormat = mNotify->dup(); // will use this for kWhatOutputFormatChanged mIsEncoder = encoder; mInputMetadataType = kMetadataBufferTypeInvalid; mOutputMetadataType = kMetadataBufferTypeInvalid; status_t err = setComponentRole(encoder /* isEncoder */, mime); if (err != OK) { return err; } int32_t bitRate = 0; // FLAC encoder doesn't need a bitrate, other encoders do if (encoder && strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_FLAC) && !msg->findInt32("bitrate", &bitRate)) { return INVALID_OPERATION; } int32_t storeMeta; if (encoder && msg->findInt32("store-metadata-in-buffers", &storeMeta) && storeMeta != 0) { err = mOMX->storeMetaDataInBuffers(mNode, kPortIndexInput, OMX_TRUE, &mInputMetadataType); if (err != OK) { ALOGE("[%s] storeMetaDataInBuffers (input) failed w/ err %d", mComponentName.c_str(), err); return err; } // For this specific case we could be using camera source even if storeMetaDataInBuffers // returns Gralloc source. Pretend that we are; this will force us to use nBufferSize. if (mInputMetadataType == kMetadataBufferTypeGrallocSource) { mInputMetadataType = kMetadataBufferTypeCameraSource; } uint32_t usageBits; if (mOMX->getParameter( mNode, (OMX_INDEXTYPE)OMX_IndexParamConsumerUsageBits, &usageBits, sizeof(usageBits)) == OK) { inputFormat->setInt32( "using-sw-read-often", !!(usageBits & GRALLOC_USAGE_SW_READ_OFTEN)); } } int32_t prependSPSPPS = 0; if (encoder && msg->findInt32("prepend-sps-pps-to-idr-frames", &prependSPSPPS) && prependSPSPPS != 0) { OMX_INDEXTYPE index; err = mOMX->getExtensionIndex( mNode, "OMX.google.android.index.prependSPSPPSToIDRFrames", &index); if (err == OK) { PrependSPSPPSToIDRFramesParams params; InitOMXParams(¶ms); params.bEnable = OMX_TRUE; err = mOMX->setParameter( mNode, index, ¶ms, sizeof(params)); } if (err != OK) { ALOGE("Encoder could not be configured to emit SPS/PPS before " "IDR frames. (err %d)", err); return err; } } // Only enable metadata mode on encoder output if encoder can prepend // sps/pps to idr frames, since in metadata mode the bitstream is in an // opaque handle, to which we don't have access. int32_t video = !strncasecmp(mime, "video/", 6); mIsVideo = video; if (encoder && video) { OMX_BOOL enable = (OMX_BOOL) (prependSPSPPS && msg->findInt32("store-metadata-in-buffers-output", &storeMeta) && storeMeta != 0); err = mOMX->storeMetaDataInBuffers(mNode, kPortIndexOutput, enable, &mOutputMetadataType); if (err != OK) { ALOGE("[%s] storeMetaDataInBuffers (output) failed w/ err %d", mComponentName.c_str(), err); } if (!msg->findInt64( "repeat-previous-frame-after", &mRepeatFrameDelayUs)) { mRepeatFrameDelayUs = -1ll; } if (!msg->findInt64("max-pts-gap-to-encoder", &mMaxPtsGapUs)) { mMaxPtsGapUs = -1ll; } if (!msg->findFloat("max-fps-to-encoder", &mMaxFps)) { mMaxFps = -1; } if (!msg->findInt64("time-lapse", &mTimePerCaptureUs)) { mTimePerCaptureUs = -1ll; } if (!msg->findInt32( "create-input-buffers-suspended", (int32_t*)&mCreateInputBuffersSuspended)) { mCreateInputBuffersSuspended = false; } } // NOTE: we only use native window for video decoders sp<RefBase> obj; bool haveNativeWindow = msg->findObject("native-window", &obj) && obj != NULL && video && !encoder; mLegacyAdaptiveExperiment = false; if (video && !encoder) { inputFormat->setInt32("adaptive-playback", false); int32_t usageProtected; if (msg->findInt32("protected", &usageProtected) && usageProtected) { if (!haveNativeWindow) { ALOGE("protected output buffers must be sent to an ANativeWindow"); return PERMISSION_DENIED; } mFlags |= kFlagIsGrallocUsageProtected; mFlags |= kFlagPushBlankBuffersToNativeWindowOnShutdown; } } if (haveNativeWindow) { sp<ANativeWindow> nativeWindow = static_cast<ANativeWindow *>(static_cast<Surface *>(obj.get())); // START of temporary support for automatic FRC - THIS WILL BE REMOVED int32_t autoFrc; if (msg->findInt32("auto-frc", &autoFrc)) { bool enabled = autoFrc; OMX_CONFIG_BOOLEANTYPE config; InitOMXParams(&config); config.bEnabled = (OMX_BOOL)enabled; status_t temp = mOMX->setConfig( mNode, (OMX_INDEXTYPE)OMX_IndexConfigAutoFramerateConversion, &config, sizeof(config)); if (temp == OK) { outputFormat->setInt32("auto-frc", enabled); } else if (enabled) { ALOGI("codec does not support requested auto-frc (err %d)", temp); } } // END of temporary support for automatic FRC int32_t tunneled; if (msg->findInt32("feature-tunneled-playback", &tunneled) && tunneled != 0) { ALOGI("Configuring TUNNELED video playback."); mTunneled = true; int32_t audioHwSync = 0; if (!msg->findInt32("audio-hw-sync", &audioHwSync)) { ALOGW("No Audio HW Sync provided for video tunnel"); } err = configureTunneledVideoPlayback(audioHwSync, nativeWindow); if (err != OK) { ALOGE("configureTunneledVideoPlayback(%d,%p) failed!", audioHwSync, nativeWindow.get()); return err; } int32_t maxWidth = 0, maxHeight = 0; if (msg->findInt32("max-width", &maxWidth) && msg->findInt32("max-height", &maxHeight)) { err = mOMX->prepareForAdaptivePlayback( mNode, kPortIndexOutput, OMX_TRUE, maxWidth, maxHeight); if (err != OK) { ALOGW("[%s] prepareForAdaptivePlayback failed w/ err %d", mComponentName.c_str(), err); // allow failure err = OK; } else { inputFormat->setInt32("max-width", maxWidth); inputFormat->setInt32("max-height", maxHeight); inputFormat->setInt32("adaptive-playback", true); } } } else { ALOGV("Configuring CPU controlled video playback."); mTunneled = false; // Explicity reset the sideband handle of the window for // non-tunneled video in case the window was previously used // for a tunneled video playback. err = native_window_set_sideband_stream(nativeWindow.get(), NULL); if (err != OK) { ALOGE("set_sideband_stream(NULL) failed! (err %d).", err); return err; } // Always try to enable dynamic output buffers on native surface err = mOMX->storeMetaDataInBuffers( mNode, kPortIndexOutput, OMX_TRUE, &mOutputMetadataType); if (err != OK) { ALOGE("[%s] storeMetaDataInBuffers failed w/ err %d", mComponentName.c_str(), err); // if adaptive playback has been requested, try JB fallback // NOTE: THIS FALLBACK MECHANISM WILL BE REMOVED DUE TO ITS // LARGE MEMORY REQUIREMENT // we will not do adaptive playback on software accessed // surfaces as they never had to respond to changes in the // crop window, and we don't trust that they will be able to. int usageBits = 0; bool canDoAdaptivePlayback; if (nativeWindow->query( nativeWindow.get(), NATIVE_WINDOW_CONSUMER_USAGE_BITS, &usageBits) != OK) { canDoAdaptivePlayback = false; } else { canDoAdaptivePlayback = (usageBits & (GRALLOC_USAGE_SW_READ_MASK | GRALLOC_USAGE_SW_WRITE_MASK)) == 0; } int32_t maxWidth = 0, maxHeight = 0; if (canDoAdaptivePlayback && msg->findInt32("max-width", &maxWidth) && msg->findInt32("max-height", &maxHeight)) { ALOGV("[%s] prepareForAdaptivePlayback(%dx%d)", mComponentName.c_str(), maxWidth, maxHeight); err = mOMX->prepareForAdaptivePlayback( mNode, kPortIndexOutput, OMX_TRUE, maxWidth, maxHeight); ALOGW_IF(err != OK, "[%s] prepareForAdaptivePlayback failed w/ err %d", mComponentName.c_str(), err); if (err == OK) { inputFormat->setInt32("max-width", maxWidth); inputFormat->setInt32("max-height", maxHeight); inputFormat->setInt32("adaptive-playback", true); } } // allow failure err = OK; } else { ALOGV("[%s] storeMetaDataInBuffers succeeded", mComponentName.c_str()); CHECK(storingMetadataInDecodedBuffers()); mLegacyAdaptiveExperiment = ADebug::isExperimentEnabled( "legacy-adaptive", !msg->contains("no-experiments")); inputFormat->setInt32("adaptive-playback", true); } int32_t push; if (msg->findInt32("push-blank-buffers-on-shutdown", &push) && push != 0) { mFlags |= kFlagPushBlankBuffersToNativeWindowOnShutdown; } } int32_t rotationDegrees; if (msg->findInt32("rotation-degrees", &rotationDegrees)) { mRotationDegrees = rotationDegrees; } else { mRotationDegrees = 0; } } if (video) { // determine need for software renderer bool usingSwRenderer = false; if (haveNativeWindow && mComponentName.startsWith("OMX.google.")) { usingSwRenderer = true; haveNativeWindow = false; } if (encoder) { err = setupVideoEncoder(mime, msg); } else { err = setupVideoDecoder(mime, msg, haveNativeWindow); } if (err != OK) { return err; } if (haveNativeWindow) { mNativeWindow = static_cast<Surface *>(obj.get()); } // initialize native window now to get actual output format // TODO: this is needed for some encoders even though they don't use native window err = initNativeWindow(); if (err != OK) { return err; } // fallback for devices that do not handle flex-YUV for native buffers if (haveNativeWindow) { int32_t requestedColorFormat = OMX_COLOR_FormatUnused; if (msg->findInt32("color-format", &requestedColorFormat) && requestedColorFormat == OMX_COLOR_FormatYUV420Flexible) { status_t err = getPortFormat(kPortIndexOutput, outputFormat); if (err != OK) { return err; } int32_t colorFormat = OMX_COLOR_FormatUnused; OMX_U32 flexibleEquivalent = OMX_COLOR_FormatUnused; if (!outputFormat->findInt32("color-format", &colorFormat)) { ALOGE("ouptut port did not have a color format (wrong domain?)"); return BAD_VALUE; } ALOGD("[%s] Requested output format %#x and got %#x.", mComponentName.c_str(), requestedColorFormat, colorFormat); if (!isFlexibleColorFormat( mOMX, mNode, colorFormat, haveNativeWindow, &flexibleEquivalent) || flexibleEquivalent != (OMX_U32)requestedColorFormat) { // device did not handle flex-YUV request for native window, fall back // to SW renderer ALOGI("[%s] Falling back to software renderer", mComponentName.c_str()); mNativeWindow.clear(); mNativeWindowUsageBits = 0; haveNativeWindow = false; usingSwRenderer = true; if (storingMetadataInDecodedBuffers()) { err = mOMX->storeMetaDataInBuffers( mNode, kPortIndexOutput, OMX_FALSE, &mOutputMetadataType); mOutputMetadataType = kMetadataBufferTypeInvalid; // just in case // TODO: implement adaptive-playback support for bytebuffer mode. // This is done by SW codecs, but most HW codecs don't support it. inputFormat->setInt32("adaptive-playback", false); } if (err == OK) { err = mOMX->enableGraphicBuffers(mNode, kPortIndexOutput, OMX_FALSE); } if (mFlags & kFlagIsGrallocUsageProtected) { // fallback is not supported for protected playback err = PERMISSION_DENIED; } else if (err == OK) { err = setupVideoDecoder(mime, msg, false); } } } } if (usingSwRenderer) { outputFormat->setInt32("using-sw-renderer", 1); } } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_MPEG)) { int32_t numChannels, sampleRate; if (!msg->findInt32("channel-count", &numChannels) || !msg->findInt32("sample-rate", &sampleRate)) { // Since we did not always check for these, leave them optional // and have the decoder figure it all out. err = OK; } else { err = setupRawAudioFormat( encoder ? kPortIndexInput : kPortIndexOutput, sampleRate, numChannels); } } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AAC)) { int32_t numChannels, sampleRate; if (!msg->findInt32("channel-count", &numChannels) || !msg->findInt32("sample-rate", &sampleRate)) { err = INVALID_OPERATION; } else { int32_t isADTS, aacProfile; int32_t sbrMode; int32_t maxOutputChannelCount; int32_t pcmLimiterEnable; drcParams_t drc; if (!msg->findInt32("is-adts", &isADTS)) { isADTS = 0; } if (!msg->findInt32("aac-profile", &aacProfile)) { aacProfile = OMX_AUDIO_AACObjectNull; } if (!msg->findInt32("aac-sbr-mode", &sbrMode)) { sbrMode = -1; } if (!msg->findInt32("aac-max-output-channel_count", &maxOutputChannelCount)) { maxOutputChannelCount = -1; } if (!msg->findInt32("aac-pcm-limiter-enable", &pcmLimiterEnable)) { // value is unknown pcmLimiterEnable = -1; } if (!msg->findInt32("aac-encoded-target-level", &drc.encodedTargetLevel)) { // value is unknown drc.encodedTargetLevel = -1; } if (!msg->findInt32("aac-drc-cut-level", &drc.drcCut)) { // value is unknown drc.drcCut = -1; } if (!msg->findInt32("aac-drc-boost-level", &drc.drcBoost)) { // value is unknown drc.drcBoost = -1; } if (!msg->findInt32("aac-drc-heavy-compression", &drc.heavyCompression)) { // value is unknown drc.heavyCompression = -1; } if (!msg->findInt32("aac-target-ref-level", &drc.targetRefLevel)) { // value is unknown drc.targetRefLevel = -1; } err = setupAACCodec( encoder, numChannels, sampleRate, bitRate, aacProfile, isADTS != 0, sbrMode, maxOutputChannelCount, drc, pcmLimiterEnable); } } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AMR_NB)) { err = setupAMRCodec(encoder, false /* isWAMR */, bitRate); } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AMR_WB)) { err = setupAMRCodec(encoder, true /* isWAMR */, bitRate); } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_G711_ALAW) || !strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_G711_MLAW)) { // These are PCM-like formats with a fixed sample rate but // a variable number of channels. int32_t numChannels; if (!msg->findInt32("channel-count", &numChannels)) { err = INVALID_OPERATION; } else { int32_t sampleRate; if (!msg->findInt32("sample-rate", &sampleRate)) { sampleRate = 8000; } err = setupG711Codec(encoder, sampleRate, numChannels); } } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_FLAC)) { int32_t numChannels = 0, sampleRate = 0, compressionLevel = -1; if (encoder && (!msg->findInt32("channel-count", &numChannels) || !msg->findInt32("sample-rate", &sampleRate))) { ALOGE("missing channel count or sample rate for FLAC encoder"); err = INVALID_OPERATION; } else { if (encoder) { if (!msg->findInt32( "complexity", &compressionLevel) && !msg->findInt32( "flac-compression-level", &compressionLevel)) { compressionLevel = 5; // default FLAC compression level } else if (compressionLevel < 0) { ALOGW("compression level %d outside [0..8] range, " "using 0", compressionLevel); compressionLevel = 0; } else if (compressionLevel > 8) { ALOGW("compression level %d outside [0..8] range, " "using 8", compressionLevel); compressionLevel = 8; } } err = setupFlacCodec( encoder, numChannels, sampleRate, compressionLevel); } } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_RAW)) { int32_t numChannels, sampleRate; if (encoder || !msg->findInt32("channel-count", &numChannels) || !msg->findInt32("sample-rate", &sampleRate)) { err = INVALID_OPERATION; } else { err = setupRawAudioFormat(kPortIndexInput, sampleRate, numChannels); } } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AC3)) { int32_t numChannels; int32_t sampleRate; if (!msg->findInt32("channel-count", &numChannels) || !msg->findInt32("sample-rate", &sampleRate)) { err = INVALID_OPERATION; } else { err = setupAC3Codec(encoder, numChannels, sampleRate); } } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_EAC3)) { int32_t numChannels; int32_t sampleRate; if (!msg->findInt32("channel-count", &numChannels) || !msg->findInt32("sample-rate", &sampleRate)) { err = INVALID_OPERATION; } else { err = setupEAC3Codec(encoder, numChannels, sampleRate); } } if (err != OK) { return err; } if (!msg->findInt32("encoder-delay", &mEncoderDelay)) { mEncoderDelay = 0; } if (!msg->findInt32("encoder-padding", &mEncoderPadding)) { mEncoderPadding = 0; } if (msg->findInt32("channel-mask", &mChannelMask)) { mChannelMaskPresent = true; } else { mChannelMaskPresent = false; } int32_t maxInputSize; if (msg->findInt32("max-input-size", &maxInputSize)) { err = setMinBufferSize(kPortIndexInput, (size_t)maxInputSize); } else if (!strcmp("OMX.Nvidia.aac.decoder", mComponentName.c_str())) { err = setMinBufferSize(kPortIndexInput, 8192); // XXX } int32_t priority; if (msg->findInt32("priority", &priority)) { err = setPriority(priority); } int32_t rateInt = -1; float rateFloat = -1; if (!msg->findFloat("operating-rate", &rateFloat)) { msg->findInt32("operating-rate", &rateInt); rateFloat = (float)rateInt; // 16MHz (FLINTMAX) is OK for upper bound. } if (rateFloat > 0) { err = setOperatingRate(rateFloat, video); } mBaseOutputFormat = outputFormat; err = getPortFormat(kPortIndexInput, inputFormat); if (err == OK) { err = getPortFormat(kPortIndexOutput, outputFormat); if (err == OK) { mInputFormat = inputFormat; mOutputFormat = outputFormat; } } return err; } |
到了这里整个解码器的初始化和配置已经结束了,我们看下解码器的start阶段:
status_t MediaCodec::start() { sp<AMessage> msg = new AMessage(kWhatStart, this); status_t err; Vector<MediaResource> resources; const char *type = (mFlags & kFlagIsSecure) ? kResourceSecureCodec : kResourceNonSecureCodec; const char *subtype = mIsVideo ? kResourceVideoCodec : kResourceAudioCodec; resources.push_back(MediaResource(String8(type), String8(subtype), 1)); // Don't know the buffer size at this point, but it's fine to use 1 because // the reclaimResource call doesn't consider the requester's buffer size for now. resources.push_back(MediaResource(String8(kResourceGraphicMemory), 1)); for (int i = 0; i <= kMaxRetry; ++i) { if (i > 0) { // Don't try to reclaim resource for the first time. if (!mResourceManagerService->reclaimResource(resources)) { break; } // Recover codec from previous error before retry start. err = reset(); if (err != OK) { ALOGE("retrying start: failed to reset codec"); break; } sp<AMessage> response; err = PostAndAwaitResponse(mConfigureMsg, &response); if (err != OK) { ALOGE("retrying start: failed to configure codec"); break; } } sp<AMessage> response; err = PostAndAwaitResponse(msg, &response); if (!isResourceError(err)) { break; } } return err; } |
case kWhatStart: { sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mState == FLUSHED) { setState(STARTED); if (mHavePendingInputBuffers) { onInputBufferAvailable(); mHavePendingInputBuffers = false; } //我们重点看这里 mCodec->signalResume(); //.................. PostReplyWithError(replyID, OK); break; } else if (mState != CONFIGURED) { PostReplyWithError(replyID, INVALID_OPERATION); break; } mReplyID = replyID; setState(STARTING); mCodec->initiateStart(); break; } |
首先調用initiateStart初始化解码器状态
void ACodec::initiateStart() { (new AMessage(kWhatStart, this))->post(); } |
case ACodec::kWhatStart: { onStart(); handled = true; break; } |
void ACodec::LoadedState::onStart() { ALOGV("onStart"); status_t err = mCodec->mOMX->sendCommand(mCodec->mNode, OMX_CommandStateSet, OMX_StateIdle); if (err != OK) { mCodec->signalError(OMX_ErrorUndefined, makeNoSideEffectStatus(err)); } else { mCodec->changeState(mCodec->mLoadedToIdleState); } } |
接着开始获取数据进行解码
void ACodec::signalResume() { (new AMessage(kWhatResume, this))->post(); } |
case kWhatResume: { resume(); handled = true; break; } |
void ACodec::ExecutingState::resume() { submitOutputBuffers(); // Post all available input buffers if (mCodec->mBuffers[kPortIndexInput].size() == 0u) { ALOGW("[%s] we don't have any input buffers to resume", mCodec->mComponentName.c_str()); } for (size_t i = 0; i < mCodec->mBuffers[kPortIndexInput].size(); i++) { BufferInfo *info = &mCodec->mBuffers[kPortIndexInput].editItemAt(i); if (info->mStatus == BufferInfo::OWNED_BY_US) { postFillThisBuffer(info); } } mActive = true; } |
void ACodec::BaseState::postFillThisBuffer(BufferInfo *info) { if (mCodec->mPortEOS[kPortIndexInput]) { return; } CHECK_EQ((int)info->mStatus, (int)BufferInfo::OWNED_BY_US); sp<AMessage> notify = mCodec->mNotify->dup(); notify->setInt32("what", CodecBase::kWhatFillThisBuffer); notify->setInt32("buffer-id", info->mBufferID); info->mData->meta()->clear(); notify->setBuffer("buffer", info->mData); sp<AMessage> reply = new AMessage(kWhatInputBufferFilled, mCodec); reply->setInt32("buffer-id", info->mBufferID); notify->setMessage("reply", reply); notify->post(); info->mStatus = BufferInfo::OWNED_BY_UPSTREAM; } |
case CodecBase::kWhatFillThisBuffer: { //.......... if (mFlags & kFlagIsAsync) { if (!mHaveInputSurface) { if (mState == FLUSHED) { mHavePendingInputBuffers = true; } else { onInputBufferAvailable(); } } } else if (mFlags & kFlagDequeueInputPending) { CHECK(handleDequeueInputBuffer(mDequeueInputReplyID)); ++mDequeueInputTimeoutGeneration; mFlags &= ~kFlagDequeueInputPending; mDequeueInputReplyID = 0; } else { postActivityNotificationIfPossible(); } break; } |
void MediaCodec::onInputBufferAvailable() { int32_t index; while ((index = dequeuePortBuffer(kPortIndexInput)) >= 0) { sp<AMessage> msg = mCallback->dup(); msg->setInt32("callbackID", CB_INPUT_AVAILABLE); msg->setInt32("index", index); msg->post(); } } |
还记得这个mCallback怎么来的吗?
void NuPlayer::Decoder::onConfigure(const sp<AMessage> &format) { //................. sp<AMessage> reply = new AMessage(kWhatCodecNotify, this); mCodec->setCallback(reply); //.................. } |
status_t MediaCodec::setCallback(const sp<AMessage> &callback) { sp<AMessage> msg = new AMessage(kWhatSetCallback, this); msg->setMessage("callback", callback); sp<AMessage> response; return PostAndAwaitResponse(msg, &response); } |
case kWhatSetCallback: { sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); sp<AMessage> callback; CHECK(msg->findMessage("callback", &callback)); mCallback = callback; if (mCallback != NULL) { mFlags |= kFlagIsAsync; } else { mFlags &= ~kFlagIsAsync; } sp<AMessage> response = new AMessage; response->postReply(replyID); break; } |
所以根据上面我们可以知道接下来i调用的是kWhatCodecNotify 下的 CB_INPUT_AVAILABLE
case MediaCodec::CB_INPUT_AVAILABLE: { int32_t index; CHECK(msg->findInt32("index", &index)); handleAnInputBuffer(index); break; } |
bool NuPlayer::Decoder::handleAnInputBuffer(size_t index) { if (isDiscontinuityPending()) { return false; } sp<ABuffer> buffer; mCodec->getInputBuffer(index, &buffer); if (buffer == NULL) { handleError(UNKNOWN_ERROR); return false; } if (index >= mInputBuffers.size()) { for (size_t i = mInputBuffers.size(); i <= index; ++i) { mInputBuffers.add(); mMediaBuffers.add(); mInputBufferIsDequeued.add(); mMediaBuffers.editItemAt(i) = NULL; mInputBufferIsDequeued.editItemAt(i) = false; } } mInputBuffers.editItemAt(index) = buffer; //CHECK_LT(bufferIx, mInputBuffers.size()); if (mMediaBuffers[index] != NULL) { mMediaBuffers[index]->release(); mMediaBuffers.editItemAt(index) = NULL; } mInputBufferIsDequeued.editItemAt(index) = true; if (!mCSDsToSubmit.isEmpty()) { sp<AMessage> msg = new AMessage(); msg->setSize("buffer-ix", index); sp<ABuffer> buffer = mCSDsToSubmit.itemAt(0); ALOGI("[%s] resubmitting CSD", mComponentName.c_str()); msg->setBuffer("buffer", buffer); mCSDsToSubmit.removeAt(0); CHECK(onInputBufferFetched(msg)); return true; } while (!mPendingInputMessages.empty()) { sp<AMessage> msg = *mPendingInputMessages.begin(); if (!onInputBufferFetched(msg)) { break; } mPendingInputMessages.erase(mPendingInputMessages.begin()); } if (!mInputBufferIsDequeued.editItemAt(index)) { return true; } mDequeuedInputBuffers.push_back(index); onRequestInputBuffers(); return true; } |
void NuPlayer::DecoderBase::onRequestInputBuffers() { if (mRequestInputBuffersPending) { return; } // doRequestBuffers() return true if we should request more data if (doRequestBuffers()) { mRequestInputBuffersPending = true; sp<AMessage> msg = new AMessage(kWhatRequestInputBuffers, this); msg->post(10 * 1000ll); } } |
bool NuPlayer::Decoder::doRequestBuffers() { // mRenderer is only NULL if we have a legacy widevine source that // is not yet ready. In this case we must not fetch input. if (isDiscontinuityPending() || mRenderer == NULL) { return false; } status_t err = OK; while (err == OK && !mDequeuedInputBuffers.empty()) { size_t bufferIx = *mDequeuedInputBuffers.begin(); sp<AMessage> msg = new AMessage(); msg->setSize("buffer-ix", bufferIx); err = fetchInputData(msg); if (err != OK && err != ERROR_END_OF_STREAM) { // if EOS, need to queue EOS buffer break; } mDequeuedInputBuffers.erase(mDequeuedInputBuffers.begin()); if (!mPendingInputMessages.empty() || !onInputBufferFetched(msg)) { mPendingInputMessages.push_back(msg); } } return err == -EWOULDBLOCK && mSource->feedMoreTSData() == OK; } |
status_t NuPlayer::Decoder::fetchInputData(sp<AMessage> &reply) { sp<ABuffer> accessUnit; bool dropAccessUnit; do { status_t err = mSource->dequeueAccessUnit(mIsAudio, &accessUnit); if (err == -EWOULDBLOCK) { return err; } else if (err != OK) { if (err == INFO_DISCONTINUITY) { int32_t type; CHECK(accessUnit->meta()->findInt32("discontinuity", &type)); bool formatChange = (mIsAudio && (type & ATSParser::DISCONTINUITY_AUDIO_FORMAT)) || (!mIsAudio && (type & ATSParser::DISCONTINUITY_VIDEO_FORMAT)); bool timeChange = (type & ATSParser::DISCONTINUITY_TIME) != 0; ALOGI("%s discontinuity (format=%d, time=%d)", mIsAudio ? "audio" : "video", formatChange, timeChange); bool seamlessFormatChange = false; sp<AMessage> newFormat = mSource->getFormat(mIsAudio); if (formatChange) { seamlessFormatChange = supportsSeamlessFormatChange(newFormat); // treat seamless format change separately formatChange = !seamlessFormatChange; } // For format or time change, return EOS to queue EOS input, // then wait for EOS on output. if (formatChange /* not seamless */) { mFormatChangePending = true; err = ERROR_END_OF_STREAM; } else if (timeChange) { rememberCodecSpecificData(newFormat); mTimeChangePending = true; err = ERROR_END_OF_STREAM; } else if (seamlessFormatChange) { // reuse existing decoder and don't flush rememberCodecSpecificData(newFormat); continue; } else { // This stream is unaffected by the discontinuity return -EWOULDBLOCK; } } // reply should only be returned without a buffer set // when there is an error (including EOS) CHECK(err != OK); reply->setInt32("err", err); return ERROR_END_OF_STREAM; } dropAccessUnit = false; if (!mIsAudio && !mIsSecure && mRenderer->getVideoLateByUs() > 100000ll && mIsVideoAVC && !IsAVCReferenceFrame(accessUnit)) { dropAccessUnit = true; ++mNumInputFramesDropped; } } while (dropAccessUnit); // ALOGV("returned a valid buffer of %s data", mIsAudio ? "mIsAudio" : "video"); #if 0 int64_t mediaTimeUs; CHECK(accessUnit->meta()->findInt64("timeUs", &mediaTimeUs)); ALOGV("[%s] feeding input buffer at media time %.3f", mIsAudio ? "audio" : "video", mediaTimeUs / 1E6); #endif if (mCCDecoder != NULL) { mCCDecoder->decode(accessUnit); } reply->setBuffer("buffer", accessUnit); return OK; } |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。