summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/rtsp/MyHandler.h
diff options
context:
space:
mode:
authorAndreas Huber <andih@google.com>2010-08-27 13:29:08 -0700
committerAndreas Huber <andih@google.com>2010-08-27 13:29:08 -0700
commit8d342970108926c4ea355c90d26a2a353ec0fd47 (patch)
tree6ff39167defd677f8a31b3d8887481ad65a88b87 /media/libstagefright/rtsp/MyHandler.h
parentcc6adf524c1bb3bfaa5be464b50b8bcca899761c (diff)
downloadframeworks_av-8d342970108926c4ea355c90d26a2a353ec0fd47.zip
frameworks_av-8d342970108926c4ea355c90d26a2a353ec0fd47.tar.gz
frameworks_av-8d342970108926c4ea355c90d26a2a353ec0fd47.tar.bz2
Better support for rtsp (normal play-)time display. Better seek support, timeout if no packets arrive for too long.
Change-Id: Id491541a6ae501604cda815f8e961a3bfe26db7d related-to-bug: 2556656
Diffstat (limited to 'media/libstagefright/rtsp/MyHandler.h')
-rw-r--r--media/libstagefright/rtsp/MyHandler.h214
1 files changed, 202 insertions, 12 deletions
diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h
index b10c881..0685a47 100644
--- a/media/libstagefright/rtsp/MyHandler.h
+++ b/media/libstagefright/rtsp/MyHandler.h
@@ -23,6 +23,8 @@
#include "ARTSPConnection.h"
#include "ASessionDescription.h"
+#include <ctype.h>
+
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/ALooper.h>
@@ -33,6 +35,34 @@
namespace android {
+static bool GetAttribute(const char *s, const char *key, AString *value) {
+ value->clear();
+
+ size_t keyLen = strlen(key);
+
+ for (;;) {
+ while (isspace(*s)) {
+ ++s;
+ }
+
+ const char *colonPos = strchr(s, ';');
+
+ size_t len =
+ (colonPos == NULL) ? strlen(s) : colonPos - s;
+
+ if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
+ value->setTo(&s[keyLen + 1], len - keyLen - 1);
+ return true;
+ }
+
+ if (colonPos == NULL) {
+ return false;
+ }
+
+ s = colonPos + 1;
+ }
+}
+
struct MyHandler : public AHandler {
MyHandler(const char *url, const sp<ALooper> &looper)
: mLooper(looper),
@@ -43,7 +73,9 @@ struct MyHandler : public AHandler {
mSetupTracksSuccessful(false),
mSeekPending(false),
mFirstAccessUnit(true),
- mFirstAccessUnitNTP(0) {
+ mFirstAccessUnitNTP(0),
+ mNumAccessUnitsReceived(0),
+ mCheckPending(false) {
mNetLooper->start(false /* runOnCallingThread */,
false /* canCallJava */,
@@ -76,6 +108,20 @@ struct MyHandler : public AHandler {
msg->post();
}
+ int64_t getNormalPlayTimeUs() {
+ int64_t maxTimeUs = 0;
+ for (size_t i = 0; i < mTracks.size(); ++i) {
+ int64_t timeUs = mTracks.editItemAt(i).mPacketSource
+ ->getNormalPlayTimeUs();
+
+ if (i == 0 || timeUs > maxTimeUs) {
+ maxTimeUs = timeUs;
+ }
+ }
+
+ return maxTimeUs;
+ }
+
virtual void onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case 'conn':
@@ -269,6 +315,8 @@ struct MyHandler : public AHandler {
CHECK_EQ(response->mStatusCode, 200u);
+ parsePlayResponse(response);
+
mDoneMsg->setInt32("result", OK);
mDoneMsg->post();
mDoneMsg = NULL;
@@ -332,16 +380,38 @@ struct MyHandler : public AHandler {
break;
}
+ case 'chek':
+ {
+ if (mNumAccessUnitsReceived == 0) {
+ LOG(INFO) << "stream ended? aborting.";
+ (new AMessage('abor', id()))->post();
+ break;
+ }
+
+ mNumAccessUnitsReceived = 0;
+ msg->post(500000);
+ break;
+ }
+
case 'accu':
{
+ ++mNumAccessUnitsReceived;
+
+ if (!mCheckPending) {
+ mCheckPending = true;
+ sp<AMessage> check = new AMessage('chek', id());
+ check->post(500000);
+ }
+
size_t trackIndex;
CHECK(msg->findSize("track-index", &trackIndex));
+ TrackInfo *track = &mTracks.editItemAt(trackIndex);
+
int32_t eos;
if (msg->findInt32("eos", &eos)) {
LOG(INFO) << "received BYE on track index " << trackIndex;
#if 0
- TrackInfo *track = &mTracks.editItemAt(trackIndex);
track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
#endif
return;
@@ -352,10 +422,32 @@ struct MyHandler : public AHandler {
sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
+ uint32_t seqNum = (uint32_t)accessUnit->int32Data();
+
+ if (seqNum < track->mFirstSeqNumInSegment) {
+ LOG(INFO) << "dropping stale access-unit "
+ << "(" << seqNum << " < "
+ << track->mFirstSeqNumInSegment << ")";
+ break;
+ }
+
uint64_t ntpTime;
CHECK(accessUnit->meta()->findInt64(
"ntp-time", (int64_t *)&ntpTime));
+ uint32_t rtpTime;
+ CHECK(accessUnit->meta()->findInt32(
+ "rtp-time", (int32_t *)&rtpTime));
+
+ if (track->mNewSegment) {
+ track->mNewSegment = false;
+
+ LOG(VERBOSE) << "first segment unit ntpTime="
+ << StringPrintf("0x%016llx", ntpTime)
+ << " rtpTime=" << rtpTime
+ << " seq=" << seqNum;
+ }
+
if (mFirstAccessUnit) {
mFirstAccessUnit = false;
mFirstAccessUnitNTP = ntpTime;
@@ -414,6 +506,11 @@ struct MyHandler : public AHandler {
case 'see1':
{
+ // Session is paused now.
+ for (size_t i = 0; i < mTracks.size(); ++i) {
+ mTracks.editItemAt(i).mPacketSource->flushQueue();
+ }
+
int64_t timeUs;
CHECK(msg->findInt64("time", &timeUs));
@@ -440,15 +537,13 @@ struct MyHandler : public AHandler {
{
CHECK(mSeekPending);
- LOG(INFO) << "seek completed.";
- mSeekPending = false;
-
int32_t result;
CHECK(msg->findInt32("result", &result));
- if (result != OK) {
- LOG(ERROR) << "seek FAILED";
- break;
- }
+
+ LOG(INFO) << "PLAY completed with result "
+ << result << " (" << strerror(-result) << ")";
+
+ CHECK_EQ(result, (status_t)OK);
sp<RefBase> obj;
CHECK(msg->findObject("response", &obj));
@@ -457,9 +552,10 @@ struct MyHandler : public AHandler {
CHECK_EQ(response->mStatusCode, 200u);
- for (size_t i = 0; i < mTracks.size(); ++i) {
- mTracks.editItemAt(i).mPacketSource->flushQueue();
- }
+ parsePlayResponse(response);
+
+ LOG(INFO) << "seek completed.";
+ mSeekPending = false;
break;
}
@@ -491,6 +587,90 @@ struct MyHandler : public AHandler {
}
}
+ static void SplitString(
+ const AString &s, const char *separator, List<AString> *items) {
+ items->clear();
+ size_t start = 0;
+ while (start < s.size()) {
+ ssize_t offset = s.find(separator, start);
+
+ if (offset < 0) {
+ items->push_back(AString(s, start, s.size() - start));
+ break;
+ }
+
+ items->push_back(AString(s, start, offset - start));
+ start = offset + strlen(separator);
+ }
+ }
+
+ void parsePlayResponse(const sp<ARTSPResponse> &response) {
+ ssize_t i = response->mHeaders.indexOfKey("range");
+ if (i < 0) {
+ // Server doesn't even tell use what range it is going to
+ // play, therefore we won't support seeking.
+ return;
+ }
+
+ AString range = response->mHeaders.valueAt(i);
+ LOG(VERBOSE) << "Range: " << range;
+
+ AString val;
+ CHECK(GetAttribute(range.c_str(), "npt", &val));
+ float npt1, npt2;
+
+ if (val == "now-") {
+ // This is a live stream and therefore not seekable.
+ return;
+ } else {
+ CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2);
+ }
+
+ i = response->mHeaders.indexOfKey("rtp-info");
+ CHECK_GE(i, 0);
+
+ AString rtpInfo = response->mHeaders.valueAt(i);
+ List<AString> streamInfos;
+ SplitString(rtpInfo, ",", &streamInfos);
+
+ int n = 1;
+ for (List<AString>::iterator it = streamInfos.begin();
+ it != streamInfos.end(); ++it) {
+ (*it).trim();
+ LOG(VERBOSE) << "streamInfo[" << n << "] = " << *it;
+
+ CHECK(GetAttribute((*it).c_str(), "url", &val));
+
+ size_t trackIndex = 0;
+ while (trackIndex < mTracks.size()
+ && !(val == mTracks.editItemAt(trackIndex).mURL)) {
+ ++trackIndex;
+ }
+ CHECK_LT(trackIndex, mTracks.size());
+
+ CHECK(GetAttribute((*it).c_str(), "seq", &val));
+
+ char *end;
+ unsigned long seq = strtoul(val.c_str(), &end, 10);
+
+ TrackInfo *info = &mTracks.editItemAt(trackIndex);
+ info->mFirstSeqNumInSegment = seq;
+ info->mNewSegment = true;
+
+ CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
+
+ uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
+
+ LOG(VERBOSE) << "track #" << n
+ << ": rtpTime=" << rtpTime << " <=> npt=" << npt1;
+
+ info->mPacketSource->setNormalPlayTimeMapping(
+ rtpTime, (int64_t)(npt1 * 1E6));
+
+ ++n;
+ }
+ }
+
sp<APacketSource> getPacketSource(size_t index) {
CHECK_GE(index, 0u);
CHECK_LT(index, mTracks.size());
@@ -515,11 +695,16 @@ private:
bool mSeekPending;
bool mFirstAccessUnit;
uint64_t mFirstAccessUnitNTP;
+ int64_t mNumAccessUnitsReceived;
+ bool mCheckPending;
struct TrackInfo {
+ AString mURL;
int mRTPSocket;
int mRTCPSocket;
bool mUsingInterleavedTCP;
+ uint32_t mFirstSeqNumInSegment;
+ bool mNewSegment;
sp<APacketSource> mPacketSource;
};
@@ -549,8 +734,13 @@ private:
mTracks.push(TrackInfo());
TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
+ info->mURL = trackURL;
info->mPacketSource = source;
info->mUsingInterleavedTCP = false;
+ info->mFirstSeqNumInSegment = 0;
+ info->mNewSegment = true;
+
+ LOG(VERBOSE) << "track #" << mTracks.size() << " URL=" << trackURL;
AString request = "SETUP ";
request.append(trackURL);