From 2bfdd428c56c7524d1a11979f200a1762866032d Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 11 Oct 2011 15:24:07 -0700 Subject: NuPlayer is now taking on the task of streaming over RTSP. Change-Id: Ie204db8810807f1e7981959e34dc0149e5d9563a --- media/libstagefright/rtsp/MyHandler.h | 167 +++++++++++++++++++++------------- 1 file changed, 102 insertions(+), 65 deletions(-) (limited to 'media/libstagefright/rtsp/MyHandler.h') diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h index 8128813..af7dd23 100644 --- a/media/libstagefright/rtsp/MyHandler.h +++ b/media/libstagefright/rtsp/MyHandler.h @@ -94,12 +94,24 @@ static bool GetAttribute(const char *s, const char *key, AString *value) { } struct MyHandler : public AHandler { + enum { + kWhatConnected = 'conn', + kWhatDisconnected = 'disc', + kWhatSeekDone = 'sdon', + + kWhatAccessUnit = 'accU', + kWhatEOS = 'eos!', + kWhatSeekDiscontinuity = 'seeD', + kWhatNormalPlayTimeMapping = 'nptM', + }; + MyHandler( - const char *url, const sp &looper, + const char *url, + const sp ¬ify, bool uidValid = false, uid_t uid = 0) - : mUIDValid(uidValid), + : mNotify(notify), + mUIDValid(uidValid), mUID(uid), - mLooper(looper), mNetLooper(new ALooper), mConn(new ARTSPConnection(mUIDValid, mUID)), mRTPConn(new ARTPConnection), @@ -145,12 +157,9 @@ struct MyHandler : public AHandler { mSessionHost = host; } - void connect(const sp &doneMsg) { - mDoneMsg = doneMsg; - - mLooper->registerHandler(this); - mLooper->registerHandler(mConn); - (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); + void connect() { + looper()->registerHandler(mConn); + (1 ? mNetLooper : looper())->registerHandler(mRTPConn); sp notify = new AMessage('biny', id()); mConn->observeBinaryData(notify); @@ -159,33 +168,16 @@ struct MyHandler : public AHandler { mConn->connect(mOriginalSessionURL.c_str(), reply); } - void disconnect(const sp &doneMsg) { - mDoneMsg = doneMsg; - + void disconnect() { (new AMessage('abor', id()))->post(); } - void seek(int64_t timeUs, const sp &doneMsg) { + void seek(int64_t timeUs) { sp msg = new AMessage('seek', id()); msg->setInt64("time", timeUs); - msg->setMessage("doneMsg", doneMsg); msg->post(); } - int64_t getNormalPlayTimeUs() { - int64_t maxTimeUs = 0; - for (size_t i = 0; i < mTracks.size(); ++i) { - int64_t timeUs = mTracks.editItemAt(i).mPacketSource - ->getNormalPlayTimeUs(); - - if (i == 0 || timeUs > maxTimeUs) { - maxTimeUs = timeUs; - } - } - - return maxTimeUs; - } - static void addRR(const sp &buf) { uint8_t *ptr = buf->data() + buf->size(); ptr[0] = 0x80 | 0; @@ -619,7 +611,9 @@ struct MyHandler : public AHandler { for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *info = &mTracks.editItemAt(i); - info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); + if (!mFirstAccessUnit) { + postQueueEOS(i, ERROR_END_OF_STREAM); + } if (!info->mUsingInterleavedTCP) { mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); @@ -690,11 +684,10 @@ struct MyHandler : public AHandler { case 'quit': { - if (mDoneMsg != NULL) { - mDoneMsg->setInt32("result", UNKNOWN_ERROR); - mDoneMsg->post(); - mDoneMsg = NULL; - } + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatDisconnected); + msg->setInt32("result", UNKNOWN_ERROR); + msg->post(); break; } @@ -795,17 +788,12 @@ struct MyHandler : public AHandler { case 'seek': { - sp doneMsg; - CHECK(msg->findMessage("doneMsg", &doneMsg)); - - if (mSeekPending) { - doneMsg->post(); - break; - } - if (!mSeekable) { LOGW("This is a live stream, ignoring seek request."); - doneMsg->post(); + + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatSeekDone); + msg->post(); break; } @@ -831,7 +819,6 @@ struct MyHandler : public AHandler { sp reply = new AMessage('see1', id()); reply->setInt64("time", timeUs); - reply->setMessage("doneMsg", doneMsg); mConn->sendRequest(request.c_str(), reply); break; } @@ -842,7 +829,8 @@ struct MyHandler : public AHandler { for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *info = &mTracks.editItemAt(i); - info->mPacketSource->flushQueue(); + postQueueSeekDiscontinuity(i); + info->mRTPAnchor = 0; info->mNTPAnchorUs = -1; } @@ -866,11 +854,7 @@ struct MyHandler : public AHandler { request.append("\r\n"); - sp doneMsg; - CHECK(msg->findMessage("doneMsg", &doneMsg)); - sp reply = new AMessage('see2', id()); - reply->setMessage("doneMsg", doneMsg); mConn->sendRequest(request.c_str(), reply); break; } @@ -915,10 +899,9 @@ struct MyHandler : public AHandler { mSeekPending = false; - sp doneMsg; - CHECK(msg->findMessage("doneMsg", &doneMsg)); - - doneMsg->post(); + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatSeekDone); + msg->post(); break; } @@ -1056,8 +1039,14 @@ struct MyHandler : public AHandler { LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); - info->mPacketSource->setNormalPlayTimeMapping( - rtpTime, (int64_t)(npt1 * 1E6)); + info->mNormalPlayTimeRTP = rtpTime; + info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); + + if (!mFirstAccessUnit) { + postNormalPlayTimeMapping( + trackIndex, + info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); + } ++n; } @@ -1065,11 +1054,15 @@ struct MyHandler : public AHandler { mSeekable = true; } - sp getPacketSource(size_t index) { + sp getTrackFormat(size_t index, int32_t *timeScale) { CHECK_GE(index, 0u); CHECK_LT(index, mTracks.size()); - return mTracks.editItemAt(index).mPacketSource; + const TrackInfo &info = mTracks.itemAt(index); + + *timeScale = info.mTimeScale; + + return info.mPacketSource->getFormat(); } size_t countTracks() const { @@ -1089,6 +1082,9 @@ private: int64_t mNTPAnchorUs; int32_t mTimeScale; + uint32_t mNormalPlayTimeRTP; + int64_t mNormalPlayTimeUs; + sp mPacketSource; // Stores packets temporarily while no notion of time @@ -1096,9 +1092,9 @@ private: List > mPackets; }; + sp mNotify; bool mUIDValid; uid_t mUID; - sp mLooper; sp mNetLooper; sp mConn; sp mRTPConn; @@ -1127,8 +1123,6 @@ private: Vector mTracks; - sp mDoneMsg; - void setupTrack(size_t index) { sp source = new APacketSource(mSessionDesc, index); @@ -1158,6 +1152,8 @@ private: info->mNewSegment = true; info->mRTPAnchor = 0; info->mNTPAnchorUs = -1; + info->mNormalPlayTimeRTP = 0; + info->mNormalPlayTimeUs = 0ll; unsigned long PT; AString formatDesc; @@ -1283,9 +1279,17 @@ private: LOGV("onAccessUnitComplete track %d", trackIndex); if (mFirstAccessUnit) { - mDoneMsg->setInt32("result", OK); - mDoneMsg->post(); - mDoneMsg = NULL; + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatConnected); + msg->post(); + + for (size_t i = 0; i < mTracks.size(); ++i) { + TrackInfo *info = &mTracks.editItemAt(i); + + postNormalPlayTimeMapping( + i, + info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); + } mFirstAccessUnit = false; } @@ -1303,12 +1307,12 @@ private: track->mPackets.erase(track->mPackets.begin()); if (addMediaTimestamp(trackIndex, track, accessUnit)) { - track->mPacketSource->queueAccessUnit(accessUnit); + postQueueAccessUnit(trackIndex, accessUnit); } } if (addMediaTimestamp(trackIndex, track, accessUnit)) { - track->mPacketSource->queueAccessUnit(accessUnit); + postQueueAccessUnit(trackIndex, accessUnit); } } @@ -1344,6 +1348,39 @@ private: return true; } + void postQueueAccessUnit( + size_t trackIndex, const sp &accessUnit) { + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatAccessUnit); + msg->setSize("trackIndex", trackIndex); + msg->setObject("accessUnit", accessUnit); + msg->post(); + } + + void postQueueEOS(size_t trackIndex, status_t finalResult) { + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatEOS); + msg->setSize("trackIndex", trackIndex); + msg->setInt32("finalResult", finalResult); + msg->post(); + } + + void postQueueSeekDiscontinuity(size_t trackIndex) { + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatSeekDiscontinuity); + msg->setSize("trackIndex", trackIndex); + msg->post(); + } + + void postNormalPlayTimeMapping( + size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatNormalPlayTimeMapping); + msg->setSize("trackIndex", trackIndex); + msg->setInt32("rtpTime", rtpTime); + msg->setInt64("nptUs", nptUs); + msg->post(); + } DISALLOW_EVIL_CONSTRUCTORS(MyHandler); }; -- cgit v1.1