diff options
author | Anatol Pomozov <anatol.pomozov@gmail.com> | 2012-03-28 09:12:55 -0700 |
---|---|---|
committer | Anatol Pomozov <anatol.pomozov@gmail.com> | 2012-03-28 12:02:47 -0700 |
commit | b0b2b4d890cf3bfb274797a759642b4e733343d7 (patch) | |
tree | 12ad21cbad346f02d542aa4d672ffd76407d58a9 /media/libstagefright/rtsp | |
parent | 51f8eec23a2bcc2cc190373cdd1195972d9b8804 (diff) | |
parent | 5a5491c17d74bd2c80cf451c6ddbba22d5d5f08a (diff) | |
download | frameworks_av-b0b2b4d890cf3bfb274797a759642b4e733343d7.zip frameworks_av-b0b2b4d890cf3bfb274797a759642b4e733343d7.tar.gz frameworks_av-b0b2b4d890cf3bfb274797a759642b4e733343d7.tar.bz2 |
Merge media files with history from frameworks/base.git
Diffstat (limited to 'media/libstagefright/rtsp')
35 files changed, 10168 insertions, 0 deletions
diff --git a/media/libstagefright/rtsp/AAMRAssembler.cpp b/media/libstagefright/rtsp/AAMRAssembler.cpp new file mode 100644 index 0000000..fb8abc5 --- /dev/null +++ b/media/libstagefright/rtsp/AAMRAssembler.cpp @@ -0,0 +1,234 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "AAMRAssembler" +#include <utils/Log.h> + +#include "AAMRAssembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/Utils.h> + +namespace android { + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + if (len == keyLen && !strncmp(s, key, keyLen)) { + value->setTo("1"); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +AAMRAssembler::AAMRAssembler( + const sp<AMessage> ¬ify, bool isWide, const AString ¶ms) + : mIsWide(isWide), + mNotifyMsg(notify), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0) { + AString value; + CHECK(GetAttribute(params.c_str(), "octet-align", &value) && value == "1"); + CHECK(!GetAttribute(params.c_str(), "crc", &value) || value == "0"); + CHECK(!GetAttribute(params.c_str(), "interleaving", &value)); +} + +AAMRAssembler::~AAMRAssembler() { +} + +ARTPAssembler::AssemblyStatus AAMRAssembler::assembleMore( + const sp<ARTPSource> &source) { + return addPacket(source); +} + +static size_t getFrameSize(bool isWide, unsigned FT) { + static const size_t kFrameSizeNB[9] = { + 95, 103, 118, 134, 148, 159, 204, 244, 39 + }; + static const size_t kFrameSizeWB[10] = { + 132, 177, 253, 285, 317, 365, 397, 461, 477, 40 + }; + + if (FT == 15) { + return 1; + } + + size_t frameSize = isWide ? kFrameSizeWB[FT] : kFrameSizeNB[FT]; + + // Round up bits to bytes and add 1 for the header byte. + frameSize = (frameSize + 7) / 8 + 1; + + return frameSize; +} + +ARTPAssembler::AssemblyStatus AAMRAssembler::addPacket( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { + ALOGV("Not the sequence number I expected"); + + return WRONG_SEQUENCE_NUMBER; + } + + // hexdump(buffer->data(), buffer->size()); + + if (buffer->size() < 1) { + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + ALOGV("AMR packet too short."); + + return MALFORMED_PACKET; + } + + unsigned payloadHeader = buffer->data()[0]; + unsigned CMR = payloadHeader >> 4; + CHECK_EQ(payloadHeader & 0x0f, 0u); // RR + + Vector<uint8_t> tableOfContents; + + size_t offset = 1; + size_t totalSize = 0; + for (;;) { + if (offset >= buffer->size()) { + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + ALOGV("Unable to parse TOC."); + + return MALFORMED_PACKET; + } + + uint8_t toc = buffer->data()[offset++]; + + unsigned FT = (toc >> 3) & 0x0f; + if ((toc & 3) != 0 + || (mIsWide && FT > 9 && FT != 15) + || (!mIsWide && FT > 8 && FT != 15)) { + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + ALOGV("Illegal TOC entry."); + + return MALFORMED_PACKET; + } + + totalSize += getFrameSize(mIsWide, (toc >> 3) & 0x0f); + + tableOfContents.push(toc); + + if (0 == (toc & 0x80)) { + break; + } + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + CopyTimes(accessUnit, buffer); + + size_t dstOffset = 0; + for (size_t i = 0; i < tableOfContents.size(); ++i) { + uint8_t toc = tableOfContents[i]; + + size_t frameSize = getFrameSize(mIsWide, (toc >> 3) & 0x0f); + + if (offset + frameSize - 1 > buffer->size()) { + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + ALOGV("AMR packet too short."); + + return MALFORMED_PACKET; + } + + accessUnit->data()[dstOffset++] = toc; + memcpy(accessUnit->data() + dstOffset, + buffer->data() + offset, frameSize - 1); + + offset += frameSize - 1; + dstOffset += frameSize - 1; + } + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setBuffer("access-unit", accessUnit); + msg->post(); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return OK; +} + +void AAMRAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ++mNextExpectedSeqNo; +} + +void AAMRAssembler::onByeReceived() { + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setInt32("eos", true); + msg->post(); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/AAMRAssembler.h b/media/libstagefright/rtsp/AAMRAssembler.h new file mode 100644 index 0000000..d55e109 --- /dev/null +++ b/media/libstagefright/rtsp/AAMRAssembler.h @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_AMR_ASSEMBLER_H_ + +#define A_AMR_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <utils/List.h> + +#include <stdint.h> + +namespace android { + +struct AMessage; +struct AString; + +struct AAMRAssembler : public ARTPAssembler { + AAMRAssembler( + const sp<AMessage> ¬ify, bool isWide, + const AString ¶ms); + +protected: + virtual ~AAMRAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void onByeReceived(); + virtual void packetLost(); + +private: + bool mIsWide; + + sp<AMessage> mNotifyMsg; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + + AssemblyStatus addPacket(const sp<ARTPSource> &source); + + DISALLOW_EVIL_CONSTRUCTORS(AAMRAssembler); +}; + +} // namespace android + +#endif // A_AMR_ASSEMBLER_H_ + diff --git a/media/libstagefright/rtsp/AAVCAssembler.cpp b/media/libstagefright/rtsp/AAVCAssembler.cpp new file mode 100644 index 0000000..7ea132e --- /dev/null +++ b/media/libstagefright/rtsp/AAVCAssembler.cpp @@ -0,0 +1,376 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "AAVCAssembler" +#include <utils/Log.h> + +#include "AAVCAssembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> + +#include <stdint.h> + +namespace android { + +// static +AAVCAssembler::AAVCAssembler(const sp<AMessage> ¬ify) + : mNotifyMsg(notify), + mAccessUnitRTPTime(0), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0), + mAccessUnitDamaged(false) { +} + +AAVCAssembler::~AAVCAssembler() { +} + +ARTPAssembler::AssemblyStatus AAVCAssembler::addNALUnit( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { + ALOGV("Not the sequence number I expected"); + + return WRONG_SEQUENCE_NUMBER; + } + + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + if (size < 1 || (data[0] & 0x80)) { + // Corrupt. + + ALOGV("Ignoring corrupt buffer."); + queue->erase(queue->begin()); + + ++mNextExpectedSeqNo; + return MALFORMED_PACKET; + } + + unsigned nalType = data[0] & 0x1f; + if (nalType >= 1 && nalType <= 23) { + addSingleNALUnit(buffer); + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + return OK; + } else if (nalType == 28) { + // FU-A + return addFragmentedNALUnit(queue); + } else if (nalType == 24) { + // STAP-A + bool success = addSingleTimeAggregationPacket(buffer); + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return success ? OK : MALFORMED_PACKET; + } else { + ALOGV("Ignoring unsupported buffer (nalType=%d)", nalType); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return MALFORMED_PACKET; + } +} + +void AAVCAssembler::addSingleNALUnit(const sp<ABuffer> &buffer) { + ALOGV("addSingleNALUnit of size %d", buffer->size()); +#if !LOG_NDEBUG + hexdump(buffer->data(), buffer->size()); +#endif + + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + if (!mNALUnits.empty() && rtpTime != mAccessUnitRTPTime) { + submitAccessUnit(); + } + mAccessUnitRTPTime = rtpTime; + + mNALUnits.push_back(buffer); +} + +bool AAVCAssembler::addSingleTimeAggregationPacket(const sp<ABuffer> &buffer) { + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + if (size < 3) { + ALOGV("Discarding too small STAP-A packet."); + return false; + } + + ++data; + --size; + while (size >= 2) { + size_t nalSize = (data[0] << 8) | data[1]; + + if (size < nalSize + 2) { + ALOGV("Discarding malformed STAP-A packet."); + return false; + } + + sp<ABuffer> unit = new ABuffer(nalSize); + memcpy(unit->data(), &data[2], nalSize); + + CopyTimes(unit, buffer); + + addSingleNALUnit(unit); + + data += 2 + nalSize; + size -= 2 + nalSize; + } + + if (size != 0) { + ALOGV("Unexpected padding at end of STAP-A packet."); + } + + return true; +} + +ARTPAssembler::AssemblyStatus AAVCAssembler::addFragmentedNALUnit( + List<sp<ABuffer> > *queue) { + CHECK(!queue->empty()); + + sp<ABuffer> buffer = *queue->begin(); + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + CHECK(size > 0); + unsigned indicator = data[0]; + + CHECK((indicator & 0x1f) == 28); + + if (size < 2) { + ALOGV("Ignoring malformed FU buffer (size = %d)", size); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + return MALFORMED_PACKET; + } + + if (!(data[1] & 0x80)) { + // Start bit not set on the first buffer. + + ALOGV("Start bit not set on first buffer"); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + return MALFORMED_PACKET; + } + + uint32_t nalType = data[1] & 0x1f; + uint32_t nri = (data[0] >> 5) & 3; + + uint32_t expectedSeqNo = (uint32_t)buffer->int32Data() + 1; + size_t totalSize = size - 2; + size_t totalCount = 1; + bool complete = false; + + if (data[1] & 0x40) { + // Huh? End bit also set on the first buffer. + + ALOGV("Grrr. This isn't fragmented at all."); + + complete = true; + } else { + List<sp<ABuffer> >::iterator it = ++queue->begin(); + while (it != queue->end()) { + ALOGV("sequence length %d", totalCount); + + const sp<ABuffer> &buffer = *it; + + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + if ((uint32_t)buffer->int32Data() != expectedSeqNo) { + ALOGV("sequence not complete, expected seqNo %d, got %d", + expectedSeqNo, (uint32_t)buffer->int32Data()); + + return WRONG_SEQUENCE_NUMBER; + } + + if (size < 2 + || data[0] != indicator + || (data[1] & 0x1f) != nalType + || (data[1] & 0x80)) { + ALOGV("Ignoring malformed FU buffer."); + + // Delete the whole start of the FU. + + it = queue->begin(); + for (size_t i = 0; i <= totalCount; ++i) { + it = queue->erase(it); + } + + mNextExpectedSeqNo = expectedSeqNo + 1; + + return MALFORMED_PACKET; + } + + totalSize += size - 2; + ++totalCount; + + expectedSeqNo = expectedSeqNo + 1; + + if (data[1] & 0x40) { + // This is the last fragment. + complete = true; + break; + } + + ++it; + } + } + + if (!complete) { + return NOT_ENOUGH_DATA; + } + + mNextExpectedSeqNo = expectedSeqNo; + + // We found all the fragments that make up the complete NAL unit. + + // Leave room for the header. So far totalSize did not include the + // header byte. + ++totalSize; + + sp<ABuffer> unit = new ABuffer(totalSize); + CopyTimes(unit, *queue->begin()); + + unit->data()[0] = (nri << 5) | nalType; + + size_t offset = 1; + List<sp<ABuffer> >::iterator it = queue->begin(); + for (size_t i = 0; i < totalCount; ++i) { + const sp<ABuffer> &buffer = *it; + + ALOGV("piece #%d/%d", i + 1, totalCount); +#if !LOG_NDEBUG + hexdump(buffer->data(), buffer->size()); +#endif + + memcpy(unit->data() + offset, buffer->data() + 2, buffer->size() - 2); + offset += buffer->size() - 2; + + it = queue->erase(it); + } + + unit->setRange(0, totalSize); + + addSingleNALUnit(unit); + + ALOGV("successfully assembled a NAL unit from fragments."); + + return OK; +} + +void AAVCAssembler::submitAccessUnit() { + CHECK(!mNALUnits.empty()); + + ALOGV("Access unit complete (%d nal units)", mNALUnits.size()); + + size_t totalSize = 0; + for (List<sp<ABuffer> >::iterator it = mNALUnits.begin(); + it != mNALUnits.end(); ++it) { + totalSize += 4 + (*it)->size(); + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + size_t offset = 0; + for (List<sp<ABuffer> >::iterator it = mNALUnits.begin(); + it != mNALUnits.end(); ++it) { + memcpy(accessUnit->data() + offset, "\x00\x00\x00\x01", 4); + offset += 4; + + sp<ABuffer> nal = *it; + memcpy(accessUnit->data() + offset, nal->data(), nal->size()); + offset += nal->size(); + } + + CopyTimes(accessUnit, *mNALUnits.begin()); + +#if 0 + printf(mAccessUnitDamaged ? "X" : "."); + fflush(stdout); +#endif + + if (mAccessUnitDamaged) { + accessUnit->meta()->setInt32("damaged", true); + } + + mNALUnits.clear(); + mAccessUnitDamaged = false; + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setBuffer("access-unit", accessUnit); + msg->post(); +} + +ARTPAssembler::AssemblyStatus AAVCAssembler::assembleMore( + const sp<ARTPSource> &source) { + AssemblyStatus status = addNALUnit(source); + if (status == MALFORMED_PACKET) { + mAccessUnitDamaged = true; + } + return status; +} + +void AAVCAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ALOGV("packetLost (expected %d)", mNextExpectedSeqNo); + + ++mNextExpectedSeqNo; + + mAccessUnitDamaged = true; +} + +void AAVCAssembler::onByeReceived() { + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setInt32("eos", true); + msg->post(); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/AAVCAssembler.h b/media/libstagefright/rtsp/AAVCAssembler.h new file mode 100644 index 0000000..bf389ec --- /dev/null +++ b/media/libstagefright/rtsp/AAVCAssembler.h @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_AVC_ASSEMBLER_H_ + +#define A_AVC_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <utils/List.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct AMessage; + +struct AAVCAssembler : public ARTPAssembler { + AAVCAssembler(const sp<AMessage> ¬ify); + +protected: + virtual ~AAVCAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void onByeReceived(); + virtual void packetLost(); + +private: + sp<AMessage> mNotifyMsg; + + uint32_t mAccessUnitRTPTime; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + bool mAccessUnitDamaged; + List<sp<ABuffer> > mNALUnits; + + AssemblyStatus addNALUnit(const sp<ARTPSource> &source); + void addSingleNALUnit(const sp<ABuffer> &buffer); + AssemblyStatus addFragmentedNALUnit(List<sp<ABuffer> > *queue); + bool addSingleTimeAggregationPacket(const sp<ABuffer> &buffer); + + void submitAccessUnit(); + + DISALLOW_EVIL_CONSTRUCTORS(AAVCAssembler); +}; + +} // namespace android + +#endif // A_AVC_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/AH263Assembler.cpp b/media/libstagefright/rtsp/AH263Assembler.cpp new file mode 100644 index 0000000..ded70fa --- /dev/null +++ b/media/libstagefright/rtsp/AH263Assembler.cpp @@ -0,0 +1,187 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AH263Assembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/Utils.h> + +namespace android { + +AH263Assembler::AH263Assembler(const sp<AMessage> ¬ify) + : mNotifyMsg(notify), + mAccessUnitRTPTime(0), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0), + mAccessUnitDamaged(false) { +} + +AH263Assembler::~AH263Assembler() { +} + +ARTPAssembler::AssemblyStatus AH263Assembler::assembleMore( + const sp<ARTPSource> &source) { + AssemblyStatus status = addPacket(source); + if (status == MALFORMED_PACKET) { + mAccessUnitDamaged = true; + } + return status; +} + +ARTPAssembler::AssemblyStatus AH263Assembler::addPacket( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { +#if VERBOSE + LOG(VERBOSE) << "Not the sequence number I expected"; +#endif + + return WRONG_SEQUENCE_NUMBER; + } + + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) { + submitAccessUnit(); + } + mAccessUnitRTPTime = rtpTime; + + // hexdump(buffer->data(), buffer->size()); + + if (buffer->size() < 2) { + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return MALFORMED_PACKET; + } + + unsigned payloadHeader = U16_AT(buffer->data()); + CHECK_EQ(payloadHeader >> 11, 0u); // RR=0 + unsigned P = (payloadHeader >> 10) & 1; + CHECK_EQ((payloadHeader >> 9) & 1, 0u); // V=0 + CHECK_EQ((payloadHeader >> 3) & 0x3f, 0u); // PLEN=0 + CHECK_EQ(payloadHeader & 7, 0u); // PEBIT=0 + + if (P) { + buffer->data()[0] = 0x00; + buffer->data()[1] = 0x00; + } else { + buffer->setRange(buffer->offset() + 2, buffer->size() - 2); + } + + mPackets.push_back(buffer); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return OK; +} + +void AH263Assembler::submitAccessUnit() { + CHECK(!mPackets.empty()); + +#if VERBOSE + LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; +#endif + + size_t totalSize = 0; + List<sp<ABuffer> >::iterator it = mPackets.begin(); + while (it != mPackets.end()) { + const sp<ABuffer> &unit = *it; + + totalSize += unit->size(); + ++it; + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + size_t offset = 0; + it = mPackets.begin(); + while (it != mPackets.end()) { + const sp<ABuffer> &unit = *it; + + memcpy((uint8_t *)accessUnit->data() + offset, + unit->data(), unit->size()); + + offset += unit->size(); + + ++it; + } + + CopyTimes(accessUnit, *mPackets.begin()); + +#if 0 + printf(mAccessUnitDamaged ? "X" : "."); + fflush(stdout); +#endif + + if (mAccessUnitDamaged) { + accessUnit->meta()->setInt32("damaged", true); + } + + mPackets.clear(); + mAccessUnitDamaged = false; + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setBuffer("access-unit", accessUnit); + msg->post(); +} + +void AH263Assembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ++mNextExpectedSeqNo; + + mAccessUnitDamaged = true; +} + +void AH263Assembler::onByeReceived() { + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setInt32("eos", true); + msg->post(); +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/AH263Assembler.h b/media/libstagefright/rtsp/AH263Assembler.h new file mode 100644 index 0000000..2b6c625 --- /dev/null +++ b/media/libstagefright/rtsp/AH263Assembler.h @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_H263_ASSEMBLER_H_ + +#define A_H263_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <utils/List.h> + +#include <stdint.h> + +namespace android { + +struct AMessage; + +struct AH263Assembler : public ARTPAssembler { + AH263Assembler(const sp<AMessage> ¬ify); + +protected: + virtual ~AH263Assembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void onByeReceived(); + virtual void packetLost(); + +private: + sp<AMessage> mNotifyMsg; + uint32_t mAccessUnitRTPTime; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + bool mAccessUnitDamaged; + List<sp<ABuffer> > mPackets; + + AssemblyStatus addPacket(const sp<ARTPSource> &source); + void submitAccessUnit(); + + DISALLOW_EVIL_CONSTRUCTORS(AH263Assembler); +}; + +} // namespace android + +#endif // A_H263_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp new file mode 100644 index 0000000..24c2f30 --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp @@ -0,0 +1,591 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "AMPEG4AudioAssembler" + +#include "AMPEG4AudioAssembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/foundation/ABitReader.h> +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/MediaErrors.h> + +#include <ctype.h> + +namespace android { + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +static sp<ABuffer> decodeHex(const AString &s) { + if ((s.size() % 2) != 0) { + return NULL; + } + + size_t outLen = s.size() / 2; + sp<ABuffer> buffer = new ABuffer(outLen); + uint8_t *out = buffer->data(); + + uint8_t accum = 0; + for (size_t i = 0; i < s.size(); ++i) { + char c = s.c_str()[i]; + unsigned value; + if (c >= '0' && c <= '9') { + value = c - '0'; + } else if (c >= 'a' && c <= 'f') { + value = c - 'a' + 10; + } else if (c >= 'A' && c <= 'F') { + value = c - 'A' + 10; + } else { + return NULL; + } + + accum = (accum << 4) | value; + + if (i & 1) { + *out++ = accum; + + accum = 0; + } + } + + return buffer; +} + +static status_t parseAudioObjectType( + ABitReader *bits, unsigned *audioObjectType) { + *audioObjectType = bits->getBits(5); + if ((*audioObjectType) == 31) { + *audioObjectType = 32 + bits->getBits(6); + } + + return OK; +} + +static status_t parseGASpecificConfig( + ABitReader *bits, + unsigned audioObjectType, unsigned channelConfiguration) { + unsigned frameLengthFlag = bits->getBits(1); + unsigned dependsOnCoreCoder = bits->getBits(1); + if (dependsOnCoreCoder) { + /* unsigned coreCoderDelay = */bits->getBits(1); + } + unsigned extensionFlag = bits->getBits(1); + + if (!channelConfiguration) { + // program_config_element + return ERROR_UNSUPPORTED; // XXX to be implemented + } + + if (audioObjectType == 6 || audioObjectType == 20) { + /* unsigned layerNr = */bits->getBits(3); + } + + if (extensionFlag) { + if (audioObjectType == 22) { + /* unsigned numOfSubFrame = */bits->getBits(5); + /* unsigned layerLength = */bits->getBits(11); + } else if (audioObjectType == 17 || audioObjectType == 19 + || audioObjectType == 20 || audioObjectType == 23) { + /* unsigned aacSectionDataResilienceFlag = */bits->getBits(1); + /* unsigned aacScalefactorDataResilienceFlag = */bits->getBits(1); + /* unsigned aacSpectralDataResilienceFlag = */bits->getBits(1); + } + + unsigned extensionFlag3 = bits->getBits(1); + CHECK_EQ(extensionFlag3, 0u); // TBD in version 3 + } + + return OK; +} + +static status_t parseAudioSpecificConfig(ABitReader *bits, sp<ABuffer> *asc) { + const uint8_t *dataStart = bits->data(); + size_t totalNumBits = bits->numBitsLeft(); + + unsigned audioObjectType; + CHECK_EQ(parseAudioObjectType(bits, &audioObjectType), (status_t)OK); + + unsigned samplingFreqIndex = bits->getBits(4); + if (samplingFreqIndex == 0x0f) { + /* unsigned samplingFrequency = */bits->getBits(24); + } + + unsigned channelConfiguration = bits->getBits(4); + + unsigned extensionAudioObjectType = 0; + unsigned sbrPresent = 0; + + if (audioObjectType == 5) { + extensionAudioObjectType = audioObjectType; + sbrPresent = 1; + unsigned extensionSamplingFreqIndex = bits->getBits(4); + if (extensionSamplingFreqIndex == 0x0f) { + /* unsigned extensionSamplingFrequency = */bits->getBits(24); + } + CHECK_EQ(parseAudioObjectType(bits, &audioObjectType), (status_t)OK); + } + + CHECK((audioObjectType >= 1 && audioObjectType <= 4) + || (audioObjectType >= 6 && audioObjectType <= 7) + || audioObjectType == 17 + || (audioObjectType >= 19 && audioObjectType <= 23)); + + CHECK_EQ(parseGASpecificConfig( + bits, audioObjectType, channelConfiguration), (status_t)OK); + + if (audioObjectType == 17 + || (audioObjectType >= 19 && audioObjectType <= 27)) { + unsigned epConfig = bits->getBits(2); + if (epConfig == 2 || epConfig == 3) { + // ErrorProtectionSpecificConfig + return ERROR_UNSUPPORTED; // XXX to be implemented + + if (epConfig == 3) { + unsigned directMapping = bits->getBits(1); + CHECK_EQ(directMapping, 1u); + } + } + } + + if (extensionAudioObjectType != 5 && bits->numBitsLeft() >= 16) { + size_t numBitsLeftAtStart = bits->numBitsLeft(); + + unsigned syncExtensionType = bits->getBits(11); + if (syncExtensionType == 0x2b7) { + ALOGI("found syncExtension"); + + CHECK_EQ(parseAudioObjectType(bits, &extensionAudioObjectType), + (status_t)OK); + + sbrPresent = bits->getBits(1); + + if (sbrPresent == 1) { + unsigned extensionSamplingFreqIndex = bits->getBits(4); + if (extensionSamplingFreqIndex == 0x0f) { + /* unsigned extensionSamplingFrequency = */bits->getBits(24); + } + } + + size_t numBitsInExtension = + numBitsLeftAtStart - bits->numBitsLeft(); + + if (numBitsInExtension & 7) { + // Apparently an extension is always considered an even + // multiple of 8 bits long. + + ALOGI("Skipping %d bits after sync extension", + 8 - (numBitsInExtension & 7)); + + bits->skipBits(8 - (numBitsInExtension & 7)); + } + } else { + bits->putBits(syncExtensionType, 11); + } + } + + if (asc != NULL) { + size_t bitpos = totalNumBits & 7; + + ABitReader bs(dataStart, (totalNumBits + 7) / 8); + + totalNumBits -= bits->numBitsLeft(); + + size_t numBytes = (totalNumBits + 7) / 8; + + *asc = new ABuffer(numBytes); + + if (bitpos & 7) { + bs.skipBits(8 - (bitpos & 7)); + } + + uint8_t *dstPtr = (*asc)->data(); + while (numBytes > 0) { + *dstPtr++ = bs.getBits(8); + --numBytes; + } + } + + return OK; +} + +static status_t parseStreamMuxConfig( + ABitReader *bits, + unsigned *numSubFrames, + unsigned *frameLengthType, + ssize_t *fixedFrameLength, + bool *otherDataPresent, + unsigned *otherDataLenBits) { + unsigned audioMuxVersion = bits->getBits(1); + + unsigned audioMuxVersionA = 0; + if (audioMuxVersion == 1) { + audioMuxVersionA = bits->getBits(1); + } + + CHECK_EQ(audioMuxVersionA, 0u); // otherwise future spec + + if (audioMuxVersion != 0) { + return ERROR_UNSUPPORTED; // XXX to be implemented; + } + CHECK_EQ(audioMuxVersion, 0u); // XXX to be implemented + + unsigned allStreamsSameTimeFraming = bits->getBits(1); + CHECK_EQ(allStreamsSameTimeFraming, 1u); // There's only one stream. + + *numSubFrames = bits->getBits(6); + unsigned numProgram = bits->getBits(4); + CHECK_EQ(numProgram, 0u); // disabled in RTP LATM + + unsigned numLayer = bits->getBits(3); + CHECK_EQ(numLayer, 0u); // disabled in RTP LATM + + if (audioMuxVersion == 0) { + // AudioSpecificConfig + CHECK_EQ(parseAudioSpecificConfig(bits, NULL /* asc */), (status_t)OK); + } else { + TRESPASS(); // XXX to be implemented + } + + *frameLengthType = bits->getBits(3); + *fixedFrameLength = -1; + + switch (*frameLengthType) { + case 0: + { + /* unsigned bufferFullness = */bits->getBits(8); + + // The "coreFrameOffset" does not apply since there's only + // a single layer. + break; + } + + case 1: + { + *fixedFrameLength = bits->getBits(9); + break; + } + + case 2: + { + // reserved + TRESPASS(); + break; + } + + case 3: + case 4: + case 5: + { + /* unsigned CELPframeLengthTableIndex = */bits->getBits(6); + break; + } + + case 6: + case 7: + { + /* unsigned HVXCframeLengthTableIndex = */bits->getBits(1); + break; + } + + default: + break; + } + + *otherDataPresent = bits->getBits(1); + *otherDataLenBits = 0; + if (*otherDataPresent) { + if (audioMuxVersion == 1) { + TRESPASS(); // XXX to be implemented + } else { + *otherDataLenBits = 0; + + unsigned otherDataLenEsc; + do { + (*otherDataLenBits) <<= 8; + otherDataLenEsc = bits->getBits(1); + unsigned otherDataLenTmp = bits->getBits(8); + (*otherDataLenBits) += otherDataLenTmp; + } while (otherDataLenEsc); + } + } + + unsigned crcCheckPresent = bits->getBits(1); + if (crcCheckPresent) { + /* unsigned crcCheckSum = */bits->getBits(8); + } + + return OK; +} + +sp<ABuffer> AMPEG4AudioAssembler::removeLATMFraming(const sp<ABuffer> &buffer) { + CHECK(!mMuxConfigPresent); // XXX to be implemented + + sp<ABuffer> out = new ABuffer(buffer->size()); + out->setRange(0, 0); + + size_t offset = 0; + uint8_t *ptr = buffer->data(); + + for (size_t i = 0; i <= mNumSubFrames; ++i) { + // parse PayloadLengthInfo + + unsigned payloadLength = 0; + + switch (mFrameLengthType) { + case 0: + { + unsigned muxSlotLengthBytes = 0; + unsigned tmp; + do { + CHECK_LT(offset, buffer->size()); + tmp = ptr[offset++]; + muxSlotLengthBytes += tmp; + } while (tmp == 0xff); + + payloadLength = muxSlotLengthBytes; + break; + } + + case 2: + { + // reserved + + TRESPASS(); + break; + } + + default: + { + CHECK_GE(mFixedFrameLength, 0); + + payloadLength = mFixedFrameLength; + break; + } + } + + CHECK_LE(offset + payloadLength, buffer->size()); + + memcpy(out->data() + out->size(), &ptr[offset], payloadLength); + out->setRange(0, out->size() + payloadLength); + + offset += payloadLength; + + if (mOtherDataPresent) { + // We want to stay byte-aligned. + + CHECK((mOtherDataLenBits % 8) == 0); + CHECK_LE(offset + (mOtherDataLenBits / 8), buffer->size()); + offset += mOtherDataLenBits / 8; + } + } + + if (offset < buffer->size()) { + ALOGI("ignoring %d bytes of trailing data", buffer->size() - offset); + } + CHECK_LE(offset, buffer->size()); + + return out; +} + +AMPEG4AudioAssembler::AMPEG4AudioAssembler( + const sp<AMessage> ¬ify, const AString ¶ms) + : mNotifyMsg(notify), + mMuxConfigPresent(false), + mAccessUnitRTPTime(0), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0), + mAccessUnitDamaged(false) { + AString val; + if (!GetAttribute(params.c_str(), "cpresent", &val)) { + mMuxConfigPresent = true; + } else if (val == "0") { + mMuxConfigPresent = false; + } else { + CHECK(val == "1"); + mMuxConfigPresent = true; + } + + CHECK(GetAttribute(params.c_str(), "config", &val)); + + sp<ABuffer> config = decodeHex(val); + CHECK(config != NULL); + + ABitReader bits(config->data(), config->size()); + status_t err = parseStreamMuxConfig( + &bits, &mNumSubFrames, &mFrameLengthType, + &mFixedFrameLength, + &mOtherDataPresent, &mOtherDataLenBits); + + CHECK_EQ(err, (status_t)NO_ERROR); +} + +AMPEG4AudioAssembler::~AMPEG4AudioAssembler() { +} + +ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::assembleMore( + const sp<ARTPSource> &source) { + AssemblyStatus status = addPacket(source); + if (status == MALFORMED_PACKET) { + mAccessUnitDamaged = true; + } + return status; +} + +ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::addPacket( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { +#if VERBOSE + LOG(VERBOSE) << "Not the sequence number I expected"; +#endif + + return WRONG_SEQUENCE_NUMBER; + } + + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) { + submitAccessUnit(); + } + mAccessUnitRTPTime = rtpTime; + + mPackets.push_back(buffer); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return OK; +} + +void AMPEG4AudioAssembler::submitAccessUnit() { + CHECK(!mPackets.empty()); + +#if VERBOSE + LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; +#endif + + size_t totalSize = 0; + List<sp<ABuffer> >::iterator it = mPackets.begin(); + while (it != mPackets.end()) { + const sp<ABuffer> &unit = *it; + + totalSize += unit->size(); + ++it; + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + size_t offset = 0; + it = mPackets.begin(); + while (it != mPackets.end()) { + const sp<ABuffer> &unit = *it; + + memcpy((uint8_t *)accessUnit->data() + offset, + unit->data(), unit->size()); + + ++it; + } + + accessUnit = removeLATMFraming(accessUnit); + CopyTimes(accessUnit, *mPackets.begin()); + +#if 0 + printf(mAccessUnitDamaged ? "X" : "."); + fflush(stdout); +#endif + + if (mAccessUnitDamaged) { + accessUnit->meta()->setInt32("damaged", true); + } + + mPackets.clear(); + mAccessUnitDamaged = false; + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setBuffer("access-unit", accessUnit); + msg->post(); +} + +void AMPEG4AudioAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ++mNextExpectedSeqNo; + + mAccessUnitDamaged = true; +} + +void AMPEG4AudioAssembler::onByeReceived() { + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setInt32("eos", true); + msg->post(); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/AMPEG4AudioAssembler.h b/media/libstagefright/rtsp/AMPEG4AudioAssembler.h new file mode 100644 index 0000000..1361cd2 --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4AudioAssembler.h @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_MPEG4_AUDIO_ASSEMBLER_H_ + +#define A_MPEG4_AUDIO_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <utils/List.h> + +#include <stdint.h> + +namespace android { + +struct AMessage; +struct AString; + +struct AMPEG4AudioAssembler : public ARTPAssembler { + AMPEG4AudioAssembler( + const sp<AMessage> ¬ify, const AString ¶ms); + +protected: + virtual ~AMPEG4AudioAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void onByeReceived(); + virtual void packetLost(); + +private: + sp<AMessage> mNotifyMsg; + + bool mMuxConfigPresent; + unsigned mNumSubFrames; + unsigned mFrameLengthType; + ssize_t mFixedFrameLength; + bool mOtherDataPresent; + unsigned mOtherDataLenBits; + + uint32_t mAccessUnitRTPTime; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + bool mAccessUnitDamaged; + List<sp<ABuffer> > mPackets; + + AssemblyStatus addPacket(const sp<ARTPSource> &source); + void submitAccessUnit(); + + sp<ABuffer> removeLATMFraming(const sp<ABuffer> &buffer); + + DISALLOW_EVIL_CONSTRUCTORS(AMPEG4AudioAssembler); +}; + +} // namespace android + +#endif // A_MPEG4_AUDIO_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp new file mode 100644 index 0000000..687d72b --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp @@ -0,0 +1,399 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "AMPEG4ElementaryAssembler" +#include <utils/Log.h> + +#include "AMPEG4ElementaryAssembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/ABitReader.h> +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/Utils.h> + +#include <ctype.h> +#include <stdint.h> + +namespace android { + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' + && !strncasecmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +static bool GetIntegerAttribute( + const char *s, const char *key, unsigned *x) { + *x = 0; + + AString val; + if (!GetAttribute(s, key, &val)) { + return false; + } + + s = val.c_str(); + char *end; + unsigned y = strtoul(s, &end, 10); + + if (end == s || *end != '\0') { + return false; + } + + *x = y; + + return true; +} + +// static +AMPEG4ElementaryAssembler::AMPEG4ElementaryAssembler( + const sp<AMessage> ¬ify, const AString &desc, const AString ¶ms) + : mNotifyMsg(notify), + mIsGeneric(false), + mParams(params), + mSizeLength(0), + mIndexLength(0), + mIndexDeltaLength(0), + mCTSDeltaLength(0), + mDTSDeltaLength(0), + mRandomAccessIndication(false), + mStreamStateIndication(0), + mAuxiliaryDataSizeLength(0), + mHasAUHeader(false), + mAccessUnitRTPTime(0), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0), + mAccessUnitDamaged(false) { + mIsGeneric = !strncasecmp(desc.c_str(),"mpeg4-generic/", 14); + + if (mIsGeneric) { + AString value; + CHECK(GetAttribute(params.c_str(), "mode", &value)); + + if (!GetIntegerAttribute(params.c_str(), "sizeLength", &mSizeLength)) { + mSizeLength = 0; + } + + if (!GetIntegerAttribute( + params.c_str(), "indexLength", &mIndexLength)) { + mIndexLength = 0; + } + + if (!GetIntegerAttribute( + params.c_str(), "indexDeltaLength", &mIndexDeltaLength)) { + mIndexDeltaLength = 0; + } + + if (!GetIntegerAttribute( + params.c_str(), "CTSDeltaLength", &mCTSDeltaLength)) { + mCTSDeltaLength = 0; + } + + if (!GetIntegerAttribute( + params.c_str(), "DTSDeltaLength", &mDTSDeltaLength)) { + mDTSDeltaLength = 0; + } + + unsigned x; + if (!GetIntegerAttribute( + params.c_str(), "randomAccessIndication", &x)) { + mRandomAccessIndication = false; + } else { + CHECK(x == 0 || x == 1); + mRandomAccessIndication = (x != 0); + } + + if (!GetIntegerAttribute( + params.c_str(), "streamStateIndication", + &mStreamStateIndication)) { + mStreamStateIndication = 0; + } + + if (!GetIntegerAttribute( + params.c_str(), "auxiliaryDataSizeLength", + &mAuxiliaryDataSizeLength)) { + mAuxiliaryDataSizeLength = 0; + } + + mHasAUHeader = + mSizeLength > 0 + || mIndexLength > 0 + || mIndexDeltaLength > 0 + || mCTSDeltaLength > 0 + || mDTSDeltaLength > 0 + || mRandomAccessIndication + || mStreamStateIndication > 0; + } +} + +AMPEG4ElementaryAssembler::~AMPEG4ElementaryAssembler() { +} + +struct AUHeader { + unsigned mSize; + unsigned mSerial; +}; + +ARTPAssembler::AssemblyStatus AMPEG4ElementaryAssembler::addPacket( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { + ALOGV("Not the sequence number I expected"); + + return WRONG_SEQUENCE_NUMBER; + } + + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) { + submitAccessUnit(); + } + mAccessUnitRTPTime = rtpTime; + + if (!mIsGeneric) { + mPackets.push_back(buffer); + } else { + // hexdump(buffer->data(), buffer->size()); + + CHECK_GE(buffer->size(), 2u); + unsigned AU_headers_length = U16_AT(buffer->data()); // in bits + + CHECK_GE(buffer->size(), 2 + (AU_headers_length + 7) / 8); + + List<AUHeader> headers; + + ABitReader bits(buffer->data() + 2, buffer->size() - 2); + unsigned numBitsLeft = AU_headers_length; + + unsigned AU_serial = 0; + for (;;) { + if (numBitsLeft < mSizeLength) { break; } + + unsigned AU_size = bits.getBits(mSizeLength); + numBitsLeft -= mSizeLength; + + size_t n = headers.empty() ? mIndexLength : mIndexDeltaLength; + if (numBitsLeft < n) { break; } + + unsigned AU_index = bits.getBits(n); + numBitsLeft -= n; + + if (headers.empty()) { + AU_serial = AU_index; + } else { + AU_serial += 1 + AU_index; + } + + if (mCTSDeltaLength > 0) { + if (numBitsLeft < 1) { + break; + } + --numBitsLeft; + if (bits.getBits(1)) { + if (numBitsLeft < mCTSDeltaLength) { + break; + } + bits.skipBits(mCTSDeltaLength); + numBitsLeft -= mCTSDeltaLength; + } + } + + if (mDTSDeltaLength > 0) { + if (numBitsLeft < 1) { + break; + } + --numBitsLeft; + if (bits.getBits(1)) { + if (numBitsLeft < mDTSDeltaLength) { + break; + } + bits.skipBits(mDTSDeltaLength); + numBitsLeft -= mDTSDeltaLength; + } + } + + if (mRandomAccessIndication) { + if (numBitsLeft < 1) { + break; + } + bits.skipBits(1); + --numBitsLeft; + } + + if (mStreamStateIndication > 0) { + if (numBitsLeft < mStreamStateIndication) { + break; + } + bits.skipBits(mStreamStateIndication); + } + + AUHeader header; + header.mSize = AU_size; + header.mSerial = AU_serial; + headers.push_back(header); + } + + size_t offset = 2 + (AU_headers_length + 7) / 8; + + if (mAuxiliaryDataSizeLength > 0) { + ABitReader bits(buffer->data() + offset, buffer->size() - offset); + + unsigned auxSize = bits.getBits(mAuxiliaryDataSizeLength); + + offset += (mAuxiliaryDataSizeLength + auxSize + 7) / 8; + } + + for (List<AUHeader>::iterator it = headers.begin(); + it != headers.end(); ++it) { + const AUHeader &header = *it; + + CHECK_LE(offset + header.mSize, buffer->size()); + + sp<ABuffer> accessUnit = new ABuffer(header.mSize); + memcpy(accessUnit->data(), buffer->data() + offset, header.mSize); + + offset += header.mSize; + + CopyTimes(accessUnit, buffer); + mPackets.push_back(accessUnit); + } + + CHECK_EQ(offset, buffer->size()); + } + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return OK; +} + +void AMPEG4ElementaryAssembler::submitAccessUnit() { + CHECK(!mPackets.empty()); + + ALOGV("Access unit complete (%d nal units)", mPackets.size()); + + size_t totalSize = 0; + for (List<sp<ABuffer> >::iterator it = mPackets.begin(); + it != mPackets.end(); ++it) { + totalSize += (*it)->size(); + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + size_t offset = 0; + for (List<sp<ABuffer> >::iterator it = mPackets.begin(); + it != mPackets.end(); ++it) { + sp<ABuffer> nal = *it; + memcpy(accessUnit->data() + offset, nal->data(), nal->size()); + offset += nal->size(); + } + + CopyTimes(accessUnit, *mPackets.begin()); + +#if 0 + printf(mAccessUnitDamaged ? "X" : "."); + fflush(stdout); +#endif + + if (mAccessUnitDamaged) { + accessUnit->meta()->setInt32("damaged", true); + } + + mPackets.clear(); + mAccessUnitDamaged = false; + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setBuffer("access-unit", accessUnit); + msg->post(); +} + +ARTPAssembler::AssemblyStatus AMPEG4ElementaryAssembler::assembleMore( + const sp<ARTPSource> &source) { + AssemblyStatus status = addPacket(source); + if (status == MALFORMED_PACKET) { + mAccessUnitDamaged = true; + } + return status; +} + +void AMPEG4ElementaryAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ALOGV("packetLost (expected %d)", mNextExpectedSeqNo); + + ++mNextExpectedSeqNo; + + mAccessUnitDamaged = true; +} + +void AMPEG4ElementaryAssembler::onByeReceived() { + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setInt32("eos", true); + msg->post(); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.h b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.h new file mode 100644 index 0000000..794bbcc --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.h @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_MPEG4_ELEM_ASSEMBLER_H_ + +#define A_MPEG4_ELEM_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <media/stagefright/foundation/AString.h> + +#include <utils/List.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct AMessage; + +struct AMPEG4ElementaryAssembler : public ARTPAssembler { + AMPEG4ElementaryAssembler( + const sp<AMessage> ¬ify, const AString &desc, + const AString ¶ms); + +protected: + virtual ~AMPEG4ElementaryAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void onByeReceived(); + virtual void packetLost(); + +private: + sp<AMessage> mNotifyMsg; + bool mIsGeneric; + AString mParams; + + unsigned mSizeLength; + unsigned mIndexLength; + unsigned mIndexDeltaLength; + unsigned mCTSDeltaLength; + unsigned mDTSDeltaLength; + bool mRandomAccessIndication; + unsigned mStreamStateIndication; + unsigned mAuxiliaryDataSizeLength; + bool mHasAUHeader; + + uint32_t mAccessUnitRTPTime; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + bool mAccessUnitDamaged; + List<sp<ABuffer> > mPackets; + + AssemblyStatus addPacket(const sp<ARTPSource> &source); + void submitAccessUnit(); + + DISALLOW_EVIL_CONSTRUCTORS(AMPEG4ElementaryAssembler); +}; + +} // namespace android + +#endif // A_MPEG4_ELEM_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/APacketSource.cpp b/media/libstagefright/rtsp/APacketSource.cpp new file mode 100644 index 0000000..6cf1301 --- /dev/null +++ b/media/libstagefright/rtsp/APacketSource.cpp @@ -0,0 +1,579 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "APacketSource" +#include <utils/Log.h> + +#include "APacketSource.h" + +#include "ARawAudioAssembler.h" +#include "ASessionDescription.h" + +#include "avc_utils.h" + +#include <ctype.h> + +#include <media/stagefright/foundation/ABitReader.h> +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/AString.h> +#include <media/stagefright/foundation/base64.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/MediaDefs.h> +#include <media/stagefright/MediaErrors.h> +#include <media/stagefright/MetaData.h> +#include <utils/Vector.h> + +namespace android { + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +static sp<ABuffer> decodeHex(const AString &s) { + if ((s.size() % 2) != 0) { + return NULL; + } + + size_t outLen = s.size() / 2; + sp<ABuffer> buffer = new ABuffer(outLen); + uint8_t *out = buffer->data(); + + uint8_t accum = 0; + for (size_t i = 0; i < s.size(); ++i) { + char c = s.c_str()[i]; + unsigned value; + if (c >= '0' && c <= '9') { + value = c - '0'; + } else if (c >= 'a' && c <= 'f') { + value = c - 'a' + 10; + } else if (c >= 'A' && c <= 'F') { + value = c - 'A' + 10; + } else { + return NULL; + } + + accum = (accum << 4) | value; + + if (i & 1) { + *out++ = accum; + + accum = 0; + } + } + + return buffer; +} + +static sp<ABuffer> MakeAVCCodecSpecificData( + const char *params, int32_t *width, int32_t *height) { + *width = 0; + *height = 0; + + AString val; + if (!GetAttribute(params, "profile-level-id", &val)) { + return NULL; + } + + sp<ABuffer> profileLevelID = decodeHex(val); + CHECK(profileLevelID != NULL); + CHECK_EQ(profileLevelID->size(), 3u); + + Vector<sp<ABuffer> > paramSets; + + size_t numSeqParameterSets = 0; + size_t totalSeqParameterSetSize = 0; + size_t numPicParameterSets = 0; + size_t totalPicParameterSetSize = 0; + + if (!GetAttribute(params, "sprop-parameter-sets", &val)) { + return NULL; + } + + size_t start = 0; + for (;;) { + ssize_t commaPos = val.find(",", start); + size_t end = (commaPos < 0) ? val.size() : commaPos; + + AString nalString(val, start, end - start); + sp<ABuffer> nal = decodeBase64(nalString); + CHECK(nal != NULL); + CHECK_GT(nal->size(), 0u); + CHECK_LE(nal->size(), 65535u); + + uint8_t nalType = nal->data()[0] & 0x1f; + if (numSeqParameterSets == 0) { + CHECK_EQ((unsigned)nalType, 7u); + } else if (numPicParameterSets > 0) { + CHECK_EQ((unsigned)nalType, 8u); + } + if (nalType == 7) { + ++numSeqParameterSets; + totalSeqParameterSetSize += nal->size(); + } else { + CHECK_EQ((unsigned)nalType, 8u); + ++numPicParameterSets; + totalPicParameterSetSize += nal->size(); + } + + paramSets.push(nal); + + if (commaPos < 0) { + break; + } + + start = commaPos + 1; + } + + CHECK_LT(numSeqParameterSets, 32u); + CHECK_LE(numPicParameterSets, 255u); + + size_t csdSize = + 1 + 3 + 1 + 1 + + 2 * numSeqParameterSets + totalSeqParameterSetSize + + 1 + 2 * numPicParameterSets + totalPicParameterSetSize; + + sp<ABuffer> csd = new ABuffer(csdSize); + uint8_t *out = csd->data(); + + *out++ = 0x01; // configurationVersion + memcpy(out, profileLevelID->data(), 3); + out += 3; + *out++ = (0x3f << 2) | 1; // lengthSize == 2 bytes + *out++ = 0xe0 | numSeqParameterSets; + + for (size_t i = 0; i < numSeqParameterSets; ++i) { + sp<ABuffer> nal = paramSets.editItemAt(i); + + *out++ = nal->size() >> 8; + *out++ = nal->size() & 0xff; + + memcpy(out, nal->data(), nal->size()); + + out += nal->size(); + + if (i == 0) { + FindAVCDimensions(nal, width, height); + ALOGI("dimensions %dx%d", *width, *height); + } + } + + *out++ = numPicParameterSets; + + for (size_t i = 0; i < numPicParameterSets; ++i) { + sp<ABuffer> nal = paramSets.editItemAt(i + numSeqParameterSets); + + *out++ = nal->size() >> 8; + *out++ = nal->size() & 0xff; + + memcpy(out, nal->data(), nal->size()); + + out += nal->size(); + } + + // hexdump(csd->data(), csd->size()); + + return csd; +} + +sp<ABuffer> MakeAACCodecSpecificData(const char *params) { + AString val; + CHECK(GetAttribute(params, "config", &val)); + + sp<ABuffer> config = decodeHex(val); + CHECK(config != NULL); + CHECK_GE(config->size(), 4u); + + const uint8_t *data = config->data(); + uint32_t x = data[0] << 24 | data[1] << 16 | data[2] << 8 | data[3]; + x = (x >> 1) & 0xffff; + + static const uint8_t kStaticESDS[] = { + 0x03, 22, + 0x00, 0x00, // ES_ID + 0x00, // streamDependenceFlag, URL_Flag, OCRstreamFlag + + 0x04, 17, + 0x40, // Audio ISO/IEC 14496-3 + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + + 0x05, 2, + // AudioSpecificInfo follows + }; + + sp<ABuffer> csd = new ABuffer(sizeof(kStaticESDS) + 2); + memcpy(csd->data(), kStaticESDS, sizeof(kStaticESDS)); + csd->data()[sizeof(kStaticESDS)] = (x >> 8) & 0xff; + csd->data()[sizeof(kStaticESDS) + 1] = x & 0xff; + + // hexdump(csd->data(), csd->size()); + + return csd; +} + +// From mpeg4-generic configuration data. +sp<ABuffer> MakeAACCodecSpecificData2(const char *params) { + AString val; + unsigned long objectType; + if (GetAttribute(params, "objectType", &val)) { + const char *s = val.c_str(); + char *end; + objectType = strtoul(s, &end, 10); + CHECK(end > s && *end == '\0'); + } else { + objectType = 0x40; // Audio ISO/IEC 14496-3 + } + + CHECK(GetAttribute(params, "config", &val)); + + sp<ABuffer> config = decodeHex(val); + CHECK(config != NULL); + + // Make sure size fits into a single byte and doesn't have to + // be encoded. + CHECK_LT(20 + config->size(), 128u); + + const uint8_t *data = config->data(); + + static const uint8_t kStaticESDS[] = { + 0x03, 22, + 0x00, 0x00, // ES_ID + 0x00, // streamDependenceFlag, URL_Flag, OCRstreamFlag + + 0x04, 17, + 0x40, // Audio ISO/IEC 14496-3 + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + + 0x05, 2, + // AudioSpecificInfo follows + }; + + sp<ABuffer> csd = new ABuffer(sizeof(kStaticESDS) + config->size()); + uint8_t *dst = csd->data(); + *dst++ = 0x03; + *dst++ = 20 + config->size(); + *dst++ = 0x00; // ES_ID + *dst++ = 0x00; + *dst++ = 0x00; // streamDependenceFlag, URL_Flag, OCRstreamFlag + *dst++ = 0x04; + *dst++ = 15 + config->size(); + *dst++ = objectType; + for (int i = 0; i < 12; ++i) { *dst++ = 0x00; } + *dst++ = 0x05; + *dst++ = config->size(); + memcpy(dst, config->data(), config->size()); + + // hexdump(csd->data(), csd->size()); + + return csd; +} + +static size_t GetSizeWidth(size_t x) { + size_t n = 1; + while (x > 127) { + ++n; + x >>= 7; + } + return n; +} + +static uint8_t *EncodeSize(uint8_t *dst, size_t x) { + while (x > 127) { + *dst++ = (x & 0x7f) | 0x80; + x >>= 7; + } + *dst++ = x; + return dst; +} + +static bool ExtractDimensionsMPEG4Config( + const sp<ABuffer> &config, int32_t *width, int32_t *height) { + *width = 0; + *height = 0; + + const uint8_t *ptr = config->data(); + size_t offset = 0; + bool foundVOL = false; + while (offset + 3 < config->size()) { + if (memcmp("\x00\x00\x01", &ptr[offset], 3) + || (ptr[offset + 3] & 0xf0) != 0x20) { + ++offset; + continue; + } + + foundVOL = true; + break; + } + + if (!foundVOL) { + return false; + } + + return ExtractDimensionsFromVOLHeader( + &ptr[offset], config->size() - offset, width, height); +} + +static sp<ABuffer> MakeMPEG4VideoCodecSpecificData( + const char *params, int32_t *width, int32_t *height) { + *width = 0; + *height = 0; + + AString val; + CHECK(GetAttribute(params, "config", &val)); + + sp<ABuffer> config = decodeHex(val); + CHECK(config != NULL); + + if (!ExtractDimensionsMPEG4Config(config, width, height)) { + return NULL; + } + + ALOGI("VOL dimensions = %dx%d", *width, *height); + + size_t len1 = config->size() + GetSizeWidth(config->size()) + 1; + size_t len2 = len1 + GetSizeWidth(len1) + 1 + 13; + size_t len3 = len2 + GetSizeWidth(len2) + 1 + 3; + + sp<ABuffer> csd = new ABuffer(len3); + uint8_t *dst = csd->data(); + *dst++ = 0x03; + dst = EncodeSize(dst, len2 + 3); + *dst++ = 0x00; // ES_ID + *dst++ = 0x00; + *dst++ = 0x00; // streamDependenceFlag, URL_Flag, OCRstreamFlag + + *dst++ = 0x04; + dst = EncodeSize(dst, len1 + 13); + *dst++ = 0x01; // Video ISO/IEC 14496-2 Simple Profile + for (size_t i = 0; i < 12; ++i) { + *dst++ = 0x00; + } + + *dst++ = 0x05; + dst = EncodeSize(dst, config->size()); + memcpy(dst, config->data(), config->size()); + dst += config->size(); + + // hexdump(csd->data(), csd->size()); + + return csd; +} + +APacketSource::APacketSource( + const sp<ASessionDescription> &sessionDesc, size_t index) + : mInitCheck(NO_INIT), + mFormat(new MetaData) { + unsigned long PT; + AString desc; + AString params; + sessionDesc->getFormatType(index, &PT, &desc, ¶ms); + + int64_t durationUs; + if (sessionDesc->getDurationUs(&durationUs)) { + mFormat->setInt64(kKeyDuration, durationUs); + } else { + mFormat->setInt64(kKeyDuration, 60 * 60 * 1000000ll); + } + + mInitCheck = OK; + if (!strncmp(desc.c_str(), "H264/", 5)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_AVC); + + int32_t width, height; + if (!sessionDesc->getDimensions(index, PT, &width, &height)) { + width = -1; + height = -1; + } + + int32_t encWidth, encHeight; + sp<ABuffer> codecSpecificData = + MakeAVCCodecSpecificData(params.c_str(), &encWidth, &encHeight); + + if (codecSpecificData != NULL) { + if (width < 0) { + // If no explicit width/height given in the sdp, use the dimensions + // extracted from the first sequence parameter set. + width = encWidth; + height = encHeight; + } + + mFormat->setData( + kKeyAVCC, 0, + codecSpecificData->data(), codecSpecificData->size()); + } else if (width < 0) { + mInitCheck = ERROR_UNSUPPORTED; + return; + } + + mFormat->setInt32(kKeyWidth, width); + mFormat->setInt32(kKeyHeight, height); + } else if (!strncmp(desc.c_str(), "H263-2000/", 10) + || !strncmp(desc.c_str(), "H263-1998/", 10)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_H263); + + int32_t width, height; + if (!sessionDesc->getDimensions(index, PT, &width, &height)) { + mInitCheck = ERROR_UNSUPPORTED; + return; + } + + mFormat->setInt32(kKeyWidth, width); + mFormat->setInt32(kKeyHeight, height); + } else if (!strncmp(desc.c_str(), "MP4A-LATM/", 10)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_AAC); + + int32_t sampleRate, numChannels; + ASessionDescription::ParseFormatDesc( + desc.c_str(), &sampleRate, &numChannels); + + mFormat->setInt32(kKeySampleRate, sampleRate); + mFormat->setInt32(kKeyChannelCount, numChannels); + + sp<ABuffer> codecSpecificData = + MakeAACCodecSpecificData(params.c_str()); + + mFormat->setData( + kKeyESDS, 0, + codecSpecificData->data(), codecSpecificData->size()); + } else if (!strncmp(desc.c_str(), "AMR/", 4)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_AMR_NB); + + int32_t sampleRate, numChannels; + ASessionDescription::ParseFormatDesc( + desc.c_str(), &sampleRate, &numChannels); + + mFormat->setInt32(kKeySampleRate, sampleRate); + mFormat->setInt32(kKeyChannelCount, numChannels); + + if (sampleRate != 8000 || numChannels != 1) { + mInitCheck = ERROR_UNSUPPORTED; + } + } else if (!strncmp(desc.c_str(), "AMR-WB/", 7)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_AMR_WB); + + int32_t sampleRate, numChannels; + ASessionDescription::ParseFormatDesc( + desc.c_str(), &sampleRate, &numChannels); + + mFormat->setInt32(kKeySampleRate, sampleRate); + mFormat->setInt32(kKeyChannelCount, numChannels); + + if (sampleRate != 16000 || numChannels != 1) { + mInitCheck = ERROR_UNSUPPORTED; + } + } else if (!strncmp(desc.c_str(), "MP4V-ES/", 8)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_MPEG4); + + int32_t width, height; + if (!sessionDesc->getDimensions(index, PT, &width, &height)) { + width = -1; + height = -1; + } + + int32_t encWidth, encHeight; + sp<ABuffer> codecSpecificData = + MakeMPEG4VideoCodecSpecificData( + params.c_str(), &encWidth, &encHeight); + + if (codecSpecificData != NULL) { + mFormat->setData( + kKeyESDS, 0, + codecSpecificData->data(), codecSpecificData->size()); + + if (width < 0) { + width = encWidth; + height = encHeight; + } + } else if (width < 0) { + mInitCheck = ERROR_UNSUPPORTED; + return; + } + + mFormat->setInt32(kKeyWidth, width); + mFormat->setInt32(kKeyHeight, height); + } else if (!strncasecmp(desc.c_str(), "mpeg4-generic/", 14)) { + AString val; + if (!GetAttribute(params.c_str(), "mode", &val) + || (strcasecmp(val.c_str(), "AAC-lbr") + && strcasecmp(val.c_str(), "AAC-hbr"))) { + mInitCheck = ERROR_UNSUPPORTED; + return; + } + + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_AAC); + + int32_t sampleRate, numChannels; + ASessionDescription::ParseFormatDesc( + desc.c_str(), &sampleRate, &numChannels); + + mFormat->setInt32(kKeySampleRate, sampleRate); + mFormat->setInt32(kKeyChannelCount, numChannels); + + sp<ABuffer> codecSpecificData = + MakeAACCodecSpecificData2(params.c_str()); + + mFormat->setData( + kKeyESDS, 0, + codecSpecificData->data(), codecSpecificData->size()); + } else if (ARawAudioAssembler::Supports(desc.c_str())) { + ARawAudioAssembler::MakeFormat(desc.c_str(), mFormat); + } else { + mInitCheck = ERROR_UNSUPPORTED; + } +} + +APacketSource::~APacketSource() { +} + +status_t APacketSource::initCheck() const { + return mInitCheck; +} + +sp<MetaData> APacketSource::getFormat() { + return mFormat; +} + +} // namespace android diff --git a/media/libstagefright/rtsp/APacketSource.h b/media/libstagefright/rtsp/APacketSource.h new file mode 100644 index 0000000..530e537 --- /dev/null +++ b/media/libstagefright/rtsp/APacketSource.h @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_PACKET_SOURCE_H_ + +#define A_PACKET_SOURCE_H_ + +#include <media/stagefright/foundation/ABase.h> +#include <media/stagefright/MetaData.h> +#include <utils/RefBase.h> + +namespace android { + +struct ASessionDescription; + +struct APacketSource : public RefBase { + APacketSource(const sp<ASessionDescription> &sessionDesc, size_t index); + + status_t initCheck() const; + + virtual sp<MetaData> getFormat(); + +protected: + virtual ~APacketSource(); + +private: + status_t mInitCheck; + + sp<MetaData> mFormat; + + DISALLOW_EVIL_CONSTRUCTORS(APacketSource); +}; + + +} // namespace android + +#endif // A_PACKET_SOURCE_H_ diff --git a/media/libstagefright/rtsp/ARTPAssembler.cpp b/media/libstagefright/rtsp/ARTPAssembler.cpp new file mode 100644 index 0000000..a897c10 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPAssembler.cpp @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ARTPAssembler.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> + +#include <stdint.h> + +namespace android { + +static int64_t getNowUs() { + struct timeval tv; + gettimeofday(&tv, NULL); + + return (int64_t)tv.tv_usec + tv.tv_sec * 1000000ll; +} + +ARTPAssembler::ARTPAssembler() + : mFirstFailureTimeUs(-1) { +} + +void ARTPAssembler::onPacketReceived(const sp<ARTPSource> &source) { + AssemblyStatus status; + for (;;) { + status = assembleMore(source); + + if (status == WRONG_SEQUENCE_NUMBER) { + if (mFirstFailureTimeUs >= 0) { + if (getNowUs() - mFirstFailureTimeUs > 10000ll) { + mFirstFailureTimeUs = -1; + + // LOG(VERBOSE) << "waited too long for packet."; + packetLost(); + continue; + } + } else { + mFirstFailureTimeUs = getNowUs(); + } + break; + } else { + mFirstFailureTimeUs = -1; + + if (status == NOT_ENOUGH_DATA) { + break; + } + } + } +} + +// static +void ARTPAssembler::CopyTimes(const sp<ABuffer> &to, const sp<ABuffer> &from) { + uint32_t rtpTime; + CHECK(from->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + to->meta()->setInt32("rtp-time", rtpTime); + + // Copy the seq number. + to->setInt32Data(from->int32Data()); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/ARTPAssembler.h b/media/libstagefright/rtsp/ARTPAssembler.h new file mode 100644 index 0000000..70ea186 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPAssembler.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_ASSEMBLER_H_ + +#define A_RTP_ASSEMBLER_H_ + +#include <media/stagefright/foundation/ABase.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct ARTPSource; + +struct ARTPAssembler : public RefBase { + enum AssemblyStatus { + MALFORMED_PACKET, + WRONG_SEQUENCE_NUMBER, + NOT_ENOUGH_DATA, + OK + }; + + ARTPAssembler(); + + void onPacketReceived(const sp<ARTPSource> &source); + virtual void onByeReceived() = 0; + +protected: + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source) = 0; + virtual void packetLost() = 0; + + static void CopyTimes(const sp<ABuffer> &to, const sp<ABuffer> &from); + +private: + int64_t mFirstFailureTimeUs; + + DISALLOW_EVIL_CONSTRUCTORS(ARTPAssembler); +}; + +} // namespace android + +#endif // A_RTP_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/ARTPConnection.cpp b/media/libstagefright/rtsp/ARTPConnection.cpp new file mode 100644 index 0000000..44988a3 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPConnection.cpp @@ -0,0 +1,674 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "ARTPConnection" +#include <utils/Log.h> + +#include "ARTPConnection.h" + +#include "ARTPSource.h" +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/AString.h> +#include <media/stagefright/foundation/hexdump.h> + +#include <arpa/inet.h> +#include <sys/socket.h> + +namespace android { + +static const size_t kMaxUDPSize = 1500; + +static uint16_t u16at(const uint8_t *data) { + return data[0] << 8 | data[1]; +} + +static uint32_t u32at(const uint8_t *data) { + return u16at(data) << 16 | u16at(&data[2]); +} + +static uint64_t u64at(const uint8_t *data) { + return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); +} + +// static +const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; + +struct ARTPConnection::StreamInfo { + int mRTPSocket; + int mRTCPSocket; + sp<ASessionDescription> mSessionDesc; + size_t mIndex; + sp<AMessage> mNotifyMsg; + KeyedVector<uint32_t, sp<ARTPSource> > mSources; + + int64_t mNumRTCPPacketsReceived; + int64_t mNumRTPPacketsReceived; + struct sockaddr_in mRemoteRTCPAddr; + + bool mIsInjected; +}; + +ARTPConnection::ARTPConnection(uint32_t flags) + : mFlags(flags), + mPollEventPending(false), + mLastReceiverReportTimeUs(-1) { +} + +ARTPConnection::~ARTPConnection() { +} + +void ARTPConnection::addStream( + int rtpSocket, int rtcpSocket, + const sp<ASessionDescription> &sessionDesc, + size_t index, + const sp<AMessage> ¬ify, + bool injected) { + sp<AMessage> msg = new AMessage(kWhatAddStream, id()); + msg->setInt32("rtp-socket", rtpSocket); + msg->setInt32("rtcp-socket", rtcpSocket); + msg->setObject("session-desc", sessionDesc); + msg->setSize("index", index); + msg->setMessage("notify", notify); + msg->setInt32("injected", injected); + msg->post(); +} + +void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { + sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); + msg->setInt32("rtp-socket", rtpSocket); + msg->setInt32("rtcp-socket", rtcpSocket); + msg->post(); +} + +static void bumpSocketBufferSize(int s) { + int size = 256 * 1024; + CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); +} + +// static +void ARTPConnection::MakePortPair( + int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { + *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); + CHECK_GE(*rtpSocket, 0); + + bumpSocketBufferSize(*rtpSocket); + + *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); + CHECK_GE(*rtcpSocket, 0); + + bumpSocketBufferSize(*rtcpSocket); + + unsigned start = (rand() * 1000)/ RAND_MAX + 15550; + start &= ~1; + + for (unsigned port = start; port < 65536; port += 2) { + struct sockaddr_in addr; + memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(port); + + if (bind(*rtpSocket, + (const struct sockaddr *)&addr, sizeof(addr)) < 0) { + continue; + } + + addr.sin_port = htons(port + 1); + + if (bind(*rtcpSocket, + (const struct sockaddr *)&addr, sizeof(addr)) == 0) { + *rtpPort = port; + return; + } + } + + TRESPASS(); +} + +void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatAddStream: + { + onAddStream(msg); + break; + } + + case kWhatRemoveStream: + { + onRemoveStream(msg); + break; + } + + case kWhatPollStreams: + { + onPollStreams(); + break; + } + + case kWhatInjectPacket: + { + onInjectPacket(msg); + break; + } + + default: + { + TRESPASS(); + break; + } + } +} + +void ARTPConnection::onAddStream(const sp<AMessage> &msg) { + mStreams.push_back(StreamInfo()); + StreamInfo *info = &*--mStreams.end(); + + int32_t s; + CHECK(msg->findInt32("rtp-socket", &s)); + info->mRTPSocket = s; + CHECK(msg->findInt32("rtcp-socket", &s)); + info->mRTCPSocket = s; + + int32_t injected; + CHECK(msg->findInt32("injected", &injected)); + + info->mIsInjected = injected; + + sp<RefBase> obj; + CHECK(msg->findObject("session-desc", &obj)); + info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); + + CHECK(msg->findSize("index", &info->mIndex)); + CHECK(msg->findMessage("notify", &info->mNotifyMsg)); + + info->mNumRTCPPacketsReceived = 0; + info->mNumRTPPacketsReceived = 0; + memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); + + if (!injected) { + postPollEvent(); + } +} + +void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { + int32_t rtpSocket, rtcpSocket; + CHECK(msg->findInt32("rtp-socket", &rtpSocket)); + CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); + + List<StreamInfo>::iterator it = mStreams.begin(); + while (it != mStreams.end() + && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { + ++it; + } + + if (it == mStreams.end()) { + return; + } + + mStreams.erase(it); +} + +void ARTPConnection::postPollEvent() { + if (mPollEventPending) { + return; + } + + sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); + msg->post(); + + mPollEventPending = true; +} + +void ARTPConnection::onPollStreams() { + mPollEventPending = false; + + if (mStreams.empty()) { + return; + } + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = kSelectTimeoutUs; + + fd_set rs; + FD_ZERO(&rs); + + int maxSocket = -1; + for (List<StreamInfo>::iterator it = mStreams.begin(); + it != mStreams.end(); ++it) { + if ((*it).mIsInjected) { + continue; + } + + FD_SET(it->mRTPSocket, &rs); + FD_SET(it->mRTCPSocket, &rs); + + if (it->mRTPSocket > maxSocket) { + maxSocket = it->mRTPSocket; + } + if (it->mRTCPSocket > maxSocket) { + maxSocket = it->mRTCPSocket; + } + } + + if (maxSocket == -1) { + return; + } + + int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); + + if (res > 0) { + List<StreamInfo>::iterator it = mStreams.begin(); + while (it != mStreams.end()) { + if ((*it).mIsInjected) { + ++it; + continue; + } + + status_t err = OK; + if (FD_ISSET(it->mRTPSocket, &rs)) { + err = receive(&*it, true); + } + if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) { + err = receive(&*it, false); + } + + if (err == -ECONNRESET) { + // socket failure, this stream is dead, Jim. + + ALOGW("failed to receive RTP/RTCP datagram."); + it = mStreams.erase(it); + continue; + } + + ++it; + } + } + + int64_t nowUs = ALooper::GetNowUs(); + if (mLastReceiverReportTimeUs <= 0 + || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { + sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); + List<StreamInfo>::iterator it = mStreams.begin(); + while (it != mStreams.end()) { + StreamInfo *s = &*it; + + if (s->mIsInjected) { + ++it; + continue; + } + + if (s->mNumRTCPPacketsReceived == 0) { + // We have never received any RTCP packets on this stream, + // we don't even know where to send a report. + ++it; + continue; + } + + buffer->setRange(0, 0); + + for (size_t i = 0; i < s->mSources.size(); ++i) { + sp<ARTPSource> source = s->mSources.valueAt(i); + + source->addReceiverReport(buffer); + + if (mFlags & kRegularlyRequestFIR) { + source->addFIR(buffer); + } + } + + if (buffer->size() > 0) { + ALOGV("Sending RR..."); + + ssize_t n; + do { + n = sendto( + s->mRTCPSocket, buffer->data(), buffer->size(), 0, + (const struct sockaddr *)&s->mRemoteRTCPAddr, + sizeof(s->mRemoteRTCPAddr)); + } while (n < 0 && errno == EINTR); + + if (n <= 0) { + ALOGW("failed to send RTCP receiver report (%s).", + n == 0 ? "connection gone" : strerror(errno)); + + it = mStreams.erase(it); + continue; + } + + CHECK_EQ(n, (ssize_t)buffer->size()); + + mLastReceiverReportTimeUs = nowUs; + } + + ++it; + } + } + + if (!mStreams.empty()) { + postPollEvent(); + } +} + +status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { + ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP"); + + CHECK(!s->mIsInjected); + + sp<ABuffer> buffer = new ABuffer(65536); + + socklen_t remoteAddrLen = + (!receiveRTP && s->mNumRTCPPacketsReceived == 0) + ? sizeof(s->mRemoteRTCPAddr) : 0; + + ssize_t nbytes; + do { + nbytes = recvfrom( + receiveRTP ? s->mRTPSocket : s->mRTCPSocket, + buffer->data(), + buffer->capacity(), + 0, + remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, + remoteAddrLen > 0 ? &remoteAddrLen : NULL); + } while (nbytes < 0 && errno == EINTR); + + if (nbytes <= 0) { + return -ECONNRESET; + } + + buffer->setRange(0, nbytes); + + // ALOGI("received %d bytes.", buffer->size()); + + status_t err; + if (receiveRTP) { + err = parseRTP(s, buffer); + } else { + err = parseRTCP(s, buffer); + } + + return err; +} + +status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { + if (s->mNumRTPPacketsReceived++ == 0) { + sp<AMessage> notify = s->mNotifyMsg->dup(); + notify->setInt32("first-rtp", true); + notify->post(); + } + + size_t size = buffer->size(); + + if (size < 12) { + // Too short to be a valid RTP header. + return -1; + } + + const uint8_t *data = buffer->data(); + + if ((data[0] >> 6) != 2) { + // Unsupported version. + return -1; + } + + if (data[0] & 0x20) { + // Padding present. + + size_t paddingLength = data[size - 1]; + + if (paddingLength + 12 > size) { + // If we removed this much padding we'd end up with something + // that's too short to be a valid RTP header. + return -1; + } + + size -= paddingLength; + } + + int numCSRCs = data[0] & 0x0f; + + size_t payloadOffset = 12 + 4 * numCSRCs; + + if (size < payloadOffset) { + // Not enough data to fit the basic header and all the CSRC entries. + return -1; + } + + if (data[0] & 0x10) { + // Header eXtension present. + + if (size < payloadOffset + 4) { + // Not enough data to fit the basic header, all CSRC entries + // and the first 4 bytes of the extension header. + + return -1; + } + + const uint8_t *extensionData = &data[payloadOffset]; + + size_t extensionLength = + 4 * (extensionData[2] << 8 | extensionData[3]); + + if (size < payloadOffset + 4 + extensionLength) { + return -1; + } + + payloadOffset += 4 + extensionLength; + } + + uint32_t srcId = u32at(&data[8]); + + sp<ARTPSource> source = findSource(s, srcId); + + uint32_t rtpTime = u32at(&data[4]); + + sp<AMessage> meta = buffer->meta(); + meta->setInt32("ssrc", srcId); + meta->setInt32("rtp-time", rtpTime); + meta->setInt32("PT", data[1] & 0x7f); + meta->setInt32("M", data[1] >> 7); + + buffer->setInt32Data(u16at(&data[2])); + buffer->setRange(payloadOffset, size - payloadOffset); + + source->processRTPPacket(buffer); + + return OK; +} + +status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { + if (s->mNumRTCPPacketsReceived++ == 0) { + sp<AMessage> notify = s->mNotifyMsg->dup(); + notify->setInt32("first-rtcp", true); + notify->post(); + } + + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + while (size > 0) { + if (size < 8) { + // Too short to be a valid RTCP header + return -1; + } + + if ((data[0] >> 6) != 2) { + // Unsupported version. + return -1; + } + + if (data[0] & 0x20) { + // Padding present. + + size_t paddingLength = data[size - 1]; + + if (paddingLength + 12 > size) { + // If we removed this much padding we'd end up with something + // that's too short to be a valid RTP header. + return -1; + } + + size -= paddingLength; + } + + size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; + + if (size < headerLength) { + // Only received a partial packet? + return -1; + } + + switch (data[1]) { + case 200: + { + parseSR(s, data, headerLength); + break; + } + + case 201: // RR + case 202: // SDES + case 204: // APP + break; + + case 205: // TSFB (transport layer specific feedback) + case 206: // PSFB (payload specific feedback) + // hexdump(data, headerLength); + break; + + case 203: + { + parseBYE(s, data, headerLength); + break; + } + + default: + { + ALOGW("Unknown RTCP packet type %u of size %d", + (unsigned)data[1], headerLength); + break; + } + } + + data += headerLength; + size -= headerLength; + } + + return OK; +} + +status_t ARTPConnection::parseBYE( + StreamInfo *s, const uint8_t *data, size_t size) { + size_t SC = data[0] & 0x3f; + + if (SC == 0 || size < (4 + SC * 4)) { + // Packet too short for the minimal BYE header. + return -1; + } + + uint32_t id = u32at(&data[4]); + + sp<ARTPSource> source = findSource(s, id); + + source->byeReceived(); + + return OK; +} + +status_t ARTPConnection::parseSR( + StreamInfo *s, const uint8_t *data, size_t size) { + size_t RC = data[0] & 0x1f; + + if (size < (7 + RC * 6) * 4) { + // Packet too short for the minimal SR header. + return -1; + } + + uint32_t id = u32at(&data[4]); + uint64_t ntpTime = u64at(&data[8]); + uint32_t rtpTime = u32at(&data[16]); + +#if 0 + ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", + id, + rtpTime, + (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); +#endif + + sp<ARTPSource> source = findSource(s, id); + + source->timeUpdate(rtpTime, ntpTime); + + return 0; +} + +sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { + sp<ARTPSource> source; + ssize_t index = info->mSources.indexOfKey(srcId); + if (index < 0) { + index = info->mSources.size(); + + source = new ARTPSource( + srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); + + info->mSources.add(srcId, source); + } else { + source = info->mSources.valueAt(index); + } + + return source; +} + +void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { + sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); + msg->setInt32("index", index); + msg->setBuffer("buffer", buffer); + msg->post(); +} + +void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { + int32_t index; + CHECK(msg->findInt32("index", &index)); + + sp<ABuffer> buffer; + CHECK(msg->findBuffer("buffer", &buffer)); + + List<StreamInfo>::iterator it = mStreams.begin(); + while (it != mStreams.end() + && it->mRTPSocket != index && it->mRTCPSocket != index) { + ++it; + } + + if (it == mStreams.end()) { + TRESPASS(); + } + + StreamInfo *s = &*it; + + status_t err; + if (it->mRTPSocket == index) { + err = parseRTP(s, buffer); + } else { + err = parseRTCP(s, buffer); + } +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/ARTPConnection.h b/media/libstagefright/rtsp/ARTPConnection.h new file mode 100644 index 0000000..edbcc35 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPConnection.h @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_CONNECTION_H_ + +#define A_RTP_CONNECTION_H_ + +#include <media/stagefright/foundation/AHandler.h> +#include <utils/List.h> + +namespace android { + +struct ABuffer; +struct ARTPSource; +struct ASessionDescription; + +struct ARTPConnection : public AHandler { + enum Flags { + kRegularlyRequestFIR = 2, + }; + + ARTPConnection(uint32_t flags = 0); + + void addStream( + int rtpSocket, int rtcpSocket, + const sp<ASessionDescription> &sessionDesc, size_t index, + const sp<AMessage> ¬ify, + bool injected); + + void removeStream(int rtpSocket, int rtcpSocket); + + void injectPacket(int index, const sp<ABuffer> &buffer); + + // Creates a pair of UDP datagram sockets bound to adjacent ports + // (the rtpSocket is bound to an even port, the rtcpSocket to the + // next higher port). + static void MakePortPair( + int *rtpSocket, int *rtcpSocket, unsigned *rtpPort); + +protected: + virtual ~ARTPConnection(); + virtual void onMessageReceived(const sp<AMessage> &msg); + +private: + enum { + kWhatAddStream, + kWhatRemoveStream, + kWhatPollStreams, + kWhatInjectPacket, + }; + + static const int64_t kSelectTimeoutUs; + + uint32_t mFlags; + + struct StreamInfo; + List<StreamInfo> mStreams; + + bool mPollEventPending; + int64_t mLastReceiverReportTimeUs; + + void onAddStream(const sp<AMessage> &msg); + void onRemoveStream(const sp<AMessage> &msg); + void onPollStreams(); + void onInjectPacket(const sp<AMessage> &msg); + void onSendReceiverReports(); + + status_t receive(StreamInfo *info, bool receiveRTP); + + status_t parseRTP(StreamInfo *info, const sp<ABuffer> &buffer); + status_t parseRTCP(StreamInfo *info, const sp<ABuffer> &buffer); + status_t parseSR(StreamInfo *info, const uint8_t *data, size_t size); + status_t parseBYE(StreamInfo *info, const uint8_t *data, size_t size); + + sp<ARTPSource> findSource(StreamInfo *info, uint32_t id); + + void postPollEvent(); + + DISALLOW_EVIL_CONSTRUCTORS(ARTPConnection); +}; + +} // namespace android + +#endif // A_RTP_CONNECTION_H_ diff --git a/media/libstagefright/rtsp/ARTPSession.cpp b/media/libstagefright/rtsp/ARTPSession.cpp new file mode 100644 index 0000000..ba4e33c --- /dev/null +++ b/media/libstagefright/rtsp/ARTPSession.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "ARTPSession" +#include <utils/Log.h> + +#include "ARTPSession.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> + +#include <ctype.h> +#include <arpa/inet.h> +#include <sys/socket.h> + +#include "APacketSource.h" +#include "ARTPConnection.h" +#include "ASessionDescription.h" + +namespace android { + +ARTPSession::ARTPSession() + : mInitCheck(NO_INIT) { +} + +status_t ARTPSession::setup(const sp<ASessionDescription> &desc) { + CHECK_EQ(mInitCheck, (status_t)NO_INIT); + + mDesc = desc; + + mRTPConn = new ARTPConnection(ARTPConnection::kRegularlyRequestFIR); + + looper()->registerHandler(mRTPConn); + + for (size_t i = 1; i < mDesc->countTracks(); ++i) { + AString connection; + if (!mDesc->findAttribute(i, "c=", &connection)) { + // No per-stream connection information, try global fallback. + if (!mDesc->findAttribute(0, "c=", &connection)) { + ALOGE("Unable to find connection attribute."); + return mInitCheck; + } + } + if (!(connection == "IN IP4 127.0.0.1")) { + ALOGE("We only support localhost connections for now."); + return mInitCheck; + } + + unsigned port; + if (!validateMediaFormat(i, &port) || (port & 1) != 0) { + ALOGE("Invalid media format."); + return mInitCheck; + } + + sp<APacketSource> source = new APacketSource(mDesc, i); + if (source->initCheck() != OK) { + ALOGE("Unsupported format."); + return mInitCheck; + } + + int rtpSocket = MakeUDPSocket(port); + int rtcpSocket = MakeUDPSocket(port + 1); + + mTracks.push(TrackInfo()); + TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); + info->mRTPSocket = rtpSocket; + info->mRTCPSocket = rtcpSocket; + + sp<AMessage> notify = new AMessage(kWhatAccessUnitComplete, id()); + notify->setSize("track-index", mTracks.size() - 1); + + mRTPConn->addStream( + rtpSocket, rtcpSocket, mDesc, i, notify, false /* injected */); + + info->mPacketSource = source; + } + + mInitCheck = OK; + + return OK; +} + +// static +int ARTPSession::MakeUDPSocket(unsigned port) { + int s = socket(AF_INET, SOCK_DGRAM, 0); + CHECK_GE(s, 0); + + struct sockaddr_in addr; + memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + CHECK_EQ(0, bind(s, (const struct sockaddr *)&addr, sizeof(addr))); + + return s; +} + +ARTPSession::~ARTPSession() { + for (size_t i = 0; i < mTracks.size(); ++i) { + TrackInfo *info = &mTracks.editItemAt(i); + + info->mPacketSource->signalEOS(UNKNOWN_ERROR); + + close(info->mRTPSocket); + close(info->mRTCPSocket); + } +} + +void ARTPSession::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatAccessUnitComplete: + { + int32_t firstRTCP; + if (msg->findInt32("first-rtcp", &firstRTCP)) { + // There won't be an access unit here, it's just a notification + // that the data communication worked since we got the first + // rtcp packet. + break; + } + + size_t trackIndex; + CHECK(msg->findSize("track-index", &trackIndex)); + + int32_t eos; + if (msg->findInt32("eos", &eos) && eos) { + TrackInfo *info = &mTracks.editItemAt(trackIndex); + info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); + break; + } + + sp<ABuffer> accessUnit; + CHECK(msg->findBuffer("access-unit", &accessUnit)); + + uint64_t ntpTime; + CHECK(accessUnit->meta()->findInt64( + "ntp-time", (int64_t *)&ntpTime)); + +#if 0 +#if 0 + printf("access unit complete size=%d\tntp-time=0x%016llx\n", + accessUnit->size(), ntpTime); +#else + ALOGI("access unit complete, size=%d, ntp-time=%llu", + accessUnit->size(), ntpTime); + hexdump(accessUnit->data(), accessUnit->size()); +#endif +#endif + +#if 0 + CHECK_GE(accessUnit->size(), 5u); + CHECK(!memcmp("\x00\x00\x00\x01", accessUnit->data(), 4)); + unsigned x = accessUnit->data()[4]; + + ALOGI("access unit complete: nalType=0x%02x, nalRefIdc=0x%02x", + x & 0x1f, (x & 0x60) >> 5); +#endif + + accessUnit->meta()->setInt64("ntp-time", ntpTime); + accessUnit->meta()->setInt64("timeUs", 0); + +#if 0 + int32_t damaged; + if (accessUnit->meta()->findInt32("damaged", &damaged) + && damaged != 0) { + ALOGI("ignoring damaged AU"); + } else +#endif + { + TrackInfo *info = &mTracks.editItemAt(trackIndex); + info->mPacketSource->queueAccessUnit(accessUnit); + } + break; + } + + default: + TRESPASS(); + break; + } +} + +bool ARTPSession::validateMediaFormat(size_t index, unsigned *port) const { + AString format; + mDesc->getFormat(index, &format); + + ssize_t i = format.find(" "); + if (i < 0) { + return false; + } + + ++i; + size_t j = i; + while (isdigit(format.c_str()[j])) { + ++j; + } + if (format.c_str()[j] != ' ') { + return false; + } + + AString portString(format, i, j - i); + + char *end; + unsigned long x = strtoul(portString.c_str(), &end, 10); + if (end == portString.c_str() || *end != '\0') { + return false; + } + + if (x == 0 || x > 65535) { + return false; + } + + *port = x; + + return true; +} + +size_t ARTPSession::countTracks() { + return mTracks.size(); +} + +sp<MediaSource> ARTPSession::trackAt(size_t index) { + CHECK_LT(index, mTracks.size()); + return mTracks.editItemAt(index).mPacketSource; +} + +} // namespace android diff --git a/media/libstagefright/rtsp/ARTPSession.h b/media/libstagefright/rtsp/ARTPSession.h new file mode 100644 index 0000000..9bff74c --- /dev/null +++ b/media/libstagefright/rtsp/ARTPSession.h @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_SESSION_H_ + +#define A_RTP_SESSION_H_ + +#include <media/stagefright/foundation/AHandler.h> + +namespace android { + +struct APacketSource; +struct ARTPConnection; +struct ASessionDescription; +struct MediaSource; + +struct ARTPSession : public AHandler { + ARTPSession(); + + status_t setup(const sp<ASessionDescription> &desc); + + size_t countTracks(); + sp<MediaSource> trackAt(size_t index); + +protected: + virtual void onMessageReceived(const sp<AMessage> &msg); + + virtual ~ARTPSession(); + +private: + enum { + kWhatAccessUnitComplete = 'accu' + }; + + struct TrackInfo { + int mRTPSocket; + int mRTCPSocket; + + sp<APacketSource> mPacketSource; + }; + + status_t mInitCheck; + sp<ASessionDescription> mDesc; + sp<ARTPConnection> mRTPConn; + + Vector<TrackInfo> mTracks; + + bool validateMediaFormat(size_t index, unsigned *port) const; + static int MakeUDPSocket(unsigned port); + + DISALLOW_EVIL_CONSTRUCTORS(ARTPSession); +}; + +} // namespace android + +#endif // A_RTP_SESSION_H_ diff --git a/media/libstagefright/rtsp/ARTPSource.cpp b/media/libstagefright/rtsp/ARTPSource.cpp new file mode 100644 index 0000000..ed68790 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPSource.cpp @@ -0,0 +1,275 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "ARTPSource" +#include <utils/Log.h> + +#include "ARTPSource.h" + +#include "AAMRAssembler.h" +#include "AAVCAssembler.h" +#include "AH263Assembler.h" +#include "AMPEG4AudioAssembler.h" +#include "AMPEG4ElementaryAssembler.h" +#include "ARawAudioAssembler.h" +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> + +namespace android { + +static const uint32_t kSourceID = 0xdeadbeef; + +ARTPSource::ARTPSource( + uint32_t id, + const sp<ASessionDescription> &sessionDesc, size_t index, + const sp<AMessage> ¬ify) + : mID(id), + mHighestSeqNumber(0), + mNumBuffersReceived(0), + mLastNTPTime(0), + mLastNTPTimeUpdateUs(0), + mIssueFIRRequests(false), + mLastFIRRequestUs(-1), + mNextFIRSeqNo((rand() * 256.0) / RAND_MAX), + mNotify(notify) { + unsigned long PT; + AString desc; + AString params; + sessionDesc->getFormatType(index, &PT, &desc, ¶ms); + + if (!strncmp(desc.c_str(), "H264/", 5)) { + mAssembler = new AAVCAssembler(notify); + mIssueFIRRequests = true; + } else if (!strncmp(desc.c_str(), "MP4A-LATM/", 10)) { + mAssembler = new AMPEG4AudioAssembler(notify, params); + } else if (!strncmp(desc.c_str(), "H263-1998/", 10) + || !strncmp(desc.c_str(), "H263-2000/", 10)) { + mAssembler = new AH263Assembler(notify); + mIssueFIRRequests = true; + } else if (!strncmp(desc.c_str(), "AMR/", 4)) { + mAssembler = new AAMRAssembler(notify, false /* isWide */, params); + } else if (!strncmp(desc.c_str(), "AMR-WB/", 7)) { + mAssembler = new AAMRAssembler(notify, true /* isWide */, params); + } else if (!strncmp(desc.c_str(), "MP4V-ES/", 8) + || !strncasecmp(desc.c_str(), "mpeg4-generic/", 14)) { + mAssembler = new AMPEG4ElementaryAssembler(notify, desc, params); + mIssueFIRRequests = true; + } else if (ARawAudioAssembler::Supports(desc.c_str())) { + mAssembler = new ARawAudioAssembler(notify, desc.c_str(), params); + } else { + TRESPASS(); + } +} + +static uint32_t AbsDiff(uint32_t seq1, uint32_t seq2) { + return seq1 > seq2 ? seq1 - seq2 : seq2 - seq1; +} + +void ARTPSource::processRTPPacket(const sp<ABuffer> &buffer) { + if (queuePacket(buffer) && mAssembler != NULL) { + mAssembler->onPacketReceived(this); + } +} + +void ARTPSource::timeUpdate(uint32_t rtpTime, uint64_t ntpTime) { + mLastNTPTime = ntpTime; + mLastNTPTimeUpdateUs = ALooper::GetNowUs(); + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("time-update", true); + notify->setInt32("rtp-time", rtpTime); + notify->setInt64("ntp-time", ntpTime); + notify->post(); +} + +bool ARTPSource::queuePacket(const sp<ABuffer> &buffer) { + uint32_t seqNum = (uint32_t)buffer->int32Data(); + + if (mNumBuffersReceived++ == 0) { + mHighestSeqNumber = seqNum; + mQueue.push_back(buffer); + return true; + } + + // Only the lower 16-bit of the sequence numbers are transmitted, + // derive the high-order bits by choosing the candidate closest + // to the highest sequence number (extended to 32 bits) received so far. + + uint32_t seq1 = seqNum | (mHighestSeqNumber & 0xffff0000); + uint32_t seq2 = seqNum | ((mHighestSeqNumber & 0xffff0000) + 0x10000); + uint32_t seq3 = seqNum | ((mHighestSeqNumber & 0xffff0000) - 0x10000); + uint32_t diff1 = AbsDiff(seq1, mHighestSeqNumber); + uint32_t diff2 = AbsDiff(seq2, mHighestSeqNumber); + uint32_t diff3 = AbsDiff(seq3, mHighestSeqNumber); + + if (diff1 < diff2) { + if (diff1 < diff3) { + // diff1 < diff2 ^ diff1 < diff3 + seqNum = seq1; + } else { + // diff3 <= diff1 < diff2 + seqNum = seq3; + } + } else if (diff2 < diff3) { + // diff2 <= diff1 ^ diff2 < diff3 + seqNum = seq2; + } else { + // diff3 <= diff2 <= diff1 + seqNum = seq3; + } + + if (seqNum > mHighestSeqNumber) { + mHighestSeqNumber = seqNum; + } + + buffer->setInt32Data(seqNum); + + List<sp<ABuffer> >::iterator it = mQueue.begin(); + while (it != mQueue.end() && (uint32_t)(*it)->int32Data() < seqNum) { + ++it; + } + + if (it != mQueue.end() && (uint32_t)(*it)->int32Data() == seqNum) { + ALOGW("Discarding duplicate buffer"); + return false; + } + + mQueue.insert(it, buffer); + + return true; +} + +void ARTPSource::byeReceived() { + mAssembler->onByeReceived(); +} + +void ARTPSource::addFIR(const sp<ABuffer> &buffer) { + if (!mIssueFIRRequests) { + return; + } + + int64_t nowUs = ALooper::GetNowUs(); + if (mLastFIRRequestUs >= 0 && mLastFIRRequestUs + 5000000ll > nowUs) { + // Send FIR requests at most every 5 secs. + return; + } + + mLastFIRRequestUs = nowUs; + + if (buffer->size() + 20 > buffer->capacity()) { + ALOGW("RTCP buffer too small to accomodate FIR."); + return; + } + + uint8_t *data = buffer->data() + buffer->size(); + + data[0] = 0x80 | 4; + data[1] = 206; // PSFB + data[2] = 0; + data[3] = 4; + data[4] = kSourceID >> 24; + data[5] = (kSourceID >> 16) & 0xff; + data[6] = (kSourceID >> 8) & 0xff; + data[7] = kSourceID & 0xff; + + data[8] = 0x00; // SSRC of media source (unused) + data[9] = 0x00; + data[10] = 0x00; + data[11] = 0x00; + + data[12] = mID >> 24; + data[13] = (mID >> 16) & 0xff; + data[14] = (mID >> 8) & 0xff; + data[15] = mID & 0xff; + + data[16] = mNextFIRSeqNo++; // Seq Nr. + + data[17] = 0x00; // Reserved + data[18] = 0x00; + data[19] = 0x00; + + buffer->setRange(buffer->offset(), buffer->size() + 20); + + ALOGV("Added FIR request."); +} + +void ARTPSource::addReceiverReport(const sp<ABuffer> &buffer) { + if (buffer->size() + 32 > buffer->capacity()) { + ALOGW("RTCP buffer too small to accomodate RR."); + return; + } + + uint8_t *data = buffer->data() + buffer->size(); + + data[0] = 0x80 | 1; + data[1] = 201; // RR + data[2] = 0; + data[3] = 7; + data[4] = kSourceID >> 24; + data[5] = (kSourceID >> 16) & 0xff; + data[6] = (kSourceID >> 8) & 0xff; + data[7] = kSourceID & 0xff; + + data[8] = mID >> 24; + data[9] = (mID >> 16) & 0xff; + data[10] = (mID >> 8) & 0xff; + data[11] = mID & 0xff; + + data[12] = 0x00; // fraction lost + + data[13] = 0x00; // cumulative lost + data[14] = 0x00; + data[15] = 0x00; + + data[16] = mHighestSeqNumber >> 24; + data[17] = (mHighestSeqNumber >> 16) & 0xff; + data[18] = (mHighestSeqNumber >> 8) & 0xff; + data[19] = mHighestSeqNumber & 0xff; + + data[20] = 0x00; // Interarrival jitter + data[21] = 0x00; + data[22] = 0x00; + data[23] = 0x00; + + uint32_t LSR = 0; + uint32_t DLSR = 0; + if (mLastNTPTime != 0) { + LSR = (mLastNTPTime >> 16) & 0xffffffff; + + DLSR = (uint32_t) + ((ALooper::GetNowUs() - mLastNTPTimeUpdateUs) * 65536.0 / 1E6); + } + + data[24] = LSR >> 24; + data[25] = (LSR >> 16) & 0xff; + data[26] = (LSR >> 8) & 0xff; + data[27] = LSR & 0xff; + + data[28] = DLSR >> 24; + data[29] = (DLSR >> 16) & 0xff; + data[30] = (DLSR >> 8) & 0xff; + data[31] = DLSR & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + 32); +} + +} // namespace android + + diff --git a/media/libstagefright/rtsp/ARTPSource.h b/media/libstagefright/rtsp/ARTPSource.h new file mode 100644 index 0000000..b70f94e --- /dev/null +++ b/media/libstagefright/rtsp/ARTPSource.h @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_SOURCE_H_ + +#define A_RTP_SOURCE_H_ + +#include <stdint.h> + +#include <media/stagefright/foundation/ABase.h> +#include <utils/List.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct AMessage; +struct ARTPAssembler; +struct ASessionDescription; + +struct ARTPSource : public RefBase { + ARTPSource( + uint32_t id, + const sp<ASessionDescription> &sessionDesc, size_t index, + const sp<AMessage> ¬ify); + + void processRTPPacket(const sp<ABuffer> &buffer); + void timeUpdate(uint32_t rtpTime, uint64_t ntpTime); + void byeReceived(); + + List<sp<ABuffer> > *queue() { return &mQueue; } + + void addReceiverReport(const sp<ABuffer> &buffer); + void addFIR(const sp<ABuffer> &buffer); + +private: + uint32_t mID; + uint32_t mHighestSeqNumber; + int32_t mNumBuffersReceived; + + List<sp<ABuffer> > mQueue; + sp<ARTPAssembler> mAssembler; + + uint64_t mLastNTPTime; + int64_t mLastNTPTimeUpdateUs; + + bool mIssueFIRRequests; + int64_t mLastFIRRequestUs; + uint8_t mNextFIRSeqNo; + + sp<AMessage> mNotify; + + bool queuePacket(const sp<ABuffer> &buffer); + + DISALLOW_EVIL_CONSTRUCTORS(ARTPSource); +}; + +} // namespace android + +#endif // A_RTP_SOURCE_H_ diff --git a/media/libstagefright/rtsp/ARTPWriter.cpp b/media/libstagefright/rtsp/ARTPWriter.cpp new file mode 100644 index 0000000..0d07043 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPWriter.cpp @@ -0,0 +1,836 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "ARTPWriter" +#include <utils/Log.h> + +#include "ARTPWriter.h" + +#include <fcntl.h> + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/MediaBuffer.h> +#include <media/stagefright/MediaDefs.h> +#include <media/stagefright/MediaSource.h> +#include <media/stagefright/MetaData.h> +#include <utils/ByteOrder.h> + +#define PT 97 +#define PT_STR "97" + +namespace android { + +// static const size_t kMaxPacketSize = 65507; // maximum payload in UDP over IP +static const size_t kMaxPacketSize = 1500; + +static int UniformRand(int limit) { + return ((double)rand() * limit) / RAND_MAX; +} + +ARTPWriter::ARTPWriter(int fd) + : mFlags(0), + mFd(dup(fd)), + mLooper(new ALooper), + mReflector(new AHandlerReflector<ARTPWriter>(this)) { + CHECK_GE(fd, 0); + + mLooper->setName("rtp writer"); + mLooper->registerHandler(mReflector); + mLooper->start(); + + mSocket = socket(AF_INET, SOCK_DGRAM, 0); + CHECK_GE(mSocket, 0); + + memset(mRTPAddr.sin_zero, 0, sizeof(mRTPAddr.sin_zero)); + mRTPAddr.sin_family = AF_INET; + +#if 1 + mRTPAddr.sin_addr.s_addr = INADDR_ANY; +#else + mRTPAddr.sin_addr.s_addr = inet_addr("172.19.18.246"); +#endif + + mRTPAddr.sin_port = htons(5634); + CHECK_EQ(0, ntohs(mRTPAddr.sin_port) & 1); + + mRTCPAddr = mRTPAddr; + mRTCPAddr.sin_port = htons(ntohs(mRTPAddr.sin_port) | 1); + +#if LOG_TO_FILES + mRTPFd = open( + "/data/misc/rtpout.bin", + O_WRONLY | O_CREAT | O_TRUNC, + 0644); + CHECK_GE(mRTPFd, 0); + + mRTCPFd = open( + "/data/misc/rtcpout.bin", + O_WRONLY | O_CREAT | O_TRUNC, + 0644); + CHECK_GE(mRTCPFd, 0); +#endif +} + +ARTPWriter::~ARTPWriter() { +#if LOG_TO_FILES + close(mRTCPFd); + mRTCPFd = -1; + + close(mRTPFd); + mRTPFd = -1; +#endif + + close(mSocket); + mSocket = -1; + + close(mFd); + mFd = -1; +} + +status_t ARTPWriter::addSource(const sp<MediaSource> &source) { + mSource = source; + return OK; +} + +bool ARTPWriter::reachedEOS() { + Mutex::Autolock autoLock(mLock); + return (mFlags & kFlagEOS) != 0; +} + +status_t ARTPWriter::start(MetaData *params) { + Mutex::Autolock autoLock(mLock); + if (mFlags & kFlagStarted) { + return INVALID_OPERATION; + } + + mFlags &= ~kFlagEOS; + mSourceID = rand(); + mSeqNo = UniformRand(65536); + mRTPTimeBase = rand(); + mNumRTPSent = 0; + mNumRTPOctetsSent = 0; + mLastRTPTime = 0; + mLastNTPTime = 0; + mNumSRsSent = 0; + + const char *mime; + CHECK(mSource->getFormat()->findCString(kKeyMIMEType, &mime)); + + mMode = INVALID; + if (!strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { + mMode = H264; + } else if (!strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_H263)) { + mMode = H263; + } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AMR_NB)) { + mMode = AMR_NB; + } else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AMR_WB)) { + mMode = AMR_WB; + } else { + TRESPASS(); + } + + (new AMessage(kWhatStart, mReflector->id()))->post(); + + while (!(mFlags & kFlagStarted)) { + mCondition.wait(mLock); + } + + return OK; +} + +status_t ARTPWriter::stop() { + Mutex::Autolock autoLock(mLock); + if (!(mFlags & kFlagStarted)) { + return OK; + } + + (new AMessage(kWhatStop, mReflector->id()))->post(); + + while (mFlags & kFlagStarted) { + mCondition.wait(mLock); + } + return OK; +} + +status_t ARTPWriter::pause() { + return OK; +} + +static void StripStartcode(MediaBuffer *buffer) { + if (buffer->range_length() < 4) { + return; + } + + const uint8_t *ptr = + (const uint8_t *)buffer->data() + buffer->range_offset(); + + if (!memcmp(ptr, "\x00\x00\x00\x01", 4)) { + buffer->set_range( + buffer->range_offset() + 4, buffer->range_length() - 4); + } +} + +void ARTPWriter::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatStart: + { + CHECK_EQ(mSource->start(), (status_t)OK); + +#if 0 + if (mMode == H264) { + MediaBuffer *buffer; + CHECK_EQ(mSource->read(&buffer), (status_t)OK); + + StripStartcode(buffer); + makeH264SPropParamSets(buffer); + buffer->release(); + buffer = NULL; + } + + dumpSessionDesc(); +#endif + + { + Mutex::Autolock autoLock(mLock); + mFlags |= kFlagStarted; + mCondition.signal(); + } + + (new AMessage(kWhatRead, mReflector->id()))->post(); + (new AMessage(kWhatSendSR, mReflector->id()))->post(); + break; + } + + case kWhatStop: + { + CHECK_EQ(mSource->stop(), (status_t)OK); + + sendBye(); + + { + Mutex::Autolock autoLock(mLock); + mFlags &= ~kFlagStarted; + mCondition.signal(); + } + break; + } + + case kWhatRead: + { + { + Mutex::Autolock autoLock(mLock); + if (!(mFlags & kFlagStarted)) { + break; + } + } + + onRead(msg); + break; + } + + case kWhatSendSR: + { + { + Mutex::Autolock autoLock(mLock); + if (!(mFlags & kFlagStarted)) { + break; + } + } + + onSendSR(msg); + break; + } + + default: + TRESPASS(); + break; + } +} + +void ARTPWriter::onRead(const sp<AMessage> &msg) { + MediaBuffer *mediaBuf; + status_t err = mSource->read(&mediaBuf); + + if (err != OK) { + ALOGI("reached EOS."); + + Mutex::Autolock autoLock(mLock); + mFlags |= kFlagEOS; + return; + } + + if (mediaBuf->range_length() > 0) { + ALOGV("read buffer of size %d", mediaBuf->range_length()); + + if (mMode == H264) { + StripStartcode(mediaBuf); + sendAVCData(mediaBuf); + } else if (mMode == H263) { + sendH263Data(mediaBuf); + } else if (mMode == AMR_NB || mMode == AMR_WB) { + sendAMRData(mediaBuf); + } + } + + mediaBuf->release(); + mediaBuf = NULL; + + msg->post(); +} + +void ARTPWriter::onSendSR(const sp<AMessage> &msg) { + sp<ABuffer> buffer = new ABuffer(65536); + buffer->setRange(0, 0); + + addSR(buffer); + addSDES(buffer); + + send(buffer, true /* isRTCP */); + + ++mNumSRsSent; + msg->post(3000000); +} + +void ARTPWriter::send(const sp<ABuffer> &buffer, bool isRTCP) { + ssize_t n = sendto( + mSocket, buffer->data(), buffer->size(), 0, + (const struct sockaddr *)(isRTCP ? &mRTCPAddr : &mRTPAddr), + sizeof(mRTCPAddr)); + + CHECK_EQ(n, (ssize_t)buffer->size()); + +#if LOG_TO_FILES + int fd = isRTCP ? mRTCPFd : mRTPFd; + + uint32_t ms = tolel(ALooper::GetNowUs() / 1000ll); + uint32_t length = tolel(buffer->size()); + write(fd, &ms, sizeof(ms)); + write(fd, &length, sizeof(length)); + write(fd, buffer->data(), buffer->size()); +#endif +} + +void ARTPWriter::addSR(const sp<ABuffer> &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + + data[0] = 0x80 | 0; + data[1] = 200; // SR + data[2] = 0; + data[3] = 6; + data[4] = mSourceID >> 24; + data[5] = (mSourceID >> 16) & 0xff; + data[6] = (mSourceID >> 8) & 0xff; + data[7] = mSourceID & 0xff; + + data[8] = mLastNTPTime >> (64 - 8); + data[9] = (mLastNTPTime >> (64 - 16)) & 0xff; + data[10] = (mLastNTPTime >> (64 - 24)) & 0xff; + data[11] = (mLastNTPTime >> 32) & 0xff; + data[12] = (mLastNTPTime >> 24) & 0xff; + data[13] = (mLastNTPTime >> 16) & 0xff; + data[14] = (mLastNTPTime >> 8) & 0xff; + data[15] = mLastNTPTime & 0xff; + + data[16] = (mLastRTPTime >> 24) & 0xff; + data[17] = (mLastRTPTime >> 16) & 0xff; + data[18] = (mLastRTPTime >> 8) & 0xff; + data[19] = mLastRTPTime & 0xff; + + data[20] = mNumRTPSent >> 24; + data[21] = (mNumRTPSent >> 16) & 0xff; + data[22] = (mNumRTPSent >> 8) & 0xff; + data[23] = mNumRTPSent & 0xff; + + data[24] = mNumRTPOctetsSent >> 24; + data[25] = (mNumRTPOctetsSent >> 16) & 0xff; + data[26] = (mNumRTPOctetsSent >> 8) & 0xff; + data[27] = mNumRTPOctetsSent & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + 28); +} + +void ARTPWriter::addSDES(const sp<ABuffer> &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + data[0] = 0x80 | 1; + data[1] = 202; // SDES + data[4] = mSourceID >> 24; + data[5] = (mSourceID >> 16) & 0xff; + data[6] = (mSourceID >> 8) & 0xff; + data[7] = mSourceID & 0xff; + + size_t offset = 8; + + data[offset++] = 1; // CNAME + + static const char *kCNAME = "someone@somewhere"; + data[offset++] = strlen(kCNAME); + + memcpy(&data[offset], kCNAME, strlen(kCNAME)); + offset += strlen(kCNAME); + + data[offset++] = 7; // NOTE + + static const char *kNOTE = "Hell's frozen over."; + data[offset++] = strlen(kNOTE); + + memcpy(&data[offset], kNOTE, strlen(kNOTE)); + offset += strlen(kNOTE); + + data[offset++] = 0; + + if ((offset % 4) > 0) { + size_t count = 4 - (offset % 4); + switch (count) { + case 3: + data[offset++] = 0; + case 2: + data[offset++] = 0; + case 1: + data[offset++] = 0; + } + } + + size_t numWords = (offset / 4) - 1; + data[2] = numWords >> 8; + data[3] = numWords & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + offset); +} + +// static +uint64_t ARTPWriter::GetNowNTP() { + uint64_t nowUs = ALooper::GetNowUs(); + + nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; + + uint64_t hi = nowUs / 1000000ll; + uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; + + return (hi << 32) | lo; +} + +void ARTPWriter::dumpSessionDesc() { + AString sdp; + sdp = "v=0\r\n"; + + sdp.append("o=- "); + + uint64_t ntp = GetNowNTP(); + sdp.append(ntp); + sdp.append(" "); + sdp.append(ntp); + sdp.append(" IN IP4 127.0.0.0\r\n"); + + sdp.append( + "s=Sample\r\n" + "i=Playing around\r\n" + "c=IN IP4 "); + + struct in_addr addr; + addr.s_addr = ntohl(INADDR_LOOPBACK); + + sdp.append(inet_ntoa(addr)); + + sdp.append( + "\r\n" + "t=0 0\r\n" + "a=range:npt=now-\r\n"); + + sp<MetaData> meta = mSource->getFormat(); + + if (mMode == H264 || mMode == H263) { + sdp.append("m=video "); + } else { + sdp.append("m=audio "); + } + + sdp.append(StringPrintf("%d", ntohs(mRTPAddr.sin_port))); + sdp.append( + " RTP/AVP " PT_STR "\r\n" + "b=AS 320000\r\n" + "a=rtpmap:" PT_STR " "); + + if (mMode == H264) { + sdp.append("H264/90000"); + } else if (mMode == H263) { + sdp.append("H263-1998/90000"); + } else if (mMode == AMR_NB || mMode == AMR_WB) { + int32_t sampleRate, numChannels; + CHECK(mSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); + CHECK(mSource->getFormat()->findInt32(kKeyChannelCount, &numChannels)); + + CHECK_EQ(numChannels, 1); + CHECK_EQ(sampleRate, (mMode == AMR_NB) ? 8000 : 16000); + + sdp.append(mMode == AMR_NB ? "AMR" : "AMR-WB"); + sdp.append(StringPrintf("/%d/%d", sampleRate, numChannels)); + } else { + TRESPASS(); + } + + sdp.append("\r\n"); + + if (mMode == H264 || mMode == H263) { + int32_t width, height; + CHECK(meta->findInt32(kKeyWidth, &width)); + CHECK(meta->findInt32(kKeyHeight, &height)); + + sdp.append("a=cliprect 0,0,"); + sdp.append(height); + sdp.append(","); + sdp.append(width); + sdp.append("\r\n"); + + sdp.append( + "a=framesize:" PT_STR " "); + sdp.append(width); + sdp.append("-"); + sdp.append(height); + sdp.append("\r\n"); + } + + if (mMode == H264) { + sdp.append( + "a=fmtp:" PT_STR " profile-level-id="); + sdp.append(mProfileLevel); + sdp.append(";sprop-parameter-sets="); + + sdp.append(mSeqParamSet); + sdp.append(","); + sdp.append(mPicParamSet); + sdp.append(";packetization-mode=1\r\n"); + } else if (mMode == AMR_NB || mMode == AMR_WB) { + sdp.append("a=fmtp:" PT_STR " octed-align\r\n"); + } + + ALOGI("%s", sdp.c_str()); +} + +void ARTPWriter::makeH264SPropParamSets(MediaBuffer *buffer) { + static const char kStartCode[] = "\x00\x00\x00\x01"; + + const uint8_t *data = + (const uint8_t *)buffer->data() + buffer->range_offset(); + size_t size = buffer->range_length(); + + CHECK_GE(size, 0u); + + size_t startCodePos = 0; + while (startCodePos + 3 < size + && memcmp(kStartCode, &data[startCodePos], 4)) { + ++startCodePos; + } + + CHECK_LT(startCodePos + 3, size); + + CHECK_EQ((unsigned)data[0], 0x67u); + + mProfileLevel = + StringPrintf("%02X%02X%02X", data[1], data[2], data[3]); + + encodeBase64(data, startCodePos, &mSeqParamSet); + + encodeBase64(&data[startCodePos + 4], size - startCodePos - 4, + &mPicParamSet); +} + +void ARTPWriter::sendBye() { + sp<ABuffer> buffer = new ABuffer(8); + uint8_t *data = buffer->data(); + *data++ = (2 << 6) | 1; + *data++ = 203; + *data++ = 0; + *data++ = 1; + *data++ = mSourceID >> 24; + *data++ = (mSourceID >> 16) & 0xff; + *data++ = (mSourceID >> 8) & 0xff; + *data++ = mSourceID & 0xff; + buffer->setRange(0, 8); + + send(buffer, true /* isRTCP */); +} + +void ARTPWriter::sendAVCData(MediaBuffer *mediaBuf) { + // 12 bytes RTP header + 2 bytes for the FU-indicator and FU-header. + CHECK_GE(kMaxPacketSize, 12u + 2u); + + int64_t timeUs; + CHECK(mediaBuf->meta_data()->findInt64(kKeyTime, &timeUs)); + + uint32_t rtpTime = mRTPTimeBase + (timeUs * 9 / 100ll); + + const uint8_t *mediaData = + (const uint8_t *)mediaBuf->data() + mediaBuf->range_offset(); + + sp<ABuffer> buffer = new ABuffer(kMaxPacketSize); + if (mediaBuf->range_length() + 12 <= buffer->capacity()) { + // The data fits into a single packet + uint8_t *data = buffer->data(); + data[0] = 0x80; + data[1] = (1 << 7) | PT; // M-bit + data[2] = (mSeqNo >> 8) & 0xff; + data[3] = mSeqNo & 0xff; + data[4] = rtpTime >> 24; + data[5] = (rtpTime >> 16) & 0xff; + data[6] = (rtpTime >> 8) & 0xff; + data[7] = rtpTime & 0xff; + data[8] = mSourceID >> 24; + data[9] = (mSourceID >> 16) & 0xff; + data[10] = (mSourceID >> 8) & 0xff; + data[11] = mSourceID & 0xff; + + memcpy(&data[12], + mediaData, mediaBuf->range_length()); + + buffer->setRange(0, mediaBuf->range_length() + 12); + + send(buffer, false /* isRTCP */); + + ++mSeqNo; + ++mNumRTPSent; + mNumRTPOctetsSent += buffer->size() - 12; + } else { + // FU-A + + unsigned nalType = mediaData[0]; + size_t offset = 1; + + bool firstPacket = true; + while (offset < mediaBuf->range_length()) { + size_t size = mediaBuf->range_length() - offset; + bool lastPacket = true; + if (size + 12 + 2 > buffer->capacity()) { + lastPacket = false; + size = buffer->capacity() - 12 - 2; + } + + uint8_t *data = buffer->data(); + data[0] = 0x80; + data[1] = (lastPacket ? (1 << 7) : 0x00) | PT; // M-bit + data[2] = (mSeqNo >> 8) & 0xff; + data[3] = mSeqNo & 0xff; + data[4] = rtpTime >> 24; + data[5] = (rtpTime >> 16) & 0xff; + data[6] = (rtpTime >> 8) & 0xff; + data[7] = rtpTime & 0xff; + data[8] = mSourceID >> 24; + data[9] = (mSourceID >> 16) & 0xff; + data[10] = (mSourceID >> 8) & 0xff; + data[11] = mSourceID & 0xff; + + data[12] = 28 | (nalType & 0xe0); + + CHECK(!firstPacket || !lastPacket); + + data[13] = + (firstPacket ? 0x80 : 0x00) + | (lastPacket ? 0x40 : 0x00) + | (nalType & 0x1f); + + memcpy(&data[14], &mediaData[offset], size); + + buffer->setRange(0, 14 + size); + + send(buffer, false /* isRTCP */); + + ++mSeqNo; + ++mNumRTPSent; + mNumRTPOctetsSent += buffer->size() - 12; + + firstPacket = false; + offset += size; + } + } + + mLastRTPTime = rtpTime; + mLastNTPTime = GetNowNTP(); +} + +void ARTPWriter::sendH263Data(MediaBuffer *mediaBuf) { + CHECK_GE(kMaxPacketSize, 12u + 2u); + + int64_t timeUs; + CHECK(mediaBuf->meta_data()->findInt64(kKeyTime, &timeUs)); + + uint32_t rtpTime = mRTPTimeBase + (timeUs * 9 / 100ll); + + const uint8_t *mediaData = + (const uint8_t *)mediaBuf->data() + mediaBuf->range_offset(); + + // hexdump(mediaData, mediaBuf->range_length()); + + CHECK_EQ((unsigned)mediaData[0], 0u); + CHECK_EQ((unsigned)mediaData[1], 0u); + + size_t offset = 2; + size_t size = mediaBuf->range_length(); + + while (offset < size) { + sp<ABuffer> buffer = new ABuffer(kMaxPacketSize); + // CHECK_LE(mediaBuf->range_length() -2 + 14, buffer->capacity()); + + size_t remaining = size - offset; + bool lastPacket = (remaining + 14 <= buffer->capacity()); + if (!lastPacket) { + remaining = buffer->capacity() - 14; + } + + uint8_t *data = buffer->data(); + data[0] = 0x80; + data[1] = (lastPacket ? 0x80 : 0x00) | PT; // M-bit + data[2] = (mSeqNo >> 8) & 0xff; + data[3] = mSeqNo & 0xff; + data[4] = rtpTime >> 24; + data[5] = (rtpTime >> 16) & 0xff; + data[6] = (rtpTime >> 8) & 0xff; + data[7] = rtpTime & 0xff; + data[8] = mSourceID >> 24; + data[9] = (mSourceID >> 16) & 0xff; + data[10] = (mSourceID >> 8) & 0xff; + data[11] = mSourceID & 0xff; + + data[12] = (offset == 2) ? 0x04 : 0x00; // P=?, V=0 + data[13] = 0x00; // PLEN = PEBIT = 0 + + memcpy(&data[14], &mediaData[offset], remaining); + offset += remaining; + + buffer->setRange(0, remaining + 14); + + send(buffer, false /* isRTCP */); + + ++mSeqNo; + ++mNumRTPSent; + mNumRTPOctetsSent += buffer->size() - 12; + } + + mLastRTPTime = rtpTime; + mLastNTPTime = GetNowNTP(); +} + +static size_t getFrameSize(bool isWide, unsigned FT) { + static const size_t kFrameSizeNB[8] = { + 95, 103, 118, 134, 148, 159, 204, 244 + }; + static const size_t kFrameSizeWB[9] = { + 132, 177, 253, 285, 317, 365, 397, 461, 477 + }; + + size_t frameSize = isWide ? kFrameSizeWB[FT] : kFrameSizeNB[FT]; + + // Round up bits to bytes and add 1 for the header byte. + frameSize = (frameSize + 7) / 8 + 1; + + return frameSize; +} + +void ARTPWriter::sendAMRData(MediaBuffer *mediaBuf) { + const uint8_t *mediaData = + (const uint8_t *)mediaBuf->data() + mediaBuf->range_offset(); + + size_t mediaLength = mediaBuf->range_length(); + + CHECK_GE(kMaxPacketSize, 12u + 1u + mediaLength); + + const bool isWide = (mMode == AMR_WB); + + int64_t timeUs; + CHECK(mediaBuf->meta_data()->findInt64(kKeyTime, &timeUs)); + uint32_t rtpTime = mRTPTimeBase + (timeUs / (isWide ? 250 : 125)); + + // hexdump(mediaData, mediaLength); + + Vector<uint8_t> tableOfContents; + size_t srcOffset = 0; + while (srcOffset < mediaLength) { + uint8_t toc = mediaData[srcOffset]; + + unsigned FT = (toc >> 3) & 0x0f; + CHECK((isWide && FT <= 8) || (!isWide && FT <= 7)); + + tableOfContents.push(toc); + srcOffset += getFrameSize(isWide, FT); + } + CHECK_EQ(srcOffset, mediaLength); + + sp<ABuffer> buffer = new ABuffer(kMaxPacketSize); + CHECK_LE(mediaLength + 12 + 1, buffer->capacity()); + + // The data fits into a single packet + uint8_t *data = buffer->data(); + data[0] = 0x80; + data[1] = PT; + if (mNumRTPSent == 0) { + // Signal start of talk-spurt. + data[1] |= 0x80; // M-bit + } + data[2] = (mSeqNo >> 8) & 0xff; + data[3] = mSeqNo & 0xff; + data[4] = rtpTime >> 24; + data[5] = (rtpTime >> 16) & 0xff; + data[6] = (rtpTime >> 8) & 0xff; + data[7] = rtpTime & 0xff; + data[8] = mSourceID >> 24; + data[9] = (mSourceID >> 16) & 0xff; + data[10] = (mSourceID >> 8) & 0xff; + data[11] = mSourceID & 0xff; + + data[12] = 0xf0; // CMR=15, RR=0 + + size_t dstOffset = 13; + + for (size_t i = 0; i < tableOfContents.size(); ++i) { + uint8_t toc = tableOfContents[i]; + + if (i + 1 < tableOfContents.size()) { + toc |= 0x80; + } else { + toc &= ~0x80; + } + + data[dstOffset++] = toc; + } + + srcOffset = 0; + for (size_t i = 0; i < tableOfContents.size(); ++i) { + uint8_t toc = tableOfContents[i]; + unsigned FT = (toc >> 3) & 0x0f; + size_t frameSize = getFrameSize(isWide, FT); + + ++srcOffset; // skip toc + memcpy(&data[dstOffset], &mediaData[srcOffset], frameSize - 1); + srcOffset += frameSize - 1; + dstOffset += frameSize - 1; + } + + buffer->setRange(0, dstOffset); + + send(buffer, false /* isRTCP */); + + ++mSeqNo; + ++mNumRTPSent; + mNumRTPOctetsSent += buffer->size() - 12; + + mLastRTPTime = rtpTime; + mLastNTPTime = GetNowNTP(); +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/ARTPWriter.h b/media/libstagefright/rtsp/ARTPWriter.h new file mode 100644 index 0000000..fdc8d23 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPWriter.h @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_WRITER_H_ + +#define A_RTP_WRITER_H_ + +#include <media/stagefright/foundation/ABase.h> +#include <media/stagefright/foundation/AHandlerReflector.h> +#include <media/stagefright/foundation/AString.h> +#include <media/stagefright/foundation/base64.h> +#include <media/stagefright/MediaWriter.h> + +#include <arpa/inet.h> +#include <sys/socket.h> + +#define LOG_TO_FILES 0 + +namespace android { + +struct ABuffer; +struct MediaBuffer; + +struct ARTPWriter : public MediaWriter { + ARTPWriter(int fd); + + virtual status_t addSource(const sp<MediaSource> &source); + virtual bool reachedEOS(); + virtual status_t start(MetaData *params); + virtual status_t stop(); + virtual status_t pause(); + + virtual void onMessageReceived(const sp<AMessage> &msg); + +protected: + virtual ~ARTPWriter(); + +private: + enum { + kWhatStart = 'strt', + kWhatStop = 'stop', + kWhatRead = 'read', + kWhatSendSR = 'sr ', + }; + + enum { + kFlagStarted = 1, + kFlagEOS = 2, + }; + + Mutex mLock; + Condition mCondition; + uint32_t mFlags; + + int mFd; + +#if LOG_TO_FILES + int mRTPFd; + int mRTCPFd; +#endif + + sp<MediaSource> mSource; + sp<ALooper> mLooper; + sp<AHandlerReflector<ARTPWriter> > mReflector; + + int mSocket; + struct sockaddr_in mRTPAddr; + struct sockaddr_in mRTCPAddr; + + AString mProfileLevel; + AString mSeqParamSet; + AString mPicParamSet; + + uint32_t mSourceID; + uint32_t mSeqNo; + uint32_t mRTPTimeBase; + uint32_t mNumRTPSent; + uint32_t mNumRTPOctetsSent; + uint32_t mLastRTPTime; + uint64_t mLastNTPTime; + + int32_t mNumSRsSent; + + enum { + INVALID, + H264, + H263, + AMR_NB, + AMR_WB, + } mMode; + + static uint64_t GetNowNTP(); + + void onRead(const sp<AMessage> &msg); + void onSendSR(const sp<AMessage> &msg); + + void addSR(const sp<ABuffer> &buffer); + void addSDES(const sp<ABuffer> &buffer); + + void makeH264SPropParamSets(MediaBuffer *buffer); + void dumpSessionDesc(); + + void sendBye(); + void sendAVCData(MediaBuffer *mediaBuf); + void sendH263Data(MediaBuffer *mediaBuf); + void sendAMRData(MediaBuffer *mediaBuf); + + void send(const sp<ABuffer> &buffer, bool isRTCP); + + DISALLOW_EVIL_CONSTRUCTORS(ARTPWriter); +}; + +} // namespace android + +#endif // A_RTP_WRITER_H_ diff --git a/media/libstagefright/rtsp/ARTSPConnection.cpp b/media/libstagefright/rtsp/ARTSPConnection.cpp new file mode 100644 index 0000000..539a888 --- /dev/null +++ b/media/libstagefright/rtsp/ARTSPConnection.cpp @@ -0,0 +1,1057 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "ARTSPConnection" +#include <utils/Log.h> + +#include "ARTSPConnection.h" + +#include <cutils/properties.h> + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/base64.h> +#include <media/stagefright/MediaErrors.h> + +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <openssl/md5.h> +#include <sys/socket.h> + +#include "HTTPBase.h" + +namespace android { + +// static +const int64_t ARTSPConnection::kSelectTimeoutUs = 1000ll; + +ARTSPConnection::ARTSPConnection(bool uidValid, uid_t uid) + : mUIDValid(uidValid), + mUID(uid), + mState(DISCONNECTED), + mAuthType(NONE), + mSocket(-1), + mConnectionID(0), + mNextCSeq(0), + mReceiveResponseEventPending(false) { + MakeUserAgent(&mUserAgent); +} + +ARTSPConnection::~ARTSPConnection() { + if (mSocket >= 0) { + ALOGE("Connection is still open, closing the socket."); + if (mUIDValid) { + HTTPBase::UnRegisterSocketUserTag(mSocket); + } + close(mSocket); + mSocket = -1; + } +} + +void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatConnect, id()); + msg->setString("url", url); + msg->setMessage("reply", reply); + msg->post(); +} + +void ARTSPConnection::disconnect(const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); + msg->setMessage("reply", reply); + msg->post(); +} + +void ARTSPConnection::sendRequest( + const char *request, const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatSendRequest, id()); + msg->setString("request", request); + msg->setMessage("reply", reply); + msg->post(); +} + +void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id()); + msg->setMessage("reply", reply); + msg->post(); +} + +void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatConnect: + onConnect(msg); + break; + + case kWhatDisconnect: + onDisconnect(msg); + break; + + case kWhatCompleteConnection: + onCompleteConnection(msg); + break; + + case kWhatSendRequest: + onSendRequest(msg); + break; + + case kWhatReceiveResponse: + onReceiveResponse(); + break; + + case kWhatObserveBinaryData: + { + CHECK(msg->findMessage("reply", &mObserveBinaryMessage)); + break; + } + + default: + TRESPASS(); + break; + } +} + +// static +bool ARTSPConnection::ParseURL( + const char *url, AString *host, unsigned *port, AString *path, + AString *user, AString *pass) { + host->clear(); + *port = 0; + path->clear(); + user->clear(); + pass->clear(); + + if (strncasecmp("rtsp://", url, 7)) { + return false; + } + + const char *slashPos = strchr(&url[7], '/'); + + if (slashPos == NULL) { + host->setTo(&url[7]); + path->setTo("/"); + } else { + host->setTo(&url[7], slashPos - &url[7]); + path->setTo(slashPos); + } + + ssize_t atPos = host->find("@"); + + if (atPos >= 0) { + // Split of user:pass@ from hostname. + + AString userPass(*host, 0, atPos); + host->erase(0, atPos + 1); + + ssize_t colonPos = userPass.find(":"); + + if (colonPos < 0) { + *user = userPass; + } else { + user->setTo(userPass, 0, colonPos); + pass->setTo(userPass, colonPos + 1, userPass.size() - colonPos - 1); + } + } + + const char *colonPos = strchr(host->c_str(), ':'); + + if (colonPos != NULL) { + unsigned long x; + if (!ParseSingleUnsignedLong(colonPos + 1, &x) || x >= 65536) { + return false; + } + + *port = x; + + size_t colonOffset = colonPos - host->c_str(); + size_t trailing = host->size() - colonOffset; + host->erase(colonOffset, trailing); + } else { + *port = 554; + } + + return true; +} + +static status_t MakeSocketBlocking(int s, bool blocking) { + // Make socket non-blocking. + int flags = fcntl(s, F_GETFL, 0); + + if (flags == -1) { + return UNKNOWN_ERROR; + } + + if (blocking) { + flags &= ~O_NONBLOCK; + } else { + flags |= O_NONBLOCK; + } + + flags = fcntl(s, F_SETFL, flags); + + return flags == -1 ? UNKNOWN_ERROR : OK; +} + +void ARTSPConnection::onConnect(const sp<AMessage> &msg) { + ++mConnectionID; + + if (mState != DISCONNECTED) { + if (mUIDValid) { + HTTPBase::UnRegisterSocketUserTag(mSocket); + } + close(mSocket); + mSocket = -1; + + flushPendingRequests(); + } + + mState = CONNECTING; + + AString url; + CHECK(msg->findString("url", &url)); + + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + AString host, path; + unsigned port; + if (!ParseURL(url.c_str(), &host, &port, &path, &mUser, &mPass) + || (mUser.size() > 0 && mPass.size() == 0)) { + // If we have a user name but no password we have to give up + // right here, since we currently have no way of asking the user + // for this information. + + ALOGE("Malformed rtsp url %s", url.c_str()); + + reply->setInt32("result", ERROR_MALFORMED); + reply->post(); + + mState = DISCONNECTED; + return; + } + + if (mUser.size() > 0) { + ALOGV("user = '%s', pass = '%s'", mUser.c_str(), mPass.c_str()); + } + + struct hostent *ent = gethostbyname(host.c_str()); + if (ent == NULL) { + ALOGE("Unknown host %s", host.c_str()); + + reply->setInt32("result", -ENOENT); + reply->post(); + + mState = DISCONNECTED; + return; + } + + mSocket = socket(AF_INET, SOCK_STREAM, 0); + + if (mUIDValid) { + HTTPBase::RegisterSocketUserTag(mSocket, mUID, + (uint32_t)*(uint32_t*) "RTSP"); + } + + MakeSocketBlocking(mSocket, false); + + struct sockaddr_in remote; + memset(remote.sin_zero, 0, sizeof(remote.sin_zero)); + remote.sin_family = AF_INET; + remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; + remote.sin_port = htons(port); + + int err = ::connect( + mSocket, (const struct sockaddr *)&remote, sizeof(remote)); + + reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr)); + + if (err < 0) { + if (errno == EINPROGRESS) { + sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id()); + msg->setMessage("reply", reply); + msg->setInt32("connection-id", mConnectionID); + msg->post(); + return; + } + + reply->setInt32("result", -errno); + mState = DISCONNECTED; + + if (mUIDValid) { + HTTPBase::UnRegisterSocketUserTag(mSocket); + } + close(mSocket); + mSocket = -1; + } else { + reply->setInt32("result", OK); + mState = CONNECTED; + mNextCSeq = 1; + + postReceiveReponseEvent(); + } + + reply->post(); +} + +void ARTSPConnection::performDisconnect() { + if (mUIDValid) { + HTTPBase::UnRegisterSocketUserTag(mSocket); + } + close(mSocket); + mSocket = -1; + + flushPendingRequests(); + + mUser.clear(); + mPass.clear(); + mAuthType = NONE; + mNonce.clear(); + + mState = DISCONNECTED; +} + +void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) { + if (mState == CONNECTED || mState == CONNECTING) { + performDisconnect(); + } + + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + reply->setInt32("result", OK); + + reply->post(); +} + +void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) { + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + int32_t connectionID; + CHECK(msg->findInt32("connection-id", &connectionID)); + + if ((connectionID != mConnectionID) || mState != CONNECTING) { + // While we were attempting to connect, the attempt was + // cancelled. + reply->setInt32("result", -ECONNABORTED); + reply->post(); + return; + } + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = kSelectTimeoutUs; + + fd_set ws; + FD_ZERO(&ws); + FD_SET(mSocket, &ws); + + int res = select(mSocket + 1, NULL, &ws, NULL, &tv); + CHECK_GE(res, 0); + + if (res == 0) { + // Timed out. Not yet connected. + + msg->post(); + return; + } + + int err; + socklen_t optionLen = sizeof(err); + CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); + CHECK_EQ(optionLen, (socklen_t)sizeof(err)); + + if (err != 0) { + ALOGE("err = %d (%s)", err, strerror(err)); + + reply->setInt32("result", -err); + + mState = DISCONNECTED; + if (mUIDValid) { + HTTPBase::UnRegisterSocketUserTag(mSocket); + } + close(mSocket); + mSocket = -1; + } else { + reply->setInt32("result", OK); + mState = CONNECTED; + mNextCSeq = 1; + + postReceiveReponseEvent(); + } + + reply->post(); +} + +void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) { + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + if (mState != CONNECTED) { + reply->setInt32("result", -ENOTCONN); + reply->post(); + return; + } + + AString request; + CHECK(msg->findString("request", &request)); + + // Just in case we need to re-issue the request with proper authentication + // later, stash it away. + reply->setString("original-request", request.c_str(), request.size()); + + addAuthentication(&request); + addUserAgent(&request); + + // Find the boundary between headers and the body. + ssize_t i = request.find("\r\n\r\n"); + CHECK_GE(i, 0); + + int32_t cseq = mNextCSeq++; + + AString cseqHeader = "CSeq: "; + cseqHeader.append(cseq); + cseqHeader.append("\r\n"); + + request.insert(cseqHeader, i + 2); + + ALOGV("request: '%s'", request.c_str()); + + size_t numBytesSent = 0; + while (numBytesSent < request.size()) { + ssize_t n = + send(mSocket, request.c_str() + numBytesSent, + request.size() - numBytesSent, 0); + + if (n < 0 && errno == EINTR) { + continue; + } + + if (n <= 0) { + performDisconnect(); + + if (n == 0) { + // Server closed the connection. + ALOGE("Server unexpectedly closed the connection."); + + reply->setInt32("result", ERROR_IO); + reply->post(); + } else { + ALOGE("Error sending rtsp request. (%s)", strerror(errno)); + reply->setInt32("result", -errno); + reply->post(); + } + + return; + } + + numBytesSent += (size_t)n; + } + + mPendingRequests.add(cseq, reply); +} + +void ARTSPConnection::onReceiveResponse() { + mReceiveResponseEventPending = false; + + if (mState != CONNECTED) { + return; + } + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = kSelectTimeoutUs; + + fd_set rs; + FD_ZERO(&rs); + FD_SET(mSocket, &rs); + + int res = select(mSocket + 1, &rs, NULL, NULL, &tv); + CHECK_GE(res, 0); + + if (res == 1) { + MakeSocketBlocking(mSocket, true); + + bool success = receiveRTSPReponse(); + + MakeSocketBlocking(mSocket, false); + + if (!success) { + // Something horrible, irreparable has happened. + flushPendingRequests(); + return; + } + } + + postReceiveReponseEvent(); +} + +void ARTSPConnection::flushPendingRequests() { + for (size_t i = 0; i < mPendingRequests.size(); ++i) { + sp<AMessage> reply = mPendingRequests.valueAt(i); + + reply->setInt32("result", -ECONNABORTED); + reply->post(); + } + + mPendingRequests.clear(); +} + +void ARTSPConnection::postReceiveReponseEvent() { + if (mReceiveResponseEventPending) { + return; + } + + sp<AMessage> msg = new AMessage(kWhatReceiveResponse, id()); + msg->post(); + + mReceiveResponseEventPending = true; +} + +status_t ARTSPConnection::receive(void *data, size_t size) { + size_t offset = 0; + while (offset < size) { + ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0); + + if (n < 0 && errno == EINTR) { + continue; + } + + if (n <= 0) { + performDisconnect(); + + if (n == 0) { + // Server closed the connection. + ALOGE("Server unexpectedly closed the connection."); + return ERROR_IO; + } else { + ALOGE("Error reading rtsp response. (%s)", strerror(errno)); + return -errno; + } + } + + offset += (size_t)n; + } + + return OK; +} + +bool ARTSPConnection::receiveLine(AString *line) { + line->clear(); + + bool sawCR = false; + for (;;) { + char c; + if (receive(&c, 1) != OK) { + return false; + } + + if (sawCR && c == '\n') { + line->erase(line->size() - 1, 1); + return true; + } + + line->append(&c, 1); + + if (c == '$' && line->size() == 1) { + // Special-case for interleaved binary data. + return true; + } + + sawCR = (c == '\r'); + } +} + +sp<ABuffer> ARTSPConnection::receiveBinaryData() { + uint8_t x[3]; + if (receive(x, 3) != OK) { + return NULL; + } + + sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]); + if (receive(buffer->data(), buffer->size()) != OK) { + return NULL; + } + + buffer->meta()->setInt32("index", (int32_t)x[0]); + + return buffer; +} + +static bool IsRTSPVersion(const AString &s) { + return s == "RTSP/1.0"; +} + +bool ARTSPConnection::receiveRTSPReponse() { + AString statusLine; + + if (!receiveLine(&statusLine)) { + return false; + } + + if (statusLine == "$") { + sp<ABuffer> buffer = receiveBinaryData(); + + if (buffer == NULL) { + return false; + } + + if (mObserveBinaryMessage != NULL) { + sp<AMessage> notify = mObserveBinaryMessage->dup(); + notify->setBuffer("buffer", buffer); + notify->post(); + } else { + ALOGW("received binary data, but no one cares."); + } + + return true; + } + + sp<ARTSPResponse> response = new ARTSPResponse; + response->mStatusLine = statusLine; + + ALOGI("status: %s", response->mStatusLine.c_str()); + + ssize_t space1 = response->mStatusLine.find(" "); + if (space1 < 0) { + return false; + } + ssize_t space2 = response->mStatusLine.find(" ", space1 + 1); + if (space2 < 0) { + return false; + } + + bool isRequest = false; + + if (!IsRTSPVersion(AString(response->mStatusLine, 0, space1))) { + CHECK(IsRTSPVersion( + AString( + response->mStatusLine, + space2 + 1, + response->mStatusLine.size() - space2 - 1))); + + isRequest = true; + + response->mStatusCode = 0; + } else { + AString statusCodeStr( + response->mStatusLine, space1 + 1, space2 - space1 - 1); + + if (!ParseSingleUnsignedLong( + statusCodeStr.c_str(), &response->mStatusCode) + || response->mStatusCode < 100 || response->mStatusCode > 999) { + return false; + } + } + + AString line; + ssize_t lastDictIndex = -1; + for (;;) { + if (!receiveLine(&line)) { + break; + } + + if (line.empty()) { + break; + } + + ALOGV("line: '%s'", line.c_str()); + + if (line.c_str()[0] == ' ' || line.c_str()[0] == '\t') { + // Support for folded header values. + + if (lastDictIndex < 0) { + // First line cannot be a continuation of the previous one. + return false; + } + + AString &value = response->mHeaders.editValueAt(lastDictIndex); + value.append(line); + + continue; + } + + ssize_t colonPos = line.find(":"); + if (colonPos < 0) { + // Malformed header line. + return false; + } + + AString key(line, 0, colonPos); + key.trim(); + key.tolower(); + + line.erase(0, colonPos + 1); + + lastDictIndex = response->mHeaders.add(key, line); + } + + for (size_t i = 0; i < response->mHeaders.size(); ++i) { + response->mHeaders.editValueAt(i).trim(); + } + + unsigned long contentLength = 0; + + ssize_t i = response->mHeaders.indexOfKey("content-length"); + + if (i >= 0) { + AString value = response->mHeaders.valueAt(i); + if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) { + return false; + } + } + + if (contentLength > 0) { + response->mContent = new ABuffer(contentLength); + + if (receive(response->mContent->data(), contentLength) != OK) { + return false; + } + } + + if (response->mStatusCode == 401) { + if (mAuthType == NONE && mUser.size() > 0 + && parseAuthMethod(response)) { + ssize_t i; + CHECK_EQ((status_t)OK, findPendingRequest(response, &i)); + CHECK_GE(i, 0); + + sp<AMessage> reply = mPendingRequests.valueAt(i); + mPendingRequests.removeItemsAt(i); + + AString request; + CHECK(reply->findString("original-request", &request)); + + sp<AMessage> msg = new AMessage(kWhatSendRequest, id()); + msg->setMessage("reply", reply); + msg->setString("request", request.c_str(), request.size()); + + ALOGI("re-sending request with authentication headers..."); + onSendRequest(msg); + + return true; + } + } + + return isRequest + ? handleServerRequest(response) + : notifyResponseListener(response); +} + +bool ARTSPConnection::handleServerRequest(const sp<ARTSPResponse> &request) { + // Implementation of server->client requests is optional for all methods + // but we do need to respond, even if it's just to say that we don't + // support the method. + + ssize_t space1 = request->mStatusLine.find(" "); + CHECK_GE(space1, 0); + + AString response; + response.append("RTSP/1.0 501 Not Implemented\r\n"); + + ssize_t i = request->mHeaders.indexOfKey("cseq"); + + if (i >= 0) { + AString value = request->mHeaders.valueAt(i); + + unsigned long cseq; + if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { + return false; + } + + response.append("CSeq: "); + response.append(cseq); + response.append("\r\n"); + } + + response.append("\r\n"); + + size_t numBytesSent = 0; + while (numBytesSent < response.size()) { + ssize_t n = + send(mSocket, response.c_str() + numBytesSent, + response.size() - numBytesSent, 0); + + if (n < 0 && errno == EINTR) { + continue; + } + + if (n <= 0) { + if (n == 0) { + // Server closed the connection. + ALOGE("Server unexpectedly closed the connection."); + } else { + ALOGE("Error sending rtsp response (%s).", strerror(errno)); + } + + performDisconnect(); + + return false; + } + + numBytesSent += (size_t)n; + } + + return true; +} + +// static +bool ARTSPConnection::ParseSingleUnsignedLong( + const char *from, unsigned long *x) { + char *end; + *x = strtoul(from, &end, 10); + + if (end == from || *end != '\0') { + return false; + } + + return true; +} + +status_t ARTSPConnection::findPendingRequest( + const sp<ARTSPResponse> &response, ssize_t *index) const { + *index = 0; + + ssize_t i = response->mHeaders.indexOfKey("cseq"); + + if (i < 0) { + // This is an unsolicited server->client message. + return OK; + } + + AString value = response->mHeaders.valueAt(i); + + unsigned long cseq; + if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { + return ERROR_MALFORMED; + } + + i = mPendingRequests.indexOfKey(cseq); + + if (i < 0) { + return -ENOENT; + } + + *index = i; + + return OK; +} + +bool ARTSPConnection::notifyResponseListener( + const sp<ARTSPResponse> &response) { + ssize_t i; + status_t err = findPendingRequest(response, &i); + + if (err == OK && i < 0) { + // An unsolicited server response is not a problem. + return true; + } + + if (err != OK) { + return false; + } + + sp<AMessage> reply = mPendingRequests.valueAt(i); + mPendingRequests.removeItemsAt(i); + + reply->setInt32("result", OK); + reply->setObject("response", response); + reply->post(); + + return true; +} + +bool ARTSPConnection::parseAuthMethod(const sp<ARTSPResponse> &response) { + ssize_t i = response->mHeaders.indexOfKey("www-authenticate"); + + if (i < 0) { + return false; + } + + AString value = response->mHeaders.valueAt(i); + + if (!strncmp(value.c_str(), "Basic", 5)) { + mAuthType = BASIC; + } else { +#if !defined(HAVE_ANDROID_OS) + // We don't have access to the MD5 implementation on the simulator, + // so we won't support digest authentication. + return false; +#endif + + CHECK(!strncmp(value.c_str(), "Digest", 6)); + mAuthType = DIGEST; + + i = value.find("nonce="); + CHECK_GE(i, 0); + CHECK_EQ(value.c_str()[i + 6], '\"'); + ssize_t j = value.find("\"", i + 7); + CHECK_GE(j, 0); + + mNonce.setTo(value, i + 7, j - i - 7); + } + + return true; +} + +#if defined(HAVE_ANDROID_OS) +static void H(const AString &s, AString *out) { + out->clear(); + + MD5_CTX m; + MD5_Init(&m); + MD5_Update(&m, s.c_str(), s.size()); + + uint8_t key[16]; + MD5_Final(key, &m); + + for (size_t i = 0; i < 16; ++i) { + char nibble = key[i] >> 4; + if (nibble <= 9) { + nibble += '0'; + } else { + nibble += 'a' - 10; + } + out->append(&nibble, 1); + + nibble = key[i] & 0x0f; + if (nibble <= 9) { + nibble += '0'; + } else { + nibble += 'a' - 10; + } + out->append(&nibble, 1); + } +} +#endif + +static void GetMethodAndURL( + const AString &request, AString *method, AString *url) { + ssize_t space1 = request.find(" "); + CHECK_GE(space1, 0); + + ssize_t space2 = request.find(" ", space1 + 1); + CHECK_GE(space2, 0); + + method->setTo(request, 0, space1); + url->setTo(request, space1 + 1, space2 - space1); +} + +void ARTSPConnection::addAuthentication(AString *request) { + if (mAuthType == NONE) { + return; + } + + // Find the boundary between headers and the body. + ssize_t i = request->find("\r\n\r\n"); + CHECK_GE(i, 0); + + if (mAuthType == BASIC) { + AString tmp; + tmp.append(mUser); + tmp.append(":"); + tmp.append(mPass); + + AString out; + encodeBase64(tmp.c_str(), tmp.size(), &out); + + AString fragment; + fragment.append("Authorization: Basic "); + fragment.append(out); + fragment.append("\r\n"); + + request->insert(fragment, i + 2); + + return; + } + +#if defined(HAVE_ANDROID_OS) + CHECK_EQ((int)mAuthType, (int)DIGEST); + + AString method, url; + GetMethodAndURL(*request, &method, &url); + + AString A1; + A1.append(mUser); + A1.append(":"); + A1.append("Streaming Server"); + A1.append(":"); + A1.append(mPass); + + AString A2; + A2.append(method); + A2.append(":"); + A2.append(url); + + AString HA1, HA2; + H(A1, &HA1); + H(A2, &HA2); + + AString tmp; + tmp.append(HA1); + tmp.append(":"); + tmp.append(mNonce); + tmp.append(":"); + tmp.append(HA2); + + AString digest; + H(tmp, &digest); + + AString fragment; + fragment.append("Authorization: Digest "); + fragment.append("nonce=\""); + fragment.append(mNonce); + fragment.append("\", "); + fragment.append("username=\""); + fragment.append(mUser); + fragment.append("\", "); + fragment.append("uri=\""); + fragment.append(url); + fragment.append("\", "); + fragment.append("response=\""); + fragment.append(digest); + fragment.append("\""); + fragment.append("\r\n"); + + request->insert(fragment, i + 2); +#endif +} + +// static +void ARTSPConnection::MakeUserAgent(AString *userAgent) { + userAgent->clear(); + userAgent->setTo("User-Agent: stagefright/1.1 (Linux;Android "); + +#if (PROPERTY_VALUE_MAX < 8) +#error "PROPERTY_VALUE_MAX must be at least 8" +#endif + + char value[PROPERTY_VALUE_MAX]; + property_get("ro.build.version.release", value, "Unknown"); + userAgent->append(value); + userAgent->append(")\r\n"); +} + +void ARTSPConnection::addUserAgent(AString *request) const { + // Find the boundary between headers and the body. + ssize_t i = request->find("\r\n\r\n"); + CHECK_GE(i, 0); + + request->insert(mUserAgent, i + 2); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/ARTSPConnection.h b/media/libstagefright/rtsp/ARTSPConnection.h new file mode 100644 index 0000000..68f2d59 --- /dev/null +++ b/media/libstagefright/rtsp/ARTSPConnection.h @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTSP_CONNECTION_H_ + +#define A_RTSP_CONNECTION_H_ + +#include <media/stagefright/foundation/AHandler.h> +#include <media/stagefright/foundation/AString.h> + +namespace android { + +struct ABuffer; + +struct ARTSPResponse : public RefBase { + unsigned long mStatusCode; + AString mStatusLine; + KeyedVector<AString,AString> mHeaders; + sp<ABuffer> mContent; +}; + +struct ARTSPConnection : public AHandler { + ARTSPConnection(bool uidValid = false, uid_t uid = 0); + + void connect(const char *url, const sp<AMessage> &reply); + void disconnect(const sp<AMessage> &reply); + + void sendRequest(const char *request, const sp<AMessage> &reply); + + void observeBinaryData(const sp<AMessage> &reply); + + static bool ParseURL( + const char *url, AString *host, unsigned *port, AString *path, + AString *user, AString *pass); + +protected: + virtual ~ARTSPConnection(); + virtual void onMessageReceived(const sp<AMessage> &msg); + +private: + enum State { + DISCONNECTED, + CONNECTING, + CONNECTED, + }; + + enum { + kWhatConnect = 'conn', + kWhatDisconnect = 'disc', + kWhatCompleteConnection = 'comc', + kWhatSendRequest = 'sreq', + kWhatReceiveResponse = 'rres', + kWhatObserveBinaryData = 'obin', + }; + + enum AuthType { + NONE, + BASIC, + DIGEST + }; + + static const int64_t kSelectTimeoutUs; + + bool mUIDValid; + uid_t mUID; + State mState; + AString mUser, mPass; + AuthType mAuthType; + AString mNonce; + int mSocket; + int32_t mConnectionID; + int32_t mNextCSeq; + bool mReceiveResponseEventPending; + + KeyedVector<int32_t, sp<AMessage> > mPendingRequests; + + sp<AMessage> mObserveBinaryMessage; + + AString mUserAgent; + + void performDisconnect(); + + void onConnect(const sp<AMessage> &msg); + void onDisconnect(const sp<AMessage> &msg); + void onCompleteConnection(const sp<AMessage> &msg); + void onSendRequest(const sp<AMessage> &msg); + void onReceiveResponse(); + + void flushPendingRequests(); + void postReceiveReponseEvent(); + + // Return false iff something went unrecoverably wrong. + bool receiveRTSPReponse(); + status_t receive(void *data, size_t size); + bool receiveLine(AString *line); + sp<ABuffer> receiveBinaryData(); + bool notifyResponseListener(const sp<ARTSPResponse> &response); + + bool parseAuthMethod(const sp<ARTSPResponse> &response); + void addAuthentication(AString *request); + + void addUserAgent(AString *request) const; + + status_t findPendingRequest( + const sp<ARTSPResponse> &response, ssize_t *index) const; + + bool handleServerRequest(const sp<ARTSPResponse> &request); + + static bool ParseSingleUnsignedLong( + const char *from, unsigned long *x); + + static void MakeUserAgent(AString *userAgent); + + DISALLOW_EVIL_CONSTRUCTORS(ARTSPConnection); +}; + +} // namespace android + +#endif // A_RTSP_CONNECTION_H_ diff --git a/media/libstagefright/rtsp/ARawAudioAssembler.cpp b/media/libstagefright/rtsp/ARawAudioAssembler.cpp new file mode 100644 index 0000000..0da5dd2 --- /dev/null +++ b/media/libstagefright/rtsp/ARawAudioAssembler.cpp @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "ARawAudioAssembler" +#include <utils/Log.h> + +#include "ARawAudioAssembler.h" + +#include "ARTPSource.h" +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/MediaDefs.h> +#include <media/stagefright/MetaData.h> +#include <media/stagefright/Utils.h> + +namespace android { + +ARawAudioAssembler::ARawAudioAssembler( + const sp<AMessage> ¬ify, const char *desc, const AString ¶ms) + : mNotifyMsg(notify), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0) { +} + +ARawAudioAssembler::~ARawAudioAssembler() { +} + +ARTPAssembler::AssemblyStatus ARawAudioAssembler::assembleMore( + const sp<ARTPSource> &source) { + return addPacket(source); +} + +ARTPAssembler::AssemblyStatus ARawAudioAssembler::addPacket( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { + ALOGV("Not the sequence number I expected"); + + return WRONG_SEQUENCE_NUMBER; + } + + // hexdump(buffer->data(), buffer->size()); + + if (buffer->size() < 1) { + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + ALOGV("raw audio packet too short."); + + return MALFORMED_PACKET; + } + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setBuffer("access-unit", buffer); + msg->post(); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return OK; +} + +void ARawAudioAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ++mNextExpectedSeqNo; +} + +void ARawAudioAssembler::onByeReceived() { + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setInt32("eos", true); + msg->post(); +} + +// static +bool ARawAudioAssembler::Supports(const char *desc) { + return !strncmp(desc, "PCMU/", 5) + || !strncmp(desc, "PCMA/", 5); +} + +// static +void ARawAudioAssembler::MakeFormat( + const char *desc, const sp<MetaData> &format) { + if (!strncmp(desc, "PCMU/", 5)) { + format->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_G711_MLAW); + } else if (!strncmp(desc, "PCMA/", 5)) { + format->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_G711_ALAW); + } else { + TRESPASS(); + } + + int32_t sampleRate, numChannels; + ASessionDescription::ParseFormatDesc( + desc, &sampleRate, &numChannels); + + format->setInt32(kKeySampleRate, sampleRate); + format->setInt32(kKeyChannelCount, numChannels); +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/ARawAudioAssembler.h b/media/libstagefright/rtsp/ARawAudioAssembler.h new file mode 100644 index 0000000..ed7af08 --- /dev/null +++ b/media/libstagefright/rtsp/ARawAudioAssembler.h @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RAW_AUDIO_ASSEMBLER_H_ + +#define A_RAW_AUDIO_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +namespace android { + +struct AMessage; +struct AString; +struct MetaData; + +struct ARawAudioAssembler : public ARTPAssembler { + ARawAudioAssembler( + const sp<AMessage> ¬ify, + const char *desc, const AString ¶ms); + + static bool Supports(const char *desc); + + static void MakeFormat( + const char *desc, const sp<MetaData> &format); + +protected: + virtual ~ARawAudioAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void onByeReceived(); + virtual void packetLost(); + +private: + bool mIsWide; + + sp<AMessage> mNotifyMsg; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + + AssemblyStatus addPacket(const sp<ARTPSource> &source); + + DISALLOW_EVIL_CONSTRUCTORS(ARawAudioAssembler); +}; + +} // namespace android + +#endif // A_RAW_AUDIO_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/ASessionDescription.cpp b/media/libstagefright/rtsp/ASessionDescription.cpp new file mode 100644 index 0000000..a9b3330 --- /dev/null +++ b/media/libstagefright/rtsp/ASessionDescription.cpp @@ -0,0 +1,336 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "ASessionDescription" +#include <utils/Log.h> + +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AString.h> + +#include <stdlib.h> + +namespace android { + +ASessionDescription::ASessionDescription() + : mIsValid(false) { +} + +ASessionDescription::~ASessionDescription() { +} + +bool ASessionDescription::setTo(const void *data, size_t size) { + mIsValid = parse(data, size); + + if (!mIsValid) { + mTracks.clear(); + mFormats.clear(); + } + + return mIsValid; +} + +bool ASessionDescription::parse(const void *data, size_t size) { + mTracks.clear(); + mFormats.clear(); + + mTracks.push(Attribs()); + mFormats.push(AString("[root]")); + + AString desc((const char *)data, size); + + size_t i = 0; + for (;;) { + ssize_t eolPos = desc.find("\n", i); + + if (eolPos < 0) { + break; + } + + AString line; + if ((size_t)eolPos > i && desc.c_str()[eolPos - 1] == '\r') { + // We accept both '\n' and '\r\n' line endings, if it's + // the latter, strip the '\r' as well. + line.setTo(desc, i, eolPos - i - 1); + } else { + line.setTo(desc, i, eolPos - i); + } + + if (line.empty()) { + i = eolPos + 1; + continue; + } + + if (line.size() < 2 || line.c_str()[1] != '=') { + return false; + } + + ALOGI("%s", line.c_str()); + + switch (line.c_str()[0]) { + case 'v': + { + if (strcmp(line.c_str(), "v=0")) { + return false; + } + break; + } + + case 'a': + case 'b': + { + AString key, value; + + ssize_t colonPos = line.find(":", 2); + if (colonPos < 0) { + key = line; + } else { + key.setTo(line, 0, colonPos); + + if (key == "a=fmtp" || key == "a=rtpmap" + || key == "a=framesize") { + ssize_t spacePos = line.find(" ", colonPos + 1); + if (spacePos < 0) { + return false; + } + + key.setTo(line, 0, spacePos); + + colonPos = spacePos; + } + + value.setTo(line, colonPos + 1, line.size() - colonPos - 1); + } + + key.trim(); + value.trim(); + + ALOGV("adding '%s' => '%s'", key.c_str(), value.c_str()); + + mTracks.editItemAt(mTracks.size() - 1).add(key, value); + break; + } + + case 'm': + { + ALOGV("new section '%s'", + AString(line, 2, line.size() - 2).c_str()); + + mTracks.push(Attribs()); + mFormats.push(AString(line, 2, line.size() - 2)); + break; + } + + default: + { + AString key, value; + + ssize_t equalPos = line.find("="); + + key = AString(line, 0, equalPos + 1); + value = AString(line, equalPos + 1, line.size() - equalPos - 1); + + key.trim(); + value.trim(); + + ALOGV("adding '%s' => '%s'", key.c_str(), value.c_str()); + + mTracks.editItemAt(mTracks.size() - 1).add(key, value); + break; + } + } + + i = eolPos + 1; + } + + return true; +} + +bool ASessionDescription::isValid() const { + return mIsValid; +} + +size_t ASessionDescription::countTracks() const { + return mTracks.size(); +} + +void ASessionDescription::getFormat(size_t index, AString *value) const { + CHECK_GE(index, 0u); + CHECK_LT(index, mTracks.size()); + + *value = mFormats.itemAt(index); +} + +bool ASessionDescription::findAttribute( + size_t index, const char *key, AString *value) const { + CHECK_GE(index, 0u); + CHECK_LT(index, mTracks.size()); + + value->clear(); + + const Attribs &track = mTracks.itemAt(index); + ssize_t i = track.indexOfKey(AString(key)); + + if (i < 0) { + return false; + } + + *value = track.valueAt(i); + + return true; +} + +void ASessionDescription::getFormatType( + size_t index, unsigned long *PT, + AString *desc, AString *params) const { + AString format; + getFormat(index, &format); + + const char *lastSpacePos = strrchr(format.c_str(), ' '); + CHECK(lastSpacePos != NULL); + + char *end; + unsigned long x = strtoul(lastSpacePos + 1, &end, 10); + CHECK_GT(end, lastSpacePos + 1); + CHECK_EQ(*end, '\0'); + + *PT = x; + + char key[20]; + sprintf(key, "a=rtpmap:%lu", x); + + CHECK(findAttribute(index, key, desc)); + + sprintf(key, "a=fmtp:%lu", x); + if (!findAttribute(index, key, params)) { + params->clear(); + } +} + +bool ASessionDescription::getDimensions( + size_t index, unsigned long PT, + int32_t *width, int32_t *height) const { + *width = 0; + *height = 0; + + char key[20]; + sprintf(key, "a=framesize:%lu", PT); + AString value; + if (!findAttribute(index, key, &value)) { + return false; + } + + const char *s = value.c_str(); + char *end; + *width = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK_EQ(*end, '-'); + + s = end + 1; + *height = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK_EQ(*end, '\0'); + + return true; +} + +bool ASessionDescription::getDurationUs(int64_t *durationUs) const { + *durationUs = 0; + + CHECK(mIsValid); + + AString value; + if (!findAttribute(0, "a=range", &value)) { + return false; + } + + if (strncmp(value.c_str(), "npt=", 4)) { + return false; + } + + float from, to; + if (!parseNTPRange(value.c_str() + 4, &from, &to)) { + return false; + } + + *durationUs = (int64_t)((to - from) * 1E6); + + return true; +} + +// static +void ASessionDescription::ParseFormatDesc( + const char *desc, int32_t *timescale, int32_t *numChannels) { + const char *slash1 = strchr(desc, '/'); + CHECK(slash1 != NULL); + + const char *s = slash1 + 1; + char *end; + unsigned long x = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK(*end == '\0' || *end == '/'); + + *timescale = x; + *numChannels = 1; + + if (*end == '/') { + s = end + 1; + unsigned long x = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK_EQ(*end, '\0'); + + *numChannels = x; + } +} + +// static +bool ASessionDescription::parseNTPRange( + const char *s, float *npt1, float *npt2) { + if (s[0] == '-') { + return false; // no start time available. + } + + if (!strncmp("now", s, 3)) { + return false; // no absolute start time available + } + + char *end; + *npt1 = strtof(s, &end); + + if (end == s || *end != '-') { + // Failed to parse float or trailing "dash". + return false; + } + + s = end + 1; // skip the dash. + + if (!strncmp("now", s, 3)) { + return false; // no absolute end time available + } + + *npt2 = strtof(s, &end); + + if (end == s || *end != '\0') { + return false; + } + + return *npt2 > *npt1; +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/ASessionDescription.h b/media/libstagefright/rtsp/ASessionDescription.h new file mode 100644 index 0000000..b462983 --- /dev/null +++ b/media/libstagefright/rtsp/ASessionDescription.h @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_SESSION_DESCRIPTION_H_ + +#define A_SESSION_DESCRIPTION_H_ + +#include <sys/types.h> + +#include <media/stagefright/foundation/ABase.h> +#include <utils/KeyedVector.h> +#include <utils/RefBase.h> +#include <utils/Vector.h> + +namespace android { + +struct AString; + +struct ASessionDescription : public RefBase { + ASessionDescription(); + + bool setTo(const void *data, size_t size); + bool isValid() const; + + // Actually, 1 + number of tracks, as index 0 is reserved for the + // session description root-level attributes. + size_t countTracks() const; + void getFormat(size_t index, AString *value) const; + + void getFormatType( + size_t index, unsigned long *PT, + AString *desc, AString *params) const; + + bool getDimensions( + size_t index, unsigned long PT, + int32_t *width, int32_t *height) const; + + bool getDurationUs(int64_t *durationUs) const; + + static void ParseFormatDesc( + const char *desc, int32_t *timescale, int32_t *numChannels); + + bool findAttribute(size_t index, const char *key, AString *value) const; + + // parses strings of the form + // npt := npt-time "-" npt-time? | "-" npt-time + // npt-time := "now" | [0-9]+("." [0-9]*)? + // + // Returns true iff both "npt1" and "npt2" times were available, + // i.e. we have a fixed duration, otherwise this is live streaming. + static bool parseNTPRange(const char *s, float *npt1, float *npt2); + +protected: + virtual ~ASessionDescription(); + +private: + typedef KeyedVector<AString,AString> Attribs; + + bool mIsValid; + Vector<Attribs> mTracks; + Vector<AString> mFormats; + + bool parse(const void *data, size_t size); + + DISALLOW_EVIL_CONSTRUCTORS(ASessionDescription); +}; + +} // namespace android + +#endif // A_SESSION_DESCRIPTION_H_ diff --git a/media/libstagefright/rtsp/Android.mk b/media/libstagefright/rtsp/Android.mk new file mode 100644 index 0000000..e0fe59b --- /dev/null +++ b/media/libstagefright/rtsp/Android.mk @@ -0,0 +1,56 @@ +LOCAL_PATH:= $(call my-dir) + +include $(CLEAR_VARS) + +LOCAL_SRC_FILES:= \ + AAMRAssembler.cpp \ + AAVCAssembler.cpp \ + AH263Assembler.cpp \ + AMPEG4AudioAssembler.cpp \ + AMPEG4ElementaryAssembler.cpp \ + APacketSource.cpp \ + ARawAudioAssembler.cpp \ + ARTPAssembler.cpp \ + ARTPConnection.cpp \ + ARTPSource.cpp \ + ARTPWriter.cpp \ + ARTSPConnection.cpp \ + ASessionDescription.cpp \ + +LOCAL_C_INCLUDES:= \ + $(TOP)/frameworks/base/media/libstagefright/include \ + $(TOP)/frameworks/native/include/media/openmax \ + $(TOP)/external/openssl/include + +LOCAL_MODULE:= libstagefright_rtsp + +ifeq ($(TARGET_ARCH),arm) + LOCAL_CFLAGS += -Wno-psabi +endif + +include $(BUILD_STATIC_LIBRARY) + +################################################################################ + +include $(CLEAR_VARS) + +LOCAL_SRC_FILES:= \ + rtp_test.cpp + +LOCAL_SHARED_LIBRARIES := \ + libstagefright liblog libutils libbinder libstagefright_foundation + +LOCAL_STATIC_LIBRARIES := \ + libstagefright_rtsp + +LOCAL_C_INCLUDES:= \ + frameworks/base/media/libstagefright \ + $(TOP)/frameworks/native/include/media/openmax + +LOCAL_CFLAGS += -Wno-multichar + +LOCAL_MODULE_TAGS := debug + +LOCAL_MODULE:= rtp_test + +# include $(BUILD_EXECUTABLE) diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h new file mode 100644 index 0000000..deee30f --- /dev/null +++ b/media/libstagefright/rtsp/MyHandler.h @@ -0,0 +1,1522 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MY_HANDLER_H_ + +#define MY_HANDLER_H_ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "MyHandler" +#include <utils/Log.h> + +#include "APacketSource.h" +#include "ARTPConnection.h" +#include "ARTSPConnection.h" +#include "ASessionDescription.h" + +#include <ctype.h> +#include <cutils/properties.h> + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/ALooper.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/MediaErrors.h> + +#include <arpa/inet.h> +#include <sys/socket.h> +#include <netdb.h> + +#include "HTTPBase.h" + +// If no access units are received within 5 secs, assume that the rtp +// stream has ended and signal end of stream. +static int64_t kAccessUnitTimeoutUs = 10000000ll; + +// If no access units arrive for the first 10 secs after starting the +// stream, assume none ever will and signal EOS or switch transports. +static int64_t kStartupTimeoutUs = 10000000ll; + +static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; + +namespace android { + +static void MakeUserAgentString(AString *s) { + s->setTo("stagefright/1.1 (Linux;Android "); + +#if (PROPERTY_VALUE_MAX < 8) +#error "PROPERTY_VALUE_MAX must be at least 8" +#endif + + char value[PROPERTY_VALUE_MAX]; + property_get("ro.build.version.release", value, "Unknown"); + s->append(value); + s->append(")"); +} + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +struct MyHandler : public AHandler { + enum { + kWhatConnected = 'conn', + kWhatDisconnected = 'disc', + kWhatSeekDone = 'sdon', + + kWhatAccessUnit = 'accU', + kWhatEOS = 'eos!', + kWhatSeekDiscontinuity = 'seeD', + kWhatNormalPlayTimeMapping = 'nptM', + }; + + MyHandler( + const char *url, + const sp<AMessage> ¬ify, + bool uidValid = false, uid_t uid = 0) + : mNotify(notify), + mUIDValid(uidValid), + mUID(uid), + mNetLooper(new ALooper), + mConn(new ARTSPConnection(mUIDValid, mUID)), + mRTPConn(new ARTPConnection), + mOriginalSessionURL(url), + mSessionURL(url), + mSetupTracksSuccessful(false), + mSeekPending(false), + mFirstAccessUnit(true), + mAllTracksHaveTime(false), + mNTPAnchorUs(-1), + mMediaAnchorUs(-1), + mLastMediaTimeUs(0), + mNumAccessUnitsReceived(0), + mCheckPending(false), + mCheckGeneration(0), + mTryTCPInterleaving(false), + mTryFakeRTCP(false), + mReceivedFirstRTCPPacket(false), + mReceivedFirstRTPPacket(false), + mSeekable(false), + mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), + mKeepAliveGeneration(0) { + mNetLooper->setName("rtsp net"); + mNetLooper->start(false /* runOnCallingThread */, + false /* canCallJava */, + PRIORITY_HIGHEST); + + // Strip any authentication info from the session url, we don't + // want to transmit user/pass in cleartext. + AString host, path, user, pass; + unsigned port; + CHECK(ARTSPConnection::ParseURL( + mSessionURL.c_str(), &host, &port, &path, &user, &pass)); + + if (user.size() > 0) { + mSessionURL.clear(); + mSessionURL.append("rtsp://"); + mSessionURL.append(host); + mSessionURL.append(":"); + mSessionURL.append(StringPrintf("%u", port)); + mSessionURL.append(path); + + ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); + } + + mSessionHost = host; + } + + void connect() { + looper()->registerHandler(mConn); + (1 ? mNetLooper : looper())->registerHandler(mRTPConn); + + sp<AMessage> notify = new AMessage('biny', id()); + mConn->observeBinaryData(notify); + + sp<AMessage> reply = new AMessage('conn', id()); + mConn->connect(mOriginalSessionURL.c_str(), reply); + } + + void disconnect() { + (new AMessage('abor', id()))->post(); + } + + void seek(int64_t timeUs) { + sp<AMessage> msg = new AMessage('seek', id()); + msg->setInt64("time", timeUs); + msg->post(); + } + + static void addRR(const sp<ABuffer> &buf) { + uint8_t *ptr = buf->data() + buf->size(); + ptr[0] = 0x80 | 0; + ptr[1] = 201; // RR + ptr[2] = 0; + ptr[3] = 1; + ptr[4] = 0xde; // SSRC + ptr[5] = 0xad; + ptr[6] = 0xbe; + ptr[7] = 0xef; + + buf->setRange(0, buf->size() + 8); + } + + static void addSDES(int s, const sp<ABuffer> &buffer) { + struct sockaddr_in addr; + socklen_t addrSize = sizeof(addr); + CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); + + uint8_t *data = buffer->data() + buffer->size(); + data[0] = 0x80 | 1; + data[1] = 202; // SDES + data[4] = 0xde; // SSRC + data[5] = 0xad; + data[6] = 0xbe; + data[7] = 0xef; + + size_t offset = 8; + + data[offset++] = 1; // CNAME + + AString cname = "stagefright@"; + cname.append(inet_ntoa(addr.sin_addr)); + data[offset++] = cname.size(); + + memcpy(&data[offset], cname.c_str(), cname.size()); + offset += cname.size(); + + data[offset++] = 6; // TOOL + + AString tool; + MakeUserAgentString(&tool); + + data[offset++] = tool.size(); + + memcpy(&data[offset], tool.c_str(), tool.size()); + offset += tool.size(); + + data[offset++] = 0; + + if ((offset % 4) > 0) { + size_t count = 4 - (offset % 4); + switch (count) { + case 3: + data[offset++] = 0; + case 2: + data[offset++] = 0; + case 1: + data[offset++] = 0; + } + } + + size_t numWords = (offset / 4) - 1; + data[2] = numWords >> 8; + data[3] = numWords & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + offset); + } + + // In case we're behind NAT, fire off two UDP packets to the remote + // rtp/rtcp ports to poke a hole into the firewall for future incoming + // packets. We're going to send an RR/SDES RTCP packet to both of them. + bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { + struct sockaddr_in addr; + memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); + addr.sin_family = AF_INET; + + AString source; + AString server_port; + if (!GetAttribute(transport.c_str(), + "source", + &source)) { + ALOGW("Missing 'source' field in Transport response. Using " + "RTSP endpoint address."); + + struct hostent *ent = gethostbyname(mSessionHost.c_str()); + if (ent == NULL) { + ALOGE("Failed to look up address of session host '%s'", + mSessionHost.c_str()); + + return false; + } + + addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; + } else { + addr.sin_addr.s_addr = inet_addr(source.c_str()); + } + + if (!GetAttribute(transport.c_str(), + "server_port", + &server_port)) { + ALOGI("Missing 'server_port' field in Transport response."); + return false; + } + + int rtpPort, rtcpPort; + if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 + || rtpPort <= 0 || rtpPort > 65535 + || rtcpPort <=0 || rtcpPort > 65535 + || rtcpPort != rtpPort + 1) { + ALOGE("Server picked invalid RTP/RTCP port pair %s," + " RTP port must be even, RTCP port must be one higher.", + server_port.c_str()); + + return false; + } + + if (rtpPort & 1) { + ALOGW("Server picked an odd RTP port, it should've picked an " + "even one, we'll let it pass for now, but this may break " + "in the future."); + } + + if (addr.sin_addr.s_addr == INADDR_NONE) { + return true; + } + + if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { + // No firewalls to traverse on the loopback interface. + return true; + } + + // Make up an RR/SDES RTCP packet. + sp<ABuffer> buf = new ABuffer(65536); + buf->setRange(0, 0); + addRR(buf); + addSDES(rtpSocket, buf); + + addr.sin_port = htons(rtpPort); + + ssize_t n = sendto( + rtpSocket, buf->data(), buf->size(), 0, + (const sockaddr *)&addr, sizeof(addr)); + + if (n < (ssize_t)buf->size()) { + ALOGE("failed to poke a hole for RTP packets"); + return false; + } + + addr.sin_port = htons(rtcpPort); + + n = sendto( + rtcpSocket, buf->data(), buf->size(), 0, + (const sockaddr *)&addr, sizeof(addr)); + + if (n < (ssize_t)buf->size()) { + ALOGE("failed to poke a hole for RTCP packets"); + return false; + } + + ALOGV("successfully poked holes."); + + return true; + } + + virtual void onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case 'conn': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + ALOGI("connection request completed with result %d (%s)", + result, strerror(-result)); + + if (result == OK) { + AString request; + request = "DESCRIBE "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + request.append("Accept: application/sdp\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('desc', id()); + mConn->sendRequest(request.c_str(), reply); + } else { + (new AMessage('disc', id()))->post(); + } + break; + } + + case 'disc': + { + ++mKeepAliveGeneration; + + int32_t reconnect; + if (msg->findInt32("reconnect", &reconnect) && reconnect) { + sp<AMessage> reply = new AMessage('conn', id()); + mConn->connect(mOriginalSessionURL.c_str(), reply); + } else { + (new AMessage('quit', id()))->post(); + } + break; + } + + case 'desc': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + ALOGI("DESCRIBE completed with result %d (%s)", + result, strerror(-result)); + + if (result == OK) { + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + if (response->mStatusCode == 302) { + ssize_t i = response->mHeaders.indexOfKey("location"); + CHECK_GE(i, 0); + + mSessionURL = response->mHeaders.valueAt(i); + + AString request; + request = "DESCRIBE "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + request.append("Accept: application/sdp\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('desc', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + if (response->mStatusCode != 200) { + result = UNKNOWN_ERROR; + } else { + mSessionDesc = new ASessionDescription; + + mSessionDesc->setTo( + response->mContent->data(), + response->mContent->size()); + + if (!mSessionDesc->isValid()) { + ALOGE("Failed to parse session description."); + result = ERROR_MALFORMED; + } else { + ssize_t i = response->mHeaders.indexOfKey("content-base"); + if (i >= 0) { + mBaseURL = response->mHeaders.valueAt(i); + } else { + i = response->mHeaders.indexOfKey("content-location"); + if (i >= 0) { + mBaseURL = response->mHeaders.valueAt(i); + } else { + mBaseURL = mSessionURL; + } + } + + if (!mBaseURL.startsWith("rtsp://")) { + // Some misbehaving servers specify a relative + // URL in one of the locations above, combine + // it with the absolute session URL to get + // something usable... + + ALOGW("Server specified a non-absolute base URL" + ", combining it with the session URL to " + "get something usable..."); + + AString tmp; + CHECK(MakeURL( + mSessionURL.c_str(), + mBaseURL.c_str(), + &tmp)); + + mBaseURL = tmp; + } + + if (mSessionDesc->countTracks() < 2) { + // There's no actual tracks in this session. + // The first "track" is merely session meta + // data. + + ALOGW("Session doesn't contain any playable " + "tracks. Aborting."); + result = ERROR_UNSUPPORTED; + } else { + setupTrack(1); + } + } + } + } + + if (result != OK) { + sp<AMessage> reply = new AMessage('disc', id()); + mConn->disconnect(reply); + } + break; + } + + case 'setu': + { + size_t index; + CHECK(msg->findSize("index", &index)); + + TrackInfo *track = NULL; + size_t trackIndex; + if (msg->findSize("track-index", &trackIndex)) { + track = &mTracks.editItemAt(trackIndex); + } + + int32_t result; + CHECK(msg->findInt32("result", &result)); + + ALOGI("SETUP(%d) completed with result %d (%s)", + index, result, strerror(-result)); + + if (result == OK) { + CHECK(track != NULL); + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + if (response->mStatusCode != 200) { + result = UNKNOWN_ERROR; + } else { + ssize_t i = response->mHeaders.indexOfKey("session"); + CHECK_GE(i, 0); + + mSessionID = response->mHeaders.valueAt(i); + + mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; + AString timeoutStr; + if (GetAttribute( + mSessionID.c_str(), "timeout", &timeoutStr)) { + char *end; + unsigned long timeoutSecs = + strtoul(timeoutStr.c_str(), &end, 10); + + if (end == timeoutStr.c_str() || *end != '\0') { + ALOGW("server specified malformed timeout '%s'", + timeoutStr.c_str()); + + mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; + } else if (timeoutSecs < 15) { + ALOGW("server specified too short a timeout " + "(%lu secs), using default.", + timeoutSecs); + + mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; + } else { + mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; + + ALOGI("server specified timeout of %lu secs.", + timeoutSecs); + } + } + + i = mSessionID.find(";"); + if (i >= 0) { + // Remove options, i.e. ";timeout=90" + mSessionID.erase(i, mSessionID.size() - i); + } + + sp<AMessage> notify = new AMessage('accu', id()); + notify->setSize("track-index", trackIndex); + + i = response->mHeaders.indexOfKey("transport"); + CHECK_GE(i, 0); + + if (!track->mUsingInterleavedTCP) { + AString transport = response->mHeaders.valueAt(i); + + // We are going to continue even if we were + // unable to poke a hole into the firewall... + pokeAHole( + track->mRTPSocket, + track->mRTCPSocket, + transport); + } + + mRTPConn->addStream( + track->mRTPSocket, track->mRTCPSocket, + mSessionDesc, index, + notify, track->mUsingInterleavedTCP); + + mSetupTracksSuccessful = true; + } + } + + if (result != OK) { + if (track) { + if (!track->mUsingInterleavedTCP) { + // Clear the tag + if (mUIDValid) { + HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); + HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); + } + + close(track->mRTPSocket); + close(track->mRTCPSocket); + } + + mTracks.removeItemsAt(trackIndex); + } + } + + ++index; + if (index < mSessionDesc->countTracks()) { + setupTrack(index); + } else if (mSetupTracksSuccessful) { + ++mKeepAliveGeneration; + postKeepAlive(); + + AString request = "PLAY "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('play', id()); + mConn->sendRequest(request.c_str(), reply); + } else { + sp<AMessage> reply = new AMessage('disc', id()); + mConn->disconnect(reply); + } + break; + } + + case 'play': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + ALOGI("PLAY completed with result %d (%s)", + result, strerror(-result)); + + if (result == OK) { + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + if (response->mStatusCode != 200) { + result = UNKNOWN_ERROR; + } else { + parsePlayResponse(response); + + sp<AMessage> timeout = new AMessage('tiou', id()); + timeout->post(kStartupTimeoutUs); + } + } + + if (result != OK) { + sp<AMessage> reply = new AMessage('disc', id()); + mConn->disconnect(reply); + } + + break; + } + + case 'aliv': + { + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + + if (generation != mKeepAliveGeneration) { + // obsolete event. + break; + } + + AString request; + request.append("OPTIONS "); + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('opts', id()); + reply->setInt32("generation", mKeepAliveGeneration); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'opts': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + ALOGI("OPTIONS completed with result %d (%s)", + result, strerror(-result)); + + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + + if (generation != mKeepAliveGeneration) { + // obsolete event. + break; + } + + postKeepAlive(); + break; + } + + case 'abor': + { + for (size_t i = 0; i < mTracks.size(); ++i) { + TrackInfo *info = &mTracks.editItemAt(i); + + if (!mFirstAccessUnit) { + postQueueEOS(i, ERROR_END_OF_STREAM); + } + + if (!info->mUsingInterleavedTCP) { + mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); + + // Clear the tag + if (mUIDValid) { + HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); + HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); + } + + close(info->mRTPSocket); + close(info->mRTCPSocket); + } + } + mTracks.clear(); + mSetupTracksSuccessful = false; + mSeekPending = false; + mFirstAccessUnit = true; + mAllTracksHaveTime = false; + mNTPAnchorUs = -1; + mMediaAnchorUs = -1; + mNumAccessUnitsReceived = 0; + mReceivedFirstRTCPPacket = false; + mReceivedFirstRTPPacket = false; + mSeekable = false; + + sp<AMessage> reply = new AMessage('tear', id()); + + int32_t reconnect; + if (msg->findInt32("reconnect", &reconnect) && reconnect) { + reply->setInt32("reconnect", true); + } + + AString request; + request = "TEARDOWN "; + + // XXX should use aggregate url from SDP here... + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append("\r\n"); + + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'tear': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + ALOGI("TEARDOWN completed with result %d (%s)", + result, strerror(-result)); + + sp<AMessage> reply = new AMessage('disc', id()); + + int32_t reconnect; + if (msg->findInt32("reconnect", &reconnect) && reconnect) { + reply->setInt32("reconnect", true); + } + + mConn->disconnect(reply); + break; + } + + case 'quit': + { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatDisconnected); + msg->setInt32("result", UNKNOWN_ERROR); + msg->post(); + break; + } + + case 'chek': + { + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + if (generation != mCheckGeneration) { + // This is an outdated message. Ignore. + break; + } + + if (mNumAccessUnitsReceived == 0) { +#if 1 + ALOGI("stream ended? aborting."); + (new AMessage('abor', id()))->post(); + break; +#else + ALOGI("haven't seen an AU in a looong time."); +#endif + } + + mNumAccessUnitsReceived = 0; + msg->post(kAccessUnitTimeoutUs); + break; + } + + case 'accu': + { + int32_t timeUpdate; + if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { + size_t trackIndex; + CHECK(msg->findSize("track-index", &trackIndex)); + + uint32_t rtpTime; + uint64_t ntpTime; + CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); + CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); + + onTimeUpdate(trackIndex, rtpTime, ntpTime); + break; + } + + int32_t first; + if (msg->findInt32("first-rtcp", &first)) { + mReceivedFirstRTCPPacket = true; + break; + } + + if (msg->findInt32("first-rtp", &first)) { + mReceivedFirstRTPPacket = true; + break; + } + + ++mNumAccessUnitsReceived; + postAccessUnitTimeoutCheck(); + + size_t trackIndex; + CHECK(msg->findSize("track-index", &trackIndex)); + + if (trackIndex >= mTracks.size()) { + ALOGV("late packets ignored."); + break; + } + + TrackInfo *track = &mTracks.editItemAt(trackIndex); + + int32_t eos; + if (msg->findInt32("eos", &eos)) { + ALOGI("received BYE on track index %d", trackIndex); +#if 0 + track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); +#endif + return; + } + + sp<ABuffer> accessUnit; + CHECK(msg->findBuffer("access-unit", &accessUnit)); + + uint32_t seqNum = (uint32_t)accessUnit->int32Data(); + + if (mSeekPending) { + ALOGV("we're seeking, dropping stale packet."); + break; + } + + if (seqNum < track->mFirstSeqNumInSegment) { + ALOGV("dropping stale access-unit (%d < %d)", + seqNum, track->mFirstSeqNumInSegment); + break; + } + + if (track->mNewSegment) { + track->mNewSegment = false; + } + + onAccessUnitComplete(trackIndex, accessUnit); + break; + } + + case 'seek': + { + if (!mSeekable) { + ALOGW("This is a live stream, ignoring seek request."); + + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatSeekDone); + msg->post(); + break; + } + + int64_t timeUs; + CHECK(msg->findInt64("time", &timeUs)); + + mSeekPending = true; + + // Disable the access unit timeout until we resumed + // playback again. + mCheckPending = true; + ++mCheckGeneration; + + AString request = "PAUSE "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('see1', id()); + reply->setInt64("time", timeUs); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'see1': + { + // Session is paused now. + for (size_t i = 0; i < mTracks.size(); ++i) { + TrackInfo *info = &mTracks.editItemAt(i); + + postQueueSeekDiscontinuity(i); + + info->mRTPAnchor = 0; + info->mNTPAnchorUs = -1; + } + + mAllTracksHaveTime = false; + mNTPAnchorUs = -1; + + int64_t timeUs; + CHECK(msg->findInt64("time", &timeUs)); + + AString request = "PLAY "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append( + StringPrintf( + "Range: npt=%lld-\r\n", timeUs / 1000000ll)); + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('see2', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'see2': + { + CHECK(mSeekPending); + + int32_t result; + CHECK(msg->findInt32("result", &result)); + + ALOGI("PLAY completed with result %d (%s)", + result, strerror(-result)); + + mCheckPending = false; + postAccessUnitTimeoutCheck(); + + if (result == OK) { + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + if (response->mStatusCode != 200) { + result = UNKNOWN_ERROR; + } else { + parsePlayResponse(response); + + ssize_t i = response->mHeaders.indexOfKey("rtp-info"); + CHECK_GE(i, 0); + + ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); + + ALOGI("seek completed."); + } + } + + if (result != OK) { + ALOGE("seek failed, aborting."); + (new AMessage('abor', id()))->post(); + } + + mSeekPending = false; + + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatSeekDone); + msg->post(); + break; + } + + case 'biny': + { + sp<ABuffer> buffer; + CHECK(msg->findBuffer("buffer", &buffer)); + + int32_t index; + CHECK(buffer->meta()->findInt32("index", &index)); + + mRTPConn->injectPacket(index, buffer); + break; + } + + case 'tiou': + { + if (!mReceivedFirstRTCPPacket) { + if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { + ALOGW("We received RTP packets but no RTCP packets, " + "using fake timestamps."); + + mTryFakeRTCP = true; + + mReceivedFirstRTCPPacket = true; + + fakeTimestamps(); + } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { + ALOGW("Never received any data, switching transports."); + + mTryTCPInterleaving = true; + + sp<AMessage> msg = new AMessage('abor', id()); + msg->setInt32("reconnect", true); + msg->post(); + } else { + ALOGW("Never received any data, disconnecting."); + (new AMessage('abor', id()))->post(); + } + } else { + if (!mAllTracksHaveTime) { + ALOGW("We received some RTCP packets, but time " + "could not be established on all tracks, now " + "using fake timestamps"); + + fakeTimestamps(); + } + } + break; + } + + default: + TRESPASS(); + break; + } + } + + void postKeepAlive() { + sp<AMessage> msg = new AMessage('aliv', id()); + msg->setInt32("generation", mKeepAliveGeneration); + msg->post((mKeepAliveTimeoutUs * 9) / 10); + } + + void postAccessUnitTimeoutCheck() { + if (mCheckPending) { + return; + } + + mCheckPending = true; + sp<AMessage> check = new AMessage('chek', id()); + check->setInt32("generation", mCheckGeneration); + check->post(kAccessUnitTimeoutUs); + } + + static void SplitString( + const AString &s, const char *separator, List<AString> *items) { + items->clear(); + size_t start = 0; + while (start < s.size()) { + ssize_t offset = s.find(separator, start); + + if (offset < 0) { + items->push_back(AString(s, start, s.size() - start)); + break; + } + + items->push_back(AString(s, start, offset - start)); + start = offset + strlen(separator); + } + } + + void parsePlayResponse(const sp<ARTSPResponse> &response) { + mSeekable = false; + + ssize_t i = response->mHeaders.indexOfKey("range"); + if (i < 0) { + // Server doesn't even tell use what range it is going to + // play, therefore we won't support seeking. + return; + } + + AString range = response->mHeaders.valueAt(i); + ALOGV("Range: %s", range.c_str()); + + AString val; + CHECK(GetAttribute(range.c_str(), "npt", &val)); + + float npt1, npt2; + if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { + // This is a live stream and therefore not seekable. + + ALOGI("This is a live stream"); + return; + } + + i = response->mHeaders.indexOfKey("rtp-info"); + CHECK_GE(i, 0); + + AString rtpInfo = response->mHeaders.valueAt(i); + List<AString> streamInfos; + SplitString(rtpInfo, ",", &streamInfos); + + int n = 1; + for (List<AString>::iterator it = streamInfos.begin(); + it != streamInfos.end(); ++it) { + (*it).trim(); + ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); + + CHECK(GetAttribute((*it).c_str(), "url", &val)); + + size_t trackIndex = 0; + while (trackIndex < mTracks.size() + && !(val == mTracks.editItemAt(trackIndex).mURL)) { + ++trackIndex; + } + CHECK_LT(trackIndex, mTracks.size()); + + CHECK(GetAttribute((*it).c_str(), "seq", &val)); + + char *end; + unsigned long seq = strtoul(val.c_str(), &end, 10); + + TrackInfo *info = &mTracks.editItemAt(trackIndex); + info->mFirstSeqNumInSegment = seq; + info->mNewSegment = true; + + CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); + + uint32_t rtpTime = strtoul(val.c_str(), &end, 10); + + ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); + + info->mNormalPlayTimeRTP = rtpTime; + info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); + + if (!mFirstAccessUnit) { + postNormalPlayTimeMapping( + trackIndex, + info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); + } + + ++n; + } + + mSeekable = true; + } + + sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { + CHECK_GE(index, 0u); + CHECK_LT(index, mTracks.size()); + + const TrackInfo &info = mTracks.itemAt(index); + + *timeScale = info.mTimeScale; + + return info.mPacketSource->getFormat(); + } + + size_t countTracks() const { + return mTracks.size(); + } + +private: + struct TrackInfo { + AString mURL; + int mRTPSocket; + int mRTCPSocket; + bool mUsingInterleavedTCP; + uint32_t mFirstSeqNumInSegment; + bool mNewSegment; + + uint32_t mRTPAnchor; + int64_t mNTPAnchorUs; + int32_t mTimeScale; + + uint32_t mNormalPlayTimeRTP; + int64_t mNormalPlayTimeUs; + + sp<APacketSource> mPacketSource; + + // Stores packets temporarily while no notion of time + // has been established yet. + List<sp<ABuffer> > mPackets; + }; + + sp<AMessage> mNotify; + bool mUIDValid; + uid_t mUID; + sp<ALooper> mNetLooper; + sp<ARTSPConnection> mConn; + sp<ARTPConnection> mRTPConn; + sp<ASessionDescription> mSessionDesc; + AString mOriginalSessionURL; // This one still has user:pass@ + AString mSessionURL; + AString mSessionHost; + AString mBaseURL; + AString mSessionID; + bool mSetupTracksSuccessful; + bool mSeekPending; + bool mFirstAccessUnit; + + bool mAllTracksHaveTime; + int64_t mNTPAnchorUs; + int64_t mMediaAnchorUs; + int64_t mLastMediaTimeUs; + + int64_t mNumAccessUnitsReceived; + bool mCheckPending; + int32_t mCheckGeneration; + bool mTryTCPInterleaving; + bool mTryFakeRTCP; + bool mReceivedFirstRTCPPacket; + bool mReceivedFirstRTPPacket; + bool mSeekable; + int64_t mKeepAliveTimeoutUs; + int32_t mKeepAliveGeneration; + + Vector<TrackInfo> mTracks; + + void setupTrack(size_t index) { + sp<APacketSource> source = + new APacketSource(mSessionDesc, index); + + if (source->initCheck() != OK) { + ALOGW("Unsupported format. Ignoring track #%d.", index); + + sp<AMessage> reply = new AMessage('setu', id()); + reply->setSize("index", index); + reply->setInt32("result", ERROR_UNSUPPORTED); + reply->post(); + return; + } + + AString url; + CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); + + AString trackURL; + CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); + + mTracks.push(TrackInfo()); + TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); + info->mURL = trackURL; + info->mPacketSource = source; + info->mUsingInterleavedTCP = false; + info->mFirstSeqNumInSegment = 0; + info->mNewSegment = true; + info->mRTPAnchor = 0; + info->mNTPAnchorUs = -1; + info->mNormalPlayTimeRTP = 0; + info->mNormalPlayTimeUs = 0ll; + + unsigned long PT; + AString formatDesc; + AString formatParams; + mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); + + int32_t timescale; + int32_t numChannels; + ASessionDescription::ParseFormatDesc( + formatDesc.c_str(), ×cale, &numChannels); + + info->mTimeScale = timescale; + + ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); + + AString request = "SETUP "; + request.append(trackURL); + request.append(" RTSP/1.0\r\n"); + + if (mTryTCPInterleaving) { + size_t interleaveIndex = 2 * (mTracks.size() - 1); + info->mUsingInterleavedTCP = true; + info->mRTPSocket = interleaveIndex; + info->mRTCPSocket = interleaveIndex + 1; + + request.append("Transport: RTP/AVP/TCP;interleaved="); + request.append(interleaveIndex); + request.append("-"); + request.append(interleaveIndex + 1); + } else { + unsigned rtpPort; + ARTPConnection::MakePortPair( + &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); + + if (mUIDValid) { + HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, + (uint32_t)*(uint32_t*) "RTP_"); + HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, + (uint32_t)*(uint32_t*) "RTP_"); + } + + request.append("Transport: RTP/AVP/UDP;unicast;client_port="); + request.append(rtpPort); + request.append("-"); + request.append(rtpPort + 1); + } + + request.append("\r\n"); + + if (index > 1) { + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + } + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('setu', id()); + reply->setSize("index", index); + reply->setSize("track-index", mTracks.size() - 1); + mConn->sendRequest(request.c_str(), reply); + } + + static bool MakeURL(const char *baseURL, const char *url, AString *out) { + out->clear(); + + if (strncasecmp("rtsp://", baseURL, 7)) { + // Base URL must be absolute + return false; + } + + if (!strncasecmp("rtsp://", url, 7)) { + // "url" is already an absolute URL, ignore base URL. + out->setTo(url); + return true; + } + + size_t n = strlen(baseURL); + if (baseURL[n - 1] == '/') { + out->setTo(baseURL); + out->append(url); + } else { + const char *slashPos = strrchr(baseURL, '/'); + + if (slashPos > &baseURL[6]) { + out->setTo(baseURL, slashPos - baseURL); + } else { + out->setTo(baseURL); + } + + out->append("/"); + out->append(url); + } + + return true; + } + + void fakeTimestamps() { + mNTPAnchorUs = -1ll; + for (size_t i = 0; i < mTracks.size(); ++i) { + onTimeUpdate(i, 0, 0ll); + } + } + + void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { + ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", + trackIndex, rtpTime, ntpTime); + + int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); + + TrackInfo *track = &mTracks.editItemAt(trackIndex); + + track->mRTPAnchor = rtpTime; + track->mNTPAnchorUs = ntpTimeUs; + + if (mNTPAnchorUs < 0) { + mNTPAnchorUs = ntpTimeUs; + mMediaAnchorUs = mLastMediaTimeUs; + } + + if (!mAllTracksHaveTime) { + bool allTracksHaveTime = true; + for (size_t i = 0; i < mTracks.size(); ++i) { + TrackInfo *track = &mTracks.editItemAt(i); + if (track->mNTPAnchorUs < 0) { + allTracksHaveTime = false; + break; + } + } + if (allTracksHaveTime) { + mAllTracksHaveTime = true; + ALOGI("Time now established for all tracks."); + } + } + } + + void onAccessUnitComplete( + int32_t trackIndex, const sp<ABuffer> &accessUnit) { + ALOGV("onAccessUnitComplete track %d", trackIndex); + + if (mFirstAccessUnit) { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatConnected); + msg->post(); + + if (mSeekable) { + for (size_t i = 0; i < mTracks.size(); ++i) { + TrackInfo *info = &mTracks.editItemAt(i); + + postNormalPlayTimeMapping( + i, + info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); + } + } + + mFirstAccessUnit = false; + } + + TrackInfo *track = &mTracks.editItemAt(trackIndex); + + if (!mAllTracksHaveTime) { + ALOGV("storing accessUnit, no time established yet"); + track->mPackets.push_back(accessUnit); + return; + } + + while (!track->mPackets.empty()) { + sp<ABuffer> accessUnit = *track->mPackets.begin(); + track->mPackets.erase(track->mPackets.begin()); + + if (addMediaTimestamp(trackIndex, track, accessUnit)) { + postQueueAccessUnit(trackIndex, accessUnit); + } + } + + if (addMediaTimestamp(trackIndex, track, accessUnit)) { + postQueueAccessUnit(trackIndex, accessUnit); + } + } + + bool addMediaTimestamp( + int32_t trackIndex, const TrackInfo *track, + const sp<ABuffer> &accessUnit) { + uint32_t rtpTime; + CHECK(accessUnit->meta()->findInt32( + "rtp-time", (int32_t *)&rtpTime)); + + int64_t relRtpTimeUs = + (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) + / track->mTimeScale; + + int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; + + int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; + + if (mediaTimeUs > mLastMediaTimeUs) { + mLastMediaTimeUs = mediaTimeUs; + } + + if (mediaTimeUs < 0) { + ALOGV("dropping early accessUnit."); + return false; + } + + ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", + trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); + + accessUnit->meta()->setInt64("timeUs", mediaTimeUs); + + return true; + } + + void postQueueAccessUnit( + size_t trackIndex, const sp<ABuffer> &accessUnit) { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatAccessUnit); + msg->setSize("trackIndex", trackIndex); + msg->setBuffer("accessUnit", accessUnit); + msg->post(); + } + + void postQueueEOS(size_t trackIndex, status_t finalResult) { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatEOS); + msg->setSize("trackIndex", trackIndex); + msg->setInt32("finalResult", finalResult); + msg->post(); + } + + void postQueueSeekDiscontinuity(size_t trackIndex) { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatSeekDiscontinuity); + msg->setSize("trackIndex", trackIndex); + msg->post(); + } + + void postNormalPlayTimeMapping( + size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatNormalPlayTimeMapping); + msg->setSize("trackIndex", trackIndex); + msg->setInt32("rtpTime", rtpTime); + msg->setInt64("nptUs", nptUs); + msg->post(); + } + + DISALLOW_EVIL_CONSTRUCTORS(MyHandler); +}; + +} // namespace android + +#endif // MY_HANDLER_H_ diff --git a/media/libstagefright/rtsp/MyTransmitter.h b/media/libstagefright/rtsp/MyTransmitter.h new file mode 100644 index 0000000..009a3b1 --- /dev/null +++ b/media/libstagefright/rtsp/MyTransmitter.h @@ -0,0 +1,981 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MY_TRANSMITTER_H_ + +#define MY_TRANSMITTER_H_ + +#include "ARTPConnection.h" + +#include <arpa/inet.h> +#include <sys/socket.h> + +#include <openssl/md5.h> + +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/base64.h> +#include <media/stagefright/foundation/hexdump.h> + +#ifdef ANDROID +#include "VideoSource.h" + +#include <media/stagefright/OMXClient.h> +#include <media/stagefright/OMXCodec.h> +#endif + +namespace android { + +#define TRACK_SUFFIX "trackid=1" +#define PT 96 +#define PT_STR "96" + +#define USERNAME "bcast" +#define PASSWORD "test" + +static int uniformRand(int limit) { + return ((double)rand() * limit) / RAND_MAX; +} + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +struct MyTransmitter : public AHandler { + MyTransmitter(const char *url, const sp<ALooper> &looper) + : mServerURL(url), + mLooper(looper), + mConn(new ARTSPConnection), + mConnected(false), + mAuthType(NONE), + mRTPSocket(-1), + mRTCPSocket(-1), + mSourceID(rand()), + mSeqNo(uniformRand(65536)), + mRTPTimeBase(rand()), + mNumSamplesSent(0), + mNumRTPSent(0), + mNumRTPOctetsSent(0), + mLastRTPTime(0), + mLastNTPTime(0) { + mStreamURL = mServerURL; + mStreamURL.append("/bazong.sdp"); + + mTrackURL = mStreamURL; + mTrackURL.append("/"); + mTrackURL.append(TRACK_SUFFIX); + + mLooper->registerHandler(this); + mLooper->registerHandler(mConn); + + sp<AMessage> reply = new AMessage('conn', id()); + mConn->connect(mServerURL.c_str(), reply); + +#ifdef ANDROID + int width = 640; + int height = 480; + + sp<MediaSource> source = new VideoSource(width, height); + + sp<MetaData> encMeta = new MetaData; + encMeta->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_AVC); + encMeta->setInt32(kKeyWidth, width); + encMeta->setInt32(kKeyHeight, height); + + OMXClient client; + client.connect(); + + mEncoder = OMXCodec::Create( + client.interface(), encMeta, + true /* createEncoder */, source); + + mEncoder->start(); + + MediaBuffer *buffer; + CHECK_EQ(mEncoder->read(&buffer), (status_t)OK); + CHECK(buffer != NULL); + + makeH264SPropParamSets(buffer); + + buffer->release(); + buffer = NULL; +#endif + } + + uint64_t ntpTime() { + struct timeval tv; + gettimeofday(&tv, NULL); + + uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec; + + nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; + + uint64_t hi = nowUs / 1000000ll; + uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; + + return (hi << 32) | lo; + } + + void issueAnnounce() { + AString sdp; + sdp = "v=0\r\n"; + + sdp.append("o=- "); + + uint64_t ntp = ntpTime(); + sdp.append(ntp); + sdp.append(" "); + sdp.append(ntp); + sdp.append(" IN IP4 127.0.0.0\r\n"); + + sdp.append( + "s=Sample\r\n" + "i=Playing around with ANNOUNCE\r\n" + "c=IN IP4 "); + + struct in_addr addr; + addr.s_addr = htonl(mServerIP); + + sdp.append(inet_ntoa(addr)); + + sdp.append( + "\r\n" + "t=0 0\r\n" + "a=range:npt=now-\r\n"); + +#ifdef ANDROID + sp<MetaData> meta = mEncoder->getFormat(); + int32_t width, height; + CHECK(meta->findInt32(kKeyWidth, &width)); + CHECK(meta->findInt32(kKeyHeight, &height)); + + sdp.append( + "m=video 0 RTP/AVP " PT_STR "\r\n" + "b=AS 320000\r\n" + "a=rtpmap:" PT_STR " H264/90000\r\n"); + + sdp.append("a=cliprect 0,0,"); + sdp.append(height); + sdp.append(","); + sdp.append(width); + sdp.append("\r\n"); + + sdp.append( + "a=framesize:" PT_STR " "); + sdp.append(width); + sdp.append("-"); + sdp.append(height); + sdp.append("\r\n"); + + sdp.append( + "a=fmtp:" PT_STR " profile-level-id=42C015;sprop-parameter-sets="); + + sdp.append(mSeqParamSet); + sdp.append(","); + sdp.append(mPicParamSet); + sdp.append(";packetization-mode=1\r\n"); +#else + sdp.append( + "m=audio 0 RTP/AVP " PT_STR "\r\n" + "a=rtpmap:" PT_STR " L8/8000/1\r\n"); +#endif + + sdp.append("a=control:" TRACK_SUFFIX "\r\n"); + + AString request; + request.append("ANNOUNCE "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "ANNOUNCE", mStreamURL.c_str()); + + request.append("Content-Type: application/sdp\r\n"); + request.append("Content-Length: "); + request.append(sdp.size()); + request.append("\r\n"); + + request.append("\r\n"); + request.append(sdp); + + sp<AMessage> reply = new AMessage('anno', id()); + mConn->sendRequest(request.c_str(), reply); + } + + void H(const AString &s, AString *out) { + out->clear(); + + MD5_CTX m; + MD5_Init(&m); + MD5_Update(&m, s.c_str(), s.size()); + + uint8_t key[16]; + MD5_Final(key, &m); + + for (size_t i = 0; i < 16; ++i) { + char nibble = key[i] >> 4; + if (nibble <= 9) { + nibble += '0'; + } else { + nibble += 'a' - 10; + } + out->append(&nibble, 1); + + nibble = key[i] & 0x0f; + if (nibble <= 9) { + nibble += '0'; + } else { + nibble += 'a' - 10; + } + out->append(&nibble, 1); + } + } + + void authenticate(const sp<ARTSPResponse> &response) { + ssize_t i = response->mHeaders.indexOfKey("www-authenticate"); + CHECK_GE(i, 0); + + AString value = response->mHeaders.valueAt(i); + + if (!strncmp(value.c_str(), "Basic", 5)) { + mAuthType = BASIC; + } else { + CHECK(!strncmp(value.c_str(), "Digest", 6)); + mAuthType = DIGEST; + + i = value.find("nonce="); + CHECK_GE(i, 0); + CHECK_EQ(value.c_str()[i + 6], '\"'); + ssize_t j = value.find("\"", i + 7); + CHECK_GE(j, 0); + + mNonce.setTo(value, i + 7, j - i - 7); + } + + issueAnnounce(); + } + + void addAuthentication( + AString *request, const char *method, const char *url) { + if (mAuthType == NONE) { + return; + } + + if (mAuthType == BASIC) { + request->append("Authorization: Basic YmNhc3Q6dGVzdAo=\r\n"); + return; + } + + CHECK_EQ((int)mAuthType, (int)DIGEST); + + AString A1; + A1.append(USERNAME); + A1.append(":"); + A1.append("Streaming Server"); + A1.append(":"); + A1.append(PASSWORD); + + AString A2; + A2.append(method); + A2.append(":"); + A2.append(url); + + AString HA1, HA2; + H(A1, &HA1); + H(A2, &HA2); + + AString tmp; + tmp.append(HA1); + tmp.append(":"); + tmp.append(mNonce); + tmp.append(":"); + tmp.append(HA2); + + AString digest; + H(tmp, &digest); + + request->append("Authorization: Digest "); + request->append("nonce=\""); + request->append(mNonce); + request->append("\", "); + request->append("username=\"" USERNAME "\", "); + request->append("uri=\""); + request->append(url); + request->append("\", "); + request->append("response=\""); + request->append(digest); + request->append("\""); + request->append("\r\n"); + } + + virtual void onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case 'conn': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "connection request completed with result " + << result << " (" << strerror(-result) << ")"; + + if (result != OK) { + (new AMessage('quit', id()))->post(); + break; + } + + mConnected = true; + + CHECK(msg->findInt32("server-ip", (int32_t *)&mServerIP)); + + issueAnnounce(); + break; + } + + case 'anno': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "ANNOUNCE completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + + if (response->mStatusCode == 401) { + if (mAuthType != NONE) { + LOG(INFO) << "FAILED to authenticate"; + (new AMessage('quit', id()))->post(); + break; + } + + authenticate(response); + break; + } + } + + if (result != OK || response->mStatusCode != 200) { + (new AMessage('quit', id()))->post(); + break; + } + + unsigned rtpPort; + ARTPConnection::MakePortPair(&mRTPSocket, &mRTCPSocket, &rtpPort); + + // (new AMessage('poll', id()))->post(); + + AString request; + request.append("SETUP "); + request.append(mTrackURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "SETUP", mTrackURL.c_str()); + + request.append("Transport: RTP/AVP;unicast;client_port="); + request.append(rtpPort); + request.append("-"); + request.append(rtpPort + 1); + request.append(";mode=record\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('setu', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + +#if 0 + case 'poll': + { + fd_set rs; + FD_ZERO(&rs); + FD_SET(mRTCPSocket, &rs); + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + int res = select(mRTCPSocket + 1, &rs, NULL, NULL, &tv); + + if (res == 1) { + sp<ABuffer> buffer = new ABuffer(65536); + ssize_t n = recv(mRTCPSocket, buffer->data(), buffer->size(), 0); + + if (n <= 0) { + LOG(ERROR) << "recv returned " << n; + } else { + LOG(INFO) << "recv returned " << n << " bytes of data."; + + hexdump(buffer->data(), n); + } + } + + msg->post(50000); + break; + } +#endif + + case 'setu': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "SETUP completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + } + + if (result != OK || response->mStatusCode != 200) { + (new AMessage('quit', id()))->post(); + break; + } + + ssize_t i = response->mHeaders.indexOfKey("session"); + CHECK_GE(i, 0); + mSessionID = response->mHeaders.valueAt(i); + i = mSessionID.find(";"); + if (i >= 0) { + // Remove options, i.e. ";timeout=90" + mSessionID.erase(i, mSessionID.size() - i); + } + + i = response->mHeaders.indexOfKey("transport"); + CHECK_GE(i, 0); + AString transport = response->mHeaders.valueAt(i); + + LOG(INFO) << "transport = '" << transport << "'"; + + AString value; + CHECK(GetAttribute(transport.c_str(), "server_port", &value)); + + unsigned rtpPort, rtcpPort; + CHECK_EQ(sscanf(value.c_str(), "%u-%u", &rtpPort, &rtcpPort), 2); + + CHECK(GetAttribute(transport.c_str(), "source", &value)); + + memset(mRemoteAddr.sin_zero, 0, sizeof(mRemoteAddr.sin_zero)); + mRemoteAddr.sin_family = AF_INET; + mRemoteAddr.sin_addr.s_addr = inet_addr(value.c_str()); + mRemoteAddr.sin_port = htons(rtpPort); + + mRemoteRTCPAddr = mRemoteAddr; + mRemoteRTCPAddr.sin_port = htons(rtpPort + 1); + + CHECK_EQ(0, connect(mRTPSocket, + (const struct sockaddr *)&mRemoteAddr, + sizeof(mRemoteAddr))); + + CHECK_EQ(0, connect(mRTCPSocket, + (const struct sockaddr *)&mRemoteRTCPAddr, + sizeof(mRemoteRTCPAddr))); + + uint32_t x = ntohl(mRemoteAddr.sin_addr.s_addr); + LOG(INFO) << "sending data to " + << (x >> 24) + << "." + << ((x >> 16) & 0xff) + << "." + << ((x >> 8) & 0xff) + << "." + << (x & 0xff) + << ":" + << rtpPort; + + AString request; + request.append("RECORD "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "RECORD", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('reco', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'reco': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "RECORD completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + } + + if (result != OK) { + (new AMessage('quit', id()))->post(); + break; + } + + (new AMessage('more', id()))->post(); + (new AMessage('sr ', id()))->post(); + (new AMessage('aliv', id()))->post(30000000ll); + break; + } + + case 'aliv': + { + if (!mConnected) { + break; + } + + AString request; + request.append("OPTIONS "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "RECORD", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('opts', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'opts': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "OPTIONS completed with result " + << result << " (" << strerror(-result) << ")"; + + if (!mConnected) { + break; + } + + (new AMessage('aliv', id()))->post(30000000ll); + break; + } + + case 'more': + { + if (!mConnected) { + break; + } + + sp<ABuffer> buffer = new ABuffer(65536); + uint8_t *data = buffer->data(); + data[0] = 0x80; + data[1] = (1 << 7) | PT; // M-bit + data[2] = (mSeqNo >> 8) & 0xff; + data[3] = mSeqNo & 0xff; + data[8] = mSourceID >> 24; + data[9] = (mSourceID >> 16) & 0xff; + data[10] = (mSourceID >> 8) & 0xff; + data[11] = mSourceID & 0xff; + +#ifdef ANDROID + MediaBuffer *mediaBuf = NULL; + for (;;) { + CHECK_EQ(mEncoder->read(&mediaBuf), (status_t)OK); + if (mediaBuf->range_length() > 0) { + break; + } + mediaBuf->release(); + mediaBuf = NULL; + } + + int64_t timeUs; + CHECK(mediaBuf->meta_data()->findInt64(kKeyTime, &timeUs)); + + uint32_t rtpTime = mRTPTimeBase + (timeUs * 9 / 100ll); + + const uint8_t *mediaData = + (const uint8_t *)mediaBuf->data() + mediaBuf->range_offset(); + + CHECK(!memcmp("\x00\x00\x00\x01", mediaData, 4)); + + CHECK_LE(mediaBuf->range_length() - 4 + 12, buffer->size()); + + memcpy(&data[12], + mediaData + 4, mediaBuf->range_length() - 4); + + buffer->setRange(0, mediaBuf->range_length() - 4 + 12); + + mediaBuf->release(); + mediaBuf = NULL; +#else + uint32_t rtpTime = mRTPTimeBase + mNumRTPSent * 128; + memset(&data[12], 0, 128); + buffer->setRange(0, 12 + 128); +#endif + + data[4] = rtpTime >> 24; + data[5] = (rtpTime >> 16) & 0xff; + data[6] = (rtpTime >> 8) & 0xff; + data[7] = rtpTime & 0xff; + + ssize_t n = send( + mRTPSocket, data, buffer->size(), 0); + if (n < 0) { + LOG(ERROR) << "send failed (" << strerror(errno) << ")"; + } + CHECK_EQ(n, (ssize_t)buffer->size()); + + ++mSeqNo; + + ++mNumRTPSent; + mNumRTPOctetsSent += buffer->size() - 12; + + mLastRTPTime = rtpTime; + mLastNTPTime = ntpTime(); + +#ifdef ANDROID + if (mNumRTPSent < 60 * 25) { // 60 secs worth + msg->post(40000); +#else + if (mNumRTPOctetsSent < 8000 * 60) { + msg->post(1000000ll * 128 / 8000); +#endif + } else { + LOG(INFO) << "That's enough, pausing."; + + AString request; + request.append("PAUSE "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "PAUSE", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('paus', id()); + mConn->sendRequest(request.c_str(), reply); + } + break; + } + + case 'sr ': + { + if (!mConnected) { + break; + } + + sp<ABuffer> buffer = new ABuffer(65536); + buffer->setRange(0, 0); + + addSR(buffer); + addSDES(buffer); + + uint8_t *data = buffer->data(); + ssize_t n = send( + mRTCPSocket, data, buffer->size(), 0); + CHECK_EQ(n, (ssize_t)buffer->size()); + + msg->post(3000000); + break; + } + + case 'paus': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "PAUSE completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + AString request; + request.append("TEARDOWN "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "TEARDOWN", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('tear', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'tear': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "TEARDOWN completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + } + + (new AMessage('quit', id()))->post(); + break; + } + + case 'disc': + { + LOG(INFO) << "disconnect completed"; + + mConnected = false; + (new AMessage('quit', id()))->post(); + break; + } + + case 'quit': + { + if (mConnected) { + mConn->disconnect(new AMessage('disc', id())); + break; + } + + if (mRTPSocket >= 0) { + close(mRTPSocket); + mRTPSocket = -1; + } + + if (mRTCPSocket >= 0) { + close(mRTCPSocket); + mRTCPSocket = -1; + } + +#ifdef ANDROID + mEncoder->stop(); + mEncoder.clear(); +#endif + + mLooper->stop(); + break; + } + + default: + TRESPASS(); + } + } + +protected: + virtual ~MyTransmitter() { + } + +private: + enum AuthType { + NONE, + BASIC, + DIGEST + }; + + AString mServerURL; + AString mTrackURL; + AString mStreamURL; + + sp<ALooper> mLooper; + sp<ARTSPConnection> mConn; + bool mConnected; + uint32_t mServerIP; + AuthType mAuthType; + AString mNonce; + AString mSessionID; + int mRTPSocket, mRTCPSocket; + uint32_t mSourceID; + uint32_t mSeqNo; + uint32_t mRTPTimeBase; + struct sockaddr_in mRemoteAddr; + struct sockaddr_in mRemoteRTCPAddr; + size_t mNumSamplesSent; + uint32_t mNumRTPSent; + uint32_t mNumRTPOctetsSent; + uint32_t mLastRTPTime; + uint64_t mLastNTPTime; + +#ifdef ANDROID + sp<MediaSource> mEncoder; + AString mSeqParamSet; + AString mPicParamSet; + + void makeH264SPropParamSets(MediaBuffer *buffer) { + static const char kStartCode[] = "\x00\x00\x00\x01"; + + const uint8_t *data = + (const uint8_t *)buffer->data() + buffer->range_offset(); + size_t size = buffer->range_length(); + + CHECK_GE(size, 0u); + CHECK(!memcmp(kStartCode, data, 4)); + + data += 4; + size -= 4; + + size_t startCodePos = 0; + while (startCodePos + 3 < size + && memcmp(kStartCode, &data[startCodePos], 4)) { + ++startCodePos; + } + + CHECK_LT(startCodePos + 3, size); + + encodeBase64(data, startCodePos, &mSeqParamSet); + + encodeBase64(&data[startCodePos + 4], size - startCodePos - 4, + &mPicParamSet); + } +#endif + + void addSR(const sp<ABuffer> &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + + data[0] = 0x80 | 0; + data[1] = 200; // SR + data[2] = 0; + data[3] = 6; + data[4] = mSourceID >> 24; + data[5] = (mSourceID >> 16) & 0xff; + data[6] = (mSourceID >> 8) & 0xff; + data[7] = mSourceID & 0xff; + + data[8] = mLastNTPTime >> (64 - 8); + data[9] = (mLastNTPTime >> (64 - 16)) & 0xff; + data[10] = (mLastNTPTime >> (64 - 24)) & 0xff; + data[11] = (mLastNTPTime >> 32) & 0xff; + data[12] = (mLastNTPTime >> 24) & 0xff; + data[13] = (mLastNTPTime >> 16) & 0xff; + data[14] = (mLastNTPTime >> 8) & 0xff; + data[15] = mLastNTPTime & 0xff; + + data[16] = (mLastRTPTime >> 24) & 0xff; + data[17] = (mLastRTPTime >> 16) & 0xff; + data[18] = (mLastRTPTime >> 8) & 0xff; + data[19] = mLastRTPTime & 0xff; + + data[20] = mNumRTPSent >> 24; + data[21] = (mNumRTPSent >> 16) & 0xff; + data[22] = (mNumRTPSent >> 8) & 0xff; + data[23] = mNumRTPSent & 0xff; + + data[24] = mNumRTPOctetsSent >> 24; + data[25] = (mNumRTPOctetsSent >> 16) & 0xff; + data[26] = (mNumRTPOctetsSent >> 8) & 0xff; + data[27] = mNumRTPOctetsSent & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + 28); + } + + void addSDES(const sp<ABuffer> &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + data[0] = 0x80 | 1; + data[1] = 202; // SDES + data[4] = mSourceID >> 24; + data[5] = (mSourceID >> 16) & 0xff; + data[6] = (mSourceID >> 8) & 0xff; + data[7] = mSourceID & 0xff; + + size_t offset = 8; + + data[offset++] = 1; // CNAME + + static const char *kCNAME = "andih@laptop"; + data[offset++] = strlen(kCNAME); + + memcpy(&data[offset], kCNAME, strlen(kCNAME)); + offset += strlen(kCNAME); + + data[offset++] = 7; // NOTE + + static const char *kNOTE = "Hell's frozen over."; + data[offset++] = strlen(kNOTE); + + memcpy(&data[offset], kNOTE, strlen(kNOTE)); + offset += strlen(kNOTE); + + data[offset++] = 0; + + if ((offset % 4) > 0) { + size_t count = 4 - (offset % 4); + switch (count) { + case 3: + data[offset++] = 0; + case 2: + data[offset++] = 0; + case 1: + data[offset++] = 0; + } + } + + size_t numWords = (offset / 4) - 1; + data[2] = numWords >> 8; + data[3] = numWords & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + offset); + } + + DISALLOW_EVIL_CONSTRUCTORS(MyTransmitter); +}; + +} // namespace android + +#endif // MY_TRANSMITTER_H_ diff --git a/media/libstagefright/rtsp/UDPPusher.cpp b/media/libstagefright/rtsp/UDPPusher.cpp new file mode 100644 index 0000000..47ea6f1 --- /dev/null +++ b/media/libstagefright/rtsp/UDPPusher.cpp @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "UDPPusher" +#include <utils/Log.h> + +#include "UDPPusher.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <utils/ByteOrder.h> + +#include <sys/socket.h> + +namespace android { + +UDPPusher::UDPPusher(const char *filename, unsigned port) + : mFile(fopen(filename, "rb")), + mFirstTimeMs(0), + mFirstTimeUs(0) { + CHECK(mFile != NULL); + + mSocket = socket(AF_INET, SOCK_DGRAM, 0); + + struct sockaddr_in addr; + memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = 0; + + CHECK_EQ(0, bind(mSocket, (const struct sockaddr *)&addr, sizeof(addr))); + + memset(mRemoteAddr.sin_zero, 0, sizeof(mRemoteAddr.sin_zero)); + mRemoteAddr.sin_family = AF_INET; + mRemoteAddr.sin_addr.s_addr = INADDR_ANY; + mRemoteAddr.sin_port = htons(port); +} + +UDPPusher::~UDPPusher() { + close(mSocket); + mSocket = -1; + + fclose(mFile); + mFile = NULL; +} + +void UDPPusher::start() { + uint32_t timeMs; + CHECK_EQ(fread(&timeMs, 1, sizeof(timeMs), mFile), sizeof(timeMs)); + mFirstTimeMs = fromlel(timeMs); + mFirstTimeUs = ALooper::GetNowUs(); + + (new AMessage(kWhatPush, id()))->post(); +} + +bool UDPPusher::onPush() { + uint32_t length; + if (fread(&length, 1, sizeof(length), mFile) < sizeof(length)) { + ALOGI("No more data to push."); + return false; + } + + length = fromlel(length); + + CHECK_GT(length, 0u); + + sp<ABuffer> buffer = new ABuffer(length); + if (fread(buffer->data(), 1, length, mFile) < length) { + ALOGE("File truncated?."); + return false; + } + + ssize_t n = sendto( + mSocket, buffer->data(), buffer->size(), 0, + (const struct sockaddr *)&mRemoteAddr, sizeof(mRemoteAddr)); + + CHECK_EQ(n, (ssize_t)buffer->size()); + + uint32_t timeMs; + if (fread(&timeMs, 1, sizeof(timeMs), mFile) < sizeof(timeMs)) { + ALOGI("No more data to push."); + return false; + } + + timeMs = fromlel(timeMs); + CHECK_GE(timeMs, mFirstTimeMs); + + timeMs -= mFirstTimeMs; + int64_t whenUs = mFirstTimeUs + timeMs * 1000ll; + int64_t nowUs = ALooper::GetNowUs(); + (new AMessage(kWhatPush, id()))->post(whenUs - nowUs); + + return true; +} + +void UDPPusher::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatPush: + { + if (!onPush() && !(ntohs(mRemoteAddr.sin_port) & 1)) { + ALOGI("emulating BYE packet"); + + sp<ABuffer> buffer = new ABuffer(8); + uint8_t *data = buffer->data(); + *data++ = (2 << 6) | 1; + *data++ = 203; + *data++ = 0; + *data++ = 1; + *data++ = 0x8f; + *data++ = 0x49; + *data++ = 0xc0; + *data++ = 0xd0; + buffer->setRange(0, 8); + + struct sockaddr_in tmp = mRemoteAddr; + tmp.sin_port = htons(ntohs(mRemoteAddr.sin_port) | 1); + + ssize_t n = sendto( + mSocket, buffer->data(), buffer->size(), 0, + (const struct sockaddr *)&tmp, + sizeof(tmp)); + + CHECK_EQ(n, (ssize_t)buffer->size()); + } + break; + } + + default: + TRESPASS(); + break; + } +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/UDPPusher.h b/media/libstagefright/rtsp/UDPPusher.h new file mode 100644 index 0000000..2bde533 --- /dev/null +++ b/media/libstagefright/rtsp/UDPPusher.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef UDP_PUSHER_H_ + +#define UDP_PUSHER_H_ + +#include <media/stagefright/foundation/AHandler.h> + +#include <stdio.h> +#include <arpa/inet.h> + +namespace android { + +struct UDPPusher : public AHandler { + UDPPusher(const char *filename, unsigned port); + + void start(); + +protected: + virtual ~UDPPusher(); + virtual void onMessageReceived(const sp<AMessage> &msg); + +private: + enum { + kWhatPush = 'push' + }; + + FILE *mFile; + int mSocket; + struct sockaddr_in mRemoteAddr; + + uint32_t mFirstTimeMs; + int64_t mFirstTimeUs; + + bool onPush(); + + DISALLOW_EVIL_CONSTRUCTORS(UDPPusher); +}; + +} // namespace android + +#endif // UDP_PUSHER_H_ diff --git a/media/libstagefright/rtsp/VideoSource.h b/media/libstagefright/rtsp/VideoSource.h new file mode 100644 index 0000000..ae0c85b --- /dev/null +++ b/media/libstagefright/rtsp/VideoSource.h @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VIDEO_SOURCE_H_ + +#define VIDEO_SOURCE_H_ + +#include <media/stagefright/MediaBufferGroup.h> +#include <media/stagefright/MediaDefs.h> +#include <media/stagefright/MediaSource.h> +#include <media/stagefright/MetaData.h> + +namespace android { + +class VideoSource : public MediaSource { + static const int32_t kFramerate = 24; // fps + +public: + VideoSource(int width, int height) + : mWidth(width), + mHeight(height), + mSize((width * height * 3) / 2) { + mGroup.add_buffer(new MediaBuffer(mSize)); + } + + virtual sp<MetaData> getFormat() { + sp<MetaData> meta = new MetaData; + meta->setInt32(kKeyWidth, mWidth); + meta->setInt32(kKeyHeight, mHeight); + meta->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_RAW); + + return meta; + } + + virtual status_t start(MetaData *params) { + mNumFramesOutput = 0; + return OK; + } + + virtual status_t stop() { + return OK; + } + + virtual status_t read( + MediaBuffer **buffer, const MediaSource::ReadOptions *options) { + if (mNumFramesOutput == kFramerate * 100) { + // Stop returning data after 10 secs. + return ERROR_END_OF_STREAM; + } + + // printf("VideoSource::read\n"); + status_t err = mGroup.acquire_buffer(buffer); + if (err != OK) { + return err; + } + + char x = (char)((double)rand() / RAND_MAX * 255); + memset((*buffer)->data(), x, mSize); + (*buffer)->set_range(0, mSize); + (*buffer)->meta_data()->clear(); + (*buffer)->meta_data()->setInt64( + kKeyTime, (mNumFramesOutput * 1000000) / kFramerate); + ++mNumFramesOutput; + + // printf("VideoSource::read - returning buffer\n"); + // LOG(INFO)("VideoSource::read - returning buffer"); + return OK; + } + +protected: + virtual ~VideoSource() {} + +private: + MediaBufferGroup mGroup; + int mWidth, mHeight; + size_t mSize; + int64_t mNumFramesOutput;; + + VideoSource(const VideoSource &); + VideoSource &operator=(const VideoSource &); +}; + +} // namespace android + +#endif // VIDEO_SOURCE_H_ diff --git a/media/libstagefright/rtsp/rtp_test.cpp b/media/libstagefright/rtsp/rtp_test.cpp new file mode 100644 index 0000000..d43cd2a --- /dev/null +++ b/media/libstagefright/rtsp/rtp_test.cpp @@ -0,0 +1,230 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "rtp_test" +#include <utils/Log.h> + +#include <binder/ProcessState.h> + +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/ALooper.h> +#include <media/stagefright/DataSource.h> +#include <media/stagefright/MetaData.h> +#include <media/stagefright/OMXClient.h> +#include <media/stagefright/OMXCodec.h> +#include <media/stagefright/foundation/base64.h> + +#include "ARTPSession.h" +#include "ASessionDescription.h" +#include "UDPPusher.h" + +using namespace android; + +int main(int argc, char **argv) { + android::ProcessState::self()->startThreadPool(); + + DataSource::RegisterDefaultSniffers(); + + const char *rtpFilename = NULL; + const char *rtcpFilename = NULL; + + if (argc == 3) { + rtpFilename = argv[1]; + rtcpFilename = argv[2]; + } else if (argc != 1) { + fprintf(stderr, "usage: %s [ rtpFilename rtcpFilename ]\n", argv[0]); + return 1; + } + +#if 0 + static const uint8_t kSPS[] = { + 0x67, 0x42, 0x80, 0x0a, 0xe9, 0x02, 0x83, 0xe4, 0x20, 0x00, 0x00, 0x7d, 0x00, 0x00, 0x0e, 0xa6, 0x00, 0x80 + }; + static const uint8_t kPPS[] = { + 0x68, 0xce, 0x3c, 0x80 + }; + AString out1, out2; + encodeBase64(kSPS, sizeof(kSPS), &out1); + encodeBase64(kPPS, sizeof(kPPS), &out2); + printf("params=%s,%s\n", out1.c_str(), out2.c_str()); +#endif + + sp<ALooper> looper = new ALooper; + + sp<UDPPusher> rtp_pusher; + sp<UDPPusher> rtcp_pusher; + + if (rtpFilename != NULL) { + rtp_pusher = new UDPPusher(rtpFilename, 5434); + looper->registerHandler(rtp_pusher); + + rtcp_pusher = new UDPPusher(rtcpFilename, 5435); + looper->registerHandler(rtcp_pusher); + } + + sp<ARTPSession> session = new ARTPSession; + looper->registerHandler(session); + +#if 0 + // My H264 SDP + static const char *raw = + "v=0\r\n" + "o=- 64 233572944 IN IP4 127.0.0.0\r\n" + "s=QuickTime\r\n" + "t=0 0\r\n" + "a=range:npt=0-315\r\n" + "a=isma-compliance:2,2.0,2\r\n" + "m=video 5434 RTP/AVP 97\r\n" + "c=IN IP4 127.0.0.1\r\n" + "b=AS:30\r\n" + "a=rtpmap:97 H264/90000\r\n" + "a=fmtp:97 packetization-mode=1;profile-level-id=42000C;" + "sprop-parameter-sets=Z0IADJZUCg+I,aM44gA==\r\n" + "a=mpeg4-esid:201\r\n" + "a=cliprect:0,0,240,320\r\n" + "a=framesize:97 320-240\r\n"; +#elif 0 + // My H263 SDP + static const char *raw = + "v=0\r\n" + "o=- 64 233572944 IN IP4 127.0.0.0\r\n" + "s=QuickTime\r\n" + "t=0 0\r\n" + "a=range:npt=0-315\r\n" + "a=isma-compliance:2,2.0,2\r\n" + "m=video 5434 RTP/AVP 97\r\n" + "c=IN IP4 127.0.0.1\r\n" + "b=AS:30\r\n" + "a=rtpmap:97 H263-1998/90000\r\n" + "a=cliprect:0,0,240,320\r\n" + "a=framesize:97 320-240\r\n"; +#elif 0 + // My AMR SDP + static const char *raw = + "v=0\r\n" + "o=- 64 233572944 IN IP4 127.0.0.0\r\n" + "s=QuickTime\r\n" + "t=0 0\r\n" + "a=range:npt=0-315\r\n" + "a=isma-compliance:2,2.0,2\r\n" + "m=audio 5434 RTP/AVP 97\r\n" + "c=IN IP4 127.0.0.1\r\n" + "b=AS:30\r\n" + "a=rtpmap:97 AMR/8000/1\r\n" + "a=fmtp:97 octet-align\r\n"; +#elif 1 + // GTalk's H264 SDP + static const char *raw = + "v=0\r\n" + "o=- 64 233572944 IN IP4 127.0.0.0\r\n" + "s=QuickTime\r\n" + "t=0 0\r\n" + "a=range:npt=now-\r\n" + "m=video 5434 RTP/AVP 96\r\n" + "c=IN IP4 127.0.0.1\r\n" + "b=AS:320000\r\n" + "a=rtpmap:96 H264/90000\r\n" + "a=fmtp:96 packetization-mode=1;profile-level-id=42001E;" + "sprop-parameter-sets=Z0IAHpZUBaHogA==,aM44gA==\r\n" + "a=cliprect:0,0,480,270\r\n" + "a=framesize:96 720-480\r\n"; +#else + // sholes H264 SDP + static const char *raw = + "v=0\r\n" + "o=- 64 233572944 IN IP4 127.0.0.0\r\n" + "s=QuickTime\r\n" + "t=0 0\r\n" + "a=range:npt=now-\r\n" + "m=video 5434 RTP/AVP 96\r\n" + "c=IN IP4 127.0.0.1\r\n" + "b=AS:320000\r\n" + "a=rtpmap:96 H264/90000\r\n" + "a=fmtp:96 packetization-mode=1;profile-level-id=42001E;" + "sprop-parameter-sets=Z0KACukCg+QgAAB9AAAOpgCA,aM48gA==\r\n" + "a=cliprect:0,0,240,320\r\n" + "a=framesize:96 320-240\r\n"; +#endif + + sp<ASessionDescription> desc = new ASessionDescription; + CHECK(desc->setTo(raw, strlen(raw))); + + CHECK_EQ(session->setup(desc), (status_t)OK); + + if (rtp_pusher != NULL) { + rtp_pusher->start(); + } + + if (rtcp_pusher != NULL) { + rtcp_pusher->start(); + } + + looper->start(false /* runOnCallingThread */); + + CHECK_EQ(session->countTracks(), 1u); + sp<MediaSource> source = session->trackAt(0); + + OMXClient client; + CHECK_EQ(client.connect(), (status_t)OK); + + sp<MediaSource> decoder = OMXCodec::Create( + client.interface(), + source->getFormat(), false /* createEncoder */, + source, + NULL, + 0); // OMXCodec::kPreferSoftwareCodecs); + CHECK(decoder != NULL); + + CHECK_EQ(decoder->start(), (status_t)OK); + + for (;;) { + MediaBuffer *buffer; + status_t err = decoder->read(&buffer); + + if (err != OK) { + if (err == INFO_FORMAT_CHANGED) { + int32_t width, height; + CHECK(decoder->getFormat()->findInt32(kKeyWidth, &width)); + CHECK(decoder->getFormat()->findInt32(kKeyHeight, &height)); + printf("INFO_FORMAT_CHANGED %d x %d\n", width, height); + continue; + } + + ALOGE("decoder returned error 0x%08x", err); + break; + } + +#if 1 + if (buffer->range_length() != 0) { + int64_t timeUs; + CHECK(buffer->meta_data()->findInt64(kKeyTime, &timeUs)); + + printf("decoder returned frame of size %d at time %.2f secs\n", + buffer->range_length(), timeUs / 1E6); + } +#endif + + buffer->release(); + buffer = NULL; + } + + CHECK_EQ(decoder->stop(), (status_t)OK); + + looper->stop(); + + return 0; +} |