diff options
26 files changed, 4809 insertions, 3 deletions
diff --git a/media/libmediaplayerservice/MediaPlayerService.cpp b/media/libmediaplayerservice/MediaPlayerService.cpp index 11f3016..d7ca635 100644 --- a/media/libmediaplayerservice/MediaPlayerService.cpp +++ b/media/libmediaplayerservice/MediaPlayerService.cpp @@ -771,9 +771,14 @@ player_type getPlayerType(const char* url) } } - // Use PV_PLAYER for rtsp for now if (!strncasecmp(url, "rtsp://", 7)) { - return PV_PLAYER; + char value[PROPERTY_VALUE_MAX]; + if (!property_get("media.stagefright.enable-rtsp", value, NULL) + || (strcmp(value, "1") && strcasecmp(value, "true"))) { + // For now, we're going to use PV for rtsp-based playback + // by default until we can clear up a few more issues. + return PV_PLAYER; + } } return getDefaultPlayerType(); diff --git a/media/libstagefright/Android.mk b/media/libstagefright/Android.mk index f67826e..7608ec8 100644 --- a/media/libstagefright/Android.mk +++ b/media/libstagefright/Android.mk @@ -82,6 +82,7 @@ LOCAL_STATIC_LIBRARIES := \ libvpx \ libstagefright_mpeg2ts \ libstagefright_httplive \ + libstagefright_rtsp \ LOCAL_SHARED_LIBRARIES += \ libstagefright_amrnb_common \ diff --git a/media/libstagefright/AwesomePlayer.cpp b/media/libstagefright/AwesomePlayer.cpp index 88c8ee4..4c9856d 100644 --- a/media/libstagefright/AwesomePlayer.cpp +++ b/media/libstagefright/AwesomePlayer.cpp @@ -20,7 +20,9 @@ #include <dlfcn.h> +#include "include/ARTSPController.h" #include "include/AwesomePlayer.h" +#include "include/LiveSource.h" #include "include/Prefetcher.h" #include "include/SoftwareRenderer.h" @@ -39,7 +41,7 @@ #include <surfaceflinger/ISurface.h> -#include "include/LiveSource.h" +#include <media/stagefright/foundation/ALooper.h> namespace android { @@ -393,6 +395,8 @@ void AwesomePlayer::reset_l() { mVideoBuffer = NULL; } + mRTSPController.clear(); + if (mVideoSource != NULL) { mVideoSource->stop(); @@ -1148,7 +1152,22 @@ status_t AwesomePlayer::finishSetDataSource_l() { sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource, MEDIA_MIMETYPE_CONTAINER_MPEG2TS); + } else if (!strncasecmp("rtsp://", mUri.string(), 7)) { + if (mLooper == NULL) { + mLooper = new ALooper; + mLooper->start(); + } + mRTSPController = new ARTSPController(mLooper); + status_t err = mRTSPController->connect(mUri.string()); + + LOGI("ARTSPController::connect returned %d", err); + + if (err != OK) { + mRTSPController.clear(); + return err; + } + sp<MediaExtractor> extractor = mRTSPController.get(); return setDataSource_l(extractor); } else { dataSource = DataSource::CreateFromURI(mUri.string(), &mUriHeaders); diff --git a/media/libstagefright/include/ARTSPController.h b/media/libstagefright/include/ARTSPController.h new file mode 100644 index 0000000..55efd41 --- /dev/null +++ b/media/libstagefright/include/ARTSPController.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTSP_CONTROLLER_H_ + +#define A_RTSP_CONTROLLER_H_ + +#include <media/stagefright/foundation/ABase.h> +#include <media/stagefright/MediaExtractor.h> + +namespace android { + +struct ALooper; +struct MyHandler; + +struct ARTSPController : public MediaExtractor { + ARTSPController(const sp<ALooper> &looper); + + status_t connect(const char *url); + void disconnect(); + + virtual size_t countTracks(); + virtual sp<MediaSource> getTrack(size_t index); + + virtual sp<MetaData> getTrackMetaData( + size_t index, uint32_t flags); + +protected: + virtual ~ARTSPController(); + +private: + sp<ALooper> mLooper; + sp<MyHandler> mHandler; + + DISALLOW_EVIL_CONSTRUCTORS(ARTSPController); +}; + +} // namespace android + +#endif // A_RTSP_CONTROLLER_H_ diff --git a/media/libstagefright/include/AwesomePlayer.h b/media/libstagefright/include/AwesomePlayer.h index 9455743..182aa06 100644 --- a/media/libstagefright/include/AwesomePlayer.h +++ b/media/libstagefright/include/AwesomePlayer.h @@ -36,6 +36,9 @@ struct MediaSource; struct Prefetcher; struct TimeSource; +struct ALooper; +struct ARTSPController; + struct AwesomeRenderer : public RefBase { AwesomeRenderer() {} @@ -169,6 +172,9 @@ private: sp<Prefetcher> mPrefetcher; sp<HTTPDataSource> mConnectingDataSource; + sp<ALooper> mLooper; + sp<ARTSPController> mRTSPController; + struct SuspensionState { String8 mUri; KeyedVector<String8, String8> mUriHeaders; diff --git a/media/libstagefright/rtsp/AAVCAssembler.cpp b/media/libstagefright/rtsp/AAVCAssembler.cpp new file mode 100644 index 0000000..3dfb200 --- /dev/null +++ b/media/libstagefright/rtsp/AAVCAssembler.cpp @@ -0,0 +1,385 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AAVCAssembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> + +#include <stdint.h> + +#define BE_VERBOSE 0 + +namespace android { + +// static +AAVCAssembler::AAVCAssembler(const sp<AMessage> ¬ify) + : mNotifyMsg(notify), + mAccessUnitRTPTime(0), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0), + mAccessUnitDamaged(false) { +} + +AAVCAssembler::~AAVCAssembler() { +} + +ARTPAssembler::AssemblyStatus AAVCAssembler::addNALUnit( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { +#if BE_VERBOSE + LOG(VERBOSE) << "Not the sequence number I expected"; +#endif + + return WRONG_SEQUENCE_NUMBER; + } + + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + if (size < 1 || (data[0] & 0x80)) { + // Corrupt. + + LOG(ERROR) << "Ignoring corrupt buffer."; + queue->erase(queue->begin()); + + ++mNextExpectedSeqNo; + return MALFORMED_PACKET; + } + + unsigned nalType = data[0] & 0x1f; + if (nalType >= 1 && nalType <= 23) { + addSingleNALUnit(buffer); + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + return OK; + } else if (nalType == 28) { + // FU-A + return addFragmentedNALUnit(queue); + } else if (nalType == 24) { + // STAP-A + bool success = addSingleTimeAggregationPacket(buffer); + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return success ? OK : MALFORMED_PACKET; + } else { + LOG(ERROR) << "Ignoring unsupported buffer (nalType=" << nalType << ")"; + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return MALFORMED_PACKET; + } +} + +void AAVCAssembler::addSingleNALUnit(const sp<ABuffer> &buffer) { +#if BE_VERBOSE + LOG(VERBOSE) << "addSingleNALUnit of size " << buffer->size(); + hexdump(buffer->data(), buffer->size()); +#endif + + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + if (!mNALUnits.empty() && rtpTime != mAccessUnitRTPTime) { + submitAccessUnit(); + } + mAccessUnitRTPTime = rtpTime; + + mNALUnits.push_back(buffer); +} + +bool AAVCAssembler::addSingleTimeAggregationPacket(const sp<ABuffer> &buffer) { + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + if (size < 3) { + LOG(ERROR) << "Discarding too small STAP-A packet."; + return false; + } + + ++data; + --size; + while (size >= 2) { + size_t nalSize = (data[0] << 8) | data[1]; + + if (size < nalSize + 2) { + LOG(ERROR) << "Discarding malformed STAP-A packet."; + return false; + } + + sp<ABuffer> unit = new ABuffer(nalSize); + memcpy(unit->data(), &data[2], nalSize); + + PropagateTimes(buffer, unit); + + addSingleNALUnit(unit); + + data += 2 + nalSize; + size -= 2 + nalSize; + } + + if (size != 0) { + LOG(WARNING) << "Unexpected padding at end of STAP-A packet."; + } + + return true; +} + +ARTPAssembler::AssemblyStatus AAVCAssembler::addFragmentedNALUnit( + List<sp<ABuffer> > *queue) { + CHECK(!queue->empty()); + + sp<ABuffer> buffer = *queue->begin(); + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + CHECK(size > 0); + unsigned indicator = data[0]; + + CHECK((indicator & 0x1f) == 28); + + if (size < 2) { + LOG(ERROR) << "Ignoring malformed FU buffer (size = " << size << ")"; + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + return MALFORMED_PACKET; + } + + if (!(data[1] & 0x80)) { + // Start bit not set on the first buffer. + +#if BE_VERBOSE + LOG(ERROR) << "Start bit not set on first buffer"; +#endif + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + return MALFORMED_PACKET; + } + + uint32_t nalType = data[1] & 0x1f; + uint32_t nri = (data[0] >> 5) & 3; + + uint32_t expectedSeqNo = (uint32_t)buffer->int32Data() + 1; + size_t totalSize = size - 2; + size_t totalCount = 1; + bool complete = false; + + if (data[1] & 0x40) { + // Huh? End bit also set on the first buffer. + +#if BE_VERBOSE + LOG(WARNING) << "Grrr. This isn't fragmented at all."; +#endif + + complete = true; + } else { + List<sp<ABuffer> >::iterator it = ++queue->begin(); + while (it != queue->end()) { +#if BE_VERBOSE + LOG(VERBOSE) << "sequence length " << totalCount; +#endif + + const sp<ABuffer> &buffer = *it; + + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + if ((uint32_t)buffer->int32Data() != expectedSeqNo) { +#if BE_VERBOSE + LOG(VERBOSE) << "sequence not complete, expected seqNo " + << expectedSeqNo << ", got " + << (uint32_t)buffer->int32Data(); +#endif + + return WRONG_SEQUENCE_NUMBER; + } + + if (size < 2 + || data[0] != indicator + || (data[1] & 0x1f) != nalType + || (data[1] & 0x80)) { + LOG(ERROR) << "Ignoring malformed FU buffer.\n"; + + // Delete the whole start of the FU. + + it = queue->begin(); + for (size_t i = 0; i <= totalCount; ++i) { + it = queue->erase(it); + } + + mNextExpectedSeqNo = expectedSeqNo + 1; + + return MALFORMED_PACKET; + } + + totalSize += size - 2; + ++totalCount; + + expectedSeqNo = expectedSeqNo + 1; + + if (data[1] & 0x40) { + // This is the last fragment. + complete = true; + break; + } + + ++it; + } + } + + if (!complete) { + return NOT_ENOUGH_DATA; + } + + mNextExpectedSeqNo = expectedSeqNo; + + // We found all the fragments that make up the complete NAL unit. + + // Leave room for the header. So far totalSize did not include the + // header byte. + ++totalSize; + + sp<ABuffer> unit = new ABuffer(totalSize); + PropagateTimes(buffer, unit); + + unit->data()[0] = (nri << 5) | nalType; + + size_t offset = 1; + List<sp<ABuffer> >::iterator it = queue->begin(); + for (size_t i = 0; i < totalCount; ++i) { + const sp<ABuffer> &buffer = *it; + +#if BE_VERBOSE + LOG(VERBOSE) << "piece #" << (i + 1) << "/" << totalCount; + hexdump(buffer->data(), buffer->size()); +#endif + + memcpy(unit->data() + offset, buffer->data() + 2, buffer->size() - 2); + offset += buffer->size() - 2; + + it = queue->erase(it); + } + + unit->setRange(0, totalSize); + + addSingleNALUnit(unit); + +#if BE_VERBOSE + LOG(VERBOSE) << "successfully assembled a NAL unit from fragments."; +#endif + + return OK; +} + +void AAVCAssembler::submitAccessUnit() { + CHECK(!mNALUnits.empty()); + +#if BE_VERBOSE + LOG(VERBOSE) << "Access unit complete (" << mNALUnits.size() << " nal units)"; +#endif + + uint64_t ntpTime; + CHECK((*mNALUnits.begin())->meta()->findInt64( + "ntp-time", (int64_t *)&ntpTime)); + + size_t totalSize = 0; + for (List<sp<ABuffer> >::iterator it = mNALUnits.begin(); + it != mNALUnits.end(); ++it) { + totalSize += 4 + (*it)->size(); + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + size_t offset = 0; + for (List<sp<ABuffer> >::iterator it = mNALUnits.begin(); + it != mNALUnits.end(); ++it) { + memcpy(accessUnit->data() + offset, "\x00\x00\x00\x01", 4); + offset += 4; + + sp<ABuffer> nal = *it; + memcpy(accessUnit->data() + offset, nal->data(), nal->size()); + offset += nal->size(); + } + + accessUnit->meta()->setInt64("ntp-time", ntpTime); + +#if 0 + printf(mAccessUnitDamaged ? "X" : "."); + fflush(stdout); +#endif + + if (mAccessUnitDamaged) { + accessUnit->meta()->setInt32("damaged", true); + } + + mNALUnits.clear(); + mAccessUnitDamaged = false; + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setObject("access-unit", accessUnit); + msg->post(); +} + +ARTPAssembler::AssemblyStatus AAVCAssembler::assembleMore( + const sp<ARTPSource> &source) { + AssemblyStatus status = addNALUnit(source); + if (status == MALFORMED_PACKET) { + mAccessUnitDamaged = true; + } + return status; +} + +void AAVCAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ++mNextExpectedSeqNo; + + mAccessUnitDamaged = true; +} + +} // namespace android diff --git a/media/libstagefright/rtsp/AAVCAssembler.h b/media/libstagefright/rtsp/AAVCAssembler.h new file mode 100644 index 0000000..1e97520 --- /dev/null +++ b/media/libstagefright/rtsp/AAVCAssembler.h @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_AVC_ASSEMBLER_H_ + +#define A_AVC_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <utils/List.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct AMessage; + +struct AAVCAssembler : public ARTPAssembler { + AAVCAssembler(const sp<AMessage> ¬ify); + +protected: + virtual ~AAVCAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void packetLost(); + +private: + sp<AMessage> mNotifyMsg; + + uint32_t mAccessUnitRTPTime; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + bool mAccessUnitDamaged; + List<sp<ABuffer> > mNALUnits; + + AssemblyStatus addNALUnit(const sp<ARTPSource> &source); + void addSingleNALUnit(const sp<ABuffer> &buffer); + AssemblyStatus addFragmentedNALUnit(List<sp<ABuffer> > *queue); + bool addSingleTimeAggregationPacket(const sp<ABuffer> &buffer); + + void submitAccessUnit(); + + DISALLOW_EVIL_CONSTRUCTORS(AAVCAssembler); +}; + +} // namespace android + +#endif // A_AVC_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp new file mode 100644 index 0000000..0549d84 --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AMPEG4AudioAssembler.h" + +#include "ARTPSource.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> + +namespace android { + +AMPEG4AudioAssembler::AMPEG4AudioAssembler(const sp<AMessage> ¬ify) + : mNotifyMsg(notify), + mAccessUnitRTPTime(0), + mNextExpectedSeqNoValid(false), + mNextExpectedSeqNo(0), + mAccessUnitDamaged(false) { +} + +AMPEG4AudioAssembler::~AMPEG4AudioAssembler() { +} + +ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::assembleMore( + const sp<ARTPSource> &source) { + AssemblyStatus status = addPacket(source); + if (status == MALFORMED_PACKET) { + mAccessUnitDamaged = true; + } + return status; +} + +ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::addPacket( + const sp<ARTPSource> &source) { + List<sp<ABuffer> > *queue = source->queue(); + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + + if (mNextExpectedSeqNoValid) { + List<sp<ABuffer> >::iterator it = queue->begin(); + while (it != queue->end()) { + if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) { + break; + } + + it = queue->erase(it); + } + + if (queue->empty()) { + return NOT_ENOUGH_DATA; + } + } + + sp<ABuffer> buffer = *queue->begin(); + + if (!mNextExpectedSeqNoValid) { + mNextExpectedSeqNoValid = true; + mNextExpectedSeqNo = (uint32_t)buffer->int32Data(); + } else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) { +#if VERBOSE + LOG(VERBOSE) << "Not the sequence number I expected"; +#endif + + return WRONG_SEQUENCE_NUMBER; + } + + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) { + submitAccessUnit(); + } + mAccessUnitRTPTime = rtpTime; + + mPackets.push_back(buffer); + + queue->erase(queue->begin()); + ++mNextExpectedSeqNo; + + return OK; +} + +void AMPEG4AudioAssembler::submitAccessUnit() { + CHECK(!mPackets.empty()); + +#if VERBOSE + LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; +#endif + + uint64_t ntpTime; + CHECK((*mPackets.begin())->meta()->findInt64( + "ntp-time", (int64_t *)&ntpTime)); + + size_t totalSize = 0; + List<sp<ABuffer> >::iterator it = mPackets.begin(); + while (it != mPackets.end()) { + const sp<ABuffer> &unit = *it; + + size_t n = 0; + while (unit->data()[n] == 0xff) { + ++n; + } + ++n; + + totalSize += unit->size() - n; + ++it; + } + + sp<ABuffer> accessUnit = new ABuffer(totalSize); + size_t offset = 0; + it = mPackets.begin(); + while (it != mPackets.end()) { + const sp<ABuffer> &unit = *it; + + size_t n = 0; + while (unit->data()[n] == 0xff) { + ++n; + } + ++n; + + memcpy((uint8_t *)accessUnit->data() + offset, + unit->data() + n, unit->size() - n); + + offset += unit->size() - n; + + ++it; + } + + accessUnit->meta()->setInt64("ntp-time", ntpTime); + + if (mAccessUnitDamaged) { + accessUnit->meta()->setInt32("damaged", true); + } + + mPackets.clear(); + mAccessUnitDamaged = false; + + sp<AMessage> msg = mNotifyMsg->dup(); + msg->setObject("access-unit", accessUnit); + msg->post(); +} + +void AMPEG4AudioAssembler::packetLost() { + CHECK(mNextExpectedSeqNoValid); + ++mNextExpectedSeqNo; + + mAccessUnitDamaged = true; +} + +} // namespace android diff --git a/media/libstagefright/rtsp/AMPEG4AudioAssembler.h b/media/libstagefright/rtsp/AMPEG4AudioAssembler.h new file mode 100644 index 0000000..5c2a2dd --- /dev/null +++ b/media/libstagefright/rtsp/AMPEG4AudioAssembler.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_MPEG4_AUDIO_ASSEMBLER_H_ + +#define A_MPEG4_AUDIO_ASSEMBLER_H_ + +#include "ARTPAssembler.h" + +#include <utils/List.h> + +#include <stdint.h> + +namespace android { + +struct AMessage; + +struct AMPEG4AudioAssembler : public ARTPAssembler { + AMPEG4AudioAssembler(const sp<AMessage> ¬ify); + +protected: + virtual ~AMPEG4AudioAssembler(); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source); + virtual void packetLost(); + +private: + sp<AMessage> mNotifyMsg; + uint32_t mAccessUnitRTPTime; + bool mNextExpectedSeqNoValid; + uint32_t mNextExpectedSeqNo; + bool mAccessUnitDamaged; + List<sp<ABuffer> > mPackets; + + AssemblyStatus addPacket(const sp<ARTPSource> &source); + void submitAccessUnit(); + + DISALLOW_EVIL_CONSTRUCTORS(AMPEG4AudioAssembler); +}; + +} // namespace android + +#endif // A_MPEG4_AUDIO_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/APacketSource.cpp b/media/libstagefright/rtsp/APacketSource.cpp new file mode 100644 index 0000000..2869d54 --- /dev/null +++ b/media/libstagefright/rtsp/APacketSource.cpp @@ -0,0 +1,345 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "APacketSource.h" + +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/AString.h> +#include <media/stagefright/foundation/base64.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/MediaBuffer.h> +#include <media/stagefright/MediaDefs.h> +#include <media/stagefright/MetaData.h> +#include <utils/Vector.h> + +namespace android { + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +static sp<ABuffer> decodeHex(const AString &s) { + if ((s.size() % 2) != 0) { + return NULL; + } + + size_t outLen = s.size() / 2; + sp<ABuffer> buffer = new ABuffer(outLen); + uint8_t *out = buffer->data(); + + uint8_t accum = 0; + for (size_t i = 0; i < s.size(); ++i) { + char c = s.c_str()[i]; + unsigned value; + if (c >= '0' && c <= '9') { + value = c - '0'; + } else if (c >= 'a' && c <= 'f') { + value = c - 'a' + 10; + } else if (c >= 'A' && c <= 'F') { + value = c - 'A' + 10; + } else { + return NULL; + } + + accum = (accum << 4) | value; + + if (i & 1) { + *out++ = accum; + + accum = 0; + } + } + + return buffer; +} + +static sp<ABuffer> MakeAVCCodecSpecificData(const char *params) { + AString val; + CHECK(GetAttribute(params, "profile-level-id", &val)); + + sp<ABuffer> profileLevelID = decodeHex(val); + CHECK(profileLevelID != NULL); + CHECK_EQ(profileLevelID->size(), 3u); + + Vector<sp<ABuffer> > paramSets; + + size_t numSeqParameterSets = 0; + size_t totalSeqParameterSetSize = 0; + size_t numPicParameterSets = 0; + size_t totalPicParameterSetSize = 0; + + CHECK(GetAttribute(params, "sprop-parameter-sets", &val)); + size_t start = 0; + for (;;) { + ssize_t commaPos = val.find(",", start); + size_t end = (commaPos < 0) ? val.size() : commaPos; + + AString nalString(val, start, end - start); + sp<ABuffer> nal = decodeBase64(nalString); + CHECK(nal != NULL); + CHECK_GT(nal->size(), 0u); + CHECK_LE(nal->size(), 65535u); + + uint8_t nalType = nal->data()[0] & 0x1f; + if (numSeqParameterSets == 0) { + CHECK_EQ((unsigned)nalType, 7u); + } else if (numPicParameterSets > 0) { + CHECK_EQ((unsigned)nalType, 8u); + } + if (nalType == 7) { + ++numSeqParameterSets; + totalSeqParameterSetSize += nal->size(); + } else { + CHECK_EQ((unsigned)nalType, 8u); + ++numPicParameterSets; + totalPicParameterSetSize += nal->size(); + } + + paramSets.push(nal); + + if (commaPos < 0) { + break; + } + + start = commaPos + 1; + } + + CHECK_LT(numSeqParameterSets, 32u); + CHECK_LE(numPicParameterSets, 255u); + + size_t csdSize = + 1 + 3 + 1 + 1 + + 2 * numSeqParameterSets + totalSeqParameterSetSize + + 1 + 2 * numPicParameterSets + totalPicParameterSetSize; + + sp<ABuffer> csd = new ABuffer(csdSize); + uint8_t *out = csd->data(); + + *out++ = 0x01; // configurationVersion + memcpy(out, profileLevelID->data(), 3); + out += 3; + *out++ = (0x3f << 2) | 1; // lengthSize == 2 bytes + *out++ = 0xe0 | numSeqParameterSets; + + for (size_t i = 0; i < numSeqParameterSets; ++i) { + sp<ABuffer> nal = paramSets.editItemAt(i); + + *out++ = nal->size() >> 8; + *out++ = nal->size() & 0xff; + + memcpy(out, nal->data(), nal->size()); + + out += nal->size(); + } + + *out++ = numPicParameterSets; + + for (size_t i = 0; i < numPicParameterSets; ++i) { + sp<ABuffer> nal = paramSets.editItemAt(i + numSeqParameterSets); + + *out++ = nal->size() >> 8; + *out++ = nal->size() & 0xff; + + memcpy(out, nal->data(), nal->size()); + + out += nal->size(); + } + + hexdump(csd->data(), csd->size()); + + return csd; +} + +sp<ABuffer> MakeAACCodecSpecificData(const char *params) { + AString val; + CHECK(GetAttribute(params, "config", &val)); + + sp<ABuffer> config = decodeHex(val); + CHECK(config != NULL); + CHECK_GE(config->size(), 4u); + + const uint8_t *data = config->data(); + uint32_t x = data[0] << 24 | data[1] << 16 | data[2] << 8 | data[3]; + x = (x >> 1) & 0xffff; + + static const uint8_t kStaticESDS[] = { + 0x03, 22, + 0x00, 0x00, // ES_ID + 0x00, // streamDependenceFlag, URL_Flag, OCRstreamFlag + + 0x04, 17, + 0x40, // Audio ISO/IEC 14496-3 + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + + 0x05, 2, + // AudioSpecificInfo follows + }; + + sp<ABuffer> csd = new ABuffer(sizeof(kStaticESDS) + 2); + memcpy(csd->data(), kStaticESDS, sizeof(kStaticESDS)); + csd->data()[sizeof(kStaticESDS)] = (x >> 8) & 0xff; + csd->data()[sizeof(kStaticESDS) + 1] = x & 0xff; + + hexdump(csd->data(), csd->size()); + + return csd; +} + +APacketSource::APacketSource( + const sp<ASessionDescription> &sessionDesc, size_t index) + : mFormat(new MetaData), + mEOSResult(OK) { + unsigned long PT; + AString desc; + AString params; + sessionDesc->getFormatType(index, &PT, &desc, ¶ms); + + int64_t durationUs; + if (sessionDesc->getDurationUs(&durationUs)) { + mFormat->setInt64(kKeyDuration, durationUs); + } else { + mFormat->setInt64(kKeyDuration, 60 * 60 * 1000000ll); + } + + if (!strncmp(desc.c_str(), "H264/", 5)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_AVC); + + int32_t width, height; + sessionDesc->getDimensions(index, PT, &width, &height); + + mFormat->setInt32(kKeyWidth, width); + mFormat->setInt32(kKeyHeight, height); + + sp<ABuffer> codecSpecificData = + MakeAVCCodecSpecificData(params.c_str()); + + mFormat->setData( + kKeyAVCC, 0, + codecSpecificData->data(), codecSpecificData->size()); + + } else if (!strncmp(desc.c_str(), "MP4A-LATM", 9)) { + mFormat->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_AAC); + + int32_t sampleRate, numChannels; + ASessionDescription::ParseFormatDesc( + desc.c_str(), &sampleRate, &numChannels); + + mFormat->setInt32(kKeySampleRate, sampleRate); + mFormat->setInt32(kKeyChannelCount, numChannels); + + sp<ABuffer> codecSpecificData = + MakeAACCodecSpecificData(params.c_str()); + + mFormat->setData( + kKeyESDS, 0, + codecSpecificData->data(), codecSpecificData->size()); + } else { + TRESPASS(); + } +} + +APacketSource::~APacketSource() { +} + +status_t APacketSource::start(MetaData *params) { + return OK; +} + +status_t APacketSource::stop() { + return OK; +} + +sp<MetaData> APacketSource::getFormat() { + return mFormat; +} + +status_t APacketSource::read( + MediaBuffer **out, const ReadOptions *) { + *out = NULL; + + Mutex::Autolock autoLock(mLock); + while (mEOSResult == OK && mBuffers.empty()) { + mCondition.wait(mLock); + } + + if (!mBuffers.empty()) { + const sp<ABuffer> buffer = *mBuffers.begin(); + + uint64_t ntpTime; + CHECK(buffer->meta()->findInt64( + "ntp-time", (int64_t *)&ntpTime)); + + int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); + + MediaBuffer *mediaBuffer = new MediaBuffer(buffer->size()); + mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs); + memcpy(mediaBuffer->data(), buffer->data(), buffer->size()); + *out = mediaBuffer; + + mBuffers.erase(mBuffers.begin()); + return OK; + } + + return mEOSResult; +} + +void APacketSource::queueAccessUnit(const sp<ABuffer> &buffer) { + int32_t damaged; + if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { + // LOG(VERBOSE) << "discarding damaged AU"; + return; + } + + Mutex::Autolock autoLock(mLock); + mBuffers.push_back(buffer); + mCondition.signal(); +} + +void APacketSource::signalEOS(status_t result) { + CHECK(result != OK); + + Mutex::Autolock autoLock(mLock); + mEOSResult = result; + mCondition.signal(); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/APacketSource.h b/media/libstagefright/rtsp/APacketSource.h new file mode 100644 index 0000000..4040eee --- /dev/null +++ b/media/libstagefright/rtsp/APacketSource.h @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_PACKET_SOURCE_H_ + +#define A_PACKET_SOURCE_H_ + +#include <media/stagefright/foundation/ABase.h> +#include <media/stagefright/MediaSource.h> +#include <utils/threads.h> +#include <utils/List.h> + +namespace android { + +struct ABuffer; +struct ASessionDescription; + +struct APacketSource : public MediaSource { + APacketSource(const sp<ASessionDescription> &sessionDesc, size_t index); + + virtual status_t start(MetaData *params = NULL); + virtual status_t stop(); + virtual sp<MetaData> getFormat(); + + virtual status_t read( + MediaBuffer **buffer, const ReadOptions *options = NULL); + + void queueAccessUnit(const sp<ABuffer> &buffer); + void signalEOS(status_t result); + +protected: + virtual ~APacketSource(); + +private: + Mutex mLock; + Condition mCondition; + + sp<MetaData> mFormat; + List<sp<ABuffer> > mBuffers; + status_t mEOSResult; + + DISALLOW_EVIL_CONSTRUCTORS(APacketSource); +}; + + +} // namespace android + +#endif // A_PACKET_SOURCE_H_ diff --git a/media/libstagefright/rtsp/ARTPAssembler.cpp b/media/libstagefright/rtsp/ARTPAssembler.cpp new file mode 100644 index 0000000..24225b8 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPAssembler.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ARTPAssembler.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> + +#include <stdint.h> + +namespace android { + +static int64_t getNowUs() { + struct timeval tv; + gettimeofday(&tv, NULL); + + return (int64_t)tv.tv_usec + tv.tv_sec * 1000000ll; +} + +ARTPAssembler::ARTPAssembler() + : mFirstFailureTimeUs(-1) { +} + +void ARTPAssembler::PropagateTimes( + const sp<ABuffer> &from, const sp<ABuffer> &to) { + uint32_t rtpTime; + CHECK(from->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + uint64_t ntpTime = 0; + CHECK(from->meta()->findInt64("ntp-time", (int64_t *)&ntpTime)); + + to->meta()->setInt32("rtp-time", rtpTime); + to->meta()->setInt64("ntp-time", ntpTime); +} + +void ARTPAssembler::onPacketReceived(const sp<ARTPSource> &source) { + AssemblyStatus status; + for (;;) { + status = assembleMore(source); + + if (status == WRONG_SEQUENCE_NUMBER) { + if (mFirstFailureTimeUs >= 0) { + if (getNowUs() - mFirstFailureTimeUs > 10000ll) { + mFirstFailureTimeUs = -1; + + // LOG(VERBOSE) << "waited too long for packet."; + packetLost(); + continue; + } + } else { + mFirstFailureTimeUs = getNowUs(); + } + break; + } else { + mFirstFailureTimeUs = -1; + + if (status == NOT_ENOUGH_DATA) { + break; + } + } + } +} + +} // namespace android diff --git a/media/libstagefright/rtsp/ARTPAssembler.h b/media/libstagefright/rtsp/ARTPAssembler.h new file mode 100644 index 0000000..892bd65 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPAssembler.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_ASSEMBLER_H_ + +#define A_RTP_ASSEMBLER_H_ + +#include <media/stagefright/foundation/ABase.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct ARTPSource; + +struct ARTPAssembler : public RefBase { + enum AssemblyStatus { + MALFORMED_PACKET, + WRONG_SEQUENCE_NUMBER, + NOT_ENOUGH_DATA, + OK + }; + + ARTPAssembler(); + + void onPacketReceived(const sp<ARTPSource> &source); + +protected: + static void PropagateTimes( + const sp<ABuffer> &from, const sp<ABuffer> &to); + + virtual AssemblyStatus assembleMore(const sp<ARTPSource> &source) = 0; + virtual void packetLost() = 0; + +private: + int64_t mFirstFailureTimeUs; + + DISALLOW_EVIL_CONSTRUCTORS(ARTPAssembler); +}; + +} // namespace android + +#endif // A_RTP_ASSEMBLER_H_ diff --git a/media/libstagefright/rtsp/ARTPConnection.cpp b/media/libstagefright/rtsp/ARTPConnection.cpp new file mode 100644 index 0000000..a4413f0 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPConnection.cpp @@ -0,0 +1,499 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ARTPConnection.h" + +#include "ARTPSource.h" +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/AString.h> + +#include <arpa/inet.h> +#include <sys/socket.h> + +#define VERBOSE 0 + +#if VERBOSE +#include "hexdump.h" +#endif + +namespace android { + +static uint16_t u16at(const uint8_t *data) { + return data[0] << 8 | data[1]; +} + +static uint32_t u32at(const uint8_t *data) { + return u16at(data) << 16 | u16at(&data[2]); +} + +static uint64_t u64at(const uint8_t *data) { + return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); +} + +// static +const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; + +struct ARTPConnection::StreamInfo { + int mRTPSocket; + int mRTCPSocket; + sp<ASessionDescription> mSessionDesc; + size_t mIndex; + sp<AMessage> mNotifyMsg; +}; + +ARTPConnection::ARTPConnection() + : mPollEventPending(false) { +} + +ARTPConnection::~ARTPConnection() { +} + +void ARTPConnection::addStream( + int rtpSocket, int rtcpSocket, + const sp<ASessionDescription> &sessionDesc, + size_t index, + const sp<AMessage> ¬ify) { + sp<AMessage> msg = new AMessage(kWhatAddStream, id()); + msg->setInt32("rtp-socket", rtpSocket); + msg->setInt32("rtcp-socket", rtcpSocket); + msg->setObject("session-desc", sessionDesc); + msg->setSize("index", index); + msg->setMessage("notify", notify); + msg->post(); +} + +void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { + sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); + msg->setInt32("rtp-socket", rtpSocket); + msg->setInt32("rtcp-socket", rtcpSocket); + msg->post(); +} + +static void bumpSocketBufferSize(int s) { + int size = 256 * 1024; + CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); +} + +// static +void ARTPConnection::MakePortPair( + int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { + *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); + CHECK_GE(*rtpSocket, 0); + + bumpSocketBufferSize(*rtpSocket); + + *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); + CHECK_GE(*rtcpSocket, 0); + + bumpSocketBufferSize(*rtcpSocket); + + unsigned start = (rand() * 1000)/ RAND_MAX + 15550; + start &= ~1; + + for (unsigned port = start; port < 65536; port += 2) { + struct sockaddr_in addr; + memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + if (bind(*rtpSocket, + (const struct sockaddr *)&addr, sizeof(addr)) < 0) { + continue; + } + + addr.sin_port = htons(port + 1); + + if (bind(*rtcpSocket, + (const struct sockaddr *)&addr, sizeof(addr)) == 0) { + *rtpPort = port; + return; + } + } + + TRESPASS(); +} + +void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatAddStream: + { + onAddStream(msg); + break; + } + + case kWhatRemoveStream: + { + onRemoveStream(msg); + break; + } + + case kWhatPollStreams: + { + onPollStreams(); + break; + } + + default: + { + TRESPASS(); + break; + } + } +} + +void ARTPConnection::onAddStream(const sp<AMessage> &msg) { + mStreams.push_back(StreamInfo()); + StreamInfo *info = &*--mStreams.end(); + + int32_t s; + CHECK(msg->findInt32("rtp-socket", &s)); + info->mRTPSocket = s; + CHECK(msg->findInt32("rtcp-socket", &s)); + info->mRTCPSocket = s; + + sp<RefBase> obj; + CHECK(msg->findObject("session-desc", &obj)); + info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); + + CHECK(msg->findSize("index", &info->mIndex)); + CHECK(msg->findMessage("notify", &info->mNotifyMsg)); + + postPollEvent(); +} + +void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { + int32_t rtpSocket, rtcpSocket; + CHECK(msg->findInt32("rtp-socket", &rtpSocket)); + CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); + + List<StreamInfo>::iterator it = mStreams.begin(); + while (it != mStreams.end() + && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { + ++it; + } + + if (it == mStreams.end()) { + TRESPASS(); + } + + mStreams.erase(it); +} + +void ARTPConnection::postPollEvent() { + if (mPollEventPending) { + return; + } + + sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); + msg->post(); + + mPollEventPending = true; +} + +void ARTPConnection::onPollStreams() { + mPollEventPending = false; + + if (mStreams.empty()) { + return; + } + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = kSelectTimeoutUs; + + fd_set rs; + FD_ZERO(&rs); + + int maxSocket = -1; + for (List<StreamInfo>::iterator it = mStreams.begin(); + it != mStreams.end(); ++it) { + FD_SET(it->mRTPSocket, &rs); + FD_SET(it->mRTCPSocket, &rs); + + if (it->mRTPSocket > maxSocket) { + maxSocket = it->mRTPSocket; + } + if (it->mRTCPSocket > maxSocket) { + maxSocket = it->mRTCPSocket; + } + } + + int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); + CHECK_GE(res, 0); + + if (res > 0) { + for (List<StreamInfo>::iterator it = mStreams.begin(); + it != mStreams.end(); ++it) { + if (FD_ISSET(it->mRTPSocket, &rs)) { + receive(&*it, true); + } + if (FD_ISSET(it->mRTCPSocket, &rs)) { + receive(&*it, false); + } + } + } + + postPollEvent(); +} + +status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { + sp<ABuffer> buffer = new ABuffer(65536); + + struct sockaddr_in from; + socklen_t fromSize = sizeof(from); + + ssize_t nbytes = recvfrom( + receiveRTP ? s->mRTPSocket : s->mRTCPSocket, + buffer->data(), + buffer->capacity(), + 0, + (struct sockaddr *)&from, + &fromSize); + + if (nbytes < 0) { + return -1; + } + + buffer->setRange(0, nbytes); + + status_t err; + if (receiveRTP) { + err = parseRTP(s, buffer); + } else { + err = parseRTCP(s, buffer); + } + + return err; +} + +status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { + size_t size = buffer->size(); + + if (size < 12) { + // Too short to be a valid RTP header. + return -1; + } + + const uint8_t *data = buffer->data(); + + if ((data[0] >> 6) != 2) { + // Unsupported version. + return -1; + } + + if (data[0] & 0x20) { + // Padding present. + + size_t paddingLength = data[size - 1]; + + if (paddingLength + 12 > size) { + // If we removed this much padding we'd end up with something + // that's too short to be a valid RTP header. + return -1; + } + + size -= paddingLength; + } + + int numCSRCs = data[0] & 0x0f; + + size_t payloadOffset = 12 + 4 * numCSRCs; + + if (size < payloadOffset) { + // Not enough data to fit the basic header and all the CSRC entries. + return -1; + } + + if (data[0] & 0x10) { + // Header eXtension present. + + if (size < payloadOffset + 4) { + // Not enough data to fit the basic header, all CSRC entries + // and the first 4 bytes of the extension header. + + return -1; + } + + const uint8_t *extensionData = &data[payloadOffset]; + + size_t extensionLength = + 4 * (extensionData[2] << 8 | extensionData[3]); + + if (size < payloadOffset + 4 + extensionLength) { + return -1; + } + + payloadOffset += 4 + extensionLength; + } + + uint32_t srcId = u32at(&data[8]); + + sp<ARTPSource> source; + ssize_t index = mSources.indexOfKey(srcId); + if (index < 0) { + index = mSources.size(); + + source = new ARTPSource( + srcId, s->mSessionDesc, s->mIndex, s->mNotifyMsg); + + mSources.add(srcId, source); + } else { + source = mSources.valueAt(index); + } + + uint32_t rtpTime = u32at(&data[4]); + + sp<AMessage> meta = buffer->meta(); + meta->setInt32("ssrc", srcId); + meta->setInt32("rtp-time", rtpTime); + meta->setInt32("PT", data[1] & 0x7f); + meta->setInt32("M", data[1] >> 7); + + buffer->setInt32Data(u16at(&data[2])); + +#if VERBOSE + printf("RTP = {\n" + " PT: %d\n" + " sequence number: %d\n" + " RTP-time: 0x%08x\n" + " M: %d\n" + " SSRC: 0x%08x\n" + "}\n", + data[1] & 0x7f, + u16at(&data[2]), + rtpTime, + data[1] >> 7, + srcId); + + // hexdump(&data[payloadOffset], size - payloadOffset); +#endif + + buffer->setRange(payloadOffset, size - payloadOffset); + + source->processRTPPacket(buffer); + + return OK; +} + +status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + while (size > 0) { + if (size < 8) { + // Too short to be a valid RTCP header + return -1; + } + + if ((data[0] >> 6) != 2) { + // Unsupported version. + return -1; + } + + if (data[0] & 0x20) { + // Padding present. + + size_t paddingLength = data[size - 1]; + + if (paddingLength + 12 > size) { + // If we removed this much padding we'd end up with something + // that's too short to be a valid RTP header. + return -1; + } + + size -= paddingLength; + } + + size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; + + if (size < headerLength) { + // Only received a partial packet? + return -1; + } + + switch (data[1]) { + case 200: + { + parseSR(s, data, headerLength); + break; + } + + default: + { +#if VERBOSE + printf("Unknown RTCP packet type %d of size %ld\n", + data[1], headerLength); + + hexdump(data, headerLength); +#endif + break; + } + } + + data += headerLength; + size -= headerLength; + } + + return OK; +} + +status_t ARTPConnection::parseSR( + StreamInfo *s, const uint8_t *data, size_t size) { + size_t RC = data[0] & 0x1f; + + if (size < (7 + RC * 6) * 4) { + // Packet too short for the minimal SR header. + return -1; + } + + uint32_t id = u32at(&data[4]); + uint64_t ntpTime = u64at(&data[8]); + uint32_t rtpTime = u32at(&data[16]); + +#if VERBOSE + printf("SR = {\n" + " SSRC: 0x%08x\n" + " NTP-time: 0x%016llx\n" + " RTP-time: 0x%08x\n" + "}\n", + id, ntpTime, rtpTime); +#endif + + sp<ARTPSource> source; + ssize_t index = mSources.indexOfKey(id); + if (index < 0) { + index = mSources.size(); + + source = new ARTPSource( + id, s->mSessionDesc, s->mIndex, s->mNotifyMsg); + + mSources.add(id, source); + } else { + source = mSources.valueAt(index); + } + + source->timeUpdate(rtpTime, ntpTime); + + return 0; +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/ARTPConnection.h b/media/libstagefright/rtsp/ARTPConnection.h new file mode 100644 index 0000000..c77e3a4 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPConnection.h @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_CONNECTION_H_ + +#define A_RTP_CONNECTION_H_ + +#include <media/stagefright/foundation/AHandler.h> +#include <utils/List.h> + +namespace android { + +struct ABuffer; +struct ARTPSource; +struct ASessionDescription; + +struct ARTPConnection : public AHandler { + ARTPConnection(); + + void addStream( + int rtpSocket, int rtcpSocket, + const sp<ASessionDescription> &sessionDesc, size_t index, + const sp<AMessage> ¬ify); + + void removeStream(int rtpSocket, int rtcpSocket); + + // Creates a pair of UDP datagram sockets bound to adjacent ports + // (the rtpSocket is bound to an even port, the rtcpSocket to the + // next higher port). + static void MakePortPair( + int *rtpSocket, int *rtcpSocket, unsigned *rtpPort); + +protected: + virtual ~ARTPConnection(); + virtual void onMessageReceived(const sp<AMessage> &msg); + +private: + enum { + kWhatAddStream, + kWhatRemoveStream, + kWhatPollStreams, + }; + + static const int64_t kSelectTimeoutUs; + + struct StreamInfo; + List<StreamInfo> mStreams; + + KeyedVector<uint32_t, sp<ARTPSource> > mSources; + + bool mPollEventPending; + + void onAddStream(const sp<AMessage> &msg); + void onRemoveStream(const sp<AMessage> &msg); + void onPollStreams(); + + status_t receive(StreamInfo *info, bool receiveRTP); + + status_t parseRTP(StreamInfo *info, const sp<ABuffer> &buffer); + status_t parseRTCP(StreamInfo *info, const sp<ABuffer> &buffer); + status_t parseSR(StreamInfo *info, const uint8_t *data, size_t size); + + void postPollEvent(); + + DISALLOW_EVIL_CONSTRUCTORS(ARTPConnection); +}; + +} // namespace android + +#endif // A_RTP_CONNECTION_H_ diff --git a/media/libstagefright/rtsp/ARTPSource.cpp b/media/libstagefright/rtsp/ARTPSource.cpp new file mode 100644 index 0000000..f05daa8 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPSource.cpp @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ARTPSource.h" + +#include "AAVCAssembler.h" +#include "AMPEG4AudioAssembler.h" +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> + +#define VERBOSE 0 + +namespace android { + +ARTPSource::ARTPSource( + uint32_t id, + const sp<ASessionDescription> &sessionDesc, size_t index, + const sp<AMessage> ¬ify) + : mID(id), + mHighestSeqNumber(0), + mNumBuffersReceived(0), + mNumTimes(0) { + unsigned long PT; + AString desc; + AString params; + sessionDesc->getFormatType(index, &PT, &desc, ¶ms); + + if (!strncmp(desc.c_str(), "H264/", 5)) { + mAssembler = new AAVCAssembler(notify); + } else if (!strncmp(desc.c_str(), "MP4A-LATM", 9)) { + mAssembler = new AMPEG4AudioAssembler(notify); + } else { + TRESPASS(); + } +} + +static uint32_t AbsDiff(uint32_t seq1, uint32_t seq2) { + return seq1 > seq2 ? seq1 - seq2 : seq2 - seq1; +} + +void ARTPSource::processRTPPacket(const sp<ABuffer> &buffer) { + if (queuePacket(buffer) && mNumTimes == 2 && mAssembler != NULL) { + mAssembler->onPacketReceived(this); + } + + dump(); +} + +void ARTPSource::timeUpdate(uint32_t rtpTime, uint64_t ntpTime) { +#if VERBOSE + LOG(VERBOSE) << "timeUpdate"; +#endif + + if (mNumTimes == 2) { + mNTPTime[0] = mNTPTime[1]; + mRTPTime[0] = mRTPTime[1]; + mNumTimes = 1; + } + mNTPTime[mNumTimes] = ntpTime; + mRTPTime[mNumTimes++] = rtpTime; + + if (mNumTimes == 2) { + for (List<sp<ABuffer> >::iterator it = mQueue.begin(); + it != mQueue.end(); ++it) { + sp<AMessage> meta = (*it)->meta(); + + uint32_t rtpTime; + CHECK(meta->findInt32("rtp-time", (int32_t *)&rtpTime)); + + meta->setInt64("ntp-time", RTP2NTP(rtpTime)); + } + } +} + +bool ARTPSource::queuePacket(const sp<ABuffer> &buffer) { + uint32_t seqNum = (uint32_t)buffer->int32Data(); + + if (mNumTimes == 2) { + sp<AMessage> meta = buffer->meta(); + + uint32_t rtpTime; + CHECK(meta->findInt32("rtp-time", (int32_t *)&rtpTime)); + + meta->setInt64("ntp-time", RTP2NTP(rtpTime)); + } + + if (mNumBuffersReceived++ == 0) { + mHighestSeqNumber = seqNum; + mQueue.push_back(buffer); + return true; + } + + // Only the lower 16-bit of the sequence numbers are transmitted, + // derive the high-order bits by choosing the candidate closest + // to the highest sequence number (extended to 32 bits) received so far. + + uint32_t seq1 = seqNum | (mHighestSeqNumber & 0xffff0000); + uint32_t seq2 = seqNum | ((mHighestSeqNumber & 0xffff0000) + 0x10000); + uint32_t seq3 = seqNum | ((mHighestSeqNumber & 0xffff0000) - 0x10000); + uint32_t diff1 = AbsDiff(seq1, mHighestSeqNumber); + uint32_t diff2 = AbsDiff(seq2, mHighestSeqNumber); + uint32_t diff3 = AbsDiff(seq3, mHighestSeqNumber); + + if (diff1 < diff2) { + if (diff1 < diff3) { + // diff1 < diff2 ^ diff1 < diff3 + seqNum = seq1; + } else { + // diff3 <= diff1 < diff2 + seqNum = seq3; + } + } else if (diff2 < diff3) { + // diff2 <= diff1 ^ diff2 < diff3 + seqNum = seq2; + } else { + // diff3 <= diff2 <= diff1 + seqNum = seq3; + } + + if (seqNum > mHighestSeqNumber) { + mHighestSeqNumber = seqNum; + } + + buffer->setInt32Data(seqNum); + + List<sp<ABuffer> >::iterator it = mQueue.begin(); + while (it != mQueue.end() && (uint32_t)(*it)->int32Data() < seqNum) { + ++it; + } + + if (it != mQueue.end() && (uint32_t)(*it)->int32Data() == seqNum) { + LOG(WARNING) << "Discarding duplicate buffer"; + return false; + } + + mQueue.insert(it, buffer); + + return true; +} + +void ARTPSource::dump() const { + if ((mNumBuffersReceived % 128) != 0) { + return; + } + +#if 0 + if (mAssembler == NULL) { + char tmp[20]; + sprintf(tmp, "0x%08x", mID); + + int32_t numMissing = 0; + + if (!mQueue.empty()) { + List<sp<ABuffer> >::const_iterator it = mQueue.begin(); + uint32_t expectedSeqNum = (uint32_t)(*it)->int32Data(); + ++expectedSeqNum; + ++it; + + for (; it != mQueue.end(); ++it) { + uint32_t seqNum = (uint32_t)(*it)->int32Data(); + CHECK_GE(seqNum, expectedSeqNum); + + if (seqNum != expectedSeqNum) { + numMissing += seqNum - expectedSeqNum; + expectedSeqNum = seqNum; + } + + ++expectedSeqNum; + } + } + + LOG(VERBOSE) << "[" << tmp << "] Missing " << numMissing + << " / " << (mNumBuffersReceived + numMissing) << " packets. (" + << (100.0 * numMissing / (mNumBuffersReceived + numMissing)) + << " %%)"; + } +#endif + +#if 0 + AString out; + + out.append(tmp); + out.append(" ["); + + List<sp<ABuffer> >::const_iterator it = mQueue.begin(); + while (it != mQueue.end()) { + uint32_t start = (uint32_t)(*it)->int32Data(); + + out.append(start); + + ++it; + uint32_t expected = start + 1; + + while (it != mQueue.end()) { + uint32_t seqNum = (uint32_t)(*it)->int32Data(); + + if (seqNum != expected) { + if (expected > start + 1) { + out.append("-"); + out.append(expected - 1); + } + out.append(", "); + break; + } + + ++it; + ++expected; + } + + if (it == mQueue.end()) { + if (expected > start + 1) { + out.append("-"); + out.append(expected - 1); + } + } + } + + out.append("]"); + + LOG(VERBOSE) << out; +#endif +} + +uint64_t ARTPSource::RTP2NTP(uint32_t rtpTime) const { + CHECK_EQ(mNumTimes, 2u); + + return mNTPTime[0] + (double)(mNTPTime[1] - mNTPTime[0]) + * ((double)rtpTime - (double)mRTPTime[0]) + / (double)(mRTPTime[1] - mRTPTime[0]); +} + +} // namespace android + + diff --git a/media/libstagefright/rtsp/ARTPSource.h b/media/libstagefright/rtsp/ARTPSource.h new file mode 100644 index 0000000..b93cd56 --- /dev/null +++ b/media/libstagefright/rtsp/ARTPSource.h @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTP_SOURCE_H_ + +#define A_RTP_SOURCE_H_ + +#include <stdint.h> + +#include <media/stagefright/foundation/ABase.h> +#include <utils/List.h> +#include <utils/RefBase.h> + +namespace android { + +struct ABuffer; +struct AMessage; +struct ARTPAssembler; +struct ASessionDescription; + +struct ARTPSource : public RefBase { + ARTPSource( + uint32_t id, + const sp<ASessionDescription> &sessionDesc, size_t index, + const sp<AMessage> ¬ify); + + void processRTPPacket(const sp<ABuffer> &buffer); + void timeUpdate(uint32_t rtpTime, uint64_t ntpTime); + + List<sp<ABuffer> > *queue() { return &mQueue; } + +private: + uint32_t mID; + uint32_t mHighestSeqNumber; + int32_t mNumBuffersReceived; + + List<sp<ABuffer> > mQueue; + sp<ARTPAssembler> mAssembler; + + size_t mNumTimes; + uint64_t mNTPTime[2]; + uint32_t mRTPTime[2]; + + uint64_t RTP2NTP(uint32_t rtpTime) const; + + bool queuePacket(const sp<ABuffer> &buffer); + void dump() const; + + DISALLOW_EVIL_CONSTRUCTORS(ARTPSource); +}; + +} // namespace android + +#endif // A_RTP_SOURCE_H_ diff --git a/media/libstagefright/rtsp/ARTSPConnection.cpp b/media/libstagefright/rtsp/ARTSPConnection.cpp new file mode 100644 index 0000000..e9162c0 --- /dev/null +++ b/media/libstagefright/rtsp/ARTSPConnection.cpp @@ -0,0 +1,549 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ARTSPConnection.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> + +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <sys/socket.h> + +namespace android { + +// static +const int64_t ARTSPConnection::kSelectTimeoutUs = 1000ll; + +ARTSPConnection::ARTSPConnection() + : mState(DISCONNECTED), + mSocket(-1), + mConnectionID(0), + mNextCSeq(0), + mReceiveResponseEventPending(false) { +} + +ARTSPConnection::~ARTSPConnection() { + if (mSocket >= 0) { + LOG(ERROR) << "Connection is still open, closing the socket."; + close(mSocket); + mSocket = -1; + } +} + +void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatConnect, id()); + msg->setString("url", url); + msg->setMessage("reply", reply); + msg->post(); +} + +void ARTSPConnection::disconnect(const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); + msg->setMessage("reply", reply); + msg->post(); +} + +void ARTSPConnection::sendRequest( + const char *request, const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatSendRequest, id()); + msg->setString("request", request); + msg->setMessage("reply", reply); + msg->post(); +} + +void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatConnect: + onConnect(msg); + break; + + case kWhatDisconnect: + onDisconnect(msg); + break; + + case kWhatCompleteConnection: + onCompleteConnection(msg); + break; + + case kWhatSendRequest: + onSendRequest(msg); + break; + + case kWhatReceiveResponse: + onReceiveResponse(); + break; + + default: + TRESPASS(); + break; + } +} + +// static +bool ARTSPConnection::ParseURL( + const char *url, AString *host, unsigned *port, AString *path) { + host->clear(); + *port = 0; + path->clear(); + + if (strncasecmp("rtsp://", url, 7)) { + return false; + } + + const char *slashPos = strchr(&url[7], '/'); + + if (slashPos == NULL) { + host->setTo(&url[7]); + path->setTo("/"); + } else { + host->setTo(&url[7], slashPos - &url[7]); + path->setTo(slashPos); + } + + char *colonPos = strchr(host->c_str(), ':'); + + if (colonPos != NULL) { + unsigned long x; + if (!ParseSingleUnsignedLong(colonPos + 1, &x) || x >= 65536) { + return false; + } + + *port = x; + + size_t colonOffset = colonPos - host->c_str(); + size_t trailing = host->size() - colonOffset; + host->erase(colonOffset, trailing); + } else { + *port = 554; + } + + return true; +} + +void ARTSPConnection::onConnect(const sp<AMessage> &msg) { + ++mConnectionID; + + if (mState != DISCONNECTED) { + close(mSocket); + mSocket = -1; + + flushPendingRequests(); + } + + mState = CONNECTING; + + mSocket = socket(AF_INET, SOCK_STREAM, 0); + + // Make socket non-blocking. + int flags = fcntl(mSocket, F_GETFL, 0); + CHECK_NE(flags, -1); + CHECK_NE(fcntl(mSocket, F_SETFL, flags | O_NONBLOCK), -1); + + AString url; + CHECK(msg->findString("url", &url)); + + AString host, path; + unsigned port; + CHECK(ParseURL(url.c_str(), &host, &port, &path)); + + struct hostent *ent = gethostbyname(host.c_str()); + CHECK(ent != NULL); + + struct sockaddr_in remote; + memset(remote.sin_zero, 0, sizeof(remote.sin_zero)); + remote.sin_family = AF_INET; + remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; + remote.sin_port = htons(port); + + int err = ::connect( + mSocket, (const struct sockaddr *)&remote, sizeof(remote)); + + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr)); + + if (err < 0) { + if (errno == EINPROGRESS) { + sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id()); + msg->setMessage("reply", reply); + msg->setInt32("connection-id", mConnectionID); + msg->post(); + return; + } + + reply->setInt32("result", -errno); + mState = DISCONNECTED; + + close(mSocket); + mSocket = -1; + } else { + reply->setInt32("result", OK); + mState = CONNECTED; + mNextCSeq = 1; + + postReceiveReponseEvent(); + } + + reply->post(); +} + +void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) { + if (mState == CONNECTED || mState == CONNECTING) { + close(mSocket); + mSocket = -1; + + flushPendingRequests(); + } + + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + reply->setInt32("result", OK); + mState = DISCONNECTED; + + reply->post(); +} + +void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) { + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + int32_t connectionID; + CHECK(msg->findInt32("connection-id", &connectionID)); + + if ((connectionID != mConnectionID) || mState != CONNECTING) { + // While we were attempting to connect, the attempt was + // cancelled. + reply->setInt32("result", -ECONNABORTED); + reply->post(); + return; + } + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = kSelectTimeoutUs; + + fd_set ws; + FD_ZERO(&ws); + FD_SET(mSocket, &ws); + + int res = select(mSocket + 1, NULL, &ws, NULL, &tv); + CHECK_GE(res, 0); + + if (res == 0) { + // Timed out. Not yet connected. + + msg->post(); + return; + } + + int err; + socklen_t optionLen = sizeof(err); + CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); + CHECK_EQ(optionLen, (socklen_t)sizeof(err)); + + if (err != 0) { + LOG(ERROR) << "err = " << err << " (" << strerror(err) << ")"; + + reply->setInt32("result", -err); + + mState = DISCONNECTED; + close(mSocket); + mSocket = -1; + } else { + reply->setInt32("result", OK); + mState = CONNECTED; + mNextCSeq = 1; + + postReceiveReponseEvent(); + } + + reply->post(); +} + +void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) { + sp<AMessage> reply; + CHECK(msg->findMessage("reply", &reply)); + + if (mState != CONNECTED) { + reply->setInt32("result", -ENOTCONN); + reply->post(); + return; + } + + AString request; + CHECK(msg->findString("request", &request)); + + // Find the boundary between headers and the body. + ssize_t i = request.find("\r\n\r\n"); + CHECK_GE(i, 0); + + int32_t cseq = mNextCSeq++; + + AString cseqHeader = "CSeq: "; + cseqHeader.append(cseq); + cseqHeader.append("\r\n"); + + request.insert(cseqHeader, i + 2); + + LOG(VERBOSE) << request; + + size_t numBytesSent = 0; + while (numBytesSent < request.size()) { + ssize_t n = + send(mSocket, request.c_str() + numBytesSent, + request.size() - numBytesSent, 0); + + if (n == 0) { + // Server closed the connection. + TRESPASS(); + } else if (n < 0) { + if (errno == EINTR) { + continue; + } + + TRESPASS(); + } + + numBytesSent += (size_t)n; + } + + mPendingRequests.add(cseq, reply); +} + +void ARTSPConnection::onReceiveResponse() { + mReceiveResponseEventPending = false; + + if (mState != CONNECTED) { + return; + } + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = kSelectTimeoutUs; + + fd_set rs; + FD_ZERO(&rs); + FD_SET(mSocket, &rs); + + int res = select(mSocket + 1, &rs, NULL, NULL, &tv); + CHECK_GE(res, 0); + + if (res == 1) { + if (!receiveRTSPReponse()) { + // Something horrible, irreparable has happened. + flushPendingRequests(); + return; + } + } + + postReceiveReponseEvent(); +} + +void ARTSPConnection::flushPendingRequests() { + for (size_t i = 0; i < mPendingRequests.size(); ++i) { + sp<AMessage> reply = mPendingRequests.valueAt(i); + + reply->setInt32("result", -ECONNABORTED); + reply->post(); + } + + mPendingRequests.clear(); +} + +void ARTSPConnection::postReceiveReponseEvent() { + if (mReceiveResponseEventPending) { + return; + } + + sp<AMessage> msg = new AMessage(kWhatReceiveResponse, id()); + msg->post(); + + mReceiveResponseEventPending = true; +} + +bool ARTSPConnection::receiveLine(AString *line) { + line->clear(); + + bool sawCR = false; + for (;;) { + char c; + ssize_t n = recv(mSocket, &c, 1, 0); + if (n == 0) { + // Server closed the connection. + return false; + } else if (n < 0) { + if (errno == EINTR) { + continue; + } + + TRESPASS(); + } + + if (sawCR && c == '\n') { + line->erase(line->size() - 1, 1); + return true; + } + + line->append(&c, 1); + + sawCR = (c == '\r'); + } +} + +bool ARTSPConnection::receiveRTSPReponse() { + sp<ARTSPResponse> response = new ARTSPResponse; + + if (!receiveLine(&response->mStatusLine)) { + return false; + } + + LOG(INFO) << "status: " << response->mStatusLine; + + ssize_t space1 = response->mStatusLine.find(" "); + if (space1 < 0) { + return false; + } + ssize_t space2 = response->mStatusLine.find(" ", space1 + 1); + if (space2 < 0) { + return false; + } + + AString statusCodeStr( + response->mStatusLine, space1 + 1, space2 - space1 - 1); + + if (!ParseSingleUnsignedLong( + statusCodeStr.c_str(), &response->mStatusCode) + || response->mStatusCode < 100 || response->mStatusCode > 999) { + return false; + } + + AString line; + for (;;) { + if (!receiveLine(&line)) { + break; + } + + if (line.empty()) { + break; + } + + LOG(VERBOSE) << "line: " << line; + + ssize_t colonPos = line.find(":"); + if (colonPos < 0) { + // Malformed header line. + return false; + } + + AString key(line, 0, colonPos); + key.trim(); + key.tolower(); + + line.erase(0, colonPos + 1); + line.trim(); + + response->mHeaders.add(key, line); + } + + unsigned long contentLength = 0; + + ssize_t i = response->mHeaders.indexOfKey("content-length"); + + if (i >= 0) { + AString value = response->mHeaders.valueAt(i); + if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) { + return false; + } + } + + if (contentLength > 0) { + response->mContent = new ABuffer(contentLength); + + size_t numBytesRead = 0; + while (numBytesRead < contentLength) { + ssize_t n = recv( + mSocket, response->mContent->data() + numBytesRead, + contentLength - numBytesRead, 0); + + if (n == 0) { + // Server closed the connection. + TRESPASS(); + } else if (n < 0) { + if (errno == EINTR) { + continue; + } + + TRESPASS(); + } + + numBytesRead += (size_t)n; + } + } + + return notifyResponseListener(response); +} + +// static +bool ARTSPConnection::ParseSingleUnsignedLong( + const char *from, unsigned long *x) { + char *end; + *x = strtoul(from, &end, 10); + + if (end == from || *end != '\0') { + return false; + } + + return true; +} + +bool ARTSPConnection::notifyResponseListener( + const sp<ARTSPResponse> &response) { + ssize_t i = response->mHeaders.indexOfKey("cseq"); + + if (i < 0) { + return true; + } + + AString value = response->mHeaders.valueAt(i); + + unsigned long cseq; + if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { + return false; + } + + i = mPendingRequests.indexOfKey(cseq); + + if (i < 0) { + // Unsolicited response? + TRESPASS(); + } + + sp<AMessage> reply = mPendingRequests.valueAt(i); + mPendingRequests.removeItemsAt(i); + + reply->setInt32("result", OK); + reply->setObject("response", response); + reply->post(); + + return true; +} + +} // namespace android diff --git a/media/libstagefright/rtsp/ARTSPConnection.h b/media/libstagefright/rtsp/ARTSPConnection.h new file mode 100644 index 0000000..3577a2f --- /dev/null +++ b/media/libstagefright/rtsp/ARTSPConnection.h @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_RTSP_CONNECTION_H_ + +#define A_RTSP_CONNECTION_H_ + +#include <media/stagefright/foundation/AHandler.h> +#include <media/stagefright/foundation/AString.h> + +namespace android { + +struct ABuffer; + +struct ARTSPResponse : public RefBase { + unsigned long mStatusCode; + AString mStatusLine; + KeyedVector<AString,AString> mHeaders; + sp<ABuffer> mContent; +}; + +struct ARTSPConnection : public AHandler { + ARTSPConnection(); + + void connect(const char *url, const sp<AMessage> &reply); + void disconnect(const sp<AMessage> &reply); + + void sendRequest(const char *request, const sp<AMessage> &reply); + +protected: + virtual ~ARTSPConnection(); + virtual void onMessageReceived(const sp<AMessage> &msg); + +private: + enum State { + DISCONNECTED, + CONNECTING, + CONNECTED, + }; + + enum { + kWhatConnect = 'conn', + kWhatDisconnect = 'disc', + kWhatCompleteConnection = 'comc', + kWhatSendRequest = 'sreq', + kWhatReceiveResponse = 'rres', + }; + + static const int64_t kSelectTimeoutUs; + + State mState; + int mSocket; + int32_t mConnectionID; + int32_t mNextCSeq; + bool mReceiveResponseEventPending; + + KeyedVector<int32_t, sp<AMessage> > mPendingRequests; + + void onConnect(const sp<AMessage> &msg); + void onDisconnect(const sp<AMessage> &msg); + void onCompleteConnection(const sp<AMessage> &msg); + void onSendRequest(const sp<AMessage> &msg); + void onReceiveResponse(); + + void flushPendingRequests(); + void postReceiveReponseEvent(); + + // Return false iff something went unrecoverably wrong. + bool receiveRTSPReponse(); + bool receiveLine(AString *line); + bool notifyResponseListener(const sp<ARTSPResponse> &response); + + static bool ParseURL( + const char *url, AString *host, unsigned *port, AString *path); + + static bool ParseSingleUnsignedLong( + const char *from, unsigned long *x); + + DISALLOW_EVIL_CONSTRUCTORS(ARTSPConnection); +}; + +} // namespace android + +#endif // A_RTSP_CONNECTION_H_ diff --git a/media/libstagefright/rtsp/ARTSPController.cpp b/media/libstagefright/rtsp/ARTSPController.cpp new file mode 100644 index 0000000..7b87d42 --- /dev/null +++ b/media/libstagefright/rtsp/ARTSPController.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ARTSPController.h" + +#include "MyHandler.h" + +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/MediaErrors.h> +#include <media/stagefright/MediaSource.h> +#include <media/stagefright/MetaData.h> + +namespace android { + +ARTSPController::ARTSPController(const sp<ALooper> &looper) + : mLooper(looper) { +} + +ARTSPController::~ARTSPController() { +} + +status_t ARTSPController::connect(const char *url) { + if (mHandler != NULL) { + return ERROR_ALREADY_CONNECTED; + } + + mHandler = new MyHandler(url, mLooper); + sleep(10); + + return OK; +} + +void ARTSPController::disconnect() { + if (mHandler == NULL) { + return; + } + + mHandler.clear(); +} + +size_t ARTSPController::countTracks() { + if (mHandler == NULL) { + return 0; + } + + return mHandler->countTracks(); +} + +sp<MediaSource> ARTSPController::getTrack(size_t index) { + CHECK(mHandler != NULL); + + return mHandler->getPacketSource(index); +} + +sp<MetaData> ARTSPController::getTrackMetaData( + size_t index, uint32_t flags) { + CHECK(mHandler != NULL); + + return mHandler->getPacketSource(index)->getFormat(); +} + +} // namespace android diff --git a/media/libstagefright/rtsp/ASessionDescription.cpp b/media/libstagefright/rtsp/ASessionDescription.cpp new file mode 100644 index 0000000..ca4c55e --- /dev/null +++ b/media/libstagefright/rtsp/ASessionDescription.cpp @@ -0,0 +1,270 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AString.h> + +#include <stdlib.h> + +namespace android { + +ASessionDescription::ASessionDescription() + : mIsValid(false) { +} + +ASessionDescription::~ASessionDescription() { +} + +bool ASessionDescription::setTo(const void *data, size_t size) { + mIsValid = parse(data, size); + + if (!mIsValid) { + mTracks.clear(); + mFormats.clear(); + } + + return mIsValid; +} + +bool ASessionDescription::parse(const void *data, size_t size) { + mTracks.clear(); + mFormats.clear(); + + mTracks.push(Attribs()); + mFormats.push(AString("[root]")); + + AString desc((const char *)data, size); + LOG(VERBOSE) << desc; + + size_t i = 0; + for (;;) { + ssize_t eolPos = desc.find("\r\n", i); + if (eolPos < 0) { + break; + } + + AString line(desc, i, eolPos - i); + + if (line.size() < 2 || line.c_str()[1] != '=') { + return false; + } + + switch (line.c_str()[0]) { + case 'v': + { + if (strcmp(line.c_str(), "v=0")) { + return false; + } + break; + } + + case 'a': + case 'b': + { + AString key, value; + + ssize_t colonPos = line.find(":", 2); + if (colonPos < 0) { + key = line; + } else { + key.setTo(line, 0, colonPos); + + if (key == "a=fmtp" || key == "a=rtpmap" + || key == "a=framesize") { + ssize_t spacePos = line.find(" ", colonPos + 1); + if (spacePos < 0) { + return false; + } + + key.setTo(line, 0, spacePos); + + colonPos = spacePos; + } + + value.setTo(line, colonPos + 1, line.size() - colonPos - 1); + } + + key.trim(); + value.trim(); + + LOG(VERBOSE) << "adding '" << key << "' => '" << value << "'"; + + mTracks.editItemAt(mTracks.size() - 1).add(key, value); + break; + } + + case 'm': + { + LOG(VERBOSE) << "new section '" << AString(line, 2, line.size() - 2) << "'"; + + mTracks.push(Attribs()); + mFormats.push(AString(line, 2, line.size() - 2)); + break; + } + } + + i = eolPos + 2; + } + + return true; +} + +bool ASessionDescription::isValid() const { + return mIsValid; +} + +size_t ASessionDescription::countTracks() const { + return mTracks.size(); +} + +void ASessionDescription::getFormat(size_t index, AString *value) const { + CHECK_GE(index, 0u); + CHECK_LT(index, mTracks.size()); + + *value = mFormats.itemAt(index); +} + +bool ASessionDescription::findAttribute( + size_t index, const char *key, AString *value) const { + CHECK_GE(index, 0u); + CHECK_LT(index, mTracks.size()); + + value->clear(); + + const Attribs &track = mTracks.itemAt(index); + ssize_t i = track.indexOfKey(AString(key)); + + if (i < 0) { + return false; + } + + *value = track.valueAt(i); + + return true; +} + +void ASessionDescription::getFormatType( + size_t index, unsigned long *PT, + AString *desc, AString *params) const { + AString format; + getFormat(index, &format); + + char *lastSpacePos = strrchr(format.c_str(), ' '); + CHECK(lastSpacePos != NULL); + + char *end; + unsigned long x = strtoul(lastSpacePos + 1, &end, 10); + CHECK_GT(end, lastSpacePos + 1); + CHECK_EQ(*end, '\0'); + + *PT = x; + + char key[20]; + sprintf(key, "a=rtpmap:%lu", x); + + CHECK(findAttribute(index, key, desc)); + + sprintf(key, "a=fmtp:%lu", x); + if (!findAttribute(index, key, params)) { + params->clear(); + } +} + +void ASessionDescription::getDimensions( + size_t index, unsigned long PT, + int32_t *width, int32_t *height) const { + char key[20]; + sprintf(key, "a=framesize:%lu", PT); + AString value; + CHECK(findAttribute(index, key, &value)); + + const char *s = value.c_str(); + char *end; + *width = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK_EQ(*end, '-'); + + s = end + 1; + *height = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK_EQ(*end, '\0'); +} + +bool ASessionDescription::getDurationUs(int64_t *durationUs) const { + *durationUs = 0; + + CHECK(mIsValid); + + AString value; + if (!findAttribute(0, "a=range", &value)) { + return false; + } + + if (value == "npt=now-") { + return false; + } + + if (strncmp(value.c_str(), "npt=", 4)) { + return false; + } + + const char *s = value.c_str() + 4; + char *end; + double from = strtod(s, &end); + CHECK_GT(end, s); + CHECK_EQ(*end, '-'); + + s = end + 1; + double to = strtod(s, &end); + CHECK_GT(end, s); + CHECK_EQ(*end, '\0'); + + CHECK_GE(to, from); + + *durationUs = (int64_t)((to - from) * 1E6); + + return true; +} + +// static +void ASessionDescription::ParseFormatDesc( + const char *desc, int32_t *timescale, int32_t *numChannels) { + const char *slash1 = strchr(desc, '/'); + CHECK(slash1 != NULL); + + const char *s = slash1 + 1; + char *end; + unsigned long x = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK(*end == '\0' || *end == '/'); + + *timescale = x; + *numChannels = 1; + + if (*end == '/') { + s = end + 1; + unsigned long x = strtoul(s, &end, 10); + CHECK_GT(end, s); + CHECK_EQ(*end, '\0'); + + *numChannels = x; + } +} + +} // namespace android + diff --git a/media/libstagefright/rtsp/ASessionDescription.h b/media/libstagefright/rtsp/ASessionDescription.h new file mode 100644 index 0000000..b26980f --- /dev/null +++ b/media/libstagefright/rtsp/ASessionDescription.h @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_SESSION_DESCRIPTION_H_ + +#define A_SESSION_DESCRIPTION_H_ + +#include <sys/types.h> + +#include <media/stagefright/foundation/ABase.h> +#include <utils/KeyedVector.h> +#include <utils/RefBase.h> +#include <utils/Vector.h> + +namespace android { + +struct AString; + +struct ASessionDescription : public RefBase { + ASessionDescription(); + + bool setTo(const void *data, size_t size); + bool isValid() const; + + // Actually, 1 + number of tracks, as index 0 is reserved for the + // session description root-level attributes. + size_t countTracks() const; + void getFormat(size_t index, AString *value) const; + + void getFormatType( + size_t index, unsigned long *PT, + AString *desc, AString *params) const; + + void getDimensions( + size_t index, unsigned long PT, + int32_t *width, int32_t *height) const; + + bool getDurationUs(int64_t *durationUs) const; + + static void ParseFormatDesc( + const char *desc, int32_t *timescale, int32_t *numChannels); + + bool findAttribute(size_t index, const char *key, AString *value) const; + +protected: + virtual ~ASessionDescription(); + +private: + typedef KeyedVector<AString,AString> Attribs; + + bool mIsValid; + Vector<Attribs> mTracks; + Vector<AString> mFormats; + + bool parse(const void *data, size_t size); + + DISALLOW_EVIL_CONSTRUCTORS(ASessionDescription); +}; + +} // namespace android + +#endif // A_SESSION_DESCRIPTION_H_ diff --git a/media/libstagefright/rtsp/Android.mk b/media/libstagefright/rtsp/Android.mk new file mode 100644 index 0000000..4608fa0 --- /dev/null +++ b/media/libstagefright/rtsp/Android.mk @@ -0,0 +1,28 @@ +LOCAL_PATH:= $(call my-dir) + +include $(CLEAR_VARS) + +LOCAL_SRC_FILES:= \ + ARTSPController.cpp \ + AAVCAssembler.cpp \ + AMPEG4AudioAssembler.cpp \ + APacketSource.cpp \ + ARTPAssembler.cpp \ + ARTPConnection.cpp \ + ARTPSource.cpp \ + ARTSPConnection.cpp \ + ASessionDescription.cpp \ + +LOCAL_C_INCLUDES:= \ + $(JNI_H_INCLUDE) \ + $(TOP)/external/opencore/extern_libs_v2/khronos/openmax/include \ + $(TOP)/frameworks/base/media/libstagefright/include \ + +LOCAL_MODULE:= libstagefright_rtsp + +ifeq ($(TARGET_ARCH),arm) + LOCAL_CFLAGS += -Wno-psabi +endif + +include $(BUILD_STATIC_LIBRARY) + diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h new file mode 100644 index 0000000..74bb798 --- /dev/null +++ b/media/libstagefright/rtsp/MyHandler.h @@ -0,0 +1,442 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MY_HANDLER_H_ + +#define MY_HANDLER_H_ + +#include "APacketSource.h" +#include "ARTPConnection.h" +#include "ARTSPConnection.h" +#include "ASessionDescription.h" + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/ALooper.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/MediaErrors.h> + +namespace android { + +struct MyHandler : public AHandler { + MyHandler(const char *url, const sp<ALooper> &looper) + : mLooper(looper), + mConn(new ARTSPConnection), + mRTPConn(new ARTPConnection), + mSessionURL(url), + mSetupTracksSuccessful(false), + mFirstAccessUnit(true), + mFirstAccessUnitNTP(-1) { + mLooper->registerHandler(this); + mLooper->registerHandler(mConn); + mLooper->registerHandler(mRTPConn); + sp<AMessage> reply = new AMessage('conn', id()); + mConn->connect(mSessionURL.c_str(), reply); + } + + virtual void onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case 'conn': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "connection request completed with result " + << result << " (" << strerror(-result) << ")"; + + if (result == OK) { + AString request; + request = "DESCRIBE "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + request.append("Accept: application/sdp\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('desc', id()); + mConn->sendRequest(request.c_str(), reply); + } + break; + } + + case 'disc': + { + LOG(INFO) << "disconnect completed"; + + (new AMessage('quit', id()))->post(); + break; + } + + case 'desc': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "DESCRIBE completed with result " + << result << " (" << strerror(-result) << ")"; + + if (result == OK) { + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + if (response->mStatusCode == 302) { + ssize_t i = response->mHeaders.indexOfKey("location"); + CHECK_GE(i, 0); + + mSessionURL = response->mHeaders.valueAt(i); + + AString request; + request = "DESCRIBE "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + request.append("Accept: application/sdp\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('desc', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + CHECK_EQ(response->mStatusCode, 200u); + + mSessionDesc = new ASessionDescription; + + mSessionDesc->setTo( + response->mContent->data(), + response->mContent->size()); + + CHECK(mSessionDesc->isValid()); + + ssize_t i = response->mHeaders.indexOfKey("content-base"); + if (i >= 0) { + mBaseURL = response->mHeaders.valueAt(i); + } else { + i = response->mHeaders.indexOfKey("content-location"); + if (i >= 0) { + mBaseURL = response->mHeaders.valueAt(i); + } else { + mBaseURL = mSessionURL; + } + } + + CHECK_GT(mSessionDesc->countTracks(), 1u); + setupTrack(1); + } else { + sp<AMessage> reply = new AMessage('disc', id()); + mConn->disconnect(reply); + } + break; + } + + case 'setu': + { + size_t index; + CHECK(msg->findSize("index", &index)); + + size_t trackIndex; + CHECK(msg->findSize("track-index", &trackIndex)); + + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "SETUP(" << index << ") completed with result " + << result << " (" << strerror(-result) << ")"; + + TrackInfo *track = &mTracks.editItemAt(trackIndex); + + if (result == OK) { + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + CHECK_EQ(response->mStatusCode, 200u); + + ssize_t i = response->mHeaders.indexOfKey("session"); + CHECK_GE(i, 0); + + if (index == 1) { + mSessionID = response->mHeaders.valueAt(i); + i = mSessionID.find(";"); + if (i >= 0) { + // Remove options, i.e. ";timeout=90" + mSessionID.erase(i, mSessionID.size() - i); + } + } + + sp<AMessage> notify = new AMessage('accu', id()); + notify->setSize("track-index", trackIndex); + + mRTPConn->addStream( + track->mRTPSocket, track->mRTCPSocket, + mSessionDesc, index, + notify); + + track->mPacketSource = + new APacketSource(mSessionDesc, index); + + mSetupTracksSuccessful = true; + + ++index; + if (index < mSessionDesc->countTracks()) { + setupTrack(index); + break; + } + } else { + close(track->mRTPSocket); + close(track->mRTCPSocket); + + mTracks.removeItemsAt(mTracks.size() - 1); + } + + if (mSetupTracksSuccessful) { + AString request = "PLAY "; + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('play', id()); + mConn->sendRequest(request.c_str(), reply); + } else { + sp<AMessage> reply = new AMessage('disc', id()); + mConn->disconnect(reply); + } + break; + } + + case 'play': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "PLAY completed with result " + << result << " (" << strerror(-result) << ")"; + + if (result == OK) { + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response = + static_cast<ARTSPResponse *>(obj.get()); + + CHECK_EQ(response->mStatusCode, 200u); + + sp<AMessage> msg = new AMessage('abor', id()); + msg->post(60000000ll); + } else { + sp<AMessage> reply = new AMessage('disc', id()); + mConn->disconnect(reply); + } + + break; + } + + case 'abor': + { + for (size_t i = 0; i < mTracks.size(); ++i) { + mTracks.editItemAt(i).mPacketSource->signalEOS( + ERROR_END_OF_STREAM); + } + + sp<AMessage> reply = new AMessage('tear', id()); + + AString request; + request = "TEARDOWN "; + + // XXX should use aggregate url from SDP here... + request.append(mSessionURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + + request.append("\r\n"); + + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'tear': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "TEARDOWN completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<AMessage> reply = new AMessage('disc', id()); + mConn->disconnect(reply); + break; + } + + case 'quit': + { + mLooper->stop(); + break; + } + + case 'accu': + { + size_t trackIndex; + CHECK(msg->findSize("track-index", &trackIndex)); + + sp<RefBase> obj; + CHECK(msg->findObject("access-unit", &obj)); + + sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); + + uint64_t ntpTime; + CHECK(accessUnit->meta()->findInt64( + "ntp-time", (int64_t *)&ntpTime)); + + if (mFirstAccessUnit) { + mFirstAccessUnit = false; + mFirstAccessUnitNTP = ntpTime; + } + if (ntpTime > mFirstAccessUnitNTP) { + ntpTime -= mFirstAccessUnitNTP; + } else { + ntpTime = 0; + } + + accessUnit->meta()->setInt64("ntp-time", ntpTime); + + TrackInfo *track = &mTracks.editItemAt(trackIndex); + track->mPacketSource->queueAccessUnit(accessUnit); + break; + } + + default: + TRESPASS(); + break; + } + } + + sp<APacketSource> getPacketSource(size_t index) { + CHECK_GE(index, 0u); + CHECK_LT(index, mTracks.size()); + + return mTracks.editItemAt(index).mPacketSource; + } + + size_t countTracks() const { + return mTracks.size(); + } + +private: + sp<ALooper> mLooper; + sp<ARTSPConnection> mConn; + sp<ARTPConnection> mRTPConn; + sp<ASessionDescription> mSessionDesc; + AString mSessionURL; + AString mBaseURL; + AString mSessionID; + bool mSetupTracksSuccessful; + bool mFirstAccessUnit; + uint64_t mFirstAccessUnitNTP; + + struct TrackInfo { + int mRTPSocket; + int mRTCPSocket; + + sp<APacketSource> mPacketSource; + }; + Vector<TrackInfo> mTracks; + + void setupTrack(size_t index) { + AString url; + CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); + + AString trackURL; + CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); + + mTracks.push(TrackInfo()); + TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); + + unsigned rtpPort; + ARTPConnection::MakePortPair( + &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); + + AString request = "SETUP "; + request.append(trackURL); + request.append(" RTSP/1.0\r\n"); + + request.append("Transport: RTP/AVP/UDP;unicast;client_port="); + request.append(rtpPort); + request.append("-"); + request.append(rtpPort + 1); + request.append("\r\n"); + + if (index > 1) { + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + } + + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('setu', id()); + reply->setSize("index", index); + reply->setSize("track-index", mTracks.size() - 1); + mConn->sendRequest(request.c_str(), reply); + } + + static bool MakeURL(const char *baseURL, const char *url, AString *out) { + out->clear(); + + if (strncasecmp("rtsp://", baseURL, 7)) { + // Base URL must be absolute + return false; + } + + if (!strncasecmp("rtsp://", url, 7)) { + // "url" is already an absolute URL, ignore base URL. + out->setTo(url); + return true; + } + + size_t n = strlen(baseURL); + if (baseURL[n - 1] == '/') { + out->setTo(baseURL); + out->append(url); + } else { + char *slashPos = strrchr(baseURL, '/'); + + if (slashPos > &baseURL[6]) { + out->setTo(baseURL, slashPos - baseURL); + } else { + out->setTo(baseURL); + } + + out->append("/"); + out->append(url); + } + + return true; + } + + DISALLOW_EVIL_CONSTRUCTORS(MyHandler); +}; + +} // namespace android + +#endif // MY_HANDLER_H_ diff --git a/media/libstagefright/rtsp/MyTransmitter.h b/media/libstagefright/rtsp/MyTransmitter.h new file mode 100644 index 0000000..009a3b1 --- /dev/null +++ b/media/libstagefright/rtsp/MyTransmitter.h @@ -0,0 +1,981 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MY_TRANSMITTER_H_ + +#define MY_TRANSMITTER_H_ + +#include "ARTPConnection.h" + +#include <arpa/inet.h> +#include <sys/socket.h> + +#include <openssl/md5.h> + +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/base64.h> +#include <media/stagefright/foundation/hexdump.h> + +#ifdef ANDROID +#include "VideoSource.h" + +#include <media/stagefright/OMXClient.h> +#include <media/stagefright/OMXCodec.h> +#endif + +namespace android { + +#define TRACK_SUFFIX "trackid=1" +#define PT 96 +#define PT_STR "96" + +#define USERNAME "bcast" +#define PASSWORD "test" + +static int uniformRand(int limit) { + return ((double)rand() * limit) / RAND_MAX; +} + +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +struct MyTransmitter : public AHandler { + MyTransmitter(const char *url, const sp<ALooper> &looper) + : mServerURL(url), + mLooper(looper), + mConn(new ARTSPConnection), + mConnected(false), + mAuthType(NONE), + mRTPSocket(-1), + mRTCPSocket(-1), + mSourceID(rand()), + mSeqNo(uniformRand(65536)), + mRTPTimeBase(rand()), + mNumSamplesSent(0), + mNumRTPSent(0), + mNumRTPOctetsSent(0), + mLastRTPTime(0), + mLastNTPTime(0) { + mStreamURL = mServerURL; + mStreamURL.append("/bazong.sdp"); + + mTrackURL = mStreamURL; + mTrackURL.append("/"); + mTrackURL.append(TRACK_SUFFIX); + + mLooper->registerHandler(this); + mLooper->registerHandler(mConn); + + sp<AMessage> reply = new AMessage('conn', id()); + mConn->connect(mServerURL.c_str(), reply); + +#ifdef ANDROID + int width = 640; + int height = 480; + + sp<MediaSource> source = new VideoSource(width, height); + + sp<MetaData> encMeta = new MetaData; + encMeta->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_AVC); + encMeta->setInt32(kKeyWidth, width); + encMeta->setInt32(kKeyHeight, height); + + OMXClient client; + client.connect(); + + mEncoder = OMXCodec::Create( + client.interface(), encMeta, + true /* createEncoder */, source); + + mEncoder->start(); + + MediaBuffer *buffer; + CHECK_EQ(mEncoder->read(&buffer), (status_t)OK); + CHECK(buffer != NULL); + + makeH264SPropParamSets(buffer); + + buffer->release(); + buffer = NULL; +#endif + } + + uint64_t ntpTime() { + struct timeval tv; + gettimeofday(&tv, NULL); + + uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec; + + nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; + + uint64_t hi = nowUs / 1000000ll; + uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; + + return (hi << 32) | lo; + } + + void issueAnnounce() { + AString sdp; + sdp = "v=0\r\n"; + + sdp.append("o=- "); + + uint64_t ntp = ntpTime(); + sdp.append(ntp); + sdp.append(" "); + sdp.append(ntp); + sdp.append(" IN IP4 127.0.0.0\r\n"); + + sdp.append( + "s=Sample\r\n" + "i=Playing around with ANNOUNCE\r\n" + "c=IN IP4 "); + + struct in_addr addr; + addr.s_addr = htonl(mServerIP); + + sdp.append(inet_ntoa(addr)); + + sdp.append( + "\r\n" + "t=0 0\r\n" + "a=range:npt=now-\r\n"); + +#ifdef ANDROID + sp<MetaData> meta = mEncoder->getFormat(); + int32_t width, height; + CHECK(meta->findInt32(kKeyWidth, &width)); + CHECK(meta->findInt32(kKeyHeight, &height)); + + sdp.append( + "m=video 0 RTP/AVP " PT_STR "\r\n" + "b=AS 320000\r\n" + "a=rtpmap:" PT_STR " H264/90000\r\n"); + + sdp.append("a=cliprect 0,0,"); + sdp.append(height); + sdp.append(","); + sdp.append(width); + sdp.append("\r\n"); + + sdp.append( + "a=framesize:" PT_STR " "); + sdp.append(width); + sdp.append("-"); + sdp.append(height); + sdp.append("\r\n"); + + sdp.append( + "a=fmtp:" PT_STR " profile-level-id=42C015;sprop-parameter-sets="); + + sdp.append(mSeqParamSet); + sdp.append(","); + sdp.append(mPicParamSet); + sdp.append(";packetization-mode=1\r\n"); +#else + sdp.append( + "m=audio 0 RTP/AVP " PT_STR "\r\n" + "a=rtpmap:" PT_STR " L8/8000/1\r\n"); +#endif + + sdp.append("a=control:" TRACK_SUFFIX "\r\n"); + + AString request; + request.append("ANNOUNCE "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "ANNOUNCE", mStreamURL.c_str()); + + request.append("Content-Type: application/sdp\r\n"); + request.append("Content-Length: "); + request.append(sdp.size()); + request.append("\r\n"); + + request.append("\r\n"); + request.append(sdp); + + sp<AMessage> reply = new AMessage('anno', id()); + mConn->sendRequest(request.c_str(), reply); + } + + void H(const AString &s, AString *out) { + out->clear(); + + MD5_CTX m; + MD5_Init(&m); + MD5_Update(&m, s.c_str(), s.size()); + + uint8_t key[16]; + MD5_Final(key, &m); + + for (size_t i = 0; i < 16; ++i) { + char nibble = key[i] >> 4; + if (nibble <= 9) { + nibble += '0'; + } else { + nibble += 'a' - 10; + } + out->append(&nibble, 1); + + nibble = key[i] & 0x0f; + if (nibble <= 9) { + nibble += '0'; + } else { + nibble += 'a' - 10; + } + out->append(&nibble, 1); + } + } + + void authenticate(const sp<ARTSPResponse> &response) { + ssize_t i = response->mHeaders.indexOfKey("www-authenticate"); + CHECK_GE(i, 0); + + AString value = response->mHeaders.valueAt(i); + + if (!strncmp(value.c_str(), "Basic", 5)) { + mAuthType = BASIC; + } else { + CHECK(!strncmp(value.c_str(), "Digest", 6)); + mAuthType = DIGEST; + + i = value.find("nonce="); + CHECK_GE(i, 0); + CHECK_EQ(value.c_str()[i + 6], '\"'); + ssize_t j = value.find("\"", i + 7); + CHECK_GE(j, 0); + + mNonce.setTo(value, i + 7, j - i - 7); + } + + issueAnnounce(); + } + + void addAuthentication( + AString *request, const char *method, const char *url) { + if (mAuthType == NONE) { + return; + } + + if (mAuthType == BASIC) { + request->append("Authorization: Basic YmNhc3Q6dGVzdAo=\r\n"); + return; + } + + CHECK_EQ((int)mAuthType, (int)DIGEST); + + AString A1; + A1.append(USERNAME); + A1.append(":"); + A1.append("Streaming Server"); + A1.append(":"); + A1.append(PASSWORD); + + AString A2; + A2.append(method); + A2.append(":"); + A2.append(url); + + AString HA1, HA2; + H(A1, &HA1); + H(A2, &HA2); + + AString tmp; + tmp.append(HA1); + tmp.append(":"); + tmp.append(mNonce); + tmp.append(":"); + tmp.append(HA2); + + AString digest; + H(tmp, &digest); + + request->append("Authorization: Digest "); + request->append("nonce=\""); + request->append(mNonce); + request->append("\", "); + request->append("username=\"" USERNAME "\", "); + request->append("uri=\""); + request->append(url); + request->append("\", "); + request->append("response=\""); + request->append(digest); + request->append("\""); + request->append("\r\n"); + } + + virtual void onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case 'conn': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "connection request completed with result " + << result << " (" << strerror(-result) << ")"; + + if (result != OK) { + (new AMessage('quit', id()))->post(); + break; + } + + mConnected = true; + + CHECK(msg->findInt32("server-ip", (int32_t *)&mServerIP)); + + issueAnnounce(); + break; + } + + case 'anno': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "ANNOUNCE completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + + if (response->mStatusCode == 401) { + if (mAuthType != NONE) { + LOG(INFO) << "FAILED to authenticate"; + (new AMessage('quit', id()))->post(); + break; + } + + authenticate(response); + break; + } + } + + if (result != OK || response->mStatusCode != 200) { + (new AMessage('quit', id()))->post(); + break; + } + + unsigned rtpPort; + ARTPConnection::MakePortPair(&mRTPSocket, &mRTCPSocket, &rtpPort); + + // (new AMessage('poll', id()))->post(); + + AString request; + request.append("SETUP "); + request.append(mTrackURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "SETUP", mTrackURL.c_str()); + + request.append("Transport: RTP/AVP;unicast;client_port="); + request.append(rtpPort); + request.append("-"); + request.append(rtpPort + 1); + request.append(";mode=record\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('setu', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + +#if 0 + case 'poll': + { + fd_set rs; + FD_ZERO(&rs); + FD_SET(mRTCPSocket, &rs); + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + int res = select(mRTCPSocket + 1, &rs, NULL, NULL, &tv); + + if (res == 1) { + sp<ABuffer> buffer = new ABuffer(65536); + ssize_t n = recv(mRTCPSocket, buffer->data(), buffer->size(), 0); + + if (n <= 0) { + LOG(ERROR) << "recv returned " << n; + } else { + LOG(INFO) << "recv returned " << n << " bytes of data."; + + hexdump(buffer->data(), n); + } + } + + msg->post(50000); + break; + } +#endif + + case 'setu': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "SETUP completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + } + + if (result != OK || response->mStatusCode != 200) { + (new AMessage('quit', id()))->post(); + break; + } + + ssize_t i = response->mHeaders.indexOfKey("session"); + CHECK_GE(i, 0); + mSessionID = response->mHeaders.valueAt(i); + i = mSessionID.find(";"); + if (i >= 0) { + // Remove options, i.e. ";timeout=90" + mSessionID.erase(i, mSessionID.size() - i); + } + + i = response->mHeaders.indexOfKey("transport"); + CHECK_GE(i, 0); + AString transport = response->mHeaders.valueAt(i); + + LOG(INFO) << "transport = '" << transport << "'"; + + AString value; + CHECK(GetAttribute(transport.c_str(), "server_port", &value)); + + unsigned rtpPort, rtcpPort; + CHECK_EQ(sscanf(value.c_str(), "%u-%u", &rtpPort, &rtcpPort), 2); + + CHECK(GetAttribute(transport.c_str(), "source", &value)); + + memset(mRemoteAddr.sin_zero, 0, sizeof(mRemoteAddr.sin_zero)); + mRemoteAddr.sin_family = AF_INET; + mRemoteAddr.sin_addr.s_addr = inet_addr(value.c_str()); + mRemoteAddr.sin_port = htons(rtpPort); + + mRemoteRTCPAddr = mRemoteAddr; + mRemoteRTCPAddr.sin_port = htons(rtpPort + 1); + + CHECK_EQ(0, connect(mRTPSocket, + (const struct sockaddr *)&mRemoteAddr, + sizeof(mRemoteAddr))); + + CHECK_EQ(0, connect(mRTCPSocket, + (const struct sockaddr *)&mRemoteRTCPAddr, + sizeof(mRemoteRTCPAddr))); + + uint32_t x = ntohl(mRemoteAddr.sin_addr.s_addr); + LOG(INFO) << "sending data to " + << (x >> 24) + << "." + << ((x >> 16) & 0xff) + << "." + << ((x >> 8) & 0xff) + << "." + << (x & 0xff) + << ":" + << rtpPort; + + AString request; + request.append("RECORD "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "RECORD", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('reco', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'reco': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "RECORD completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + } + + if (result != OK) { + (new AMessage('quit', id()))->post(); + break; + } + + (new AMessage('more', id()))->post(); + (new AMessage('sr ', id()))->post(); + (new AMessage('aliv', id()))->post(30000000ll); + break; + } + + case 'aliv': + { + if (!mConnected) { + break; + } + + AString request; + request.append("OPTIONS "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "RECORD", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('opts', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'opts': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "OPTIONS completed with result " + << result << " (" << strerror(-result) << ")"; + + if (!mConnected) { + break; + } + + (new AMessage('aliv', id()))->post(30000000ll); + break; + } + + case 'more': + { + if (!mConnected) { + break; + } + + sp<ABuffer> buffer = new ABuffer(65536); + uint8_t *data = buffer->data(); + data[0] = 0x80; + data[1] = (1 << 7) | PT; // M-bit + data[2] = (mSeqNo >> 8) & 0xff; + data[3] = mSeqNo & 0xff; + data[8] = mSourceID >> 24; + data[9] = (mSourceID >> 16) & 0xff; + data[10] = (mSourceID >> 8) & 0xff; + data[11] = mSourceID & 0xff; + +#ifdef ANDROID + MediaBuffer *mediaBuf = NULL; + for (;;) { + CHECK_EQ(mEncoder->read(&mediaBuf), (status_t)OK); + if (mediaBuf->range_length() > 0) { + break; + } + mediaBuf->release(); + mediaBuf = NULL; + } + + int64_t timeUs; + CHECK(mediaBuf->meta_data()->findInt64(kKeyTime, &timeUs)); + + uint32_t rtpTime = mRTPTimeBase + (timeUs * 9 / 100ll); + + const uint8_t *mediaData = + (const uint8_t *)mediaBuf->data() + mediaBuf->range_offset(); + + CHECK(!memcmp("\x00\x00\x00\x01", mediaData, 4)); + + CHECK_LE(mediaBuf->range_length() - 4 + 12, buffer->size()); + + memcpy(&data[12], + mediaData + 4, mediaBuf->range_length() - 4); + + buffer->setRange(0, mediaBuf->range_length() - 4 + 12); + + mediaBuf->release(); + mediaBuf = NULL; +#else + uint32_t rtpTime = mRTPTimeBase + mNumRTPSent * 128; + memset(&data[12], 0, 128); + buffer->setRange(0, 12 + 128); +#endif + + data[4] = rtpTime >> 24; + data[5] = (rtpTime >> 16) & 0xff; + data[6] = (rtpTime >> 8) & 0xff; + data[7] = rtpTime & 0xff; + + ssize_t n = send( + mRTPSocket, data, buffer->size(), 0); + if (n < 0) { + LOG(ERROR) << "send failed (" << strerror(errno) << ")"; + } + CHECK_EQ(n, (ssize_t)buffer->size()); + + ++mSeqNo; + + ++mNumRTPSent; + mNumRTPOctetsSent += buffer->size() - 12; + + mLastRTPTime = rtpTime; + mLastNTPTime = ntpTime(); + +#ifdef ANDROID + if (mNumRTPSent < 60 * 25) { // 60 secs worth + msg->post(40000); +#else + if (mNumRTPOctetsSent < 8000 * 60) { + msg->post(1000000ll * 128 / 8000); +#endif + } else { + LOG(INFO) << "That's enough, pausing."; + + AString request; + request.append("PAUSE "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "PAUSE", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('paus', id()); + mConn->sendRequest(request.c_str(), reply); + } + break; + } + + case 'sr ': + { + if (!mConnected) { + break; + } + + sp<ABuffer> buffer = new ABuffer(65536); + buffer->setRange(0, 0); + + addSR(buffer); + addSDES(buffer); + + uint8_t *data = buffer->data(); + ssize_t n = send( + mRTCPSocket, data, buffer->size(), 0); + CHECK_EQ(n, (ssize_t)buffer->size()); + + msg->post(3000000); + break; + } + + case 'paus': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "PAUSE completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + AString request; + request.append("TEARDOWN "); + request.append(mStreamURL); + request.append(" RTSP/1.0\r\n"); + + addAuthentication(&request, "TEARDOWN", mStreamURL.c_str()); + + request.append("Session: "); + request.append(mSessionID); + request.append("\r\n"); + request.append("\r\n"); + + sp<AMessage> reply = new AMessage('tear', id()); + mConn->sendRequest(request.c_str(), reply); + break; + } + + case 'tear': + { + int32_t result; + CHECK(msg->findInt32("result", &result)); + + LOG(INFO) << "TEARDOWN completed with result " + << result << " (" << strerror(-result) << ")"; + + sp<RefBase> obj; + CHECK(msg->findObject("response", &obj)); + sp<ARTSPResponse> response; + + if (result == OK) { + response = static_cast<ARTSPResponse *>(obj.get()); + CHECK(response != NULL); + } + + (new AMessage('quit', id()))->post(); + break; + } + + case 'disc': + { + LOG(INFO) << "disconnect completed"; + + mConnected = false; + (new AMessage('quit', id()))->post(); + break; + } + + case 'quit': + { + if (mConnected) { + mConn->disconnect(new AMessage('disc', id())); + break; + } + + if (mRTPSocket >= 0) { + close(mRTPSocket); + mRTPSocket = -1; + } + + if (mRTCPSocket >= 0) { + close(mRTCPSocket); + mRTCPSocket = -1; + } + +#ifdef ANDROID + mEncoder->stop(); + mEncoder.clear(); +#endif + + mLooper->stop(); + break; + } + + default: + TRESPASS(); + } + } + +protected: + virtual ~MyTransmitter() { + } + +private: + enum AuthType { + NONE, + BASIC, + DIGEST + }; + + AString mServerURL; + AString mTrackURL; + AString mStreamURL; + + sp<ALooper> mLooper; + sp<ARTSPConnection> mConn; + bool mConnected; + uint32_t mServerIP; + AuthType mAuthType; + AString mNonce; + AString mSessionID; + int mRTPSocket, mRTCPSocket; + uint32_t mSourceID; + uint32_t mSeqNo; + uint32_t mRTPTimeBase; + struct sockaddr_in mRemoteAddr; + struct sockaddr_in mRemoteRTCPAddr; + size_t mNumSamplesSent; + uint32_t mNumRTPSent; + uint32_t mNumRTPOctetsSent; + uint32_t mLastRTPTime; + uint64_t mLastNTPTime; + +#ifdef ANDROID + sp<MediaSource> mEncoder; + AString mSeqParamSet; + AString mPicParamSet; + + void makeH264SPropParamSets(MediaBuffer *buffer) { + static const char kStartCode[] = "\x00\x00\x00\x01"; + + const uint8_t *data = + (const uint8_t *)buffer->data() + buffer->range_offset(); + size_t size = buffer->range_length(); + + CHECK_GE(size, 0u); + CHECK(!memcmp(kStartCode, data, 4)); + + data += 4; + size -= 4; + + size_t startCodePos = 0; + while (startCodePos + 3 < size + && memcmp(kStartCode, &data[startCodePos], 4)) { + ++startCodePos; + } + + CHECK_LT(startCodePos + 3, size); + + encodeBase64(data, startCodePos, &mSeqParamSet); + + encodeBase64(&data[startCodePos + 4], size - startCodePos - 4, + &mPicParamSet); + } +#endif + + void addSR(const sp<ABuffer> &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + + data[0] = 0x80 | 0; + data[1] = 200; // SR + data[2] = 0; + data[3] = 6; + data[4] = mSourceID >> 24; + data[5] = (mSourceID >> 16) & 0xff; + data[6] = (mSourceID >> 8) & 0xff; + data[7] = mSourceID & 0xff; + + data[8] = mLastNTPTime >> (64 - 8); + data[9] = (mLastNTPTime >> (64 - 16)) & 0xff; + data[10] = (mLastNTPTime >> (64 - 24)) & 0xff; + data[11] = (mLastNTPTime >> 32) & 0xff; + data[12] = (mLastNTPTime >> 24) & 0xff; + data[13] = (mLastNTPTime >> 16) & 0xff; + data[14] = (mLastNTPTime >> 8) & 0xff; + data[15] = mLastNTPTime & 0xff; + + data[16] = (mLastRTPTime >> 24) & 0xff; + data[17] = (mLastRTPTime >> 16) & 0xff; + data[18] = (mLastRTPTime >> 8) & 0xff; + data[19] = mLastRTPTime & 0xff; + + data[20] = mNumRTPSent >> 24; + data[21] = (mNumRTPSent >> 16) & 0xff; + data[22] = (mNumRTPSent >> 8) & 0xff; + data[23] = mNumRTPSent & 0xff; + + data[24] = mNumRTPOctetsSent >> 24; + data[25] = (mNumRTPOctetsSent >> 16) & 0xff; + data[26] = (mNumRTPOctetsSent >> 8) & 0xff; + data[27] = mNumRTPOctetsSent & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + 28); + } + + void addSDES(const sp<ABuffer> &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + data[0] = 0x80 | 1; + data[1] = 202; // SDES + data[4] = mSourceID >> 24; + data[5] = (mSourceID >> 16) & 0xff; + data[6] = (mSourceID >> 8) & 0xff; + data[7] = mSourceID & 0xff; + + size_t offset = 8; + + data[offset++] = 1; // CNAME + + static const char *kCNAME = "andih@laptop"; + data[offset++] = strlen(kCNAME); + + memcpy(&data[offset], kCNAME, strlen(kCNAME)); + offset += strlen(kCNAME); + + data[offset++] = 7; // NOTE + + static const char *kNOTE = "Hell's frozen over."; + data[offset++] = strlen(kNOTE); + + memcpy(&data[offset], kNOTE, strlen(kNOTE)); + offset += strlen(kNOTE); + + data[offset++] = 0; + + if ((offset % 4) > 0) { + size_t count = 4 - (offset % 4); + switch (count) { + case 3: + data[offset++] = 0; + case 2: + data[offset++] = 0; + case 1: + data[offset++] = 0; + } + } + + size_t numWords = (offset / 4) - 1; + data[2] = numWords >> 8; + data[3] = numWords & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + offset); + } + + DISALLOW_EVIL_CONSTRUCTORS(MyTransmitter); +}; + +} // namespace android + +#endif // MY_TRANSMITTER_H_ diff --git a/media/libstagefright/rtsp/VideoSource.h b/media/libstagefright/rtsp/VideoSource.h new file mode 100644 index 0000000..ae0c85b --- /dev/null +++ b/media/libstagefright/rtsp/VideoSource.h @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VIDEO_SOURCE_H_ + +#define VIDEO_SOURCE_H_ + +#include <media/stagefright/MediaBufferGroup.h> +#include <media/stagefright/MediaDefs.h> +#include <media/stagefright/MediaSource.h> +#include <media/stagefright/MetaData.h> + +namespace android { + +class VideoSource : public MediaSource { + static const int32_t kFramerate = 24; // fps + +public: + VideoSource(int width, int height) + : mWidth(width), + mHeight(height), + mSize((width * height * 3) / 2) { + mGroup.add_buffer(new MediaBuffer(mSize)); + } + + virtual sp<MetaData> getFormat() { + sp<MetaData> meta = new MetaData; + meta->setInt32(kKeyWidth, mWidth); + meta->setInt32(kKeyHeight, mHeight); + meta->setCString(kKeyMIMEType, MEDIA_MIMETYPE_VIDEO_RAW); + + return meta; + } + + virtual status_t start(MetaData *params) { + mNumFramesOutput = 0; + return OK; + } + + virtual status_t stop() { + return OK; + } + + virtual status_t read( + MediaBuffer **buffer, const MediaSource::ReadOptions *options) { + if (mNumFramesOutput == kFramerate * 100) { + // Stop returning data after 10 secs. + return ERROR_END_OF_STREAM; + } + + // printf("VideoSource::read\n"); + status_t err = mGroup.acquire_buffer(buffer); + if (err != OK) { + return err; + } + + char x = (char)((double)rand() / RAND_MAX * 255); + memset((*buffer)->data(), x, mSize); + (*buffer)->set_range(0, mSize); + (*buffer)->meta_data()->clear(); + (*buffer)->meta_data()->setInt64( + kKeyTime, (mNumFramesOutput * 1000000) / kFramerate); + ++mNumFramesOutput; + + // printf("VideoSource::read - returning buffer\n"); + // LOG(INFO)("VideoSource::read - returning buffer"); + return OK; + } + +protected: + virtual ~VideoSource() {} + +private: + MediaBufferGroup mGroup; + int mWidth, mHeight; + size_t mSize; + int64_t mNumFramesOutput;; + + VideoSource(const VideoSource &); + VideoSource &operator=(const VideoSource &); +}; + +} // namespace android + +#endif // VIDEO_SOURCE_H_ |