diff options
Diffstat (limited to 'media/libstagefright/rtsp/MyHandler.h')
-rw-r--r-- | media/libstagefright/rtsp/MyHandler.h | 1522 |
1 files changed, 0 insertions, 1522 deletions
diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h deleted file mode 100644 index deee30f..0000000 --- a/media/libstagefright/rtsp/MyHandler.h +++ /dev/null @@ -1,1522 +0,0 @@ -/* - * Copyright (C) 2010 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef MY_HANDLER_H_ - -#define MY_HANDLER_H_ - -//#define LOG_NDEBUG 0 -#define LOG_TAG "MyHandler" -#include <utils/Log.h> - -#include "APacketSource.h" -#include "ARTPConnection.h" -#include "ARTSPConnection.h" -#include "ASessionDescription.h" - -#include <ctype.h> -#include <cutils/properties.h> - -#include <media/stagefright/foundation/ABuffer.h> -#include <media/stagefright/foundation/ADebug.h> -#include <media/stagefright/foundation/ALooper.h> -#include <media/stagefright/foundation/AMessage.h> -#include <media/stagefright/MediaErrors.h> - -#include <arpa/inet.h> -#include <sys/socket.h> -#include <netdb.h> - -#include "HTTPBase.h" - -// If no access units are received within 5 secs, assume that the rtp -// stream has ended and signal end of stream. -static int64_t kAccessUnitTimeoutUs = 10000000ll; - -// If no access units arrive for the first 10 secs after starting the -// stream, assume none ever will and signal EOS or switch transports. -static int64_t kStartupTimeoutUs = 10000000ll; - -static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; - -namespace android { - -static void MakeUserAgentString(AString *s) { - s->setTo("stagefright/1.1 (Linux;Android "); - -#if (PROPERTY_VALUE_MAX < 8) -#error "PROPERTY_VALUE_MAX must be at least 8" -#endif - - char value[PROPERTY_VALUE_MAX]; - property_get("ro.build.version.release", value, "Unknown"); - s->append(value); - s->append(")"); -} - -static bool GetAttribute(const char *s, const char *key, AString *value) { - value->clear(); - - size_t keyLen = strlen(key); - - for (;;) { - while (isspace(*s)) { - ++s; - } - - const char *colonPos = strchr(s, ';'); - - size_t len = - (colonPos == NULL) ? strlen(s) : colonPos - s; - - if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { - value->setTo(&s[keyLen + 1], len - keyLen - 1); - return true; - } - - if (colonPos == NULL) { - return false; - } - - s = colonPos + 1; - } -} - -struct MyHandler : public AHandler { - enum { - kWhatConnected = 'conn', - kWhatDisconnected = 'disc', - kWhatSeekDone = 'sdon', - - kWhatAccessUnit = 'accU', - kWhatEOS = 'eos!', - kWhatSeekDiscontinuity = 'seeD', - kWhatNormalPlayTimeMapping = 'nptM', - }; - - MyHandler( - const char *url, - const sp<AMessage> ¬ify, - bool uidValid = false, uid_t uid = 0) - : mNotify(notify), - mUIDValid(uidValid), - mUID(uid), - mNetLooper(new ALooper), - mConn(new ARTSPConnection(mUIDValid, mUID)), - mRTPConn(new ARTPConnection), - mOriginalSessionURL(url), - mSessionURL(url), - mSetupTracksSuccessful(false), - mSeekPending(false), - mFirstAccessUnit(true), - mAllTracksHaveTime(false), - mNTPAnchorUs(-1), - mMediaAnchorUs(-1), - mLastMediaTimeUs(0), - mNumAccessUnitsReceived(0), - mCheckPending(false), - mCheckGeneration(0), - mTryTCPInterleaving(false), - mTryFakeRTCP(false), - mReceivedFirstRTCPPacket(false), - mReceivedFirstRTPPacket(false), - mSeekable(false), - mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), - mKeepAliveGeneration(0) { - mNetLooper->setName("rtsp net"); - mNetLooper->start(false /* runOnCallingThread */, - false /* canCallJava */, - PRIORITY_HIGHEST); - - // Strip any authentication info from the session url, we don't - // want to transmit user/pass in cleartext. - AString host, path, user, pass; - unsigned port; - CHECK(ARTSPConnection::ParseURL( - mSessionURL.c_str(), &host, &port, &path, &user, &pass)); - - if (user.size() > 0) { - mSessionURL.clear(); - mSessionURL.append("rtsp://"); - mSessionURL.append(host); - mSessionURL.append(":"); - mSessionURL.append(StringPrintf("%u", port)); - mSessionURL.append(path); - - ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); - } - - mSessionHost = host; - } - - void connect() { - looper()->registerHandler(mConn); - (1 ? mNetLooper : looper())->registerHandler(mRTPConn); - - sp<AMessage> notify = new AMessage('biny', id()); - mConn->observeBinaryData(notify); - - sp<AMessage> reply = new AMessage('conn', id()); - mConn->connect(mOriginalSessionURL.c_str(), reply); - } - - void disconnect() { - (new AMessage('abor', id()))->post(); - } - - void seek(int64_t timeUs) { - sp<AMessage> msg = new AMessage('seek', id()); - msg->setInt64("time", timeUs); - msg->post(); - } - - static void addRR(const sp<ABuffer> &buf) { - uint8_t *ptr = buf->data() + buf->size(); - ptr[0] = 0x80 | 0; - ptr[1] = 201; // RR - ptr[2] = 0; - ptr[3] = 1; - ptr[4] = 0xde; // SSRC - ptr[5] = 0xad; - ptr[6] = 0xbe; - ptr[7] = 0xef; - - buf->setRange(0, buf->size() + 8); - } - - static void addSDES(int s, const sp<ABuffer> &buffer) { - struct sockaddr_in addr; - socklen_t addrSize = sizeof(addr); - CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); - - uint8_t *data = buffer->data() + buffer->size(); - data[0] = 0x80 | 1; - data[1] = 202; // SDES - data[4] = 0xde; // SSRC - data[5] = 0xad; - data[6] = 0xbe; - data[7] = 0xef; - - size_t offset = 8; - - data[offset++] = 1; // CNAME - - AString cname = "stagefright@"; - cname.append(inet_ntoa(addr.sin_addr)); - data[offset++] = cname.size(); - - memcpy(&data[offset], cname.c_str(), cname.size()); - offset += cname.size(); - - data[offset++] = 6; // TOOL - - AString tool; - MakeUserAgentString(&tool); - - data[offset++] = tool.size(); - - memcpy(&data[offset], tool.c_str(), tool.size()); - offset += tool.size(); - - data[offset++] = 0; - - if ((offset % 4) > 0) { - size_t count = 4 - (offset % 4); - switch (count) { - case 3: - data[offset++] = 0; - case 2: - data[offset++] = 0; - case 1: - data[offset++] = 0; - } - } - - size_t numWords = (offset / 4) - 1; - data[2] = numWords >> 8; - data[3] = numWords & 0xff; - - buffer->setRange(buffer->offset(), buffer->size() + offset); - } - - // In case we're behind NAT, fire off two UDP packets to the remote - // rtp/rtcp ports to poke a hole into the firewall for future incoming - // packets. We're going to send an RR/SDES RTCP packet to both of them. - bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { - struct sockaddr_in addr; - memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); - addr.sin_family = AF_INET; - - AString source; - AString server_port; - if (!GetAttribute(transport.c_str(), - "source", - &source)) { - ALOGW("Missing 'source' field in Transport response. Using " - "RTSP endpoint address."); - - struct hostent *ent = gethostbyname(mSessionHost.c_str()); - if (ent == NULL) { - ALOGE("Failed to look up address of session host '%s'", - mSessionHost.c_str()); - - return false; - } - - addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; - } else { - addr.sin_addr.s_addr = inet_addr(source.c_str()); - } - - if (!GetAttribute(transport.c_str(), - "server_port", - &server_port)) { - ALOGI("Missing 'server_port' field in Transport response."); - return false; - } - - int rtpPort, rtcpPort; - if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 - || rtpPort <= 0 || rtpPort > 65535 - || rtcpPort <=0 || rtcpPort > 65535 - || rtcpPort != rtpPort + 1) { - ALOGE("Server picked invalid RTP/RTCP port pair %s," - " RTP port must be even, RTCP port must be one higher.", - server_port.c_str()); - - return false; - } - - if (rtpPort & 1) { - ALOGW("Server picked an odd RTP port, it should've picked an " - "even one, we'll let it pass for now, but this may break " - "in the future."); - } - - if (addr.sin_addr.s_addr == INADDR_NONE) { - return true; - } - - if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { - // No firewalls to traverse on the loopback interface. - return true; - } - - // Make up an RR/SDES RTCP packet. - sp<ABuffer> buf = new ABuffer(65536); - buf->setRange(0, 0); - addRR(buf); - addSDES(rtpSocket, buf); - - addr.sin_port = htons(rtpPort); - - ssize_t n = sendto( - rtpSocket, buf->data(), buf->size(), 0, - (const sockaddr *)&addr, sizeof(addr)); - - if (n < (ssize_t)buf->size()) { - ALOGE("failed to poke a hole for RTP packets"); - return false; - } - - addr.sin_port = htons(rtcpPort); - - n = sendto( - rtcpSocket, buf->data(), buf->size(), 0, - (const sockaddr *)&addr, sizeof(addr)); - - if (n < (ssize_t)buf->size()) { - ALOGE("failed to poke a hole for RTCP packets"); - return false; - } - - ALOGV("successfully poked holes."); - - return true; - } - - virtual void onMessageReceived(const sp<AMessage> &msg) { - switch (msg->what()) { - case 'conn': - { - int32_t result; - CHECK(msg->findInt32("result", &result)); - - ALOGI("connection request completed with result %d (%s)", - result, strerror(-result)); - - if (result == OK) { - AString request; - request = "DESCRIBE "; - request.append(mSessionURL); - request.append(" RTSP/1.0\r\n"); - request.append("Accept: application/sdp\r\n"); - request.append("\r\n"); - - sp<AMessage> reply = new AMessage('desc', id()); - mConn->sendRequest(request.c_str(), reply); - } else { - (new AMessage('disc', id()))->post(); - } - break; - } - - case 'disc': - { - ++mKeepAliveGeneration; - - int32_t reconnect; - if (msg->findInt32("reconnect", &reconnect) && reconnect) { - sp<AMessage> reply = new AMessage('conn', id()); - mConn->connect(mOriginalSessionURL.c_str(), reply); - } else { - (new AMessage('quit', id()))->post(); - } - break; - } - - case 'desc': - { - int32_t result; - CHECK(msg->findInt32("result", &result)); - - ALOGI("DESCRIBE completed with result %d (%s)", - result, strerror(-result)); - - if (result == OK) { - sp<RefBase> obj; - CHECK(msg->findObject("response", &obj)); - sp<ARTSPResponse> response = - static_cast<ARTSPResponse *>(obj.get()); - - if (response->mStatusCode == 302) { - ssize_t i = response->mHeaders.indexOfKey("location"); - CHECK_GE(i, 0); - - mSessionURL = response->mHeaders.valueAt(i); - - AString request; - request = "DESCRIBE "; - request.append(mSessionURL); - request.append(" RTSP/1.0\r\n"); - request.append("Accept: application/sdp\r\n"); - request.append("\r\n"); - - sp<AMessage> reply = new AMessage('desc', id()); - mConn->sendRequest(request.c_str(), reply); - break; - } - - if (response->mStatusCode != 200) { - result = UNKNOWN_ERROR; - } else { - mSessionDesc = new ASessionDescription; - - mSessionDesc->setTo( - response->mContent->data(), - response->mContent->size()); - - if (!mSessionDesc->isValid()) { - ALOGE("Failed to parse session description."); - result = ERROR_MALFORMED; - } else { - ssize_t i = response->mHeaders.indexOfKey("content-base"); - if (i >= 0) { - mBaseURL = response->mHeaders.valueAt(i); - } else { - i = response->mHeaders.indexOfKey("content-location"); - if (i >= 0) { - mBaseURL = response->mHeaders.valueAt(i); - } else { - mBaseURL = mSessionURL; - } - } - - if (!mBaseURL.startsWith("rtsp://")) { - // Some misbehaving servers specify a relative - // URL in one of the locations above, combine - // it with the absolute session URL to get - // something usable... - - ALOGW("Server specified a non-absolute base URL" - ", combining it with the session URL to " - "get something usable..."); - - AString tmp; - CHECK(MakeURL( - mSessionURL.c_str(), - mBaseURL.c_str(), - &tmp)); - - mBaseURL = tmp; - } - - if (mSessionDesc->countTracks() < 2) { - // There's no actual tracks in this session. - // The first "track" is merely session meta - // data. - - ALOGW("Session doesn't contain any playable " - "tracks. Aborting."); - result = ERROR_UNSUPPORTED; - } else { - setupTrack(1); - } - } - } - } - - if (result != OK) { - sp<AMessage> reply = new AMessage('disc', id()); - mConn->disconnect(reply); - } - break; - } - - case 'setu': - { - size_t index; - CHECK(msg->findSize("index", &index)); - - TrackInfo *track = NULL; - size_t trackIndex; - if (msg->findSize("track-index", &trackIndex)) { - track = &mTracks.editItemAt(trackIndex); - } - - int32_t result; - CHECK(msg->findInt32("result", &result)); - - ALOGI("SETUP(%d) completed with result %d (%s)", - index, result, strerror(-result)); - - if (result == OK) { - CHECK(track != NULL); - - sp<RefBase> obj; - CHECK(msg->findObject("response", &obj)); - sp<ARTSPResponse> response = - static_cast<ARTSPResponse *>(obj.get()); - - if (response->mStatusCode != 200) { - result = UNKNOWN_ERROR; - } else { - ssize_t i = response->mHeaders.indexOfKey("session"); - CHECK_GE(i, 0); - - mSessionID = response->mHeaders.valueAt(i); - - mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; - AString timeoutStr; - if (GetAttribute( - mSessionID.c_str(), "timeout", &timeoutStr)) { - char *end; - unsigned long timeoutSecs = - strtoul(timeoutStr.c_str(), &end, 10); - - if (end == timeoutStr.c_str() || *end != '\0') { - ALOGW("server specified malformed timeout '%s'", - timeoutStr.c_str()); - - mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; - } else if (timeoutSecs < 15) { - ALOGW("server specified too short a timeout " - "(%lu secs), using default.", - timeoutSecs); - - mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; - } else { - mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; - - ALOGI("server specified timeout of %lu secs.", - timeoutSecs); - } - } - - i = mSessionID.find(";"); - if (i >= 0) { - // Remove options, i.e. ";timeout=90" - mSessionID.erase(i, mSessionID.size() - i); - } - - sp<AMessage> notify = new AMessage('accu', id()); - notify->setSize("track-index", trackIndex); - - i = response->mHeaders.indexOfKey("transport"); - CHECK_GE(i, 0); - - if (!track->mUsingInterleavedTCP) { - AString transport = response->mHeaders.valueAt(i); - - // We are going to continue even if we were - // unable to poke a hole into the firewall... - pokeAHole( - track->mRTPSocket, - track->mRTCPSocket, - transport); - } - - mRTPConn->addStream( - track->mRTPSocket, track->mRTCPSocket, - mSessionDesc, index, - notify, track->mUsingInterleavedTCP); - - mSetupTracksSuccessful = true; - } - } - - if (result != OK) { - if (track) { - if (!track->mUsingInterleavedTCP) { - // Clear the tag - if (mUIDValid) { - HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); - HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); - } - - close(track->mRTPSocket); - close(track->mRTCPSocket); - } - - mTracks.removeItemsAt(trackIndex); - } - } - - ++index; - if (index < mSessionDesc->countTracks()) { - setupTrack(index); - } else if (mSetupTracksSuccessful) { - ++mKeepAliveGeneration; - postKeepAlive(); - - AString request = "PLAY "; - request.append(mSessionURL); - request.append(" RTSP/1.0\r\n"); - - request.append("Session: "); - request.append(mSessionID); - request.append("\r\n"); - - request.append("\r\n"); - - sp<AMessage> reply = new AMessage('play', id()); - mConn->sendRequest(request.c_str(), reply); - } else { - sp<AMessage> reply = new AMessage('disc', id()); - mConn->disconnect(reply); - } - break; - } - - case 'play': - { - int32_t result; - CHECK(msg->findInt32("result", &result)); - - ALOGI("PLAY completed with result %d (%s)", - result, strerror(-result)); - - if (result == OK) { - sp<RefBase> obj; - CHECK(msg->findObject("response", &obj)); - sp<ARTSPResponse> response = - static_cast<ARTSPResponse *>(obj.get()); - - if (response->mStatusCode != 200) { - result = UNKNOWN_ERROR; - } else { - parsePlayResponse(response); - - sp<AMessage> timeout = new AMessage('tiou', id()); - timeout->post(kStartupTimeoutUs); - } - } - - if (result != OK) { - sp<AMessage> reply = new AMessage('disc', id()); - mConn->disconnect(reply); - } - - break; - } - - case 'aliv': - { - int32_t generation; - CHECK(msg->findInt32("generation", &generation)); - - if (generation != mKeepAliveGeneration) { - // obsolete event. - break; - } - - AString request; - request.append("OPTIONS "); - request.append(mSessionURL); - request.append(" RTSP/1.0\r\n"); - request.append("Session: "); - request.append(mSessionID); - request.append("\r\n"); - request.append("\r\n"); - - sp<AMessage> reply = new AMessage('opts', id()); - reply->setInt32("generation", mKeepAliveGeneration); - mConn->sendRequest(request.c_str(), reply); - break; - } - - case 'opts': - { - int32_t result; - CHECK(msg->findInt32("result", &result)); - - ALOGI("OPTIONS completed with result %d (%s)", - result, strerror(-result)); - - int32_t generation; - CHECK(msg->findInt32("generation", &generation)); - - if (generation != mKeepAliveGeneration) { - // obsolete event. - break; - } - - postKeepAlive(); - break; - } - - case 'abor': - { - for (size_t i = 0; i < mTracks.size(); ++i) { - TrackInfo *info = &mTracks.editItemAt(i); - - if (!mFirstAccessUnit) { - postQueueEOS(i, ERROR_END_OF_STREAM); - } - - if (!info->mUsingInterleavedTCP) { - mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); - - // Clear the tag - if (mUIDValid) { - HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); - HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); - } - - close(info->mRTPSocket); - close(info->mRTCPSocket); - } - } - mTracks.clear(); - mSetupTracksSuccessful = false; - mSeekPending = false; - mFirstAccessUnit = true; - mAllTracksHaveTime = false; - mNTPAnchorUs = -1; - mMediaAnchorUs = -1; - mNumAccessUnitsReceived = 0; - mReceivedFirstRTCPPacket = false; - mReceivedFirstRTPPacket = false; - mSeekable = false; - - sp<AMessage> reply = new AMessage('tear', id()); - - int32_t reconnect; - if (msg->findInt32("reconnect", &reconnect) && reconnect) { - reply->setInt32("reconnect", true); - } - - AString request; - request = "TEARDOWN "; - - // XXX should use aggregate url from SDP here... - request.append(mSessionURL); - request.append(" RTSP/1.0\r\n"); - - request.append("Session: "); - request.append(mSessionID); - request.append("\r\n"); - - request.append("\r\n"); - - mConn->sendRequest(request.c_str(), reply); - break; - } - - case 'tear': - { - int32_t result; - CHECK(msg->findInt32("result", &result)); - - ALOGI("TEARDOWN completed with result %d (%s)", - result, strerror(-result)); - - sp<AMessage> reply = new AMessage('disc', id()); - - int32_t reconnect; - if (msg->findInt32("reconnect", &reconnect) && reconnect) { - reply->setInt32("reconnect", true); - } - - mConn->disconnect(reply); - break; - } - - case 'quit': - { - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatDisconnected); - msg->setInt32("result", UNKNOWN_ERROR); - msg->post(); - break; - } - - case 'chek': - { - int32_t generation; - CHECK(msg->findInt32("generation", &generation)); - if (generation != mCheckGeneration) { - // This is an outdated message. Ignore. - break; - } - - if (mNumAccessUnitsReceived == 0) { -#if 1 - ALOGI("stream ended? aborting."); - (new AMessage('abor', id()))->post(); - break; -#else - ALOGI("haven't seen an AU in a looong time."); -#endif - } - - mNumAccessUnitsReceived = 0; - msg->post(kAccessUnitTimeoutUs); - break; - } - - case 'accu': - { - int32_t timeUpdate; - if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { - size_t trackIndex; - CHECK(msg->findSize("track-index", &trackIndex)); - - uint32_t rtpTime; - uint64_t ntpTime; - CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); - CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); - - onTimeUpdate(trackIndex, rtpTime, ntpTime); - break; - } - - int32_t first; - if (msg->findInt32("first-rtcp", &first)) { - mReceivedFirstRTCPPacket = true; - break; - } - - if (msg->findInt32("first-rtp", &first)) { - mReceivedFirstRTPPacket = true; - break; - } - - ++mNumAccessUnitsReceived; - postAccessUnitTimeoutCheck(); - - size_t trackIndex; - CHECK(msg->findSize("track-index", &trackIndex)); - - if (trackIndex >= mTracks.size()) { - ALOGV("late packets ignored."); - break; - } - - TrackInfo *track = &mTracks.editItemAt(trackIndex); - - int32_t eos; - if (msg->findInt32("eos", &eos)) { - ALOGI("received BYE on track index %d", trackIndex); -#if 0 - track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); -#endif - return; - } - - sp<ABuffer> accessUnit; - CHECK(msg->findBuffer("access-unit", &accessUnit)); - - uint32_t seqNum = (uint32_t)accessUnit->int32Data(); - - if (mSeekPending) { - ALOGV("we're seeking, dropping stale packet."); - break; - } - - if (seqNum < track->mFirstSeqNumInSegment) { - ALOGV("dropping stale access-unit (%d < %d)", - seqNum, track->mFirstSeqNumInSegment); - break; - } - - if (track->mNewSegment) { - track->mNewSegment = false; - } - - onAccessUnitComplete(trackIndex, accessUnit); - break; - } - - case 'seek': - { - if (!mSeekable) { - ALOGW("This is a live stream, ignoring seek request."); - - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatSeekDone); - msg->post(); - break; - } - - int64_t timeUs; - CHECK(msg->findInt64("time", &timeUs)); - - mSeekPending = true; - - // Disable the access unit timeout until we resumed - // playback again. - mCheckPending = true; - ++mCheckGeneration; - - AString request = "PAUSE "; - request.append(mSessionURL); - request.append(" RTSP/1.0\r\n"); - - request.append("Session: "); - request.append(mSessionID); - request.append("\r\n"); - - request.append("\r\n"); - - sp<AMessage> reply = new AMessage('see1', id()); - reply->setInt64("time", timeUs); - mConn->sendRequest(request.c_str(), reply); - break; - } - - case 'see1': - { - // Session is paused now. - for (size_t i = 0; i < mTracks.size(); ++i) { - TrackInfo *info = &mTracks.editItemAt(i); - - postQueueSeekDiscontinuity(i); - - info->mRTPAnchor = 0; - info->mNTPAnchorUs = -1; - } - - mAllTracksHaveTime = false; - mNTPAnchorUs = -1; - - int64_t timeUs; - CHECK(msg->findInt64("time", &timeUs)); - - AString request = "PLAY "; - request.append(mSessionURL); - request.append(" RTSP/1.0\r\n"); - - request.append("Session: "); - request.append(mSessionID); - request.append("\r\n"); - - request.append( - StringPrintf( - "Range: npt=%lld-\r\n", timeUs / 1000000ll)); - - request.append("\r\n"); - - sp<AMessage> reply = new AMessage('see2', id()); - mConn->sendRequest(request.c_str(), reply); - break; - } - - case 'see2': - { - CHECK(mSeekPending); - - int32_t result; - CHECK(msg->findInt32("result", &result)); - - ALOGI("PLAY completed with result %d (%s)", - result, strerror(-result)); - - mCheckPending = false; - postAccessUnitTimeoutCheck(); - - if (result == OK) { - sp<RefBase> obj; - CHECK(msg->findObject("response", &obj)); - sp<ARTSPResponse> response = - static_cast<ARTSPResponse *>(obj.get()); - - if (response->mStatusCode != 200) { - result = UNKNOWN_ERROR; - } else { - parsePlayResponse(response); - - ssize_t i = response->mHeaders.indexOfKey("rtp-info"); - CHECK_GE(i, 0); - - ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); - - ALOGI("seek completed."); - } - } - - if (result != OK) { - ALOGE("seek failed, aborting."); - (new AMessage('abor', id()))->post(); - } - - mSeekPending = false; - - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatSeekDone); - msg->post(); - break; - } - - case 'biny': - { - sp<ABuffer> buffer; - CHECK(msg->findBuffer("buffer", &buffer)); - - int32_t index; - CHECK(buffer->meta()->findInt32("index", &index)); - - mRTPConn->injectPacket(index, buffer); - break; - } - - case 'tiou': - { - if (!mReceivedFirstRTCPPacket) { - if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { - ALOGW("We received RTP packets but no RTCP packets, " - "using fake timestamps."); - - mTryFakeRTCP = true; - - mReceivedFirstRTCPPacket = true; - - fakeTimestamps(); - } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { - ALOGW("Never received any data, switching transports."); - - mTryTCPInterleaving = true; - - sp<AMessage> msg = new AMessage('abor', id()); - msg->setInt32("reconnect", true); - msg->post(); - } else { - ALOGW("Never received any data, disconnecting."); - (new AMessage('abor', id()))->post(); - } - } else { - if (!mAllTracksHaveTime) { - ALOGW("We received some RTCP packets, but time " - "could not be established on all tracks, now " - "using fake timestamps"); - - fakeTimestamps(); - } - } - break; - } - - default: - TRESPASS(); - break; - } - } - - void postKeepAlive() { - sp<AMessage> msg = new AMessage('aliv', id()); - msg->setInt32("generation", mKeepAliveGeneration); - msg->post((mKeepAliveTimeoutUs * 9) / 10); - } - - void postAccessUnitTimeoutCheck() { - if (mCheckPending) { - return; - } - - mCheckPending = true; - sp<AMessage> check = new AMessage('chek', id()); - check->setInt32("generation", mCheckGeneration); - check->post(kAccessUnitTimeoutUs); - } - - static void SplitString( - const AString &s, const char *separator, List<AString> *items) { - items->clear(); - size_t start = 0; - while (start < s.size()) { - ssize_t offset = s.find(separator, start); - - if (offset < 0) { - items->push_back(AString(s, start, s.size() - start)); - break; - } - - items->push_back(AString(s, start, offset - start)); - start = offset + strlen(separator); - } - } - - void parsePlayResponse(const sp<ARTSPResponse> &response) { - mSeekable = false; - - ssize_t i = response->mHeaders.indexOfKey("range"); - if (i < 0) { - // Server doesn't even tell use what range it is going to - // play, therefore we won't support seeking. - return; - } - - AString range = response->mHeaders.valueAt(i); - ALOGV("Range: %s", range.c_str()); - - AString val; - CHECK(GetAttribute(range.c_str(), "npt", &val)); - - float npt1, npt2; - if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { - // This is a live stream and therefore not seekable. - - ALOGI("This is a live stream"); - return; - } - - i = response->mHeaders.indexOfKey("rtp-info"); - CHECK_GE(i, 0); - - AString rtpInfo = response->mHeaders.valueAt(i); - List<AString> streamInfos; - SplitString(rtpInfo, ",", &streamInfos); - - int n = 1; - for (List<AString>::iterator it = streamInfos.begin(); - it != streamInfos.end(); ++it) { - (*it).trim(); - ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); - - CHECK(GetAttribute((*it).c_str(), "url", &val)); - - size_t trackIndex = 0; - while (trackIndex < mTracks.size() - && !(val == mTracks.editItemAt(trackIndex).mURL)) { - ++trackIndex; - } - CHECK_LT(trackIndex, mTracks.size()); - - CHECK(GetAttribute((*it).c_str(), "seq", &val)); - - char *end; - unsigned long seq = strtoul(val.c_str(), &end, 10); - - TrackInfo *info = &mTracks.editItemAt(trackIndex); - info->mFirstSeqNumInSegment = seq; - info->mNewSegment = true; - - CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); - - uint32_t rtpTime = strtoul(val.c_str(), &end, 10); - - ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); - - info->mNormalPlayTimeRTP = rtpTime; - info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); - - if (!mFirstAccessUnit) { - postNormalPlayTimeMapping( - trackIndex, - info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); - } - - ++n; - } - - mSeekable = true; - } - - sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { - CHECK_GE(index, 0u); - CHECK_LT(index, mTracks.size()); - - const TrackInfo &info = mTracks.itemAt(index); - - *timeScale = info.mTimeScale; - - return info.mPacketSource->getFormat(); - } - - size_t countTracks() const { - return mTracks.size(); - } - -private: - struct TrackInfo { - AString mURL; - int mRTPSocket; - int mRTCPSocket; - bool mUsingInterleavedTCP; - uint32_t mFirstSeqNumInSegment; - bool mNewSegment; - - uint32_t mRTPAnchor; - int64_t mNTPAnchorUs; - int32_t mTimeScale; - - uint32_t mNormalPlayTimeRTP; - int64_t mNormalPlayTimeUs; - - sp<APacketSource> mPacketSource; - - // Stores packets temporarily while no notion of time - // has been established yet. - List<sp<ABuffer> > mPackets; - }; - - sp<AMessage> mNotify; - bool mUIDValid; - uid_t mUID; - sp<ALooper> mNetLooper; - sp<ARTSPConnection> mConn; - sp<ARTPConnection> mRTPConn; - sp<ASessionDescription> mSessionDesc; - AString mOriginalSessionURL; // This one still has user:pass@ - AString mSessionURL; - AString mSessionHost; - AString mBaseURL; - AString mSessionID; - bool mSetupTracksSuccessful; - bool mSeekPending; - bool mFirstAccessUnit; - - bool mAllTracksHaveTime; - int64_t mNTPAnchorUs; - int64_t mMediaAnchorUs; - int64_t mLastMediaTimeUs; - - int64_t mNumAccessUnitsReceived; - bool mCheckPending; - int32_t mCheckGeneration; - bool mTryTCPInterleaving; - bool mTryFakeRTCP; - bool mReceivedFirstRTCPPacket; - bool mReceivedFirstRTPPacket; - bool mSeekable; - int64_t mKeepAliveTimeoutUs; - int32_t mKeepAliveGeneration; - - Vector<TrackInfo> mTracks; - - void setupTrack(size_t index) { - sp<APacketSource> source = - new APacketSource(mSessionDesc, index); - - if (source->initCheck() != OK) { - ALOGW("Unsupported format. Ignoring track #%d.", index); - - sp<AMessage> reply = new AMessage('setu', id()); - reply->setSize("index", index); - reply->setInt32("result", ERROR_UNSUPPORTED); - reply->post(); - return; - } - - AString url; - CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); - - AString trackURL; - CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); - - mTracks.push(TrackInfo()); - TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); - info->mURL = trackURL; - info->mPacketSource = source; - info->mUsingInterleavedTCP = false; - info->mFirstSeqNumInSegment = 0; - info->mNewSegment = true; - info->mRTPAnchor = 0; - info->mNTPAnchorUs = -1; - info->mNormalPlayTimeRTP = 0; - info->mNormalPlayTimeUs = 0ll; - - unsigned long PT; - AString formatDesc; - AString formatParams; - mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); - - int32_t timescale; - int32_t numChannels; - ASessionDescription::ParseFormatDesc( - formatDesc.c_str(), ×cale, &numChannels); - - info->mTimeScale = timescale; - - ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); - - AString request = "SETUP "; - request.append(trackURL); - request.append(" RTSP/1.0\r\n"); - - if (mTryTCPInterleaving) { - size_t interleaveIndex = 2 * (mTracks.size() - 1); - info->mUsingInterleavedTCP = true; - info->mRTPSocket = interleaveIndex; - info->mRTCPSocket = interleaveIndex + 1; - - request.append("Transport: RTP/AVP/TCP;interleaved="); - request.append(interleaveIndex); - request.append("-"); - request.append(interleaveIndex + 1); - } else { - unsigned rtpPort; - ARTPConnection::MakePortPair( - &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); - - if (mUIDValid) { - HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, - (uint32_t)*(uint32_t*) "RTP_"); - HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, - (uint32_t)*(uint32_t*) "RTP_"); - } - - request.append("Transport: RTP/AVP/UDP;unicast;client_port="); - request.append(rtpPort); - request.append("-"); - request.append(rtpPort + 1); - } - - request.append("\r\n"); - - if (index > 1) { - request.append("Session: "); - request.append(mSessionID); - request.append("\r\n"); - } - - request.append("\r\n"); - - sp<AMessage> reply = new AMessage('setu', id()); - reply->setSize("index", index); - reply->setSize("track-index", mTracks.size() - 1); - mConn->sendRequest(request.c_str(), reply); - } - - static bool MakeURL(const char *baseURL, const char *url, AString *out) { - out->clear(); - - if (strncasecmp("rtsp://", baseURL, 7)) { - // Base URL must be absolute - return false; - } - - if (!strncasecmp("rtsp://", url, 7)) { - // "url" is already an absolute URL, ignore base URL. - out->setTo(url); - return true; - } - - size_t n = strlen(baseURL); - if (baseURL[n - 1] == '/') { - out->setTo(baseURL); - out->append(url); - } else { - const char *slashPos = strrchr(baseURL, '/'); - - if (slashPos > &baseURL[6]) { - out->setTo(baseURL, slashPos - baseURL); - } else { - out->setTo(baseURL); - } - - out->append("/"); - out->append(url); - } - - return true; - } - - void fakeTimestamps() { - mNTPAnchorUs = -1ll; - for (size_t i = 0; i < mTracks.size(); ++i) { - onTimeUpdate(i, 0, 0ll); - } - } - - void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { - ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", - trackIndex, rtpTime, ntpTime); - - int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); - - TrackInfo *track = &mTracks.editItemAt(trackIndex); - - track->mRTPAnchor = rtpTime; - track->mNTPAnchorUs = ntpTimeUs; - - if (mNTPAnchorUs < 0) { - mNTPAnchorUs = ntpTimeUs; - mMediaAnchorUs = mLastMediaTimeUs; - } - - if (!mAllTracksHaveTime) { - bool allTracksHaveTime = true; - for (size_t i = 0; i < mTracks.size(); ++i) { - TrackInfo *track = &mTracks.editItemAt(i); - if (track->mNTPAnchorUs < 0) { - allTracksHaveTime = false; - break; - } - } - if (allTracksHaveTime) { - mAllTracksHaveTime = true; - ALOGI("Time now established for all tracks."); - } - } - } - - void onAccessUnitComplete( - int32_t trackIndex, const sp<ABuffer> &accessUnit) { - ALOGV("onAccessUnitComplete track %d", trackIndex); - - if (mFirstAccessUnit) { - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatConnected); - msg->post(); - - if (mSeekable) { - for (size_t i = 0; i < mTracks.size(); ++i) { - TrackInfo *info = &mTracks.editItemAt(i); - - postNormalPlayTimeMapping( - i, - info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); - } - } - - mFirstAccessUnit = false; - } - - TrackInfo *track = &mTracks.editItemAt(trackIndex); - - if (!mAllTracksHaveTime) { - ALOGV("storing accessUnit, no time established yet"); - track->mPackets.push_back(accessUnit); - return; - } - - while (!track->mPackets.empty()) { - sp<ABuffer> accessUnit = *track->mPackets.begin(); - track->mPackets.erase(track->mPackets.begin()); - - if (addMediaTimestamp(trackIndex, track, accessUnit)) { - postQueueAccessUnit(trackIndex, accessUnit); - } - } - - if (addMediaTimestamp(trackIndex, track, accessUnit)) { - postQueueAccessUnit(trackIndex, accessUnit); - } - } - - bool addMediaTimestamp( - int32_t trackIndex, const TrackInfo *track, - const sp<ABuffer> &accessUnit) { - uint32_t rtpTime; - CHECK(accessUnit->meta()->findInt32( - "rtp-time", (int32_t *)&rtpTime)); - - int64_t relRtpTimeUs = - (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) - / track->mTimeScale; - - int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; - - int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; - - if (mediaTimeUs > mLastMediaTimeUs) { - mLastMediaTimeUs = mediaTimeUs; - } - - if (mediaTimeUs < 0) { - ALOGV("dropping early accessUnit."); - return false; - } - - ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", - trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); - - accessUnit->meta()->setInt64("timeUs", mediaTimeUs); - - return true; - } - - void postQueueAccessUnit( - size_t trackIndex, const sp<ABuffer> &accessUnit) { - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatAccessUnit); - msg->setSize("trackIndex", trackIndex); - msg->setBuffer("accessUnit", accessUnit); - msg->post(); - } - - void postQueueEOS(size_t trackIndex, status_t finalResult) { - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatEOS); - msg->setSize("trackIndex", trackIndex); - msg->setInt32("finalResult", finalResult); - msg->post(); - } - - void postQueueSeekDiscontinuity(size_t trackIndex) { - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatSeekDiscontinuity); - msg->setSize("trackIndex", trackIndex); - msg->post(); - } - - void postNormalPlayTimeMapping( - size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatNormalPlayTimeMapping); - msg->setSize("trackIndex", trackIndex); - msg->setInt32("rtpTime", rtpTime); - msg->setInt64("nptUs", nptUs); - msg->post(); - } - - DISALLOW_EVIL_CONSTRUCTORS(MyHandler); -}; - -} // namespace android - -#endif // MY_HANDLER_H_ |