/* * Copyright (C) 2010 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef MY_HANDLER_H_ #define MY_HANDLER_H_ //#define LOG_NDEBUG 0 #ifndef LOG_TAG #define LOG_TAG "MyHandler" #endif #include #include "APacketSource.h" #include "ARTPConnection.h" #include "ARTSPConnection.h" #include "ASessionDescription.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "HTTPBase.h" #if LOG_NDEBUG #define UNUSED_UNLESS_VERBOSE(x) (void)(x) #else #define UNUSED_UNLESS_VERBOSE(x) #endif // If no access units are received within 5 secs, assume that the rtp // stream has ended and signal end of stream. static int64_t kAccessUnitTimeoutUs = 10000000ll; // If no access units arrive for the first 10 secs after starting the // stream, assume none ever will and signal EOS or switch transports. static int64_t kStartupTimeoutUs = 10000000ll; static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; static int64_t kPauseDelayUs = 3000000ll; static int64_t kTearDownTimeoutUs = 3000000ll; namespace android { static bool GetAttribute(const char *s, const char *key, AString *value) { value->clear(); size_t keyLen = strlen(key); for (;;) { while (isspace(*s)) { ++s; } const char *colonPos = strchr(s, ';'); size_t len = (colonPos == NULL) ? strlen(s) : colonPos - s; if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { value->setTo(&s[keyLen + 1], len - keyLen - 1); return true; } if (colonPos == NULL) { return false; } s = colonPos + 1; } } struct MyHandler : public AHandler { enum { kWhatConnected = 'conn', kWhatDisconnected = 'disc', kWhatSeekPaused = 'spau', kWhatSeekDone = 'sdon', kWhatAccessUnit = 'accU', kWhatEOS = 'eos!', kWhatSeekDiscontinuity = 'seeD', kWhatNormalPlayTimeMapping = 'nptM', kWhatCancelCheck = 'canC', kWhatByeReceived = 'byeR', }; MyHandler( const char *url, const sp ¬ify, bool uidValid = false, uid_t uid = 0) : mNotify(notify), mUIDValid(uidValid), mUID(uid), mNetLooper(new ALooper), mOriginalSessionURL(url), mSessionURL(url), mSetupTracksSuccessful(false), mSeekPending(false), mFirstAccessUnit(true), mAllTracksHaveTime(false), mNTPAnchorUs(-1), mMediaAnchorUs(-1), mLastMediaTimeUs(0), mNumAccessUnitsReceived(0), mCheckPending(false), mCheckGeneration(0), mCheckTimeoutGeneration(0), mTryTCPInterleaving(false), mTryFakeRTCP(false), mReceivedFirstRTCPPacket(false), mReceivedFirstRTPPacket(false), mSeekable(true), mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), mKeepAliveGeneration(0), mPausing(false), mPauseGeneration(0), mPlayResponseParsed(false) { mConn = AVMediaServiceFactory::get()->createARTSPConnection( mUIDValid, uid); mRTPConn = AVMediaServiceFactory::get()->createARTPConnection(); mNetLooper->setName("rtsp net"); mNetLooper->start(false /* runOnCallingThread */, false /* canCallJava */, PRIORITY_HIGHEST); char value[PROPERTY_VALUE_MAX] = {0}; property_get("rtsp.transport.TCP", value, "false"); if (!strcasecmp(value, "true")) { mTryTCPInterleaving = true; } else { mTryTCPInterleaving = false; } // Strip any authentication info from the session url, we don't // want to transmit user/pass in cleartext. AString host, path, user, pass; unsigned port; CHECK(ARTSPConnection::ParseURL( mSessionURL.c_str(), &host, &port, &path, &user, &pass)); if (user.size() > 0) { mSessionURL.clear(); mSessionURL.append("rtsp://"); mSessionURL.append(host); mSessionURL.append(":"); mSessionURL.append(AStringPrintf("%u", port)); mSessionURL.append(path); ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); } mSessionHost = host; } void connect() { looper()->registerHandler(mConn); (1 ? mNetLooper : looper())->registerHandler(mRTPConn); sp notify = new AMessage('biny', this); mConn->observeBinaryData(notify); sp reply = new AMessage('conn', this); mConn->connect(mOriginalSessionURL.c_str(), reply); } void loadSDP(const sp& desc) { looper()->registerHandler(mConn); (1 ? mNetLooper : looper())->registerHandler(mRTPConn); sp notify = new AMessage('biny', this); mConn->observeBinaryData(notify); sp reply = new AMessage('sdpl', this); reply->setObject("description", desc); mConn->connect(mOriginalSessionURL.c_str(), reply); } AString getControlURL() { AString sessionLevelControlURL; if (mSessionDesc->findAttribute( 0, "a=control", &sessionLevelControlURL)) { if (sessionLevelControlURL.compare("*") == 0) { return mBaseURL; } else { AString controlURL; CHECK(MakeURL( mBaseURL.c_str(), sessionLevelControlURL.c_str(), &controlURL)); return controlURL; } } else { return mSessionURL; } } void disconnect() { (new AMessage('abor', this))->post(); } void seek(int64_t timeUs) { sp msg = new AMessage('seek', this); msg->setInt64("time", timeUs); mPauseGeneration++; msg->post(); } void continueSeekAfterPause(int64_t timeUs) { sp msg = new AMessage('see1', this); msg->setInt64("time", timeUs); msg->post(); } bool isSeekable() const { return mSeekable; } void pause() { sp msg = new AMessage('paus', this); mPauseGeneration++; msg->setInt32("pausecheck", mPauseGeneration); msg->post(kPauseDelayUs); } void resume() { sp msg = new AMessage('resu', this); mPauseGeneration++; msg->post(); } void cancelTimeoutCheck() { sp msg = new AMessage('canC', this); msg->post(); } static void addRR(const sp &buf) { uint8_t *ptr = buf->data() + buf->size(); ptr[0] = 0x80 | 0; ptr[1] = 201; // RR ptr[2] = 0; ptr[3] = 1; ptr[4] = 0xde; // SSRC ptr[5] = 0xad; ptr[6] = 0xbe; ptr[7] = 0xef; buf->setRange(0, buf->size() + 8); } static void addSDES(int s, const sp &buffer) { struct sockaddr_in addr; socklen_t addrSize = sizeof(addr); if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { inet_aton("0.0.0.0", &(addr.sin_addr)); } uint8_t *data = buffer->data() + buffer->size(); data[0] = 0x80 | 1; data[1] = 202; // SDES data[4] = 0xde; // SSRC data[5] = 0xad; data[6] = 0xbe; data[7] = 0xef; size_t offset = 8; data[offset++] = 1; // CNAME AString cname = "stagefright@"; cname.append(inet_ntoa(addr.sin_addr)); data[offset++] = cname.size(); memcpy(&data[offset], cname.c_str(), cname.size()); offset += cname.size(); data[offset++] = 6; // TOOL AString tool = MakeUserAgent(); data[offset++] = tool.size(); memcpy(&data[offset], tool.c_str(), tool.size()); offset += tool.size(); data[offset++] = 0; if ((offset % 4) > 0) { size_t count = 4 - (offset % 4); switch (count) { case 3: data[offset++] = 0; case 2: data[offset++] = 0; case 1: data[offset++] = 0; } } size_t numWords = (offset / 4) - 1; data[2] = numWords >> 8; data[3] = numWords & 0xff; buffer->setRange(buffer->offset(), buffer->size() + offset); } // In case we're behind NAT, fire off two UDP packets to the remote // rtp/rtcp ports to poke a hole into the firewall for future incoming // packets. We're going to send an RR/SDES RTCP packet to both of them. bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { struct sockaddr_in addr; memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); addr.sin_family = AF_INET; AString source; AString server_port; if (!GetAttribute(transport.c_str(), "source", &source)) { ALOGW("Missing 'source' field in Transport response. Using " "RTSP endpoint address."); struct hostent *ent = gethostbyname(mSessionHost.c_str()); if (ent == NULL) { ALOGE("Failed to look up address of session host '%s'", mSessionHost.c_str()); return false; } addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; } else { addr.sin_addr.s_addr = inet_addr(source.c_str()); } if (!GetAttribute(transport.c_str(), "server_port", &server_port)) { ALOGI("Missing 'server_port' field in Transport response."); return false; } int rtpPort, rtcpPort; if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 || rtpPort <= 0 || rtpPort > 65535 || rtcpPort <=0 || rtcpPort > 65535 || rtcpPort != rtpPort + 1) { ALOGE("Server picked invalid RTP/RTCP port pair %s," " RTP port must be even, RTCP port must be one higher.", server_port.c_str()); return false; } if (rtpPort & 1) { ALOGW("Server picked an odd RTP port, it should've picked an " "even one, we'll let it pass for now, but this may break " "in the future."); } if (addr.sin_addr.s_addr == INADDR_NONE) { return true; } if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { // No firewalls to traverse on the loopback interface. return true; } // Make up an RR/SDES RTCP packet. sp buf = new ABuffer(65536); buf->setRange(0, 0); addRR(buf); addSDES(rtpSocket, buf); addr.sin_port = htons(rtpPort); ssize_t n = sendto( rtpSocket, buf->data(), buf->size(), 0, (const sockaddr *)&addr, sizeof(addr)); if (n < (ssize_t)buf->size()) { ALOGE("failed to poke a hole for RTP packets"); return false; } addr.sin_port = htons(rtcpPort); n = sendto( rtcpSocket, buf->data(), buf->size(), 0, (const sockaddr *)&addr, sizeof(addr)); if (n < (ssize_t)buf->size()) { ALOGE("failed to poke a hole for RTCP packets"); return false; } ALOGV("successfully poked holes."); return true; } static bool isLiveStream(const sp &desc) { AString attrLiveStream; if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { ssize_t semicolonPos = attrLiveStream.find(";", 2); const char* liveStreamValue; if (semicolonPos < 0) { liveStreamValue = attrLiveStream.c_str(); } else { AString valString; valString.setTo(attrLiveStream, semicolonPos + 1, attrLiveStream.size() - semicolonPos - 1); liveStreamValue = valString.c_str(); } uint32_t value = strtoul(liveStreamValue, NULL, 10); if (value == 1) { ALOGV("found live stream"); return true; } } else { // It is a live stream if no duration is returned int64_t durationUs; if (!desc->getDurationUs(&durationUs)) { ALOGV("No duration found, assume live stream"); return true; } } return false; } virtual void onMessageReceived(const sp &msg) { switch (msg->what()) { case 'conn': { int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("connection request completed with result %d (%s)", result, strerror(-result)); if (result == OK) { AString request; request = "DESCRIBE "; request.append(mSessionURL); request.append(" RTSP/1.0\r\n"); request.append("Accept: application/sdp\r\n"); request.append("\r\n"); sp reply = new AMessage('desc', this); mConn->sendRequest(request.c_str(), reply); } else { (new AMessage('disc', this))->post(); } break; } case 'disc': { ++mKeepAliveGeneration; int32_t reconnect; if (msg->findInt32("reconnect", &reconnect) && reconnect) { sp reply = new AMessage('conn', this); mConn->connect(mOriginalSessionURL.c_str(), reply); } else { (new AMessage('quit', this))->post(); } break; } case 'desc': { int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("DESCRIBE completed with result %d (%s)", result, strerror(-result)); if (result == OK) { sp obj; CHECK(msg->findObject("response", &obj)); sp response = static_cast(obj.get()); if (response->mStatusCode == 301 || response->mStatusCode == 302) { ssize_t i = response->mHeaders.indexOfKey("location"); CHECK_GE(i, 0); mOriginalSessionURL = response->mHeaders.valueAt(i); mSessionURL = mOriginalSessionURL; // Strip any authentication info from the session url, we don't // want to transmit user/pass in cleartext. AString host, path, user, pass; unsigned port; if (ARTSPConnection::ParseURL( mSessionURL.c_str(), &host, &port, &path, &user, &pass) && user.size() > 0) { mSessionURL.clear(); mSessionURL.append("rtsp://"); mSessionURL.append(host); mSessionURL.append(":"); mSessionURL.append(AStringPrintf("%u", port)); mSessionURL.append(path); ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); } sp reply = new AMessage('conn', this); mConn->connect(mOriginalSessionURL.c_str(), reply); break; } if (response->mStatusCode != 200) { result = UNKNOWN_ERROR; } else if (response->mContent == NULL) { result = ERROR_MALFORMED; ALOGE("The response has no content."); } else { mSessionDesc = new ASessionDescription; mSessionDesc->setTo( response->mContent->data(), response->mContent->size()); if (!mSessionDesc->isValid()) { ALOGE("Failed to parse session description."); result = ERROR_MALFORMED; } else { ssize_t i = response->mHeaders.indexOfKey("content-base"); if (i >= 0) { mBaseURL = response->mHeaders.valueAt(i); } else { i = response->mHeaders.indexOfKey("content-location"); if (i >= 0) { mBaseURL = response->mHeaders.valueAt(i); } else { mBaseURL = mSessionURL; } } mSeekable = !isLiveStream(mSessionDesc); if (!mBaseURL.startsWith("rtsp://")) { // Some misbehaving servers specify a relative // URL in one of the locations above, combine // it with the absolute session URL to get // something usable... ALOGW("Server specified a non-absolute base URL" ", combining it with the session URL to " "get something usable..."); AString tmp; CHECK(MakeURL( mSessionURL.c_str(), mBaseURL.c_str(), &tmp)); mBaseURL = tmp; } mControlURL = getControlURL(); if (mSessionDesc->countTracks() < 2) { // There's no actual tracks in this session. // The first "track" is merely session meta // data. ALOGW("Session doesn't contain any playable " "tracks. Aborting."); result = ERROR_UNSUPPORTED; } else { setupTrack(1); } } } } if (result != OK) { sp reply = new AMessage('disc', this); mConn->disconnect(reply); } break; } case 'sdpl': { int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("SDP connection request completed with result %d (%s)", result, strerror(-result)); if (result == OK) { sp obj; CHECK(msg->findObject("description", &obj)); mSessionDesc = static_cast(obj.get()); if (!mSessionDesc->isValid()) { ALOGE("Failed to parse session description."); result = ERROR_MALFORMED; } else { mBaseURL = mSessionURL; mSeekable = !isLiveStream(mSessionDesc); mControlURL = getControlURL(); if (mSessionDesc->countTracks() < 2) { // There's no actual tracks in this session. // The first "track" is merely session meta // data. ALOGW("Session doesn't contain any playable " "tracks. Aborting."); result = ERROR_UNSUPPORTED; } else { setupTrack(1); } } } if (result != OK) { sp reply = new AMessage('disc', this); mConn->disconnect(reply); } break; } case 'setu': { size_t index; CHECK(msg->findSize("index", &index)); TrackInfo *track = NULL; size_t trackIndex; if (msg->findSize("track-index", &trackIndex)) { track = &mTracks.editItemAt(trackIndex); } int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("SETUP(%zu) completed with result %d (%s)", index, result, strerror(-result)); if (result == OK) { CHECK(track != NULL); sp obj; CHECK(msg->findObject("response", &obj)); sp response = static_cast(obj.get()); if (response->mStatusCode != 200) { result = UNKNOWN_ERROR; } else { ssize_t i = response->mHeaders.indexOfKey("session"); CHECK_GE(i, 0); mSessionID = response->mHeaders.valueAt(i); mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; AString timeoutStr; if (GetAttribute( mSessionID.c_str(), "timeout", &timeoutStr)) { char *end; unsigned long timeoutSecs = strtoul(timeoutStr.c_str(), &end, 10); if (end == timeoutStr.c_str() || *end != '\0') { ALOGW("server specified malformed timeout '%s'", timeoutStr.c_str()); mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; } else if (timeoutSecs < 15) { ALOGW("server specified too short a timeout " "(%lu secs), using default.", timeoutSecs); mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; } else { mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; ALOGI("server specified timeout of %lu secs.", timeoutSecs); } } AVMediaServiceUtils::get()->setServerTimeoutUs(mKeepAliveTimeoutUs); i = mSessionID.find(";"); if (i >= 0) { // Remove options, i.e. ";timeout=90" mSessionID.erase(i, mSessionID.size() - i); } sp notify = new AMessage('accu', this); notify->setSize("track-index", trackIndex); i = response->mHeaders.indexOfKey("transport"); CHECK_GE(i, 0); if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { if (!track->mUsingInterleavedTCP) { AString transport = response->mHeaders.valueAt(i); // We are going to continue even if we were // unable to poke a hole into the firewall... AVMediaServiceUtils::get()->pokeAHole( this, track->mRTPSocket, track->mRTCPSocket, transport, mSessionHost); } mRTPConn->addStream( track->mRTPSocket, track->mRTCPSocket, mSessionDesc, index, notify, track->mUsingInterleavedTCP, mConn->isIPV6()); mSetupTracksSuccessful = true; } else { result = BAD_VALUE; } } } if (result != OK) { if (track) { if (!track->mUsingInterleavedTCP) { // Clear the tag if (mUIDValid) { HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); } close(track->mRTPSocket); close(track->mRTCPSocket); } mTracks.removeItemsAt(trackIndex); } } ++index; if (result == OK && index < mSessionDesc->countTracks()) { setupTrack(index); } else if (mSetupTracksSuccessful) { ++mKeepAliveGeneration; postKeepAlive(); AString request = "PLAY "; request.append(mControlURL); request.append(" RTSP/1.0\r\n"); request.append("Session: "); request.append(mSessionID); request.append("\r\n"); AVMediaServiceUtils::get()->appendRange(&request); request.append("\r\n"); sp reply = new AMessage('play', this); mConn->sendRequest(request.c_str(), reply); } else { sp reply = new AMessage('disc', this); mConn->disconnect(reply); } break; } case 'play': { int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("PLAY completed with result %d (%s)", result, strerror(-result)); if (result == OK) { sp obj; CHECK(msg->findObject("response", &obj)); sp response = static_cast(obj.get()); if (response->mStatusCode != 200) { result = UNKNOWN_ERROR; } else { parsePlayResponse(response); sp timeout = new AMessage('tiou', this); mCheckTimeoutGeneration++; timeout->setInt32("tioucheck", mCheckTimeoutGeneration); timeout->post(kStartupTimeoutUs); } } if (result != OK) { sp reply = new AMessage('disc', this); mConn->disconnect(reply); } break; } case 'aliv': { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mKeepAliveGeneration) { // obsolete event. break; } AString request; request.append("OPTIONS "); request.append(mSessionURL); request.append(" RTSP/1.0\r\n"); request.append("Session: "); request.append(mSessionID); request.append("\r\n"); request.append("\r\n"); sp reply = new AMessage('opts', this); reply->setInt32("generation", mKeepAliveGeneration); mConn->sendRequest(request.c_str(), reply); break; } case 'opts': { int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("OPTIONS completed with result %d (%s)", result, strerror(-result)); int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mKeepAliveGeneration) { // obsolete event. break; } postKeepAlive(); break; } case 'abor': { for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *info = &mTracks.editItemAt(i); if (!mFirstAccessUnit) { postQueueEOS(i, ERROR_END_OF_STREAM); } if (!info->mUsingInterleavedTCP) { mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); // Clear the tag if (mUIDValid) { HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); } close(info->mRTPSocket); close(info->mRTCPSocket); } } mTracks.clear(); mSetupTracksSuccessful = false; mSeekPending = false; mFirstAccessUnit = true; mAllTracksHaveTime = false; mNTPAnchorUs = -1; mMediaAnchorUs = -1; mNumAccessUnitsReceived = 0; mReceivedFirstRTCPPacket = false; mReceivedFirstRTPPacket = false; mPausing = false; mSeekable = true; sp reply = new AMessage('tear', this); int32_t reconnect; if (msg->findInt32("reconnect", &reconnect) && reconnect) { reply->setInt32("reconnect", true); } AString request; request = "TEARDOWN "; // XXX should use aggregate url from SDP here... request.append(mSessionURL); request.append(" RTSP/1.0\r\n"); request.append("Session: "); request.append(mSessionID); request.append("\r\n"); request.append("\r\n"); mConn->sendRequest(request.c_str(), reply); // If the response of teardown hasn't been received in 3 seconds, // post 'tear' message to avoid ANR. if (!msg->findInt32("reconnect", &reconnect) || !reconnect) { sp teardown = new AMessage('tear', this); teardown->setInt32("result", -ECONNABORTED); teardown->post(kTearDownTimeoutUs); } break; } case 'tear': { int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("TEARDOWN completed with result %d (%s)", result, strerror(-result)); sp reply = new AMessage('disc', this); int32_t reconnect; if (msg->findInt32("reconnect", &reconnect) && reconnect) { reply->setInt32("reconnect", true); } mConn->disconnect(reply); break; } case 'quit': { sp msg = mNotify->dup(); msg->setInt32("what", kWhatDisconnected); msg->setInt32("result", UNKNOWN_ERROR); msg->post(); break; } case 'chek': { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mCheckGeneration) { // This is an outdated message. Ignore. break; } if (mNumAccessUnitsReceived == 0) { #if 1 ALOGI("stream ended? aborting."); (new AMessage('abor', this))->post(); break; #else ALOGI("haven't seen an AU in a looong time."); #endif } mNumAccessUnitsReceived = 0; msg->post(kAccessUnitTimeoutUs); break; } case 'accu': { int32_t timeUpdate; if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); uint32_t rtpTime; uint64_t ntpTime; CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); onTimeUpdate(trackIndex, rtpTime, ntpTime); break; } int32_t first; if (msg->findInt32("first-rtcp", &first)) { mReceivedFirstRTCPPacket = true; break; } if (msg->findInt32("first-rtp", &first)) { mReceivedFirstRTPPacket = true; break; } ++mNumAccessUnitsReceived; postAccessUnitTimeoutCheck(); size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); if (trackIndex >= mTracks.size()) { ALOGV("late packets ignored."); break; } TrackInfo *track = &mTracks.editItemAt(trackIndex); int32_t eos; if (msg->findInt32("eos", &eos)) { ALOGI("received BYE on track index %zu", trackIndex); char value[PROPERTY_VALUE_MAX] = {0}; if (property_get("rtcp.bye.notify", value, "false") && !strcasecmp(value, "true")) { sp msg = mNotify->dup(); msg->setInt32("what", kWhatByeReceived); msg->post(); } if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { ALOGI("No time established => fake existing data"); track->mEOSReceived = true; mTryFakeRTCP = true; mReceivedFirstRTCPPacket = true; fakeTimestamps(); } else { postQueueEOS(trackIndex, ERROR_END_OF_STREAM); } return; } sp accessUnit; CHECK(msg->findBuffer("access-unit", &accessUnit)); uint32_t seqNum = (uint32_t)accessUnit->int32Data(); if (mSeekPending) { ALOGV("we're seeking, dropping stale packet."); break; } if (seqNum < track->mFirstSeqNumInSegment) { ALOGV("dropping stale access-unit (%d < %d)", seqNum, track->mFirstSeqNumInSegment); break; } if (track->mNewSegment) { track->mNewSegment = false; } onAccessUnitComplete(trackIndex, accessUnit); break; } case 'paus': { int32_t generation; CHECK(msg->findInt32("pausecheck", &generation)); if (generation != mPauseGeneration) { ALOGV("Ignoring outdated pause message."); break; } if (!mSeekable) { ALOGW("This is a live stream, ignoring pause request."); break; } mCheckPending = true; ++mCheckGeneration; mPausing = true; AString request = "PAUSE "; request.append(mControlURL); request.append(" RTSP/1.0\r\n"); request.append("Session: "); request.append(mSessionID); request.append("\r\n"); request.append("\r\n"); sp reply = new AMessage('pau2', this); mConn->sendRequest(request.c_str(), reply); break; } case 'pau2': { int32_t result; CHECK(msg->findInt32("result", &result)); mCheckTimeoutGeneration++; ALOGI("PAUSE completed with result %d (%s)", result, strerror(-result)); break; } case 'resu': { if (mPausing && mSeekPending) { // If seeking, Play will be sent from see1 instead break; } if (!mPausing) { // Dont send PLAY if we have not paused break; } AString request = "PLAY "; request.append(mControlURL); request.append(" RTSP/1.0\r\n"); request.append("Session: "); request.append(mSessionID); request.append("\r\n"); request.append("\r\n"); sp reply = new AMessage('res2', this); mConn->sendRequest(request.c_str(), reply); break; } case 'res2': { int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("PLAY completed with result %d (%s)", result, strerror(-result)); mCheckPending = false; postAccessUnitTimeoutCheck(); if (result == OK) { sp obj; CHECK(msg->findObject("response", &obj)); sp response = static_cast(obj.get()); if (response->mStatusCode != 200) { result = UNKNOWN_ERROR; } else { parsePlayResponse(response); // Post new timeout in order to make sure to use // fake timestamps if no new Sender Reports arrive sp timeout = new AMessage('tiou', this); mCheckTimeoutGeneration++; timeout->setInt32("tioucheck", mCheckTimeoutGeneration); timeout->post(kStartupTimeoutUs); } } if (result != OK) { ALOGE("resume failed, aborting."); (new AMessage('abor', this))->post(); } mPausing = false; break; } case 'seek': { if (!mSeekable) { ALOGW("This is a live stream, ignoring seek request."); sp msg = mNotify->dup(); msg->setInt32("what", kWhatSeekDone); msg->post(); break; } int64_t timeUs; CHECK(msg->findInt64("time", &timeUs)); mSeekPending = true; // Disable the access unit timeout until we resumed // playback again. mCheckPending = true; ++mCheckGeneration; sp reply = new AMessage('see0', this); reply->setInt64("time", timeUs); if (mPausing) { // PAUSE already sent ALOGI("Pause already sent"); reply->post(); break; } AString request = "PAUSE "; request.append(mControlURL); request.append(" RTSP/1.0\r\n"); request.append("Session: "); request.append(mSessionID); request.append("\r\n"); request.append("\r\n"); mConn->sendRequest(request.c_str(), reply); break; } case 'see0': { // Session is paused now. status_t err = OK; msg->findInt32("result", &err); int64_t timeUs; CHECK(msg->findInt64("time", &timeUs)); sp notify = mNotify->dup(); notify->setInt32("what", kWhatSeekPaused); notify->setInt32("err", err); notify->setInt64("time", timeUs); notify->post(); break; } case 'see1': { for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *info = &mTracks.editItemAt(i); postQueueSeekDiscontinuity(i); info->mEOSReceived = false; info->mRTPAnchor = 0; info->mNTPAnchorUs = -1; } mAllTracksHaveTime = false; mNTPAnchorUs = -1; // Start new timeoutgeneration to avoid getting timeout // before PLAY response arrive sp timeout = new AMessage('tiou', this); mCheckTimeoutGeneration++; timeout->setInt32("tioucheck", mCheckTimeoutGeneration); timeout->post(kStartupTimeoutUs); int64_t timeUs; CHECK(msg->findInt64("time", &timeUs)); AString request = "PLAY "; request.append(mControlURL); request.append(" RTSP/1.0\r\n"); request.append("Session: "); request.append(mSessionID); request.append("\r\n"); request.append( AStringPrintf( "Range: npt=%lld-\r\n", timeUs / 1000000ll)); request.append("\r\n"); sp reply = new AMessage('see2', this); mConn->sendRequest(request.c_str(), reply); break; } case 'see2': { if (mTracks.size() == 0) { // We have already hit abor, break break; } int32_t result; CHECK(msg->findInt32("result", &result)); ALOGI("PLAY completed with result %d (%s)", result, strerror(-result)); mCheckPending = false; postAccessUnitTimeoutCheck(); if (result == OK) { sp obj; CHECK(msg->findObject("response", &obj)); sp response = static_cast(obj.get()); if (response->mStatusCode != 200) { result = UNKNOWN_ERROR; } else { parsePlayResponse(response); // Post new timeout in order to make sure to use // fake timestamps if no new Sender Reports arrive sp timeout = new AMessage('tiou', this); mCheckTimeoutGeneration++; timeout->setInt32("tioucheck", mCheckTimeoutGeneration); timeout->post(kStartupTimeoutUs); ssize_t i = response->mHeaders.indexOfKey("rtp-info"); CHECK_GE(i, 0); ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); ALOGI("seek completed."); } } if (result != OK) { ALOGE("seek failed, aborting."); (new AMessage('abor', this))->post(); } mPausing = false; mSeekPending = false; sp msg = mNotify->dup(); msg->setInt32("what", kWhatSeekDone); msg->post(); break; } case 'biny': { sp buffer; CHECK(msg->findBuffer("buffer", &buffer)); int32_t index; CHECK(buffer->meta()->findInt32("index", &index)); mRTPConn->injectPacket(index, buffer); break; } case 'tiou': { int32_t timeoutGenerationCheck; CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); if (timeoutGenerationCheck != mCheckTimeoutGeneration) { // This is an outdated message. Ignore. // This typically happens if a lot of seeks are // performed, since new timeout messages now are // posted at seek as well. break; } if (!mReceivedFirstRTCPPacket) { if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { ALOGW("We received RTP packets but no RTCP packets, " "using fake timestamps."); mTryFakeRTCP = true; mReceivedFirstRTCPPacket = true; fakeTimestamps(); } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { ALOGW("Never received any data, switching transports."); mTryTCPInterleaving = true; sp msg = new AMessage('abor', this); msg->setInt32("reconnect", true); msg->post(); } else { ALOGW("Never received any data, disconnecting."); (new AMessage('abor', this))->post(); } } else { if (!mAllTracksHaveTime) { ALOGW("We received some RTCP packets, but time " "could not be established on all tracks, now " "using fake timestamps"); fakeTimestamps(); } } break; } case 'canC': { ALOGV("cancel checking timeout"); mCheckGeneration++; break; } default: TRESPASS(); break; } } void postKeepAlive() { sp msg = new AMessage('aliv', this); msg->setInt32("generation", mKeepAliveGeneration); msg->post((mKeepAliveTimeoutUs * 9) / 10); } void postAccessUnitTimeoutCheck() { if (mCheckPending) { return; } mCheckPending = true; sp check = new AMessage('chek', this); check->setInt32("generation", mCheckGeneration); check->post(kAccessUnitTimeoutUs); } static void SplitString( const AString &s, const char *separator, List *items) { items->clear(); size_t start = 0; while (start < s.size()) { ssize_t offset = s.find(separator, start); if (offset < 0) { items->push_back(AString(s, start, s.size() - start)); break; } items->push_back(AString(s, start, offset - start)); start = offset + strlen(separator); } } void parsePlayResponse(const sp &response) { mPlayResponseParsed = true; if (mTracks.size() == 0) { ALOGV("parsePlayResponse: late packets ignored."); return; } ssize_t i = response->mHeaders.indexOfKey("range"); if (i < 0) { // Server doesn't even tell use what range it is going to // play, therefore we won't support seeking. return; } AString range = response->mHeaders.valueAt(i); ALOGV("Range: %s", range.c_str()); AString val; CHECK(GetAttribute(range.c_str(), "npt", &val)); float npt1, npt2; if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { // This is a live stream and therefore not seekable. ALOGI("This is a live stream"); return; } i = response->mHeaders.indexOfKey("rtp-info"); CHECK_GE(i, 0); AString rtpInfo = response->mHeaders.valueAt(i); List streamInfos; SplitString(rtpInfo, ",", &streamInfos); int n = 1; for (List::iterator it = streamInfos.begin(); it != streamInfos.end(); ++it) { (*it).trim(); ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); CHECK(GetAttribute((*it).c_str(), "url", &val)); size_t trackIndex = 0; while (trackIndex < mTracks.size() && !(AVMediaServiceUtils::get()->parseTrackURL( mTracks.editItemAt(trackIndex).mURL, val) || val == mTracks.editItemAt(trackIndex).mURL)) { ++trackIndex; } CHECK_LT(trackIndex, mTracks.size()); CHECK(GetAttribute((*it).c_str(), "seq", &val)); char *end; unsigned long seq = strtoul(val.c_str(), &end, 10); TrackInfo *info = &mTracks.editItemAt(trackIndex); info->mFirstSeqNumInSegment = seq; info->mNewSegment = true; CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); uint32_t rtpTime = strtoul(val.c_str(), &end, 10); ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); info->mNormalPlayTimeRTP = rtpTime; info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); if (!mFirstAccessUnit) { postNormalPlayTimeMapping( trackIndex, info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); } ++n; } } sp getTrackFormat(size_t index, int32_t *timeScale) { CHECK_GE(index, 0u); CHECK_LT(index, mTracks.size()); const TrackInfo &info = mTracks.itemAt(index); *timeScale = info.mTimeScale; return info.mPacketSource->getFormat(); } size_t countTracks() const { return mTracks.size(); } private: struct TrackInfo { AString mURL; int mRTPSocket; int mRTCPSocket; bool mUsingInterleavedTCP; uint32_t mFirstSeqNumInSegment; bool mNewSegment; uint32_t mRTPAnchor; int64_t mNTPAnchorUs; int32_t mTimeScale; bool mEOSReceived; uint32_t mNormalPlayTimeRTP; int64_t mNormalPlayTimeUs; sp mPacketSource; // Stores packets temporarily while no notion of time // has been established yet. List > mPackets; }; sp mNotify; bool mUIDValid; uid_t mUID; sp mNetLooper; sp mConn; sp mRTPConn; sp mSessionDesc; AString mOriginalSessionURL; // This one still has user:pass@ AString mSessionURL; AString mSessionHost; AString mBaseURL; AString mControlURL; AString mSessionID; bool mSetupTracksSuccessful; bool mSeekPending; bool mFirstAccessUnit; bool mAllTracksHaveTime; int64_t mNTPAnchorUs; int64_t mMediaAnchorUs; int64_t mLastMediaTimeUs; int64_t mNumAccessUnitsReceived; bool mCheckPending; int32_t mCheckGeneration; int32_t mCheckTimeoutGeneration; bool mTryTCPInterleaving; bool mTryFakeRTCP; bool mReceivedFirstRTCPPacket; bool mReceivedFirstRTPPacket; bool mSeekable; int64_t mKeepAliveTimeoutUs; int32_t mKeepAliveGeneration; bool mPausing; int32_t mPauseGeneration; Vector mTracks; bool mPlayResponseParsed; void setupTrack(size_t index) { sp source = new APacketSource(mSessionDesc, index); if (source->initCheck() != OK) { ALOGW("Unsupported format. Ignoring track #%zu.", index); sp reply = new AMessage('setu', this); reply->setSize("index", index); reply->setInt32("result", ERROR_UNSUPPORTED); reply->post(); return; } AString url; CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); AString trackURL; CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); mTracks.push(TrackInfo()); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); info->mURL = trackURL; info->mPacketSource = source; info->mUsingInterleavedTCP = false; info->mFirstSeqNumInSegment = 0; info->mNewSegment = true; info->mRTPSocket = -1; info->mRTCPSocket = -1; info->mRTPAnchor = 0; info->mNTPAnchorUs = -1; info->mNormalPlayTimeRTP = 0; info->mNormalPlayTimeUs = 0ll; unsigned long PT; AString formatDesc; AString formatParams; mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); int32_t timescale; int32_t numChannels; ASessionDescription::ParseFormatDesc( formatDesc.c_str(), ×cale, &numChannels); info->mTimeScale = timescale; info->mEOSReceived = false; ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); AString request = "SETUP "; request.append(trackURL); request.append(" RTSP/1.0\r\n"); if (mTryTCPInterleaving) { size_t interleaveIndex = 2 * (mTracks.size() - 1); info->mUsingInterleavedTCP = true; info->mRTPSocket = interleaveIndex; info->mRTCPSocket = interleaveIndex + 1; request.append("Transport: RTP/AVP/TCP;interleaved="); request.append(interleaveIndex); request.append("-"); request.append(interleaveIndex + 1); } else { unsigned rtpPort; AVMediaServiceUtils::get()->makePortPair( &info->mRTPSocket, &info->mRTCPSocket, &rtpPort, mConn->isIPV6()); if (mUIDValid) { HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, (uint32_t)*(uint32_t*) "RTP_"); HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, (uint32_t)*(uint32_t*) "RTP_"); HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); } request.append("Transport: RTP/AVP/UDP;unicast;client_port="); request.append(rtpPort); request.append("-"); request.append(rtpPort + 1); } request.append("\r\n"); if (index > 1) { request.append("Session: "); request.append(mSessionID); request.append("\r\n"); } request.append("\r\n"); sp reply = new AMessage('setu', this); reply->setSize("index", index); reply->setSize("track-index", mTracks.size() - 1); mConn->sendRequest(request.c_str(), reply); } static bool MakeURL(const char *baseURL, const char *url, AString *out) { out->clear(); if (strncasecmp("rtsp://", baseURL, 7)) { // Base URL must be absolute return false; } if (!strncasecmp("rtsp://", url, 7)) { // "url" is already an absolute URL, ignore base URL. out->setTo(url); return true; } size_t n = strlen(baseURL); out->setTo(baseURL); if (baseURL[n - 1] != '/') { out->append("/"); } out->append(url); return true; } void fakeTimestamps() { mNTPAnchorUs = -1ll; for (size_t i = 0; i < mTracks.size(); ++i) { onTimeUpdate(i, 0, 0ll); } } bool dataReceivedOnAllChannels() { TrackInfo *track; for (size_t i = 0; i < mTracks.size(); ++i) { track = &mTracks.editItemAt(i); if (track->mPackets.empty()) { return false; } } return true; } void handleFirstAccessUnit() { if (mFirstAccessUnit) { sp msg = mNotify->dup(); msg->setInt32("what", kWhatConnected); msg->post(); if (mSeekable) { for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *info = &mTracks.editItemAt(i); postNormalPlayTimeMapping( i, info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); } } mFirstAccessUnit = false; } } void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx", trackIndex, rtpTime, (long long)ntpTime); int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); TrackInfo *track = &mTracks.editItemAt(trackIndex); track->mRTPAnchor = rtpTime; track->mNTPAnchorUs = ntpTimeUs; if (mNTPAnchorUs < 0) { mNTPAnchorUs = ntpTimeUs; mMediaAnchorUs = mLastMediaTimeUs; } if (!mAllTracksHaveTime) { bool allTracksHaveTime = (mTracks.size() > 0); for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *track = &mTracks.editItemAt(i); if (track->mNTPAnchorUs < 0) { allTracksHaveTime = false; break; } } if (allTracksHaveTime) { mAllTracksHaveTime = true; ALOGI("Time now established for all tracks."); } } if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { handleFirstAccessUnit(); // Time is now established, lets start timestamping immediately for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *trackInfo = &mTracks.editItemAt(i); while (!trackInfo->mPackets.empty()) { sp accessUnit = *trackInfo->mPackets.begin(); trackInfo->mPackets.erase(trackInfo->mPackets.begin()); if (addMediaTimestamp(i, trackInfo, accessUnit)) { postQueueAccessUnit(i, accessUnit); } } } for (size_t i = 0; i < mTracks.size(); ++i) { TrackInfo *trackInfo = &mTracks.editItemAt(i); if (trackInfo->mEOSReceived) { postQueueEOS(i, ERROR_END_OF_STREAM); trackInfo->mEOSReceived = false; } } } } void onAccessUnitComplete( int32_t trackIndex, const sp &accessUnit) { ALOGV("onAccessUnitComplete track %d", trackIndex); if(!mPlayResponseParsed){ ALOGI("play response is not parsed, storing accessunit"); TrackInfo *track = &mTracks.editItemAt(trackIndex); track->mPackets.push_back(accessUnit); return; } handleFirstAccessUnit(); TrackInfo *track = &mTracks.editItemAt(trackIndex); if (!mAllTracksHaveTime) { ALOGV("storing accessUnit, no time established yet"); track->mPackets.push_back(accessUnit); return; } while (!track->mPackets.empty()) { sp accessUnit = *track->mPackets.begin(); track->mPackets.erase(track->mPackets.begin()); if (addMediaTimestamp(trackIndex, track, accessUnit)) { postQueueAccessUnit(trackIndex, accessUnit); } } if (addMediaTimestamp(trackIndex, track, accessUnit)) { postQueueAccessUnit(trackIndex, accessUnit); } if (track->mEOSReceived) { postQueueEOS(trackIndex, ERROR_END_OF_STREAM); track->mEOSReceived = false; } } bool addMediaTimestamp( int32_t trackIndex, const TrackInfo *track, const sp &accessUnit) { UNUSED_UNLESS_VERBOSE(trackIndex); uint32_t rtpTime; CHECK(accessUnit->meta()->findInt32( "rtp-time", (int32_t *)&rtpTime)); int64_t relRtpTimeUs = (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) / track->mTimeScale; int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; if (mediaTimeUs > mLastMediaTimeUs) { mLastMediaTimeUs = mediaTimeUs; } if (mediaTimeUs < 0 && !mSeekable) { ALOGV("dropping early accessUnit."); return false; } ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)", trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6); accessUnit->meta()->setInt64("timeUs", mediaTimeUs); return true; } void postQueueAccessUnit( size_t trackIndex, const sp &accessUnit) { sp msg = mNotify->dup(); msg->setInt32("what", kWhatAccessUnit); msg->setSize("trackIndex", trackIndex); msg->setBuffer("accessUnit", accessUnit); msg->post(); } void postQueueEOS(size_t trackIndex, status_t finalResult) { sp msg = mNotify->dup(); msg->setInt32("what", kWhatEOS); msg->setSize("trackIndex", trackIndex); msg->setInt32("finalResult", finalResult); msg->post(); } void postQueueSeekDiscontinuity(size_t trackIndex) { sp msg = mNotify->dup(); msg->setInt32("what", kWhatSeekDiscontinuity); msg->setSize("trackIndex", trackIndex); msg->post(); } void postNormalPlayTimeMapping( size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { sp msg = mNotify->dup(); msg->setInt32("what", kWhatNormalPlayTimeMapping); msg->setSize("trackIndex", trackIndex); msg->setInt32("rtpTime", rtpTime); msg->setInt64("nptUs", nptUs); msg->post(); } DISALLOW_EVIL_CONSTRUCTORS(MyHandler); }; } // namespace android #endif // MY_HANDLER_H_