diff options
Diffstat (limited to 'media/libstagefright/rtsp')
23 files changed, 1194 insertions, 146 deletions
diff --git a/media/libstagefright/rtsp/AAMRAssembler.cpp b/media/libstagefright/rtsp/AAMRAssembler.cpp index c56578b..154ba31 100644 --- a/media/libstagefright/rtsp/AAMRAssembler.cpp +++ b/media/libstagefright/rtsp/AAMRAssembler.cpp @@ -178,12 +178,8 @@ ARTPAssembler::AssemblyStatus AAMRAssembler::addPacket( } } - uint64_t ntpTime; - CHECK(buffer->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - sp<ABuffer> accessUnit = new ABuffer(totalSize); - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, buffer); size_t dstOffset = 0; for (size_t i = 0; i < tableOfContents.size(); ++i) { diff --git a/media/libstagefright/rtsp/AAVCAssembler.cpp b/media/libstagefright/rtsp/AAVCAssembler.cpp index b22de2c..6b1e292 100644 --- a/media/libstagefright/rtsp/AAVCAssembler.cpp +++ b/media/libstagefright/rtsp/AAVCAssembler.cpp @@ -155,7 +155,7 @@ bool AAVCAssembler::addSingleTimeAggregationPacket(const sp<ABuffer> &buffer) { sp<ABuffer> unit = new ABuffer(nalSize); memcpy(unit->data(), &data[2], nalSize); - PropagateTimes(buffer, unit); + CopyTimes(unit, buffer); addSingleNALUnit(unit); @@ -287,7 +287,7 @@ ARTPAssembler::AssemblyStatus AAVCAssembler::addFragmentedNALUnit( ++totalSize; sp<ABuffer> unit = new ABuffer(totalSize); - PropagateTimes(buffer, unit); + CopyTimes(unit, *queue->begin()); unit->data()[0] = (nri << 5) | nalType; @@ -325,10 +325,6 @@ void AAVCAssembler::submitAccessUnit() { LOG(VERBOSE) << "Access unit complete (" << mNALUnits.size() << " nal units)"; #endif - uint64_t ntpTime; - CHECK((*mNALUnits.begin())->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - size_t totalSize = 0; for (List<sp<ABuffer> >::iterator it = mNALUnits.begin(); it != mNALUnits.end(); ++it) { @@ -347,7 +343,7 @@ void AAVCAssembler::submitAccessUnit() { offset += nal->size(); } - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, *mNALUnits.begin()); #if 0 printf(mAccessUnitDamaged ? "X" : "."); diff --git a/media/libstagefright/rtsp/AH263Assembler.cpp b/media/libstagefright/rtsp/AH263Assembler.cpp index 2818041..498295c 100644 --- a/media/libstagefright/rtsp/AH263Assembler.cpp +++ b/media/libstagefright/rtsp/AH263Assembler.cpp @@ -128,10 +128,6 @@ void AH263Assembler::submitAccessUnit() { LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; #endif - uint64_t ntpTime; - CHECK((*mPackets.begin())->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - size_t totalSize = 0; List<sp<ABuffer> >::iterator it = mPackets.begin(); while (it != mPackets.end()) { @@ -155,7 +151,7 @@ void AH263Assembler::submitAccessUnit() { ++it; } - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, *mPackets.begin()); #if 0 printf(mAccessUnitDamaged ? "X" : "."); diff --git a/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp index 6e46361..b0d2c64 100644 --- a/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp +++ b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp @@ -103,10 +103,6 @@ void AMPEG4AudioAssembler::submitAccessUnit() { LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; #endif - uint64_t ntpTime; - CHECK((*mPackets.begin())->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - size_t totalSize = 0; List<sp<ABuffer> >::iterator it = mPackets.begin(); while (it != mPackets.end()) { @@ -142,7 +138,7 @@ void AMPEG4AudioAssembler::submitAccessUnit() { ++it; } - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, *mPackets.begin()); #if 0 printf(mAccessUnitDamaged ? "X" : "."); diff --git a/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp new file mode 100644 index 0000000..7dd3e3f --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AMPEG4ElementaryAssembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> + +#include <stdint.h> + +#define BE_VERBOSE 0 + +namespace android { + +// static +AMPEG4ElementaryAssembler::AMPEG4ElementaryAssembler(const sp<AMessage> ¬ify) + : mNotifyMsg(notify), + mAccessUnitRTPTime(0), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0), + mAccessUnitDamaged(false) { +} + +AMPEG4ElementaryAssembler::~AMPEG4ElementaryAssembler() { +} + +ARTPAssembler::AssemblyStatus AMPEG4ElementaryAssembler::addPacket( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { +#if BE_VERBOSE + LOG(VERBOSE) << "Not the sequence number I expected"; +#endif + + return WRONG_SEQUENCE_NUMBER; + } + + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) { + submitAccessUnit(); + } + mAccessUnitRTPTime = rtpTime; + + mPackets.push_back(buffer); + // hexdump(buffer->data(), buffer->size()); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return OK; +} + +void AMPEG4ElementaryAssembler::submitAccessUnit() { + CHECK(!mPackets.empty()); + +#if BE_VERBOSE + LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " nal units)"; +#endif + + size_t totalSize = 0; + for (List<sp<ABuffer> >::iterator it = mPackets.begin(); + it != mPackets.end(); ++it) { + totalSize += (*it)->size(); + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + size_t offset = 0; + for (List<sp<ABuffer> >::iterator it = mPackets.begin(); + it != mPackets.end(); ++it) { + sp<ABuffer> nal = *it; + memcpy(accessUnit->data() + offset, nal->data(), nal->size()); + offset += nal->size(); + } + + CopyTimes(accessUnit, *mPackets.begin()); + +#if 0 + printf(mAccessUnitDamaged ? "X" : "."); + fflush(stdout); +#endif + + if (mAccessUnitDamaged) { + accessUnit->meta()->setInt32("damaged", true); + } + + mPackets.clear(); + mAccessUnitDamaged = false; + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setObject("access-unit", accessUnit); + msg->post(); +} + +ARTPAssembler::AssemblyStatus AMPEG4ElementaryAssembler::assembleMore( + const sp<ARTPSource> &source) { + AssemblyStatus status = addPacket(source); + if (status == MALFORMED_PACKET) { + mAccessUnitDamaged = true; + } + return status; +} + +void AMPEG4ElementaryAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + LOG(VERBOSE) << "packetLost (expected " << mNextExpectedSeqNo << ")"; + + ++mNextExpectedSeqNo; + + mAccessUnitDamaged = true; +} + +void AMPEG4ElementaryAssembler::onByeReceived() { + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setInt32("eos", true); + msg->post(); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.h b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.h new file mode 100644 index 0000000..1566d00 --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.h @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_MPEG4_ELEM_ASSEMBLER_H_ + +#define A_MPEG4_ELEM_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <utils/List.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct AMessage; + +struct AMPEG4ElementaryAssembler : public ARTPAssembler { + AMPEG4ElementaryAssembler(const sp<AMessage> ¬ify); + +protected: + virtual ~AMPEG4ElementaryAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void onByeReceived(); + virtual void packetLost(); + +private: + sp<AMessage> mNotifyMsg; + + uint32_t mAccessUnitRTPTime; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + bool mAccessUnitDamaged; + List<sp<ABuffer> > mPackets; + + AssemblyStatus addPacket(const sp<ARTPSource> &source); + void submitAccessUnit(); + + DISALLOW_EVIL_CONSTRUCTORS(AMPEG4ElementaryAssembler); +}; + +} // namespace android + +#endif // A_MPEG4_ELEM_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/APacketSource.cpp b/media/libstagefright/rtsp/APacketSource.cpp index 224b4bf..2d7738b 100644 --- a/media/libstagefright/rtsp/APacketSource.cpp +++ b/media/libstagefright/rtsp/APacketSource.cpp @@ -18,6 +18,11 @@ #include "ASessionDescription.h" +#include "avc_utils.h" + +#include <ctype.h> + +#include <media/stagefright/foundation/ABitReader.h> #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/AMessage.h> @@ -37,6 +42,10 @@ static bool GetAttribute(const char *s, const char *key, AString *value) { size_t keyLen = strlen(key); for (;;) { + while (isspace(*s)) { + ++s; + } + const char *colonPos = strchr(s, ';'); size_t len = @@ -90,7 +99,11 @@ static sp<ABuffer> decodeHex(const AString &s) { return buffer; } -static sp<ABuffer> MakeAVCCodecSpecificData(const char *params) { +static sp<ABuffer> MakeAVCCodecSpecificData( + const char *params, int32_t *width, int32_t *height) { + *width = 0; + *height = 0; + AString val; if (!GetAttribute(params, "profile-level-id", &val)) { return NULL; @@ -172,6 +185,11 @@ static sp<ABuffer> MakeAVCCodecSpecificData(const char *params) { memcpy(out, nal->data(), nal->size()); out += nal->size(); + + if (i == 0) { + FindAVCDimensions(nal, width, height); + LOG(INFO) << "dimensions " << *width << "x" << *height; + } } *out++ = numPicParameterSets; @@ -187,7 +205,7 @@ static sp<ABuffer> MakeAVCCodecSpecificData(const char *params) { out += nal->size(); } - hexdump(csd->data(), csd->size()); + // hexdump(csd->data(), csd->size()); return csd; } @@ -224,23 +242,201 @@ sp<ABuffer> MakeAACCodecSpecificData(const char *params) { csd->data()[sizeof(kStaticESDS)] = (x >> 8) & 0xff; csd->data()[sizeof(kStaticESDS) + 1] = x & 0xff; - hexdump(csd->data(), csd->size()); + // hexdump(csd->data(), csd->size()); + + return csd; +} + +static size_t GetSizeWidth(size_t x) { + size_t n = 1; + while (x > 127) { + ++n; + x >>= 7; + } + return n; +} + +static uint8_t *EncodeSize(uint8_t *dst, size_t x) { + while (x > 127) { + *dst++ = (x & 0x7f) | 0x80; + x >>= 7; + } + *dst++ = x; + return dst; +} + +static bool ExtractDimensionsFromVOLHeader( + const sp<ABuffer> &config, int32_t *width, int32_t *height) { + *width = 0; + *height = 0; + + const uint8_t *ptr = config->data(); + size_t offset = 0; + bool foundVOL = false; + while (offset + 3 < config->size()) { + if (memcmp("\x00\x00\x01", &ptr[offset], 3) + || (ptr[offset + 3] & 0xf0) != 0x20) { + ++offset; + continue; + } + + foundVOL = true; + break; + } + + if (!foundVOL) { + return false; + } + + ABitReader br(&ptr[offset + 4], config->size() - offset - 4); + br.skipBits(1); // random_accessible_vol + unsigned video_object_type_indication = br.getBits(8); + + CHECK_NE(video_object_type_indication, + 0x21u /* Fine Granularity Scalable */); + + unsigned video_object_layer_verid; + unsigned video_object_layer_priority; + if (br.getBits(1)) { + video_object_layer_verid = br.getBits(4); + video_object_layer_priority = br.getBits(3); + } + unsigned aspect_ratio_info = br.getBits(4); + if (aspect_ratio_info == 0x0f /* extended PAR */) { + br.skipBits(8); // par_width + br.skipBits(8); // par_height + } + if (br.getBits(1)) { // vol_control_parameters + br.skipBits(2); // chroma_format + br.skipBits(1); // low_delay + if (br.getBits(1)) { // vbv_parameters + TRESPASS(); + } + } + unsigned video_object_layer_shape = br.getBits(2); + CHECK_EQ(video_object_layer_shape, 0x00u /* rectangular */); + + CHECK(br.getBits(1)); // marker_bit + unsigned vop_time_increment_resolution = br.getBits(16); + CHECK(br.getBits(1)); // marker_bit + + if (br.getBits(1)) { // fixed_vop_rate + // range [0..vop_time_increment_resolution) + + // vop_time_increment_resolution + // 2 => 0..1, 1 bit + // 3 => 0..2, 2 bits + // 4 => 0..3, 2 bits + // 5 => 0..4, 3 bits + // ... + + CHECK_GT(vop_time_increment_resolution, 0u); + --vop_time_increment_resolution; + + unsigned numBits = 0; + while (vop_time_increment_resolution > 0) { + ++numBits; + vop_time_increment_resolution >>= 1; + } + + br.skipBits(numBits); // fixed_vop_time_increment + } + + CHECK(br.getBits(1)); // marker_bit + unsigned video_object_layer_width = br.getBits(13); + CHECK(br.getBits(1)); // marker_bit + unsigned video_object_layer_height = br.getBits(13); + CHECK(br.getBits(1)); // marker_bit + + unsigned interlaced = br.getBits(1); + + *width = video_object_layer_width; + *height = video_object_layer_height; + + LOG(INFO) << "VOL dimensions = " << *width << "x" << *height; + + return true; +} + +sp<ABuffer> MakeMPEG4VideoCodecSpecificData( + const char *params, int32_t *width, int32_t *height) { + *width = 0; + *height = 0; + + AString val; + CHECK(GetAttribute(params, "config", &val)); + + sp<ABuffer> config = decodeHex(val); + CHECK(config != NULL); + + if (!ExtractDimensionsFromVOLHeader(config, width, height)) { + return NULL; + } + + size_t len1 = config->size() + GetSizeWidth(config->size()) + 1; + size_t len2 = len1 + GetSizeWidth(len1) + 1 + 13; + size_t len3 = len2 + GetSizeWidth(len2) + 1 + 3; + + sp<ABuffer> csd = new ABuffer(len3); + uint8_t *dst = csd->data(); + *dst++ = 0x03; + dst = EncodeSize(dst, len2 + 3); + *dst++ = 0x00; // ES_ID + *dst++ = 0x00; + *dst++ = 0x00; // streamDependenceFlag, URL_Flag, OCRstreamFlag + + *dst++ = 0x04; + dst = EncodeSize(dst, len1 + 13); + *dst++ = 0x01; // Video ISO/IEC 14496-2 Simple Profile + for (size_t i = 0; i < 12; ++i) { + *dst++ = 0x00; + } + + *dst++ = 0x05; + dst = EncodeSize(dst, config->size()); + memcpy(dst, config->data(), config->size()); + dst += config->size(); + + // hexdump(csd->data(), csd->size()); return csd; } +static bool GetClockRate(const AString &desc, uint32_t *clockRate) { + ssize_t slashPos = desc.find("/"); + if (slashPos < 0) { + return false; + } + + const char *s = desc.c_str() + slashPos + 1; + + char *end; + unsigned long x = strtoul(s, &end, 10); + + if (end == s || (*end != '\0' && *end != '/')) { + return false; + } + + *clockRate = x; + + return true; +} + APacketSource::APacketSource( const sp<ASessionDescription> &sessionDesc, size_t index) : mInitCheck(NO_INIT), mFormat(new MetaData), mEOSResult(OK), - mFirstAccessUnit(true), - mFirstAccessUnitNTP(0) { + mRTPTimeBase(0), + mNormalPlayTimeBaseUs(0), + mLastNormalPlayTimeUs(0) { unsigned long PT; AString desc; AString params; sessionDesc->getFormatType(index, &PT, &desc, ¶ms); + CHECK(GetClockRate(desc, &mClockRate)); + int64_t durationUs; if (sessionDesc->getDurationUs(&durationUs)) { mFormat->setInt64(kKeyDuration, durationUs); @@ -253,25 +449,42 @@ APacketSource::APacketSource( mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_AVC); int32_t width, height; - sessionDesc->getDimensions(index, PT, &width, &height); - - mFormat->setInt32(kKeyWidth, width); - mFormat->setInt32(kKeyHeight, height); + if (!sessionDesc->getDimensions(index, PT, &width, &height)) { + width = -1; + height = -1; + } + int32_t encWidth, encHeight; sp<ABuffer> codecSpecificData = - MakeAVCCodecSpecificData(params.c_str()); + MakeAVCCodecSpecificData(params.c_str(), &encWidth, &encHeight); if (codecSpecificData != NULL) { + if (width < 0) { + // If no explicit width/height given in the sdp, use the dimensions + // extracted from the first sequence parameter set. + width = encWidth; + height = encHeight; + } + mFormat->setData( kKeyAVCC, 0, codecSpecificData->data(), codecSpecificData->size()); + } else if (width < 0) { + mInitCheck = ERROR_UNSUPPORTED; + return; } + + mFormat->setInt32(kKeyWidth, width); + mFormat->setInt32(kKeyHeight, height); } else if (!strncmp(desc.c_str(), "H263-2000/", 10) || !strncmp(desc.c_str(), "H263-1998/", 10)) { mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_H263); int32_t width, height; - sessionDesc->getDimensions(index, PT, &width, &height); + if (!sessionDesc->getDimensions(index, PT, &width, &height)) { + mInitCheck = ERROR_UNSUPPORTED; + return; + } mFormat->setInt32(kKeyWidth, width); mFormat->setInt32(kKeyHeight, height); @@ -317,6 +530,36 @@ APacketSource::APacketSource( if (sampleRate != 16000 || numChannels != 1) { mInitCheck = ERROR_UNSUPPORTED; } + } else if (!strncmp(desc.c_str(), "MP4V-ES/", 8)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_MPEG4); + + int32_t width, height; + if (!sessionDesc->getDimensions(index, PT, &width, &height)) { + width = -1; + height = -1; + } + + int32_t encWidth, encHeight; + sp<ABuffer> codecSpecificData = + MakeMPEG4VideoCodecSpecificData( + params.c_str(), &encWidth, &encHeight); + + if (codecSpecificData != NULL) { + mFormat->setData( + kKeyESDS, 0, + codecSpecificData->data(), codecSpecificData->size()); + + if (width < 0) { + width = encWidth; + height = encHeight; + } + } else if (width < 0) { + mInitCheck = ERROR_UNSUPPORTED; + return; + } + + mFormat->setInt32(kKeyWidth, width); + mFormat->setInt32(kKeyHeight, height); } else { mInitCheck = ERROR_UNSUPPORTED; } @@ -330,9 +573,6 @@ status_t APacketSource::initCheck() const { } status_t APacketSource::start(MetaData *params) { - mFirstAccessUnit = true; - mFirstAccessUnitNTP = 0; - return OK; } @@ -356,6 +596,8 @@ status_t APacketSource::read( if (!mBuffers.empty()) { const sp<ABuffer> buffer = *mBuffers.begin(); + updateNormalPlayTime_l(buffer); + MediaBuffer *mediaBuffer = new MediaBuffer(buffer->size()); int64_t timeUs; @@ -373,6 +615,16 @@ status_t APacketSource::read( return mEOSResult; } +void APacketSource::updateNormalPlayTime_l(const sp<ABuffer> &buffer) { + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + mLastNormalPlayTimeUs = + (((double)rtpTime - (double)mRTPTimeBase) / mClockRate) + * 1000000ll + + mNormalPlayTimeBaseUs; +} + void APacketSource::queueAccessUnit(const sp<ABuffer> &buffer) { int32_t damaged; if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { @@ -380,25 +632,6 @@ void APacketSource::queueAccessUnit(const sp<ABuffer> &buffer) { return; } - uint64_t ntpTime; - CHECK(buffer->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - - if (mFirstAccessUnit) { - mFirstAccessUnit = false; - mFirstAccessUnitNTP = ntpTime; - } - - if (ntpTime > mFirstAccessUnitNTP) { - ntpTime -= mFirstAccessUnitNTP; - } else { - ntpTime = 0; - } - - int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); - - buffer->meta()->setInt64("timeUs", timeUs); - Mutex::Autolock autoLock(mLock); mBuffers.push_back(buffer); mCondition.signal(); @@ -412,31 +645,22 @@ void APacketSource::signalEOS(status_t result) { mCondition.signal(); } -int64_t APacketSource::getQueuedDuration(bool *eos) { +void APacketSource::flushQueue() { Mutex::Autolock autoLock(mLock); + mBuffers.clear(); +} - *eos = (mEOSResult != OK); - - if (mBuffers.empty()) { - return 0; - } - - sp<ABuffer> buffer = *mBuffers.begin(); - - uint64_t ntpTime; - CHECK(buffer->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - - int64_t firstTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); - - buffer = *--mBuffers.end(); - - CHECK(buffer->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); +int64_t APacketSource::getNormalPlayTimeUs() { + Mutex::Autolock autoLock(mLock); + return mLastNormalPlayTimeUs; +} - int64_t lastTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); +void APacketSource::setNormalPlayTimeMapping( + uint32_t rtpTime, int64_t normalPlayTimeUs) { + Mutex::Autolock autoLock(mLock); - return lastTimeUs - firstTimeUs; + mRTPTimeBase = rtpTime; + mNormalPlayTimeBaseUs = normalPlayTimeUs; } } // namespace android diff --git a/media/libstagefright/rtsp/APacketSource.h b/media/libstagefright/rtsp/APacketSource.h index 647da6e..3833ab1 100644 --- a/media/libstagefright/rtsp/APacketSource.h +++ b/media/libstagefright/rtsp/APacketSource.h @@ -43,7 +43,12 @@ struct APacketSource : public MediaSource { void queueAccessUnit(const sp<ABuffer> &buffer); void signalEOS(status_t result); - int64_t getQueuedDuration(bool *eos); + void flushQueue(); + + int64_t getNormalPlayTimeUs(); + + void setNormalPlayTimeMapping( + uint32_t rtpTime, int64_t normalPlayTimeUs); protected: virtual ~APacketSource(); @@ -58,8 +63,14 @@ private: List<sp<ABuffer> > mBuffers; status_t mEOSResult; - bool mFirstAccessUnit; - uint64_t mFirstAccessUnitNTP; + uint32_t mClockRate; + + uint32_t mRTPTimeBase; + int64_t mNormalPlayTimeBaseUs; + + int64_t mLastNormalPlayTimeUs; + + void updateNormalPlayTime_l(const sp<ABuffer> &buffer); DISALLOW_EVIL_CONSTRUCTORS(APacketSource); }; diff --git a/media/libstagefright/rtsp/ARTPAssembler.cpp b/media/libstagefright/rtsp/ARTPAssembler.cpp index 24225b8..9ba2b37 100644 --- a/media/libstagefright/rtsp/ARTPAssembler.cpp +++ b/media/libstagefright/rtsp/ARTPAssembler.cpp @@ -35,18 +35,6 @@ ARTPAssembler::ARTPAssembler() : mFirstFailureTimeUs(-1) { } -void ARTPAssembler::PropagateTimes( - const sp<ABuffer> &from, const sp<ABuffer> &to) { - uint32_t rtpTime; - CHECK(from->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); - - uint64_t ntpTime = 0; - CHECK(from->meta()->findInt64("ntp-time", (int64_t *)&ntpTime)); - - to->meta()->setInt32("rtp-time", rtpTime); - to->meta()->setInt64("ntp-time", ntpTime); -} - void ARTPAssembler::onPacketReceived(const sp<ARTPSource> &source) { AssemblyStatus status; for (;;) { @@ -75,4 +63,19 @@ void ARTPAssembler::onPacketReceived(const sp<ARTPSource> &source) { } } +// static +void ARTPAssembler::CopyTimes(const sp<ABuffer> &to, const sp<ABuffer> &from) { + uint64_t ntpTime; + CHECK(from->meta()->findInt64("ntp-time", (int64_t *)&ntpTime)); + + uint32_t rtpTime; + CHECK(from->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + to->meta()->setInt64("ntp-time", ntpTime); + to->meta()->setInt32("rtp-time", rtpTime); + + // Copy the seq number. + to->setInt32Data(from->int32Data()); +} + } // namespace android diff --git a/media/libstagefright/rtsp/ARTPAssembler.h b/media/libstagefright/rtsp/ARTPAssembler.h index e598088..70ea186 100644 --- a/media/libstagefright/rtsp/ARTPAssembler.h +++ b/media/libstagefright/rtsp/ARTPAssembler.h @@ -40,12 +40,11 @@ struct ARTPAssembler : public RefBase { virtual void onByeReceived() = 0; protected: - static void PropagateTimes( - const sp<ABuffer> &from, const sp<ABuffer> &to); - virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source) = 0; virtual void packetLost() = 0; + static void CopyTimes(const sp<ABuffer> &to, const sp<ABuffer> &from); + private: int64_t mFirstFailureTimeUs; diff --git a/media/libstagefright/rtsp/ARTPConnection.cpp b/media/libstagefright/rtsp/ARTPConnection.cpp index 469af3e..42a22b7 100644 --- a/media/libstagefright/rtsp/ARTPConnection.cpp +++ b/media/libstagefright/rtsp/ARTPConnection.cpp @@ -57,6 +57,8 @@ struct ARTPConnection::StreamInfo { int32_t mNumRTCPPacketsReceived; struct sockaddr_in mRemoteRTCPAddr; + + bool mIsInjected; }; ARTPConnection::ARTPConnection(uint32_t flags) @@ -72,13 +74,15 @@ void ARTPConnection::addStream( int rtpSocket, int rtcpSocket, const sp<ASessionDescription> &sessionDesc, size_t index, - const sp<AMessage> ¬ify) { + const sp<AMessage> ¬ify, + bool injected) { sp<AMessage> msg = new AMessage(kWhatAddStream, id()); msg->setInt32("rtp-socket", rtpSocket); msg->setInt32("rtcp-socket", rtcpSocket); msg->setObject("session-desc", sessionDesc); msg->setSize("index", index); msg->setMessage("notify", notify); + msg->setInt32("injected", injected); msg->post(); } @@ -154,6 +158,12 @@ void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { break; } + case kWhatInjectPacket: + { + onInjectPacket(msg); + break; + } + default: { TRESPASS(); @@ -172,6 +182,11 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) { CHECK(msg->findInt32("rtcp-socket", &s)); info->mRTCPSocket = s; + int32_t injected; + CHECK(msg->findInt32("injected", &injected)); + + info->mIsInjected = injected; + sp<RefBase> obj; CHECK(msg->findObject("session-desc", &obj)); info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); @@ -182,7 +197,9 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) { info->mNumRTCPPacketsReceived = 0; memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); - postPollEvent(); + if (!injected) { + postPollEvent(); + } } void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { @@ -231,6 +248,10 @@ void ARTPConnection::onPollStreams() { int maxSocket = -1; for (List<StreamInfo>::iterator it = mStreams.begin(); it != mStreams.end(); ++it) { + if ((*it).mIsInjected) { + continue; + } + FD_SET(it->mRTPSocket, &rs); FD_SET(it->mRTCPSocket, &rs); @@ -248,6 +269,10 @@ void ARTPConnection::onPollStreams() { if (res > 0) { for (List<StreamInfo>::iterator it = mStreams.begin(); it != mStreams.end(); ++it) { + if ((*it).mIsInjected) { + continue; + } + if (FD_ISSET(it->mRTPSocket, &rs)) { receive(&*it, true); } @@ -301,6 +326,8 @@ void ARTPConnection::onPollStreams() { } status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { + CHECK(!s->mIsInjected); + sp<ABuffer> buffer = new ABuffer(65536); socklen_t remoteAddrLen = @@ -321,6 +348,8 @@ status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { buffer->setRange(0, nbytes); + // LOG(INFO) << "received " << buffer->size() << " bytes."; + status_t err; if (receiveRTP) { err = parseRTP(s, buffer); @@ -557,5 +586,42 @@ sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { return source; } +void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { + sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); + msg->setInt32("index", index); + msg->setObject("buffer", buffer); + msg->post(); +} + +void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { + int32_t index; + CHECK(msg->findInt32("index", &index)); + + sp<RefBase> obj; + CHECK(msg->findObject("buffer", &obj)); + + sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); + + List<StreamInfo>::iterator it = mStreams.begin(); + while (it != mStreams.end() + && it->mRTPSocket != index && it->mRTCPSocket != index) { + ++it; + } + + if (it == mStreams.end()) { + TRESPASS(); + } + + StreamInfo *s = &*it; + + status_t err; + if (it->mRTPSocket == index) { + err = parseRTP(s, buffer); + } else { + ++s->mNumRTCPPacketsReceived; + err = parseRTCP(s, buffer); + } +} + } // namespace android diff --git a/media/libstagefright/rtsp/ARTPConnection.h b/media/libstagefright/rtsp/ARTPConnection.h index c535199..77f81fa 100644 --- a/media/libstagefright/rtsp/ARTPConnection.h +++ b/media/libstagefright/rtsp/ARTPConnection.h @@ -38,10 +38,13 @@ struct ARTPConnection : public AHandler { void addStream( int rtpSocket, int rtcpSocket, const sp<ASessionDescription> &sessionDesc, size_t index, - const sp<AMessage> ¬ify); + const sp<AMessage> ¬ify, + bool injected); void removeStream(int rtpSocket, int rtcpSocket); + void injectPacket(int index, const sp<ABuffer> &buffer); + // Creates a pair of UDP datagram sockets bound to adjacent ports // (the rtpSocket is bound to an even port, the rtcpSocket to the // next higher port). @@ -57,6 +60,7 @@ private: kWhatAddStream, kWhatRemoveStream, kWhatPollStreams, + kWhatInjectPacket, }; static const int64_t kSelectTimeoutUs; @@ -72,6 +76,7 @@ private: void onAddStream(const sp<AMessage> &msg); void onRemoveStream(const sp<AMessage> &msg); void onPollStreams(); + void onInjectPacket(const sp<AMessage> &msg); void onSendReceiverReports(); status_t receive(StreamInfo *info, bool receiveRTP); diff --git a/media/libstagefright/rtsp/ARTPSession.cpp b/media/libstagefright/rtsp/ARTPSession.cpp index e082078..d2c56f7 100644 --- a/media/libstagefright/rtsp/ARTPSession.cpp +++ b/media/libstagefright/rtsp/ARTPSession.cpp @@ -83,7 +83,8 @@ status_t ARTPSession::setup(const sp<ASessionDescription> &desc) { sp<AMessage> notify = new AMessage(kWhatAccessUnitComplete, id()); notify->setSize("track-index", mTracks.size() - 1); - mRTPConn->addStream(rtpSocket, rtcpSocket, mDesc, i, notify); + mRTPConn->addStream( + rtpSocket, rtcpSocket, mDesc, i, notify, false /* injected */); info->mPacketSource = source; } diff --git a/media/libstagefright/rtsp/ARTPSource.cpp b/media/libstagefright/rtsp/ARTPSource.cpp index 225f6e8..775c4ee 100644 --- a/media/libstagefright/rtsp/ARTPSource.cpp +++ b/media/libstagefright/rtsp/ARTPSource.cpp @@ -20,6 +20,7 @@ #include "AAVCAssembler.h" #include "AH263Assembler.h" #include "AMPEG4AudioAssembler.h" +#include "AMPEG4ElementaryAssembler.h" #include "ASessionDescription.h" #include <media/stagefright/foundation/ABuffer.h> @@ -63,6 +64,9 @@ ARTPSource::ARTPSource( mAssembler = new AAMRAssembler(notify, false /* isWide */, params); } else if (!strncmp(desc.c_str(), "AMR-WB/", 7)) { mAssembler = new AAMRAssembler(notify, true /* isWide */, params); + } else if (!strncmp(desc.c_str(), "MP4V-ES/", 8)) { + mAssembler = new AMPEG4ElementaryAssembler(notify); + mIssueFIRRequests = true; } else { TRESPASS(); } diff --git a/media/libstagefright/rtsp/ARTPWriter.cpp b/media/libstagefright/rtsp/ARTPWriter.cpp index d6dd597..d4eed7c 100644 --- a/media/libstagefright/rtsp/ARTPWriter.cpp +++ b/media/libstagefright/rtsp/ARTPWriter.cpp @@ -134,10 +134,10 @@ status_t ARTPWriter::start(MetaData *params) { return OK; } -void ARTPWriter::stop() { +status_t ARTPWriter::stop() { Mutex::Autolock autoLock(mLock); if (!(mFlags & kFlagStarted)) { - return; + return OK; } (new AMessage(kWhatStop, mReflector->id()))->post(); @@ -145,9 +145,11 @@ void ARTPWriter::stop() { while (mFlags & kFlagStarted) { mCondition.wait(mLock); } + return OK; } -void ARTPWriter::pause() { +status_t ARTPWriter::pause() { + return OK; } static void StripStartcode(MediaBuffer *buffer) { diff --git a/media/libstagefright/rtsp/ARTPWriter.h b/media/libstagefright/rtsp/ARTPWriter.h index b1b8b45..fdc8d23 100644 --- a/media/libstagefright/rtsp/ARTPWriter.h +++ b/media/libstagefright/rtsp/ARTPWriter.h @@ -40,8 +40,8 @@ struct ARTPWriter : public MediaWriter { virtual status_t addSource(const sp<MediaSource> &source); virtual bool reachedEOS(); virtual status_t start(MetaData *params); - virtual void stop(); - virtual void pause(); + virtual status_t stop(); + virtual status_t pause(); virtual void onMessageReceived(const sp<AMessage> &msg); diff --git a/media/libstagefright/rtsp/ARTSPConnection.cpp b/media/libstagefright/rtsp/ARTSPConnection.cpp index 9826990..fd6aa62 100644 --- a/media/libstagefright/rtsp/ARTSPConnection.cpp +++ b/media/libstagefright/rtsp/ARTSPConnection.cpp @@ -19,6 +19,7 @@ #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/MediaErrors.h> #include <arpa/inet.h> #include <fcntl.h> @@ -67,6 +68,12 @@ void ARTSPConnection::sendRequest( msg->post(); } +void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id()); + msg->setMessage("reply", reply); + msg->post(); +} + void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { switch (msg->what()) { case kWhatConnect: @@ -89,6 +96,12 @@ void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { onReceiveResponse(); break; + case kWhatObserveBinaryData: + { + CHECK(msg->findMessage("reply", &mObserveBinaryMessage)); + break; + } + default: TRESPASS(); break; @@ -136,6 +149,20 @@ bool ARTSPConnection::ParseURL( return true; } +static void MakeSocketBlocking(int s, bool blocking) { + // Make socket non-blocking. + int flags = fcntl(s, F_GETFL, 0); + CHECK_NE(flags, -1); + + if (blocking) { + flags &= ~O_NONBLOCK; + } else { + flags |= O_NONBLOCK; + } + + CHECK_NE(fcntl(s, F_SETFL, flags), -1); +} + void ARTSPConnection::onConnect(const sp<AMessage> &msg) { ++mConnectionID; @@ -150,10 +177,7 @@ void ARTSPConnection::onConnect(const sp<AMessage> &msg) { mSocket = socket(AF_INET, SOCK_STREAM, 0); - // Make socket non-blocking. - int flags = fcntl(mSocket, F_GETFL, 0); - CHECK_NE(flags, -1); - CHECK_NE(fcntl(mSocket, F_SETFL, flags | O_NONBLOCK), -1); + MakeSocketBlocking(mSocket, false); AString url; CHECK(msg->findString("url", &url)); @@ -210,7 +234,7 @@ void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) { mSocket = -1; flushPendingRequests(); - } + } sp<AMessage> reply; CHECK(msg->findMessage("reply", &reply)); @@ -347,7 +371,13 @@ void ARTSPConnection::onReceiveResponse() { CHECK_GE(res, 0); if (res == 1) { - if (!receiveRTSPReponse()) { + MakeSocketBlocking(mSocket, true); + + bool success = receiveRTSPReponse(); + + MakeSocketBlocking(mSocket, false); + + if (!success) { // Something horrible, irreparable has happened. flushPendingRequests(); return; @@ -379,16 +409,13 @@ void ARTSPConnection::postReceiveReponseEvent() { mReceiveResponseEventPending = true; } -bool ARTSPConnection::receiveLine(AString *line) { - line->clear(); - - bool sawCR = false; - for (;;) { - char c; - ssize_t n = recv(mSocket, &c, 1, 0); +status_t ARTSPConnection::receive(void *data, size_t size) { + size_t offset = 0; + while (offset < size) { + ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0); if (n == 0) { // Server closed the connection. - return false; + return ERROR_IO; } else if (n < 0) { if (errno == EINTR) { continue; @@ -397,6 +424,22 @@ bool ARTSPConnection::receiveLine(AString *line) { TRESPASS(); } + offset += (size_t)n; + } + + return OK; +} + +bool ARTSPConnection::receiveLine(AString *line) { + line->clear(); + + bool sawCR = false; + for (;;) { + char c; + if (receive(&c, 1) != OK) { + return false; + } + if (sawCR && c == '\n') { line->erase(line->size() - 1, 1); return true; @@ -404,17 +447,59 @@ bool ARTSPConnection::receiveLine(AString *line) { line->append(&c, 1); + if (c == '$' && line->size() == 1) { + // Special-case for interleaved binary data. + return true; + } + sawCR = (c == '\r'); } } +sp<ABuffer> ARTSPConnection::receiveBinaryData() { + uint8_t x[3]; + if (receive(x, 3) != OK) { + return NULL; + } + + sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]); + if (receive(buffer->data(), buffer->size()) != OK) { + return NULL; + } + + buffer->meta()->setInt32("index", (int32_t)x[0]); + + return buffer; +} + bool ARTSPConnection::receiveRTSPReponse() { - sp<ARTSPResponse> response = new ARTSPResponse; + AString statusLine; - if (!receiveLine(&response->mStatusLine)) { + if (!receiveLine(&statusLine)) { return false; } + if (statusLine == "$") { + sp<ABuffer> buffer = receiveBinaryData(); + + if (buffer == NULL) { + return false; + } + + if (mObserveBinaryMessage != NULL) { + sp<AMessage> notify = mObserveBinaryMessage->dup(); + notify->setObject("buffer", buffer); + notify->post(); + } else { + LOG(WARNING) << "received binary data, but no one cares."; + } + + return true; + } + + sp<ARTSPResponse> response = new ARTSPResponse; + response->mStatusLine = statusLine; + LOG(INFO) << "status: " << response->mStatusLine; ssize_t space1 = response->mStatusLine.find(" "); diff --git a/media/libstagefright/rtsp/ARTSPConnection.h b/media/libstagefright/rtsp/ARTSPConnection.h index 3577a2f..96e0d5b 100644 --- a/media/libstagefright/rtsp/ARTSPConnection.h +++ b/media/libstagefright/rtsp/ARTSPConnection.h @@ -40,6 +40,8 @@ struct ARTSPConnection : public AHandler { void sendRequest(const char *request, const sp<AMessage> &reply); + void observeBinaryData(const sp<AMessage> &reply); + protected: virtual ~ARTSPConnection(); virtual void onMessageReceived(const sp<AMessage> &msg); @@ -57,6 +59,7 @@ private: kWhatCompleteConnection = 'comc', kWhatSendRequest = 'sreq', kWhatReceiveResponse = 'rres', + kWhatObserveBinaryData = 'obin', }; static const int64_t kSelectTimeoutUs; @@ -69,6 +72,8 @@ private: KeyedVector<int32_t, sp<AMessage> > mPendingRequests; + sp<AMessage> mObserveBinaryMessage; + void onConnect(const sp<AMessage> &msg); void onDisconnect(const sp<AMessage> &msg); void onCompleteConnection(const sp<AMessage> &msg); @@ -80,7 +85,9 @@ private: // Return false iff something went unrecoverably wrong. bool receiveRTSPReponse(); + status_t receive(void *data, size_t size); bool receiveLine(AString *line); + sp<ABuffer> receiveBinaryData(); bool notifyResponseListener(const sp<ARTSPResponse> &response); static bool ParseURL( diff --git a/media/libstagefright/rtsp/ARTSPController.cpp b/media/libstagefright/rtsp/ARTSPController.cpp index 195323e..a89946b 100644 --- a/media/libstagefright/rtsp/ARTSPController.cpp +++ b/media/libstagefright/rtsp/ARTSPController.cpp @@ -26,34 +26,70 @@ namespace android { ARTSPController::ARTSPController(const sp<ALooper> &looper) - : mLooper(looper) { + : mState(DISCONNECTED), + mLooper(looper) { + mReflector = new AHandlerReflector<ARTSPController>(this); + looper->registerHandler(mReflector); } ARTSPController::~ARTSPController() { + CHECK_EQ((int)mState, (int)DISCONNECTED); + mLooper->unregisterHandler(mReflector->id()); } status_t ARTSPController::connect(const char *url) { - if (mHandler != NULL) { + Mutex::Autolock autoLock(mLock); + + if (mState != DISCONNECTED) { return ERROR_ALREADY_CONNECTED; } + sp<AMessage> msg = new AMessage(kWhatConnectDone, mReflector->id()); + mHandler = new MyHandler(url, mLooper); - mHandler->connect(); - sleep(10); + mState = CONNECTING; + + mHandler->connect(msg); + + while (mState == CONNECTING) { + mCondition.wait(mLock); + } + + if (mState != CONNECTED) { + mHandler.clear(); + } - return OK; + return mConnectionResult; } void ARTSPController::disconnect() { - if (mHandler == NULL) { + Mutex::Autolock autoLock(mLock); + + if (mState != CONNECTED) { return; } - mHandler->disconnect(); + sp<AMessage> msg = new AMessage(kWhatDisconnectDone, mReflector->id()); + mHandler->disconnect(msg); + + while (mState == CONNECTED) { + mCondition.wait(mLock); + } + mHandler.clear(); } +void ARTSPController::seek(int64_t timeUs) { + Mutex::Autolock autoLock(mLock); + + if (mState != CONNECTED) { + return; + } + + mHandler->seek(timeUs); +} + size_t ARTSPController::countTracks() { if (mHandler == NULL) { return 0; @@ -75,4 +111,36 @@ sp<MetaData> ARTSPController::getTrackMetaData( return mHandler->getPacketSource(index)->getFormat(); } +void ARTSPController::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatConnectDone: + { + Mutex::Autolock autoLock(mLock); + + CHECK(msg->findInt32("result", &mConnectionResult)); + mState = (mConnectionResult == OK) ? CONNECTED : DISCONNECTED; + + mCondition.signal(); + break; + } + + case kWhatDisconnectDone: + { + Mutex::Autolock autoLock(mLock); + mState = DISCONNECTED; + mCondition.signal(); + break; + } + + default: + TRESPASS(); + break; + } +} + +int64_t ARTSPController::getNormalPlayTimeUs() { + CHECK(mHandler != NULL); + return mHandler->getNormalPlayTimeUs(); +} + } // namespace android diff --git a/media/libstagefright/rtsp/ASessionDescription.cpp b/media/libstagefright/rtsp/ASessionDescription.cpp index 8187e0c..4a8cce8 100644 --- a/media/libstagefright/rtsp/ASessionDescription.cpp +++ b/media/libstagefright/rtsp/ASessionDescription.cpp @@ -203,13 +203,18 @@ void ASessionDescription::getFormatType( } } -void ASessionDescription::getDimensions( +bool ASessionDescription::getDimensions( size_t index, unsigned long PT, int32_t *width, int32_t *height) const { + *width = 0; + *height = 0; + char key[20]; sprintf(key, "a=framesize:%lu", PT); AString value; - CHECK(findAttribute(index, key, &value)); + if (!findAttribute(index, key, &value)) { + return false; + } const char *s = value.c_str(); char *end; @@ -221,6 +226,8 @@ void ASessionDescription::getDimensions( *height = strtoul(s, &end, 10); CHECK_GT(end, s); CHECK_EQ(*end, '\0'); + + return true; } bool ASessionDescription::getDurationUs(int64_t *durationUs) const { diff --git a/media/libstagefright/rtsp/ASessionDescription.h b/media/libstagefright/rtsp/ASessionDescription.h index b26980f..a3fa79e 100644 --- a/media/libstagefright/rtsp/ASessionDescription.h +++ b/media/libstagefright/rtsp/ASessionDescription.h @@ -44,7 +44,7 @@ struct ASessionDescription : public RefBase { size_t index, unsigned long *PT, AString *desc, AString *params) const; - void getDimensions( + bool getDimensions( size_t index, unsigned long PT, int32_t *width, int32_t *height) const; diff --git a/media/libstagefright/rtsp/Android.mk b/media/libstagefright/rtsp/Android.mk index 7f3659f..ed16059 100644 --- a/media/libstagefright/rtsp/Android.mk +++ b/media/libstagefright/rtsp/Android.mk @@ -7,6 +7,7 @@ LOCAL_SRC_FILES:= \ AAVCAssembler.cpp \ AH263Assembler.cpp \ AMPEG4AudioAssembler.cpp \ + AMPEG4ElementaryAssembler.cpp \ APacketSource.cpp \ ARTPAssembler.cpp \ ARTPConnection.cpp \ diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h index 8be8914..8337c93 100644 --- a/media/libstagefright/rtsp/MyHandler.h +++ b/media/libstagefright/rtsp/MyHandler.h @@ -23,14 +23,46 @@ #include "ARTSPConnection.h" #include "ASessionDescription.h" +#include <ctype.h> + #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/ALooper.h> #include <media/stagefright/foundation/AMessage.h> #include <media/stagefright/MediaErrors.h> +#define USE_TCP_INTERLEAVED 0 + namespace android { +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + struct MyHandler : public AHandler { MyHandler(const char *url, const sp<ALooper> &looper) : mLooper(looper), @@ -38,25 +70,56 @@ struct MyHandler : public AHandler { mConn(new ARTSPConnection), mRTPConn(new ARTPConnection), mSessionURL(url), - mSetupTracksSuccessful(false) { + mSetupTracksSuccessful(false), + mSeekPending(false), + mFirstAccessUnit(true), + mFirstAccessUnitNTP(0), + mNumAccessUnitsReceived(0), + mCheckPending(false) { mNetLooper->start(false /* runOnCallingThread */, false /* canCallJava */, PRIORITY_HIGHEST); } - void connect() { + void connect(const sp<AMessage> &doneMsg) { + mDoneMsg = doneMsg; + mLooper->registerHandler(this); mLooper->registerHandler(mConn); (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); - sp<AMessage> reply = new AMessage('conn', id()); + sp<AMessage> notify = new AMessage('biny', id()); + mConn->observeBinaryData(notify); + + sp<AMessage> reply = new AMessage('conn', id()); mConn->connect(mSessionURL.c_str(), reply); } - void disconnect() { - sp<AMessage> reply = new AMessage('disc', id()); - mConn->disconnect(reply); + void disconnect(const sp<AMessage> &doneMsg) { + mDoneMsg = doneMsg; + + (new AMessage('abor', id()))->post(); + } + + void seek(int64_t timeUs) { + sp<AMessage> msg = new AMessage('seek', id()); + msg->setInt64("time", timeUs); + msg->post(); + } + + int64_t getNormalPlayTimeUs() { + int64_t maxTimeUs = 0; + for (size_t i = 0; i < mTracks.size(); ++i) { + int64_t timeUs = mTracks.editItemAt(i).mPacketSource + ->getNormalPlayTimeUs(); + + if (i == 0 || timeUs > maxTimeUs) { + maxTimeUs = timeUs; + } + } + + return maxTimeUs; } virtual void onMessageReceived(const sp<AMessage> &msg) { @@ -79,14 +142,14 @@ struct MyHandler : public AHandler { sp<AMessage> reply = new AMessage('desc', id()); mConn->sendRequest(request.c_str(), reply); + } else { + (new AMessage('disc', id()))->post(); } break; } case 'disc': { - LOG(INFO) << "disconnect completed"; - (new AMessage('quit', id()))->post(); break; } @@ -173,8 +236,10 @@ struct MyHandler : public AHandler { if (result != OK) { if (track) { - close(track->mRTPSocket); - close(track->mRTCPSocket); + if (!track->mUsingInterleavedTCP) { + close(track->mRTPSocket); + close(track->mRTCPSocket); + } mTracks.removeItemsAt(trackIndex); } @@ -206,7 +271,7 @@ struct MyHandler : public AHandler { mRTPConn->addStream( track->mRTPSocket, track->mRTCPSocket, mSessionDesc, index, - notify); + notify, track->mUsingInterleavedTCP); mSetupTracksSuccessful = true; } @@ -250,8 +315,14 @@ struct MyHandler : public AHandler { CHECK_EQ(response->mStatusCode, 200u); - sp<AMessage> msg = new AMessage('abor', id()); - msg->post(60000000ll); + parsePlayResponse(response); + + mDoneMsg->setInt32("result", OK); + mDoneMsg->post(); + mDoneMsg = NULL; + + sp<AMessage> timeout = new AMessage('tiou', id()); + timeout->post(10000000ll); } else { sp<AMessage> reply = new AMessage('disc', id()); mConn->disconnect(reply); @@ -301,24 +372,96 @@ struct MyHandler : public AHandler { case 'quit': { + if (mDoneMsg != NULL) { + mDoneMsg->setInt32("result", UNKNOWN_ERROR); + mDoneMsg->post(); + mDoneMsg = NULL; + } + break; + } + + case 'chek': + { + if (mNumAccessUnitsReceived == 0) { + LOG(INFO) << "stream ended? aborting."; + (new AMessage('abor', id()))->post(); + break; + } + + mNumAccessUnitsReceived = 0; + msg->post(500000); break; } case 'accu': { + ++mNumAccessUnitsReceived; + + if (!mCheckPending) { + mCheckPending = true; + sp<AMessage> check = new AMessage('chek', id()); + check->post(500000); + } + size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); + TrackInfo *track = &mTracks.editItemAt(trackIndex); + + int32_t eos; + if (msg->findInt32("eos", &eos)) { + LOG(INFO) << "received BYE on track index " << trackIndex; +#if 0 + track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); +#endif + return; + } + sp<RefBase> obj; CHECK(msg->findObject("access-unit", &obj)); sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); + uint32_t seqNum = (uint32_t)accessUnit->int32Data(); + + if (seqNum < track->mFirstSeqNumInSegment) { + LOG(INFO) << "dropping stale access-unit " + << "(" << seqNum << " < " + << track->mFirstSeqNumInSegment << ")"; + break; + } + uint64_t ntpTime; CHECK(accessUnit->meta()->findInt64( "ntp-time", (int64_t *)&ntpTime)); - accessUnit->meta()->setInt64("ntp-time", ntpTime); + uint32_t rtpTime; + CHECK(accessUnit->meta()->findInt32( + "rtp-time", (int32_t *)&rtpTime)); + + if (track->mNewSegment) { + track->mNewSegment = false; + + LOG(VERBOSE) << "first segment unit ntpTime=" + << StringPrintf("0x%016llx", ntpTime) + << " rtpTime=" << rtpTime + << " seq=" << seqNum; + } + + if (mFirstAccessUnit) { + mFirstAccessUnit = false; + mFirstAccessUnitNTP = ntpTime; + } + + if (ntpTime >= mFirstAccessUnitNTP) { + ntpTime -= mFirstAccessUnitNTP; + } else { + ntpTime = 0; + } + + int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); + + accessUnit->meta()->setInt64("timeUs", timeUs); #if 0 int32_t damaged; @@ -334,12 +477,200 @@ struct MyHandler : public AHandler { break; } + case 'seek': + { + if (mSeekPending) { + break; + } + + int64_t timeUs; + CHECK(msg->findInt64("time", &timeUs)); + + mSeekPending = true; + + AString request = "PAUSE "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('see1', id()); + reply->setInt64("time", timeUs); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'see1': + { + // Session is paused now. + for (size_t i = 0; i < mTracks.size(); ++i) { + mTracks.editItemAt(i).mPacketSource->flushQueue(); + } + + int64_t timeUs; + CHECK(msg->findInt64("time", &timeUs)); + + AString request = "PLAY "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append( + StringPrintf( + "Range: npt=%lld-\r\n", timeUs / 1000000ll)); + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('see2', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'see2': + { + CHECK(mSeekPending); + + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "PLAY completed with result " + << result << " (" << strerror(-result) << ")"; + + CHECK_EQ(result, (status_t)OK); + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + CHECK_EQ(response->mStatusCode, 200u); + + parsePlayResponse(response); + + LOG(INFO) << "seek completed."; + mSeekPending = false; + break; + } + + case 'biny': + { + sp<RefBase> obj; + CHECK(msg->findObject("buffer", &obj)); + sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); + + int32_t index; + CHECK(buffer->meta()->findInt32("index", &index)); + + mRTPConn->injectPacket(index, buffer); + break; + } + + case 'tiou': + { + if (mFirstAccessUnit) { + LOG(WARNING) << "Never received any data, disconnecting."; + (new AMessage('abor', id()))->post(); + } + break; + } + default: TRESPASS(); break; } } + static void SplitString( + const AString &s, const char *separator, List<AString> *items) { + items->clear(); + size_t start = 0; + while (start < s.size()) { + ssize_t offset = s.find(separator, start); + + if (offset < 0) { + items->push_back(AString(s, start, s.size() - start)); + break; + } + + items->push_back(AString(s, start, offset - start)); + start = offset + strlen(separator); + } + } + + void parsePlayResponse(const sp<ARTSPResponse> &response) { + ssize_t i = response->mHeaders.indexOfKey("range"); + if (i < 0) { + // Server doesn't even tell use what range it is going to + // play, therefore we won't support seeking. + return; + } + + AString range = response->mHeaders.valueAt(i); + LOG(VERBOSE) << "Range: " << range; + + AString val; + CHECK(GetAttribute(range.c_str(), "npt", &val)); + float npt1, npt2; + + if (val == "now-") { + // This is a live stream and therefore not seekable. + return; + } else { + CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2); + } + + i = response->mHeaders.indexOfKey("rtp-info"); + CHECK_GE(i, 0); + + AString rtpInfo = response->mHeaders.valueAt(i); + List<AString> streamInfos; + SplitString(rtpInfo, ",", &streamInfos); + + int n = 1; + for (List<AString>::iterator it = streamInfos.begin(); + it != streamInfos.end(); ++it) { + (*it).trim(); + LOG(VERBOSE) << "streamInfo[" << n << "] = " << *it; + + CHECK(GetAttribute((*it).c_str(), "url", &val)); + + size_t trackIndex = 0; + while (trackIndex < mTracks.size() + && !(val == mTracks.editItemAt(trackIndex).mURL)) { + ++trackIndex; + } + CHECK_LT(trackIndex, mTracks.size()); + + CHECK(GetAttribute((*it).c_str(), "seq", &val)); + + char *end; + unsigned long seq = strtoul(val.c_str(), &end, 10); + + TrackInfo *info = &mTracks.editItemAt(trackIndex); + info->mFirstSeqNumInSegment = seq; + info->mNewSegment = true; + + CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); + + uint32_t rtpTime = strtoul(val.c_str(), &end, 10); + + LOG(VERBOSE) << "track #" << n + << ": rtpTime=" << rtpTime << " <=> npt=" << npt1; + + info->mPacketSource->setNormalPlayTimeMapping( + rtpTime, (int64_t)(npt1 * 1E6)); + + ++n; + } + } + sp<APacketSource> getPacketSource(size_t index) { CHECK_GE(index, 0u); CHECK_LT(index, mTracks.size()); @@ -361,15 +692,26 @@ private: AString mBaseURL; AString mSessionID; bool mSetupTracksSuccessful; + bool mSeekPending; + bool mFirstAccessUnit; + uint64_t mFirstAccessUnitNTP; + int64_t mNumAccessUnitsReceived; + bool mCheckPending; struct TrackInfo { + AString mURL; int mRTPSocket; int mRTCPSocket; + bool mUsingInterleavedTCP; + uint32_t mFirstSeqNumInSegment; + bool mNewSegment; sp<APacketSource> mPacketSource; }; Vector<TrackInfo> mTracks; + sp<AMessage> mDoneMsg; + void setupTrack(size_t index) { sp<APacketSource> source = new APacketSource(mSessionDesc, index); @@ -392,20 +734,39 @@ private: mTracks.push(TrackInfo()); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); + info->mURL = trackURL; info->mPacketSource = source; + info->mUsingInterleavedTCP = false; + info->mFirstSeqNumInSegment = 0; + info->mNewSegment = true; - unsigned rtpPort; - ARTPConnection::MakePortPair( - &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); + LOG(VERBOSE) << "track #" << mTracks.size() << " URL=" << trackURL; AString request = "SETUP "; request.append(trackURL); request.append(" RTSP/1.0\r\n"); +#if USE_TCP_INTERLEAVED + size_t interleaveIndex = 2 * (mTracks.size() - 1); + info->mUsingInterleavedTCP = true; + info->mRTPSocket = interleaveIndex; + info->mRTCPSocket = interleaveIndex + 1; + + request.append("Transport: RTP/AVP/TCP;interleaved="); + request.append(interleaveIndex); + request.append("-"); + request.append(interleaveIndex + 1); +#else + unsigned rtpPort; + ARTPConnection::MakePortPair( + &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); + request.append("Transport: RTP/AVP/UDP;unicast;client_port="); request.append(rtpPort); request.append("-"); request.append(rtpPort + 1); +#endif + request.append("\r\n"); if (index > 1) { |