From a556c4822fc205db0d27834ba5b637c351d73ffa Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 5 Mar 2013 10:56:27 -0800 Subject: Squashed commit of the following: commit e5919b1f57ea61fa1d380dfdb4e3e832ce73d79d Author: Andreas Huber Date: Wed Feb 27 16:38:48 2013 -0800 Configure TCP datagram sockets to be TCP_NODELAY. Change-Id: Ia724a81e6e27dccd00ac84603e712d69ca77a0cd commit 1b52b393183db8a6dc000a7c31baac544ccfc50c Author: Andreas Huber Date: Wed Feb 27 14:26:01 2013 -0800 Send IDR frame requests on packet loss. Change-Id: I53b7fb85cbd6923491113b93ec3e2175726d654a commit 68d76b4b3a0181b30abc57cd2915273210530a6d Author: Andreas Huber Date: Tue Feb 26 15:12:34 2013 -0800 Revive TunnelRenderer Change-Id: I8c5a9d982793b1c5b841c828227b354f1dab618c commit 3df28a8e9d8bcdc1430016bb088d097eca653b56 Author: Andreas Huber Date: Tue Feb 26 13:53:14 2013 -0800 Disable suspension of video updates. Change-Id: I7e3a16b8d7dd7a55d9f962a2236388931f664106 commit 2ec7a79de019a26ec415016c1478afd762f069cd Author: Andreas Huber Date: Tue Feb 26 08:54:40 2013 -0800 Adds an SNTP client to wfd. Change-Id: Icd7d6104e951e1443e4c1b81ccf6b3731d79d3ec commit c81c3bb5725bb4079a4d7fb02151ad0bb540632f Author: Andreas Huber Date: Mon Feb 25 10:00:58 2013 -0800 Squashed commit of the following: commit b83a4ec96659ef6f6b7c2090fdd866abe3ab78ba Author: Andreas Huber Date: Mon Feb 25 09:28:11 2013 -0800 Some reorganization of the rtp code, renamed StreamHub -> MediaSender Change-Id: I8cf67444960e60426bf74880af1acce41e8b2fef commit 7769cbd739f2a67c58e0c6a7b1a21a12210c7c4d Author: Andreas Huber Date: Fri Feb 22 16:12:18 2013 -0800 Choose a smaller MTU to avoid fragmented IPv4 packets, fix AVC assembler. Change-Id: I274b3cc1483c4e9f4d146dbf9f3d9f7557ef7ef9 commit 1f687ee80a88b56d614c2cf408ff729114ff86a0 Author: Andreas Huber Date: Fri Feb 22 11:38:31 2013 -0800 better reporting. Change-Id: I67f0bb51f106ea77f5cc75938b053c8e8e8f688e commit 7950c1cd59213eb5f281fcde44a772ecffae473d Author: Andreas Huber Date: Fri Feb 22 09:07:41 2013 -0800 stuff Change-Id: Ib99416366d3eec6e6ad69b4d791a8a9408410f3b commit 33c09045b0f86fcaa4619cbd679b47a074f71231 Author: Andreas Huber Date: Thu Feb 21 15:54:01 2013 -0800 Render frames according to their timestamps. Change-Id: I8143a95cffe775799d6a4bb093558bd7abb1f063 commit d8b6daae2160bf1c016d7c6251256b46bb89db42 Author: Andreas Huber Date: Thu Feb 21 15:01:27 2013 -0800 Better packet-lost logic. Change-Id: I611eee5a42bd089638cf45b0e16f628ff2a955ab commit 782c6b15717e2d062d96665a089d06c0577733d0 Author: Andreas Huber Date: Wed Feb 20 15:06:47 2013 -0800 Add a dedicated looper for the MediaReceiver Change-Id: I3b79cad367fb69c9a160a8d009af8c5f5142b98e commit 4c7b8b10861674b773270103bcabd1a99486a691 Author: Andreas Huber Date: Wed Feb 20 14:30:28 2013 -0800 Tweaks to RTPSender and RTPReceiver Change-Id: Ib535552f289a26cfead6df8c63e4c63d3987d4e9 commit 39226b28177a816cda5c67b321745d396b18277d Author: Andreas Huber Date: Tue Feb 19 08:48:25 2013 -0800 Playing around with non muxed delivery Change-Id: I845375f6938d04bc30502840c2ceb7688dc9b237 commit c16d21de75d8ecdbcd9abce14934afe484970061 Author: Andreas Huber Date: Wed Feb 13 14:43:35 2013 -0800 A more solid base for RTP communication. Change-Id: I52033eeb0feba0ff029d61553a821c82f2fa1c3f Change-Id: I57e3bcfc1c59a012b15aaaa42ed81f09c34c26bb Change-Id: I4b09db4a44d0eeded7a1658f6dc6c97d4b8be720 --- .../wifi-display/rtp/RTPAssembler.cpp | 324 ++++++++ .../libstagefright/wifi-display/rtp/RTPAssembler.h | 92 +++ media/libstagefright/wifi-display/rtp/RTPBase.h | 49 ++ .../wifi-display/rtp/RTPReceiver.cpp | 899 +++++++++++++++++++++ .../libstagefright/wifi-display/rtp/RTPReceiver.h | 110 +++ .../libstagefright/wifi-display/rtp/RTPSender.cpp | 701 ++++++++++++++++ media/libstagefright/wifi-display/rtp/RTPSender.h | 112 +++ 7 files changed, 2287 insertions(+) create mode 100644 media/libstagefright/wifi-display/rtp/RTPAssembler.cpp create mode 100644 media/libstagefright/wifi-display/rtp/RTPAssembler.h create mode 100644 media/libstagefright/wifi-display/rtp/RTPBase.h create mode 100644 media/libstagefright/wifi-display/rtp/RTPReceiver.cpp create mode 100644 media/libstagefright/wifi-display/rtp/RTPReceiver.h create mode 100644 media/libstagefright/wifi-display/rtp/RTPSender.cpp create mode 100644 media/libstagefright/wifi-display/rtp/RTPSender.h (limited to 'media/libstagefright/wifi-display/rtp') diff --git a/media/libstagefright/wifi-display/rtp/RTPAssembler.cpp b/media/libstagefright/wifi-display/rtp/RTPAssembler.cpp new file mode 100644 index 0000000..d0ab60d --- /dev/null +++ b/media/libstagefright/wifi-display/rtp/RTPAssembler.cpp @@ -0,0 +1,324 @@ +/* + * Copyright 2013, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "RTPAssembler" +#include + +#include "RTPAssembler.h" + +#include +#include +#include +#include +#include + +namespace android { + +RTPReceiver::Assembler::Assembler(const sp ¬ify) + : mNotify(notify) { +} + +void RTPReceiver::Assembler::postAccessUnit( + const sp &accessUnit, bool followsDiscontinuity) { + sp notify = mNotify->dup(); + notify->setInt32("what", RTPReceiver::kWhatAccessUnit); + notify->setBuffer("accessUnit", accessUnit); + notify->setInt32("followsDiscontinuity", followsDiscontinuity); + notify->post(); +} + +//////////////////////////////////////////////////////////////////////////////// + +RTPReceiver::TSAssembler::TSAssembler(const sp ¬ify) + : Assembler(notify), + mSawDiscontinuity(false) { +} + +void RTPReceiver::TSAssembler::signalDiscontinuity() { + mSawDiscontinuity = true; +} + +status_t RTPReceiver::TSAssembler::processPacket(const sp &packet) { + postAccessUnit(packet, mSawDiscontinuity); + + if (mSawDiscontinuity) { + mSawDiscontinuity = false; + } + + return OK; +} + +//////////////////////////////////////////////////////////////////////////////// + +RTPReceiver::H264Assembler::H264Assembler(const sp ¬ify) + : Assembler(notify), + mState(0), + mIndicator(0), + mNALType(0), + mAccessUnitRTPTime(0) { +} + +void RTPReceiver::H264Assembler::signalDiscontinuity() { + reset(); +} + +status_t RTPReceiver::H264Assembler::processPacket(const sp &packet) { + status_t err = internalProcessPacket(packet); + + if (err != OK) { + reset(); + } + + return err; +} + +status_t RTPReceiver::H264Assembler::internalProcessPacket( + const sp &packet) { + const uint8_t *data = packet->data(); + size_t size = packet->size(); + + switch (mState) { + case 0: + { + if (size < 1 || (data[0] & 0x80)) { + ALOGV("Malformed H264 RTP packet (empty or F-bit set)"); + return ERROR_MALFORMED; + } + + unsigned nalType = data[0] & 0x1f; + if (nalType >= 1 && nalType <= 23) { + addSingleNALUnit(packet); + ALOGV("added single NAL packet"); + } else if (nalType == 28) { + // FU-A + unsigned indicator = data[0]; + CHECK((indicator & 0x1f) == 28); + + if (size < 2) { + ALOGV("Malformed H264 FU-A packet (single byte)"); + return ERROR_MALFORMED; + } + + if (!(data[1] & 0x80)) { + ALOGV("Malformed H264 FU-A packet (no start bit)"); + return ERROR_MALFORMED; + } + + mIndicator = data[0]; + mNALType = data[1] & 0x1f; + uint32_t nri = (data[0] >> 5) & 3; + + clearAccumulator(); + + uint8_t byte = mNALType | (nri << 5); + appendToAccumulator(&byte, 1); + appendToAccumulator(data + 2, size - 2); + + int32_t rtpTime; + CHECK(packet->meta()->findInt32("rtp-time", &rtpTime)); + mAccumulator->meta()->setInt32("rtp-time", rtpTime); + + if (data[1] & 0x40) { + // Huh? End bit also set on the first buffer. + addSingleNALUnit(mAccumulator); + clearAccumulator(); + + ALOGV("added FU-A"); + break; + } + + mState = 1; + } else if (nalType == 24) { + // STAP-A + + status_t err = addSingleTimeAggregationPacket(packet); + if (err != OK) { + return err; + } + } else { + ALOGV("Malformed H264 packet (unknown type %d)", nalType); + return ERROR_UNSUPPORTED; + } + break; + } + + case 1: + { + if (size < 2 + || data[0] != mIndicator + || (data[1] & 0x1f) != mNALType + || (data[1] & 0x80)) { + ALOGV("Malformed H264 FU-A packet (indicator, " + "type or start bit mismatch)"); + + return ERROR_MALFORMED; + } + + appendToAccumulator(data + 2, size - 2); + + if (data[1] & 0x40) { + addSingleNALUnit(mAccumulator); + + clearAccumulator(); + mState = 0; + + ALOGV("added FU-A"); + } + break; + } + + default: + TRESPASS(); + } + + int32_t marker; + CHECK(packet->meta()->findInt32("M", &marker)); + + if (marker) { + flushAccessUnit(); + } + + return OK; +} + +void RTPReceiver::H264Assembler::reset() { + mNALUnits.clear(); + + clearAccumulator(); + mState = 0; +} + +void RTPReceiver::H264Assembler::clearAccumulator() { + if (mAccumulator != NULL) { + // XXX Too expensive. + mAccumulator.clear(); + } +} + +void RTPReceiver::H264Assembler::appendToAccumulator( + const void *data, size_t size) { + if (mAccumulator == NULL) { + mAccumulator = new ABuffer(size); + memcpy(mAccumulator->data(), data, size); + return; + } + + if (mAccumulator->size() + size > mAccumulator->capacity()) { + sp buf = new ABuffer(mAccumulator->size() + size); + memcpy(buf->data(), mAccumulator->data(), mAccumulator->size()); + buf->setRange(0, mAccumulator->size()); + + int32_t rtpTime; + if (mAccumulator->meta()->findInt32("rtp-time", &rtpTime)) { + buf->meta()->setInt32("rtp-time", rtpTime); + } + + mAccumulator = buf; + } + + memcpy(mAccumulator->data() + mAccumulator->size(), data, size); + mAccumulator->setRange(0, mAccumulator->size() + size); +} + +void RTPReceiver::H264Assembler::addSingleNALUnit(const sp &packet) { + if (mNALUnits.empty()) { + int32_t rtpTime; + CHECK(packet->meta()->findInt32("rtp-time", &rtpTime)); + + mAccessUnitRTPTime = rtpTime; + } + + mNALUnits.push_back(packet); +} + +void RTPReceiver::H264Assembler::flushAccessUnit() { + if (mNALUnits.empty()) { + return; + } + + size_t totalSize = 0; + for (List >::iterator it = mNALUnits.begin(); + it != mNALUnits.end(); ++it) { + totalSize += 4 + (*it)->size(); + } + + sp accessUnit = new ABuffer(totalSize); + size_t offset = 0; + for (List >::iterator it = mNALUnits.begin(); + it != mNALUnits.end(); ++it) { + const sp nalUnit = *it; + + memcpy(accessUnit->data() + offset, "\x00\x00\x00\x01", 4); + + memcpy(accessUnit->data() + offset + 4, + nalUnit->data(), + nalUnit->size()); + + offset += 4 + nalUnit->size(); + } + + mNALUnits.clear(); + + accessUnit->meta()->setInt64("timeUs", mAccessUnitRTPTime * 100ll / 9ll); + postAccessUnit(accessUnit, false /* followsDiscontinuity */); +} + +status_t RTPReceiver::H264Assembler::addSingleTimeAggregationPacket( + const sp &packet) { + const uint8_t *data = packet->data(); + size_t size = packet->size(); + + if (size < 3) { + ALOGV("Malformed H264 STAP-A packet (too small)"); + return ERROR_MALFORMED; + } + + int32_t rtpTime; + CHECK(packet->meta()->findInt32("rtp-time", &rtpTime)); + + ++data; + --size; + while (size >= 2) { + size_t nalSize = (data[0] << 8) | data[1]; + + if (size < nalSize + 2) { + ALOGV("Malformed H264 STAP-A packet (incomplete NAL unit)"); + return ERROR_MALFORMED; + } + + sp unit = new ABuffer(nalSize); + memcpy(unit->data(), &data[2], nalSize); + + unit->meta()->setInt32("rtp-time", rtpTime); + + addSingleNALUnit(unit); + + data += 2 + nalSize; + size -= 2 + nalSize; + } + + if (size != 0) { + ALOGV("Unexpected padding at end of STAP-A packet."); + } + + ALOGV("added STAP-A"); + + return OK; +} + +} // namespace android + diff --git a/media/libstagefright/wifi-display/rtp/RTPAssembler.h b/media/libstagefright/wifi-display/rtp/RTPAssembler.h new file mode 100644 index 0000000..e456d32 --- /dev/null +++ b/media/libstagefright/wifi-display/rtp/RTPAssembler.h @@ -0,0 +1,92 @@ +/* + * Copyright 2013, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RTP_ASSEMBLER_H_ + +#define RTP_ASSEMBLER_H_ + +#include "RTPReceiver.h" + +namespace android { + +// A helper class to reassemble the payload of RTP packets into access +// units depending on the packetization scheme. +struct RTPReceiver::Assembler : public RefBase { + Assembler(const sp ¬ify); + + virtual void signalDiscontinuity() = 0; + virtual status_t processPacket(const sp &packet) = 0; + +protected: + virtual ~Assembler() {} + + void postAccessUnit( + const sp &accessUnit, bool followsDiscontinuity); + +private: + sp mNotify; + + DISALLOW_EVIL_CONSTRUCTORS(Assembler); +}; + +struct RTPReceiver::TSAssembler : public RTPReceiver::Assembler { + TSAssembler(const sp ¬ify); + + virtual void signalDiscontinuity(); + virtual status_t processPacket(const sp &packet); + +private: + bool mSawDiscontinuity; + + DISALLOW_EVIL_CONSTRUCTORS(TSAssembler); +}; + +struct RTPReceiver::H264Assembler : public RTPReceiver::Assembler { + H264Assembler(const sp ¬ify); + + virtual void signalDiscontinuity(); + virtual status_t processPacket(const sp &packet); + +private: + int32_t mState; + + uint8_t mIndicator; + uint8_t mNALType; + + sp mAccumulator; + + List > mNALUnits; + int32_t mAccessUnitRTPTime; + + status_t internalProcessPacket(const sp &packet); + + void addSingleNALUnit(const sp &packet); + status_t addSingleTimeAggregationPacket(const sp &packet); + + void flushAccessUnit(); + + void clearAccumulator(); + void appendToAccumulator(const void *data, size_t size); + + void reset(); + + DISALLOW_EVIL_CONSTRUCTORS(H264Assembler); +}; + +} // namespace android + +#endif // RTP_ASSEMBLER_H_ + diff --git a/media/libstagefright/wifi-display/rtp/RTPBase.h b/media/libstagefright/wifi-display/rtp/RTPBase.h new file mode 100644 index 0000000..6507a6f --- /dev/null +++ b/media/libstagefright/wifi-display/rtp/RTPBase.h @@ -0,0 +1,49 @@ +/* + * Copyright 2013, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RTP_BASE_H_ + +#define RTP_BASE_H_ + +namespace android { + +struct RTPBase { + enum PacketizationMode { + PACKETIZATION_TRANSPORT_STREAM, + PACKETIZATION_H264, + PACKETIZATION_AAC, + }; + + enum TransportMode { + TRANSPORT_UNDEFINED, + TRANSPORT_UDP, + TRANSPORT_TCP, + TRANSPORT_TCP_INTERLEAVED, + }; + + enum { + // Really UDP _payload_ size + kMaxUDPPacketSize = 1472, // 1472 good, 1473 bad on Android@Home + }; + + static int32_t PickRandomRTPPort(); +}; + +} // namespace android + +#endif // RTP_BASE_H_ + + diff --git a/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp b/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp new file mode 100644 index 0000000..29482af --- /dev/null +++ b/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp @@ -0,0 +1,899 @@ +/* + * Copyright 2013, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "RTPReceiver" +#include + +#include "RTPAssembler.h" +#include "RTPReceiver.h" + +#include "ANetworkSession.h" + +#include +#include +#include +#include +#include +#include + +namespace android { + +//////////////////////////////////////////////////////////////////////////////// + +struct RTPReceiver::Source : public RefBase { + Source(RTPReceiver *receiver, uint32_t ssrc); + + void onPacketReceived(uint16_t seq, const sp &buffer); + + void addReportBlock(uint32_t ssrc, const sp &buf); + +protected: + virtual ~Source(); + +private: + static const uint32_t kMinSequential = 2; + static const uint32_t kMaxDropout = 3000; + static const uint32_t kMaxMisorder = 100; + static const uint32_t kRTPSeqMod = 1u << 16; + static const int64_t kReportIntervalUs = 10000000ll; + + RTPReceiver *mReceiver; + uint32_t mSSRC; + bool mFirst; + uint16_t mMaxSeq; + uint32_t mCycles; + uint32_t mBaseSeq; + uint32_t mReceived; + uint32_t mExpectedPrior; + uint32_t mReceivedPrior; + + int64_t mFirstArrivalTimeUs; + int64_t mFirstRTPTimeUs; + + // Ordered by extended seq number. + List > mPackets; + + int32_t mAwaitingExtSeqNo; + bool mRequestedRetransmission; + + int32_t mActivePacketType; + sp mActiveAssembler; + + int64_t mNextReportTimeUs; + + int32_t mNumDeclaredLost; + int32_t mNumDeclaredLostPrior; + + void queuePacket(const sp &packet); + void dequeueMore(); + + sp getNextPacket(); + void resync(); + + DISALLOW_EVIL_CONSTRUCTORS(Source); +}; + +//////////////////////////////////////////////////////////////////////////////// + +RTPReceiver::Source::Source(RTPReceiver *receiver, uint32_t ssrc) + : mReceiver(receiver), + mSSRC(ssrc), + mFirst(true), + mMaxSeq(0), + mCycles(0), + mBaseSeq(0), + mReceived(0), + mExpectedPrior(0), + mReceivedPrior(0), + mFirstArrivalTimeUs(-1ll), + mFirstRTPTimeUs(-1ll), + mAwaitingExtSeqNo(-1), + mRequestedRetransmission(false), + mActivePacketType(-1), + mNextReportTimeUs(-1ll), + mNumDeclaredLost(0), + mNumDeclaredLostPrior(0) { +} + +RTPReceiver::Source::~Source() { +} + +void RTPReceiver::Source::onPacketReceived( + uint16_t seq, const sp &buffer) { + if (mFirst) { + buffer->setInt32Data(mCycles | seq); + queuePacket(buffer); + + mFirst = false; + mBaseSeq = seq; + mMaxSeq = seq; + ++mReceived; + return; + } + + uint16_t udelta = seq - mMaxSeq; + + if (udelta < kMaxDropout) { + // In order, with permissible gap. + + if (seq < mMaxSeq) { + // Sequence number wrapped - count another 64K cycle + mCycles += kRTPSeqMod; + } + + mMaxSeq = seq; + + ++mReceived; + } else if (udelta <= kRTPSeqMod - kMaxMisorder) { + // The sequence number made a very large jump + return; + } else { + // Duplicate or reordered packet. + } + + buffer->setInt32Data(mCycles | seq); + queuePacket(buffer); +} + +void RTPReceiver::Source::queuePacket(const sp &packet) { + int32_t newExtendedSeqNo = packet->int32Data(); + + if (mFirstArrivalTimeUs < 0ll) { + mFirstArrivalTimeUs = ALooper::GetNowUs(); + + uint32_t rtpTime; + CHECK(packet->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + mFirstRTPTimeUs = (rtpTime * 100ll) / 9ll; + } + + if (mAwaitingExtSeqNo >= 0 && newExtendedSeqNo < mAwaitingExtSeqNo) { + // We're no longer interested in these. They're old. + ALOGV("dropping stale extSeqNo %d", newExtendedSeqNo); + return; + } + + if (mPackets.empty()) { + mPackets.push_back(packet); + dequeueMore(); + return; + } + + List >::iterator firstIt = mPackets.begin(); + List >::iterator it = --mPackets.end(); + for (;;) { + int32_t extendedSeqNo = (*it)->int32Data(); + + if (extendedSeqNo == newExtendedSeqNo) { + // Duplicate packet. + return; + } + + if (extendedSeqNo < newExtendedSeqNo) { + // Insert new packet after the one at "it". + mPackets.insert(++it, packet); + break; + } + + if (it == firstIt) { + // Insert new packet before the first existing one. + mPackets.insert(it, packet); + break; + } + + --it; + } + + dequeueMore(); +} + +void RTPReceiver::Source::dequeueMore() { + int64_t nowUs = ALooper::GetNowUs(); + if (mNextReportTimeUs < 0ll || nowUs >= mNextReportTimeUs) { + if (mNextReportTimeUs >= 0ll) { + uint32_t expected = (mMaxSeq | mCycles) - mBaseSeq + 1; + + uint32_t expectedInterval = expected - mExpectedPrior; + mExpectedPrior = expected; + + uint32_t receivedInterval = mReceived - mReceivedPrior; + mReceivedPrior = mReceived; + + int64_t lostInterval = + (int64_t)expectedInterval - (int64_t)receivedInterval; + + int32_t declaredLostInterval = + mNumDeclaredLost - mNumDeclaredLostPrior; + + mNumDeclaredLostPrior = mNumDeclaredLost; + + ALOGI("lost %lld packets (%.2f %%), declared %d lost\n", + lostInterval, + 100.0f * lostInterval / expectedInterval, + declaredLostInterval); + } + + mNextReportTimeUs = nowUs + kReportIntervalUs; + } + + for (;;) { + sp packet = getNextPacket(); + + if (packet == NULL) { + if (mPackets.empty()) { + break; + } + + CHECK_GE(mAwaitingExtSeqNo, 0); + + const sp &firstPacket = *mPackets.begin(); + + uint32_t rtpTime; + CHECK(firstPacket->meta()->findInt32( + "rtp-time", (int32_t *)&rtpTime)); + + + int64_t rtpUs = (rtpTime * 100ll) / 9ll; + + int64_t maxArrivalTimeUs = + mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs; + + int64_t nowUs = ALooper::GetNowUs(); + + CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data()); + + ALOGV("waiting for %d, comparing against %d, %lld us left", + mAwaitingExtSeqNo, + firstPacket->int32Data(), + maxArrivalTimeUs - nowUs); + + if (maxArrivalTimeUs + kPacketLostAfterUs <= nowUs) { + ALOGV("Lost packet extSeqNo %d %s", + mAwaitingExtSeqNo, + mRequestedRetransmission ? "*" : ""); + + mRequestedRetransmission = false; + if (mActiveAssembler != NULL) { + mActiveAssembler->signalDiscontinuity(); + } + + // resync(); + ++mAwaitingExtSeqNo; + ++mNumDeclaredLost; + + mReceiver->notifyPacketLost(); + continue; + } else if (kRequestRetransmissionAfterUs > 0 + && maxArrivalTimeUs + kRequestRetransmissionAfterUs <= nowUs + && !mRequestedRetransmission + && mAwaitingExtSeqNo >= 0) { + mRequestedRetransmission = true; + mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo); + break; + } else { + break; + } + } + + mRequestedRetransmission = false; + + int32_t packetType; + CHECK(packet->meta()->findInt32("PT", &packetType)); + + if (packetType != mActivePacketType) { + mActiveAssembler = mReceiver->makeAssembler(packetType); + mActivePacketType = packetType; + } + + if (mActiveAssembler == NULL) { + continue; + } + + status_t err = mActiveAssembler->processPacket(packet); + if (err != OK) { + ALOGV("assembler returned error %d", err); + } + } +} + +sp RTPReceiver::Source::getNextPacket() { + if (mPackets.empty()) { + return NULL; + } + + int32_t extSeqNo = (*mPackets.begin())->int32Data(); + + if (mAwaitingExtSeqNo < 0) { + mAwaitingExtSeqNo = extSeqNo; + } else if (extSeqNo != mAwaitingExtSeqNo) { + return NULL; + } + + sp packet = *mPackets.begin(); + mPackets.erase(mPackets.begin()); + + ++mAwaitingExtSeqNo; + + return packet; +} + +void RTPReceiver::Source::resync() { + mAwaitingExtSeqNo = -1; +} + +void RTPReceiver::Source::addReportBlock( + uint32_t ssrc, const sp &buf) { + uint32_t extMaxSeq = mMaxSeq | mCycles; + uint32_t expected = extMaxSeq - mBaseSeq + 1; + + int64_t lost = (int64_t)expected - (int64_t)mReceived; + if (lost > 0x7fffff) { + lost = 0x7fffff; + } else if (lost < -0x800000) { + lost = -0x800000; + } + + uint32_t expectedInterval = expected - mExpectedPrior; + mExpectedPrior = expected; + + uint32_t receivedInterval = mReceived - mReceivedPrior; + mReceivedPrior = mReceived; + + int64_t lostInterval = expectedInterval - receivedInterval; + + uint8_t fractionLost; + if (expectedInterval == 0 || lostInterval <=0) { + fractionLost = 0; + } else { + fractionLost = (lostInterval << 8) / expectedInterval; + } + + uint8_t *ptr = buf->data() + buf->size(); + + ptr[0] = ssrc >> 24; + ptr[1] = (ssrc >> 16) & 0xff; + ptr[2] = (ssrc >> 8) & 0xff; + ptr[3] = ssrc & 0xff; + + ptr[4] = fractionLost; + + ptr[5] = (lost >> 16) & 0xff; + ptr[6] = (lost >> 8) & 0xff; + ptr[7] = lost & 0xff; + + ptr[8] = extMaxSeq >> 24; + ptr[9] = (extMaxSeq >> 16) & 0xff; + ptr[10] = (extMaxSeq >> 8) & 0xff; + ptr[11] = extMaxSeq & 0xff; + + // XXX TODO: + + ptr[12] = 0x00; // interarrival jitter + ptr[13] = 0x00; + ptr[14] = 0x00; + ptr[15] = 0x00; + + ptr[16] = 0x00; // last SR + ptr[17] = 0x00; + ptr[18] = 0x00; + ptr[19] = 0x00; + + ptr[20] = 0x00; // delay since last SR + ptr[21] = 0x00; + ptr[22] = 0x00; + ptr[23] = 0x00; +} + +//////////////////////////////////////////////////////////////////////////////// + +RTPReceiver::RTPReceiver( + const sp &netSession, + const sp ¬ify) + : mNetSession(netSession), + mNotify(notify), + mMode(TRANSPORT_UNDEFINED), + mRTPSessionID(0), + mRTCPSessionID(0), + mRTPClientSessionID(0) { +} + +RTPReceiver::~RTPReceiver() { + if (mRTPClientSessionID != 0) { + mNetSession->destroySession(mRTPClientSessionID); + mRTPClientSessionID = 0; + } + + if (mRTCPSessionID != 0) { + mNetSession->destroySession(mRTCPSessionID); + mRTCPSessionID = 0; + } + + if (mRTPSessionID != 0) { + mNetSession->destroySession(mRTPSessionID); + mRTPSessionID = 0; + } +} + +status_t RTPReceiver::initAsync(TransportMode mode, int32_t *outLocalRTPPort) { + if (mMode != TRANSPORT_UNDEFINED || mode == TRANSPORT_UNDEFINED) { + return INVALID_OPERATION; + } + + CHECK_NE(mMode, TRANSPORT_TCP_INTERLEAVED); + + sp rtpNotify = new AMessage(kWhatRTPNotify, id()); + + sp rtcpNotify; + if (mode == TRANSPORT_UDP) { + rtcpNotify = new AMessage(kWhatRTCPNotify, id()); + } + + CHECK_EQ(mRTPSessionID, 0); + CHECK_EQ(mRTCPSessionID, 0); + + int32_t localRTPPort; + + struct in_addr ifaceAddr; + ifaceAddr.s_addr = INADDR_ANY; + + for (;;) { + localRTPPort = PickRandomRTPPort(); + + status_t err; + if (mode == TRANSPORT_UDP) { + err = mNetSession->createUDPSession( + localRTPPort, + rtpNotify, + &mRTPSessionID); + } else { + CHECK_EQ(mode, TRANSPORT_TCP); + err = mNetSession->createTCPDatagramSession( + ifaceAddr, + localRTPPort, + rtpNotify, + &mRTPSessionID); + } + + if (err != OK) { + continue; + } + + if (mode == TRANSPORT_TCP) { + break; + } + + err = mNetSession->createUDPSession( + localRTPPort + 1, + rtcpNotify, + &mRTCPSessionID); + + if (err == OK) { + break; + } + + mNetSession->destroySession(mRTPSessionID); + mRTPSessionID = 0; + } + + mMode = mode; + *outLocalRTPPort = localRTPPort; + + return OK; +} + +status_t RTPReceiver::connect( + const char *remoteHost, int32_t remoteRTPPort, int32_t remoteRTCPPort) { + if (mMode == TRANSPORT_TCP) { + return OK; + } + + status_t err = mNetSession->connectUDPSession( + mRTPSessionID, remoteHost, remoteRTPPort); + + if (err != OK) { + notifyInitDone(err); + return err; + } + + ALOGI("connectUDPSession RTP successful."); + + if (remoteRTCPPort >= 0) { + err = mNetSession->connectUDPSession( + mRTCPSessionID, remoteHost, remoteRTCPPort); + + if (err != OK) { + ALOGI("connect failed w/ err %d", err); + + notifyInitDone(err); + return err; + } + + scheduleSendRR(); + } + + notifyInitDone(OK); + + return OK; +} + +void RTPReceiver::onMessageReceived(const sp &msg) { + switch (msg->what()) { + case kWhatRTPNotify: + case kWhatRTCPNotify: + onNetNotify(msg->what() == kWhatRTPNotify, msg); + break; + + case kWhatSendRR: + { + onSendRR(); + break; + } + + default: + TRESPASS(); + } +} + +void RTPReceiver::onNetNotify(bool isRTP, const sp &msg) { + int32_t reason; + CHECK(msg->findInt32("reason", &reason)); + + switch (reason) { + case ANetworkSession::kWhatError: + { + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + int32_t err; + CHECK(msg->findInt32("err", &err)); + + int32_t errorOccuredDuringSend; + CHECK(msg->findInt32("send", &errorOccuredDuringSend)); + + AString detail; + CHECK(msg->findString("detail", &detail)); + + ALOGE("An error occurred during %s in session %d " + "(%d, '%s' (%s)).", + errorOccuredDuringSend ? "send" : "receive", + sessionID, + err, + detail.c_str(), + strerror(-err)); + + mNetSession->destroySession(sessionID); + + if (sessionID == mRTPSessionID) { + mRTPSessionID = 0; + + if (mMode == TRANSPORT_TCP && mRTPClientSessionID == 0) { + notifyInitDone(err); + break; + } + } else if (sessionID == mRTCPSessionID) { + mRTCPSessionID = 0; + } else if (sessionID == mRTPClientSessionID) { + mRTPClientSessionID = 0; + } + + notifyError(err); + break; + } + + case ANetworkSession::kWhatDatagram: + { + sp data; + CHECK(msg->findBuffer("data", &data)); + + if (isRTP) { + onRTPData(data); + } else { + onRTCPData(data); + } + break; + } + + case ANetworkSession::kWhatClientConnected: + { + CHECK_EQ(mMode, TRANSPORT_TCP); + CHECK(isRTP); + + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + if (mRTPClientSessionID != 0) { + // We only allow a single client connection. + mNetSession->destroySession(sessionID); + sessionID = 0; + break; + } + + mRTPClientSessionID = sessionID; + + notifyInitDone(OK); + break; + } + } +} + +void RTPReceiver::notifyInitDone(status_t err) { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatInitDone); + notify->setInt32("err", err); + notify->post(); +} + +void RTPReceiver::notifyError(status_t err) { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatError); + notify->setInt32("err", err); + notify->post(); +} + +void RTPReceiver::notifyPacketLost() { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatPacketLost); + notify->post(); +} + +status_t RTPReceiver::onRTPData(const sp &buffer) { + size_t size = buffer->size(); + if (size < 12) { + // Too short to be a valid RTP header. + return ERROR_MALFORMED; + } + + const uint8_t *data = buffer->data(); + + if ((data[0] >> 6) != 2) { + // Unsupported version. + return ERROR_UNSUPPORTED; + } + + if (data[0] & 0x20) { + // Padding present. + + size_t paddingLength = data[size - 1]; + + if (paddingLength + 12 > size) { + // If we removed this much padding we'd end up with something + // that's too short to be a valid RTP header. + return ERROR_MALFORMED; + } + + size -= paddingLength; + } + + int numCSRCs = data[0] & 0x0f; + + size_t payloadOffset = 12 + 4 * numCSRCs; + + if (size < payloadOffset) { + // Not enough data to fit the basic header and all the CSRC entries. + return ERROR_MALFORMED; + } + + if (data[0] & 0x10) { + // Header eXtension present. + + if (size < payloadOffset + 4) { + // Not enough data to fit the basic header, all CSRC entries + // and the first 4 bytes of the extension header. + + return ERROR_MALFORMED; + } + + const uint8_t *extensionData = &data[payloadOffset]; + + size_t extensionLength = + 4 * (extensionData[2] << 8 | extensionData[3]); + + if (size < payloadOffset + 4 + extensionLength) { + return ERROR_MALFORMED; + } + + payloadOffset += 4 + extensionLength; + } + + uint32_t srcId = U32_AT(&data[8]); + uint32_t rtpTime = U32_AT(&data[4]); + uint16_t seqNo = U16_AT(&data[2]); + + sp meta = buffer->meta(); + meta->setInt32("ssrc", srcId); + meta->setInt32("rtp-time", rtpTime); + meta->setInt32("PT", data[1] & 0x7f); + meta->setInt32("M", data[1] >> 7); + + buffer->setRange(payloadOffset, size - payloadOffset); + + ssize_t index = mSources.indexOfKey(srcId); + sp source; + if (index < 0) { + source = new Source(this, srcId); + mSources.add(srcId, source); + } else { + source = mSources.valueAt(index); + } + + source->onPacketReceived(seqNo, buffer); + + return OK; +} + +status_t RTPReceiver::onRTCPData(const sp &data) { + ALOGI("onRTCPData"); + return OK; +} + +void RTPReceiver::addSDES(const sp &buffer) { + uint8_t *data = buffer->data() + buffer->size(); + data[0] = 0x80 | 1; + data[1] = 202; // SDES + data[4] = kSourceID >> 24; // SSRC + data[5] = (kSourceID >> 16) & 0xff; + data[6] = (kSourceID >> 8) & 0xff; + data[7] = kSourceID & 0xff; + + size_t offset = 8; + + data[offset++] = 1; // CNAME + + AString cname = "stagefright@somewhere"; + data[offset++] = cname.size(); + + memcpy(&data[offset], cname.c_str(), cname.size()); + offset += cname.size(); + + data[offset++] = 6; // TOOL + + AString tool = "stagefright/1.0"; + data[offset++] = tool.size(); + + memcpy(&data[offset], tool.c_str(), tool.size()); + offset += tool.size(); + + data[offset++] = 0; + + if ((offset % 4) > 0) { + size_t count = 4 - (offset % 4); + switch (count) { + case 3: + data[offset++] = 0; + case 2: + data[offset++] = 0; + case 1: + data[offset++] = 0; + } + } + + size_t numWords = (offset / 4) - 1; + data[2] = numWords >> 8; + data[3] = numWords & 0xff; + + buffer->setRange(buffer->offset(), buffer->size() + offset); +} + +void RTPReceiver::scheduleSendRR() { + (new AMessage(kWhatSendRR, id()))->post(5000000ll); +} + +void RTPReceiver::onSendRR() { +#if 0 + sp buf = new ABuffer(kMaxUDPPacketSize); + buf->setRange(0, 0); + + uint8_t *ptr = buf->data(); + ptr[0] = 0x80 | 0; + ptr[1] = 201; // RR + ptr[2] = 0; + ptr[3] = 1; + ptr[4] = kSourceID >> 24; // SSRC + ptr[5] = (kSourceID >> 16) & 0xff; + ptr[6] = (kSourceID >> 8) & 0xff; + ptr[7] = kSourceID & 0xff; + + buf->setRange(0, 8); + + size_t numReportBlocks = 0; + for (size_t i = 0; i < mSources.size(); ++i) { + uint32_t ssrc = mSources.keyAt(i); + sp source = mSources.valueAt(i); + + if (numReportBlocks > 31 || buf->size() + 24 > buf->capacity()) { + // Cannot fit another report block. + break; + } + + source->addReportBlock(ssrc, buf); + ++numReportBlocks; + } + + ptr[0] |= numReportBlocks; // 5 bit + + size_t sizeInWordsMinus1 = 1 + 6 * numReportBlocks; + ptr[2] = sizeInWordsMinus1 >> 8; + ptr[3] = sizeInWordsMinus1 & 0xff; + + buf->setRange(0, (sizeInWordsMinus1 + 1) * 4); + + addSDES(buf); + + mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size()); +#endif + + scheduleSendRR(); +} + +status_t RTPReceiver::registerPacketType( + uint8_t packetType, PacketizationMode mode) { + mPacketTypes.add(packetType, mode); + + return OK; +} + +sp RTPReceiver::makeAssembler(uint8_t packetType) { + ssize_t index = mPacketTypes.indexOfKey(packetType); + if (index < 0) { + return NULL; + } + + PacketizationMode mode = mPacketTypes.valueAt(index); + + switch (mode) { + case PACKETIZATION_TRANSPORT_STREAM: + return new TSAssembler(mNotify); + + case PACKETIZATION_H264: + return new H264Assembler(mNotify); + + default: + return NULL; + } +} + +void RTPReceiver::requestRetransmission(uint32_t senderSSRC, int32_t extSeqNo) { + int32_t blp = 0; + + sp buf = new ABuffer(16); + buf->setRange(0, 0); + + uint8_t *ptr = buf->data(); + ptr[0] = 0x80 | 1; // generic NACK + ptr[1] = 205; // TSFB + ptr[2] = 0; + ptr[3] = 3; + ptr[8] = (senderSSRC >> 24) & 0xff; + ptr[9] = (senderSSRC >> 16) & 0xff; + ptr[10] = (senderSSRC >> 8) & 0xff; + ptr[11] = (senderSSRC & 0xff); + ptr[8] = (kSourceID >> 24) & 0xff; + ptr[9] = (kSourceID >> 16) & 0xff; + ptr[10] = (kSourceID >> 8) & 0xff; + ptr[11] = (kSourceID & 0xff); + ptr[12] = (extSeqNo >> 8) & 0xff; + ptr[13] = (extSeqNo & 0xff); + ptr[14] = (blp >> 8) & 0xff; + ptr[15] = (blp & 0xff); + + buf->setRange(0, 16); + + mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size()); +} + +} // namespace android + diff --git a/media/libstagefright/wifi-display/rtp/RTPReceiver.h b/media/libstagefright/wifi-display/rtp/RTPReceiver.h new file mode 100644 index 0000000..2ae864a --- /dev/null +++ b/media/libstagefright/wifi-display/rtp/RTPReceiver.h @@ -0,0 +1,110 @@ +/* + * Copyright 2013, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RTP_RECEIVER_H_ + +#define RTP_RECEIVER_H_ + +#include "RTPBase.h" + +#include + +namespace android { + +struct ABuffer; +struct ANetworkSession; + +// An object of this class facilitates receiving of media data on an RTP +// channel. The channel is established over a UDP or TCP connection depending +// on which "TransportMode" was chosen. In addition different RTP packetization +// schemes are supported such as "Transport Stream Packets over RTP", +// or "AVC/H.264 encapsulation as specified in RFC 3984 (non-interleaved mode)" +struct RTPReceiver : public RTPBase, public AHandler { + enum { + kWhatInitDone, + kWhatError, + kWhatAccessUnit, + kWhatPacketLost, + }; + RTPReceiver( + const sp &netSession, + const sp ¬ify); + + status_t registerPacketType( + uint8_t packetType, PacketizationMode mode); + + status_t initAsync(TransportMode mode, int32_t *outLocalRTPPort); + + status_t connect( + const char *remoteHost, + int32_t remoteRTPPort, + int32_t remoteRTCPPort); + +protected: + virtual ~RTPReceiver(); + virtual void onMessageReceived(const sp &msg); + +private: + enum { + kWhatRTPNotify, + kWhatRTCPNotify, + kWhatSendRR, + }; + + enum { + kSourceID = 0xdeadbeef, + kPacketLostAfterUs = 100000, + kRequestRetransmissionAfterUs = -1, + }; + + struct Assembler; + struct H264Assembler; + struct Source; + struct TSAssembler; + + sp mNetSession; + sp mNotify; + TransportMode mMode; + int32_t mRTPSessionID; + int32_t mRTCPSessionID; + + int32_t mRTPClientSessionID; // in TRANSPORT_TCP mode. + + KeyedVector mPacketTypes; + KeyedVector > mSources; + + void onNetNotify(bool isRTP, const sp &msg); + status_t onRTPData(const sp &data); + status_t onRTCPData(const sp &data); + void onSendRR(); + + void scheduleSendRR(); + void addSDES(const sp &buffer); + + void notifyInitDone(status_t err); + void notifyError(status_t err); + void notifyPacketLost(); + + sp makeAssembler(uint8_t packetType); + + void requestRetransmission(uint32_t senderSSRC, int32_t extSeqNo); + + DISALLOW_EVIL_CONSTRUCTORS(RTPReceiver); +}; + +} // namespace android + +#endif // RTP_RECEIVER_H_ diff --git a/media/libstagefright/wifi-display/rtp/RTPSender.cpp b/media/libstagefright/wifi-display/rtp/RTPSender.cpp new file mode 100644 index 0000000..85c5933 --- /dev/null +++ b/media/libstagefright/wifi-display/rtp/RTPSender.cpp @@ -0,0 +1,701 @@ +/* + * Copyright 2013, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "RTPSender" +#include + +#include "RTPSender.h" + +#include "ANetworkSession.h" + +#include +#include +#include +#include +#include +#include + +#include "include/avc_utils.h" + +namespace android { + +RTPSender::RTPSender( + const sp &netSession, + const sp ¬ify) + : mNetSession(netSession), + mNotify(notify), + mMode(TRANSPORT_UNDEFINED), + mRTPSessionID(0), + mRTCPSessionID(0), + mRTPConnected(false), + mRTCPConnected(false), + mLastNTPTime(0), + mLastRTPTime(0), + mNumRTPSent(0), + mNumRTPOctetsSent(0), + mNumSRsSent(0), + mRTPSeqNo(0), + mHistorySize(0) { +} + +RTPSender::~RTPSender() { + if (mRTCPSessionID != 0) { + mNetSession->destroySession(mRTCPSessionID); + mRTCPSessionID = 0; + } + + if (mRTPSessionID != 0) { + mNetSession->destroySession(mRTPSessionID); + mRTPSessionID = 0; + } +} + +// static +int32_t RTPBase::PickRandomRTPPort() { + // Pick an even integer in range [1024, 65534) + + static const size_t kRange = (65534 - 1024) / 2; + + return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024; +} + +status_t RTPSender::initAsync( + TransportMode mode, + const char *remoteHost, + int32_t remoteRTPPort, + int32_t remoteRTCPPort, + int32_t *outLocalRTPPort) { + if (mMode != TRANSPORT_UNDEFINED || mode == TRANSPORT_UNDEFINED) { + return INVALID_OPERATION; + } + + CHECK_NE(mMode, TRANSPORT_TCP_INTERLEAVED); + + if (mode == TRANSPORT_TCP && remoteRTCPPort >= 0) { + return INVALID_OPERATION; + } + + sp rtpNotify = new AMessage(kWhatRTPNotify, id()); + + sp rtcpNotify; + if (remoteRTCPPort >= 0) { + rtcpNotify = new AMessage(kWhatRTCPNotify, id()); + } + + CHECK_EQ(mRTPSessionID, 0); + CHECK_EQ(mRTCPSessionID, 0); + + int32_t localRTPPort; + + for (;;) { + localRTPPort = PickRandomRTPPort(); + + status_t err; + if (mode == TRANSPORT_UDP) { + err = mNetSession->createUDPSession( + localRTPPort, + remoteHost, + remoteRTPPort, + rtpNotify, + &mRTPSessionID); + } else { + CHECK_EQ(mode, TRANSPORT_TCP); + err = mNetSession->createTCPDatagramSession( + localRTPPort, + remoteHost, + remoteRTPPort, + rtpNotify, + &mRTPSessionID); + } + + if (err != OK) { + continue; + } + + if (remoteRTCPPort < 0) { + break; + } + + if (mode == TRANSPORT_UDP) { + err = mNetSession->createUDPSession( + localRTPPort + 1, + remoteHost, + remoteRTCPPort, + rtcpNotify, + &mRTCPSessionID); + } else { + CHECK_EQ(mode, TRANSPORT_TCP); + err = mNetSession->createTCPDatagramSession( + localRTPPort + 1, + remoteHost, + remoteRTCPPort, + rtcpNotify, + &mRTCPSessionID); + } + + if (err == OK) { + break; + } + + mNetSession->destroySession(mRTPSessionID); + mRTPSessionID = 0; + } + + if (mode == TRANSPORT_UDP) { + mRTPConnected = true; + mRTCPConnected = true; + } + + mMode = mode; + *outLocalRTPPort = localRTPPort; + + if (mMode == TRANSPORT_UDP) { + notifyInitDone(OK); + } + + return OK; +} + +status_t RTPSender::queueBuffer( + const sp &buffer, uint8_t packetType, PacketizationMode mode) { + status_t err; + + switch (mode) { + case PACKETIZATION_TRANSPORT_STREAM: + err = queueTSPackets(buffer, packetType); + break; + + case PACKETIZATION_H264: + err = queueAVCBuffer(buffer, packetType); + break; + + default: + TRESPASS(); + } + + return err; +} + +status_t RTPSender::queueTSPackets( + const sp &tsPackets, uint8_t packetType) { + CHECK_EQ(0, tsPackets->size() % 188); + + const size_t numTSPackets = tsPackets->size() / 188; + + size_t srcOffset = 0; + while (srcOffset < tsPackets->size()) { + sp udpPacket = + new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188); + + udpPacket->setInt32Data(mRTPSeqNo); + + uint8_t *rtp = udpPacket->data(); + rtp[0] = 0x80; + rtp[1] = packetType; + + rtp[2] = (mRTPSeqNo >> 8) & 0xff; + rtp[3] = mRTPSeqNo & 0xff; + ++mRTPSeqNo; + + int64_t nowUs = ALooper::GetNowUs(); + uint32_t rtpTime = (nowUs * 9) / 100ll; + + rtp[4] = rtpTime >> 24; + rtp[5] = (rtpTime >> 16) & 0xff; + rtp[6] = (rtpTime >> 8) & 0xff; + rtp[7] = rtpTime & 0xff; + + rtp[8] = kSourceID >> 24; + rtp[9] = (kSourceID >> 16) & 0xff; + rtp[10] = (kSourceID >> 8) & 0xff; + rtp[11] = kSourceID & 0xff; + + size_t numTSPackets = (tsPackets->size() - srcOffset) / 188; + if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) { + numTSPackets = kMaxNumTSPacketsPerRTPPacket; + } + + memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188); + + udpPacket->setRange(0, 12 + numTSPackets * 188); + status_t err = sendRTPPacket(udpPacket, true /* storeInHistory */); + + if (err != OK) { + return err; + } + + srcOffset += numTSPackets * 188; + } + + return OK; +} + +status_t RTPSender::queueAVCBuffer( + const sp &accessUnit, uint8_t packetType) { + int64_t timeUs; + CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + + uint32_t rtpTime = (timeUs * 9 / 100ll); + + List > packets; + + sp out = new ABuffer(kMaxUDPPacketSize); + size_t outBytesUsed = 12; // Placeholder for RTP header. + + const uint8_t *data = accessUnit->data(); + size_t size = accessUnit->size(); + const uint8_t *nalStart; + size_t nalSize; + while (getNextNALUnit( + &data, &size, &nalStart, &nalSize, + true /* startCodeFollows */) == OK) { + size_t bytesNeeded = nalSize + 2; + if (outBytesUsed == 12) { + ++bytesNeeded; + } + + if (outBytesUsed + bytesNeeded > out->capacity()) { + bool emitSingleNALPacket = false; + + if (outBytesUsed == 12 + && outBytesUsed + nalSize <= out->capacity()) { + // We haven't emitted anything into the current packet yet and + // this NAL unit fits into a single-NAL-unit-packet while + // it wouldn't have fit as part of a STAP-A packet. + + memcpy(out->data() + outBytesUsed, nalStart, nalSize); + outBytesUsed += nalSize; + + emitSingleNALPacket = true; + } + + if (outBytesUsed > 12) { + out->setRange(0, outBytesUsed); + packets.push_back(out); + out = new ABuffer(kMaxUDPPacketSize); + outBytesUsed = 12; // Placeholder for RTP header + } + + if (emitSingleNALPacket) { + continue; + } + } + + if (outBytesUsed + bytesNeeded <= out->capacity()) { + uint8_t *dst = out->data() + outBytesUsed; + + if (outBytesUsed == 12) { + *dst++ = 24; // STAP-A header + } + + *dst++ = (nalSize >> 8) & 0xff; + *dst++ = nalSize & 0xff; + memcpy(dst, nalStart, nalSize); + + outBytesUsed += bytesNeeded; + continue; + } + + // This single NAL unit does not fit into a single RTP packet, + // we need to emit an FU-A. + + CHECK_EQ(outBytesUsed, 12u); + + uint8_t nalType = nalStart[0] & 0x1f; + uint8_t nri = (nalStart[0] >> 5) & 3; + + size_t srcOffset = 1; + while (srcOffset < nalSize) { + size_t copy = out->capacity() - outBytesUsed - 2; + if (copy > nalSize - srcOffset) { + copy = nalSize - srcOffset; + } + + uint8_t *dst = out->data() + outBytesUsed; + dst[0] = (nri << 5) | 28; + + dst[1] = nalType; + + if (srcOffset == 1) { + dst[1] |= 0x80; + } + + if (srcOffset + copy == nalSize) { + dst[1] |= 0x40; + } + + memcpy(&dst[2], nalStart + srcOffset, copy); + srcOffset += copy; + + out->setRange(0, outBytesUsed + copy + 2); + + packets.push_back(out); + out = new ABuffer(kMaxUDPPacketSize); + outBytesUsed = 12; // Placeholder for RTP header + } + } + + if (outBytesUsed > 12) { + out->setRange(0, outBytesUsed); + packets.push_back(out); + } + + while (!packets.empty()) { + sp out = *packets.begin(); + packets.erase(packets.begin()); + + out->setInt32Data(mRTPSeqNo); + + bool last = packets.empty(); + + uint8_t *dst = out->data(); + + dst[0] = 0x80; + + dst[1] = packetType; + if (last) { + dst[1] |= 1 << 7; // M-bit + } + + dst[2] = (mRTPSeqNo >> 8) & 0xff; + dst[3] = mRTPSeqNo & 0xff; + ++mRTPSeqNo; + + dst[4] = rtpTime >> 24; + dst[5] = (rtpTime >> 16) & 0xff; + dst[6] = (rtpTime >> 8) & 0xff; + dst[7] = rtpTime & 0xff; + dst[8] = kSourceID >> 24; + dst[9] = (kSourceID >> 16) & 0xff; + dst[10] = (kSourceID >> 8) & 0xff; + dst[11] = kSourceID & 0xff; + + status_t err = sendRTPPacket(out, true /* storeInHistory */); + + if (err != OK) { + return err; + } + } + + return OK; +} + +status_t RTPSender::sendRTPPacket( + const sp &buffer, bool storeInHistory) { + CHECK(mRTPConnected); + + status_t err = mNetSession->sendRequest( + mRTPSessionID, buffer->data(), buffer->size()); + + if (err != OK) { + return err; + } + + mLastNTPTime = GetNowNTP(); + mLastRTPTime = U32_AT(buffer->data() + 4); + + ++mNumRTPSent; + mNumRTPOctetsSent += buffer->size() - 12; + + if (storeInHistory) { + if (mHistorySize == kMaxHistorySize) { + mHistory.erase(mHistory.begin()); + } else { + ++mHistorySize; + } + mHistory.push_back(buffer); + } + + return OK; +} + +// static +uint64_t RTPSender::GetNowNTP() { + struct timeval tv; + gettimeofday(&tv, NULL /* timezone */); + + uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec; + + nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; + + uint64_t hi = nowUs / 1000000ll; + uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; + + return (hi << 32) | lo; +} + +void RTPSender::onMessageReceived(const sp &msg) { + switch (msg->what()) { + case kWhatRTPNotify: + case kWhatRTCPNotify: + onNetNotify(msg->what() == kWhatRTPNotify, msg); + break; + + default: + TRESPASS(); + } +} + +void RTPSender::onNetNotify(bool isRTP, const sp &msg) { + int32_t reason; + CHECK(msg->findInt32("reason", &reason)); + + switch (reason) { + case ANetworkSession::kWhatError: + { + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + int32_t err; + CHECK(msg->findInt32("err", &err)); + + int32_t errorOccuredDuringSend; + CHECK(msg->findInt32("send", &errorOccuredDuringSend)); + + AString detail; + CHECK(msg->findString("detail", &detail)); + + ALOGE("An error occurred during %s in session %d " + "(%d, '%s' (%s)).", + errorOccuredDuringSend ? "send" : "receive", + sessionID, + err, + detail.c_str(), + strerror(-err)); + + mNetSession->destroySession(sessionID); + + if (sessionID == mRTPSessionID) { + mRTPSessionID = 0; + } else if (sessionID == mRTCPSessionID) { + mRTCPSessionID = 0; + } + + if (mMode == TRANSPORT_TCP) { + if (!mRTPConnected + || (mRTCPSessionID > 0 && !mRTCPConnected)) { + notifyInitDone(err); + break; + } + } + + notifyError(err); + break; + } + + case ANetworkSession::kWhatDatagram: + { + sp data; + CHECK(msg->findBuffer("data", &data)); + + if (isRTP) { + ALOGW("Huh? Received data on RTP connection..."); + } else { + onRTCPData(data); + } + break; + } + + case ANetworkSession::kWhatConnected: + { + CHECK_EQ(mMode, TRANSPORT_TCP); + + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + if (isRTP) { + CHECK_EQ(sessionID, mRTPSessionID); + mRTPConnected = true; + } else { + CHECK_EQ(sessionID, mRTCPSessionID); + mRTCPConnected = true; + } + + if (mRTPConnected && (mRTCPSessionID == 0 || mRTCPConnected)) { + notifyInitDone(OK); + } + break; + } + } +} + +status_t RTPSender::onRTCPData(const sp &buffer) { + const uint8_t *data = buffer->data(); + size_t size = buffer->size(); + + while (size > 0) { + if (size < 8) { + // Too short to be a valid RTCP header + return ERROR_MALFORMED; + } + + if ((data[0] >> 6) != 2) { + // Unsupported version. + return ERROR_UNSUPPORTED; + } + + if (data[0] & 0x20) { + // Padding present. + + size_t paddingLength = data[size - 1]; + + if (paddingLength + 12 > size) { + // If we removed this much padding we'd end up with something + // that's too short to be a valid RTP header. + return ERROR_MALFORMED; + } + + size -= paddingLength; + } + + size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; + + if (size < headerLength) { + // Only received a partial packet? + return ERROR_MALFORMED; + } + + switch (data[1]) { + case 200: + case 201: // RR + parseReceiverReport(data, headerLength); + break; + + case 202: // SDES + case 203: + case 204: // APP + break; + + case 205: // TSFB (transport layer specific feedback) + parseTSFB(data, headerLength); + break; + + case 206: // PSFB (payload specific feedback) + // hexdump(data, headerLength); + break; + + default: + { + ALOGW("Unknown RTCP packet type %u of size %d", + (unsigned)data[1], headerLength); + break; + } + } + + data += headerLength; + size -= headerLength; + } + + return OK; +} + +status_t RTPSender::parseReceiverReport(const uint8_t *data, size_t size) { + // hexdump(data, size); + + float fractionLost = data[12] / 256.0f; + + ALOGI("lost %.2f %% of packets during report interval.", + 100.0f * fractionLost); + + return OK; +} + +status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) { + if ((data[0] & 0x1f) != 1) { + return ERROR_UNSUPPORTED; // We only support NACK for now. + } + + uint32_t srcId = U32_AT(&data[8]); + if (srcId != kSourceID) { + return ERROR_MALFORMED; + } + + for (size_t i = 12; i < size; i += 4) { + uint16_t seqNo = U16_AT(&data[i]); + uint16_t blp = U16_AT(&data[i + 2]); + + List >::iterator it = mHistory.begin(); + bool foundSeqNo = false; + while (it != mHistory.end()) { + const sp &buffer = *it; + + uint16_t bufferSeqNo = buffer->int32Data() & 0xffff; + + bool retransmit = false; + if (bufferSeqNo == seqNo) { + retransmit = true; + } else if (blp != 0) { + for (size_t i = 0; i < 16; ++i) { + if ((blp & (1 << i)) + && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) { + blp &= ~(1 << i); + retransmit = true; + } + } + } + + if (retransmit) { + ALOGV("retransmitting seqNo %d", bufferSeqNo); + + CHECK_EQ((status_t)OK, + sendRTPPacket(buffer, false /* storeInHistory */)); + + if (bufferSeqNo == seqNo) { + foundSeqNo = true; + } + + if (foundSeqNo && blp == 0) { + break; + } + } + + ++it; + } + + if (!foundSeqNo || blp != 0) { + ALOGI("Some sequence numbers were no longer available for " + "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)", + seqNo, foundSeqNo, blp); + + if (!mHistory.empty()) { + int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff; + int32_t latest = (*--mHistory.end())->int32Data() & 0xffff; + + ALOGI("have seq numbers from %d - %d", earliest, latest); + } + } + } + + return OK; +} + +void RTPSender::notifyInitDone(status_t err) { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatInitDone); + notify->setInt32("err", err); + notify->post(); +} + +void RTPSender::notifyError(status_t err) { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatError); + notify->setInt32("err", err); + notify->post(); +} + +} // namespace android + diff --git a/media/libstagefright/wifi-display/rtp/RTPSender.h b/media/libstagefright/wifi-display/rtp/RTPSender.h new file mode 100644 index 0000000..2b683a4 --- /dev/null +++ b/media/libstagefright/wifi-display/rtp/RTPSender.h @@ -0,0 +1,112 @@ +/* + * Copyright 2013, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RTP_SENDER_H_ + +#define RTP_SENDER_H_ + +#include "RTPBase.h" + +#include + +namespace android { + +struct ABuffer; +struct ANetworkSession; + +// An object of this class facilitates sending of media data over an RTP +// channel. The channel is established over a UDP or TCP connection depending +// on which "TransportMode" was chosen. In addition different RTP packetization +// schemes are supported such as "Transport Stream Packets over RTP", +// or "AVC/H.264 encapsulation as specified in RFC 3984 (non-interleaved mode)" +struct RTPSender : public RTPBase, public AHandler { + enum { + kWhatInitDone, + kWhatError, + }; + RTPSender( + const sp &netSession, + const sp ¬ify); + + status_t initAsync( + TransportMode mode, + const char *remoteHost, + int32_t remoteRTPPort, + int32_t remoteRTCPPort, + int32_t *outLocalRTPPort); + + status_t queueBuffer( + const sp &buffer, + uint8_t packetType, + PacketizationMode mode); + +protected: + virtual ~RTPSender(); + virtual void onMessageReceived(const sp &msg); + +private: + enum { + kWhatRTPNotify, + kWhatRTCPNotify, + }; + + enum { + kMaxNumTSPacketsPerRTPPacket = (kMaxUDPPacketSize - 12) / 188, + kMaxHistorySize = 1024, + kSourceID = 0xdeadbeef, + }; + + sp mNetSession; + sp mNotify; + TransportMode mMode; + int32_t mRTPSessionID; + int32_t mRTCPSessionID; + bool mRTPConnected; + bool mRTCPConnected; + + uint64_t mLastNTPTime; + uint32_t mLastRTPTime; + uint32_t mNumRTPSent; + uint32_t mNumRTPOctetsSent; + uint32_t mNumSRsSent; + + uint32_t mRTPSeqNo; + + List > mHistory; + size_t mHistorySize; + + static uint64_t GetNowNTP(); + + status_t queueTSPackets(const sp &tsPackets, uint8_t packetType); + status_t queueAVCBuffer(const sp &accessUnit, uint8_t packetType); + + status_t sendRTPPacket(const sp &packet, bool storeInHistory); + + void onNetNotify(bool isRTP, const sp &msg); + + status_t onRTCPData(const sp &data); + status_t parseReceiverReport(const uint8_t *data, size_t size); + status_t parseTSFB(const uint8_t *data, size_t size); + + void notifyInitDone(status_t err); + void notifyError(status_t err); + + DISALLOW_EVIL_CONSTRUCTORS(RTPSender); +}; + +} // namespace android + +#endif // RTP_SENDER_H_ -- cgit v1.1