diff options
author | Andreas Huber <andih@google.com> | 2010-08-27 13:29:08 -0700 |
---|---|---|
committer | Andreas Huber <andih@google.com> | 2010-08-27 13:29:08 -0700 |
commit | 8d342970108926c4ea355c90d26a2a353ec0fd47 (patch) | |
tree | 6ff39167defd677f8a31b3d8887481ad65a88b87 /media/libstagefright/rtsp/MyHandler.h | |
parent | cc6adf524c1bb3bfaa5be464b50b8bcca899761c (diff) | |
download | frameworks_av-8d342970108926c4ea355c90d26a2a353ec0fd47.zip frameworks_av-8d342970108926c4ea355c90d26a2a353ec0fd47.tar.gz frameworks_av-8d342970108926c4ea355c90d26a2a353ec0fd47.tar.bz2 |
Better support for rtsp (normal play-)time display. Better seek support, timeout if no packets arrive for too long.
Change-Id: Id491541a6ae501604cda815f8e961a3bfe26db7d
related-to-bug: 2556656
Diffstat (limited to 'media/libstagefright/rtsp/MyHandler.h')
-rw-r--r-- | media/libstagefright/rtsp/MyHandler.h | 214 |
1 files changed, 202 insertions, 12 deletions
diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h index b10c881..0685a47 100644 --- a/media/libstagefright/rtsp/MyHandler.h +++ b/media/libstagefright/rtsp/MyHandler.h @@ -23,6 +23,8 @@ #include "ARTSPConnection.h" #include "ASessionDescription.h" +#include <ctype.h> + #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/ALooper.h> @@ -33,6 +35,34 @@ namespace android { +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + struct MyHandler : public AHandler { MyHandler(const char *url, const sp<ALooper> &looper) : mLooper(looper), @@ -43,7 +73,9 @@ struct MyHandler : public AHandler { mSetupTracksSuccessful(false), mSeekPending(false), mFirstAccessUnit(true), - mFirstAccessUnitNTP(0) { + mFirstAccessUnitNTP(0), + mNumAccessUnitsReceived(0), + mCheckPending(false) { mNetLooper->start(false /* runOnCallingThread */, false /* canCallJava */, @@ -76,6 +108,20 @@ struct MyHandler : public AHandler { msg->post(); } + int64_t getNormalPlayTimeUs() { + int64_t maxTimeUs = 0; + for (size_t i = 0; i < mTracks.size(); ++i) { + int64_t timeUs = mTracks.editItemAt(i).mPacketSource + ->getNormalPlayTimeUs(); + + if (i == 0 || timeUs > maxTimeUs) { + maxTimeUs = timeUs; + } + } + + return maxTimeUs; + } + virtual void onMessageReceived(const sp<AMessage> &msg) { switch (msg->what()) { case 'conn': @@ -269,6 +315,8 @@ struct MyHandler : public AHandler { CHECK_EQ(response->mStatusCode, 200u); + parsePlayResponse(response); + mDoneMsg->setInt32("result", OK); mDoneMsg->post(); mDoneMsg = NULL; @@ -332,16 +380,38 @@ struct MyHandler : public AHandler { break; } + case 'chek': + { + if (mNumAccessUnitsReceived == 0) { + LOG(INFO) << "stream ended? aborting."; + (new AMessage('abor', id()))->post(); + break; + } + + mNumAccessUnitsReceived = 0; + msg->post(500000); + break; + } + case 'accu': { + ++mNumAccessUnitsReceived; + + if (!mCheckPending) { + mCheckPending = true; + sp<AMessage> check = new AMessage('chek', id()); + check->post(500000); + } + size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); + TrackInfo *track = &mTracks.editItemAt(trackIndex); + int32_t eos; if (msg->findInt32("eos", &eos)) { LOG(INFO) << "received BYE on track index " << trackIndex; #if 0 - TrackInfo *track = &mTracks.editItemAt(trackIndex); track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); #endif return; @@ -352,10 +422,32 @@ struct MyHandler : public AHandler { sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); + uint32_t seqNum = (uint32_t)accessUnit->int32Data(); + + if (seqNum < track->mFirstSeqNumInSegment) { + LOG(INFO) << "dropping stale access-unit " + << "(" << seqNum << " < " + << track->mFirstSeqNumInSegment << ")"; + break; + } + uint64_t ntpTime; CHECK(accessUnit->meta()->findInt64( "ntp-time", (int64_t *)&ntpTime)); + uint32_t rtpTime; + CHECK(accessUnit->meta()->findInt32( + "rtp-time", (int32_t *)&rtpTime)); + + if (track->mNewSegment) { + track->mNewSegment = false; + + LOG(VERBOSE) << "first segment unit ntpTime=" + << StringPrintf("0x%016llx", ntpTime) + << " rtpTime=" << rtpTime + << " seq=" << seqNum; + } + if (mFirstAccessUnit) { mFirstAccessUnit = false; mFirstAccessUnitNTP = ntpTime; @@ -414,6 +506,11 @@ struct MyHandler : public AHandler { case 'see1': { + // Session is paused now. + for (size_t i = 0; i < mTracks.size(); ++i) { + mTracks.editItemAt(i).mPacketSource->flushQueue(); + } + int64_t timeUs; CHECK(msg->findInt64("time", &timeUs)); @@ -440,15 +537,13 @@ struct MyHandler : public AHandler { { CHECK(mSeekPending); - LOG(INFO) << "seek completed."; - mSeekPending = false; - int32_t result; CHECK(msg->findInt32("result", &result)); - if (result != OK) { - LOG(ERROR) << "seek FAILED"; - break; - } + + LOG(INFO) << "PLAY completed with result " + << result << " (" << strerror(-result) << ")"; + + CHECK_EQ(result, (status_t)OK); sp<RefBase> obj; CHECK(msg->findObject("response", &obj)); @@ -457,9 +552,10 @@ struct MyHandler : public AHandler { CHECK_EQ(response->mStatusCode, 200u); - for (size_t i = 0; i < mTracks.size(); ++i) { - mTracks.editItemAt(i).mPacketSource->flushQueue(); - } + parsePlayResponse(response); + + LOG(INFO) << "seek completed."; + mSeekPending = false; break; } @@ -491,6 +587,90 @@ struct MyHandler : public AHandler { } } + static void SplitString( + const AString &s, const char *separator, List<AString> *items) { + items->clear(); + size_t start = 0; + while (start < s.size()) { + ssize_t offset = s.find(separator, start); + + if (offset < 0) { + items->push_back(AString(s, start, s.size() - start)); + break; + } + + items->push_back(AString(s, start, offset - start)); + start = offset + strlen(separator); + } + } + + void parsePlayResponse(const sp<ARTSPResponse> &response) { + ssize_t i = response->mHeaders.indexOfKey("range"); + if (i < 0) { + // Server doesn't even tell use what range it is going to + // play, therefore we won't support seeking. + return; + } + + AString range = response->mHeaders.valueAt(i); + LOG(VERBOSE) << "Range: " << range; + + AString val; + CHECK(GetAttribute(range.c_str(), "npt", &val)); + float npt1, npt2; + + if (val == "now-") { + // This is a live stream and therefore not seekable. + return; + } else { + CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2); + } + + i = response->mHeaders.indexOfKey("rtp-info"); + CHECK_GE(i, 0); + + AString rtpInfo = response->mHeaders.valueAt(i); + List<AString> streamInfos; + SplitString(rtpInfo, ",", &streamInfos); + + int n = 1; + for (List<AString>::iterator it = streamInfos.begin(); + it != streamInfos.end(); ++it) { + (*it).trim(); + LOG(VERBOSE) << "streamInfo[" << n << "] = " << *it; + + CHECK(GetAttribute((*it).c_str(), "url", &val)); + + size_t trackIndex = 0; + while (trackIndex < mTracks.size() + && !(val == mTracks.editItemAt(trackIndex).mURL)) { + ++trackIndex; + } + CHECK_LT(trackIndex, mTracks.size()); + + CHECK(GetAttribute((*it).c_str(), "seq", &val)); + + char *end; + unsigned long seq = strtoul(val.c_str(), &end, 10); + + TrackInfo *info = &mTracks.editItemAt(trackIndex); + info->mFirstSeqNumInSegment = seq; + info->mNewSegment = true; + + CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); + + uint32_t rtpTime = strtoul(val.c_str(), &end, 10); + + LOG(VERBOSE) << "track #" << n + << ": rtpTime=" << rtpTime << " <=> npt=" << npt1; + + info->mPacketSource->setNormalPlayTimeMapping( + rtpTime, (int64_t)(npt1 * 1E6)); + + ++n; + } + } + sp<APacketSource> getPacketSource(size_t index) { CHECK_GE(index, 0u); CHECK_LT(index, mTracks.size()); @@ -515,11 +695,16 @@ private: bool mSeekPending; bool mFirstAccessUnit; uint64_t mFirstAccessUnitNTP; + int64_t mNumAccessUnitsReceived; + bool mCheckPending; struct TrackInfo { + AString mURL; int mRTPSocket; int mRTCPSocket; bool mUsingInterleavedTCP; + uint32_t mFirstSeqNumInSegment; + bool mNewSegment; sp<APacketSource> mPacketSource; }; @@ -549,8 +734,13 @@ private: mTracks.push(TrackInfo()); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); + info->mURL = trackURL; info->mPacketSource = source; info->mUsingInterleavedTCP = false; + info->mFirstSeqNumInSegment = 0; + info->mNewSegment = true; + + LOG(VERBOSE) << "track #" << mTracks.size() << " URL=" << trackURL; AString request = "SETUP "; request.append(trackURL); |