/* * Copyright 2012, The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "RTPSink" #include #include "RTPSink.h" #include "ANetworkSession.h" #if USE_TUNNEL_RENDERER #include "TunnelRenderer.h" #define RENDERER_CLASS TunnelRenderer #else #include "DirectRenderer.h" #define RENDERER_CLASS DirectRenderer #endif #include #include #include #include #include #include namespace android { struct RTPSink::Source : public RefBase { Source(uint16_t seq, const sp &buffer, const sp queueBufferMsg); bool updateSeq(uint16_t seq, const sp &buffer); void addReportBlock(uint32_t ssrc, const sp &buf); protected: virtual ~Source(); private: static const uint32_t kMinSequential = 2; static const uint32_t kMaxDropout = 3000; static const uint32_t kMaxMisorder = 100; static const uint32_t kRTPSeqMod = 1u << 16; sp mQueueBufferMsg; uint16_t mMaxSeq; uint32_t mCycles; uint32_t mBaseSeq; uint32_t mBadSeq; uint32_t mProbation; uint32_t mReceived; uint32_t mExpectedPrior; uint32_t mReceivedPrior; void initSeq(uint16_t seq); void queuePacket(const sp &buffer); DISALLOW_EVIL_CONSTRUCTORS(Source); }; //////////////////////////////////////////////////////////////////////////////// RTPSink::Source::Source( uint16_t seq, const sp &buffer, const sp queueBufferMsg) : mQueueBufferMsg(queueBufferMsg), mProbation(kMinSequential) { initSeq(seq); mMaxSeq = seq - 1; buffer->setInt32Data(mCycles | seq); queuePacket(buffer); } RTPSink::Source::~Source() { } void RTPSink::Source::initSeq(uint16_t seq) { mMaxSeq = seq; mCycles = 0; mBaseSeq = seq; mBadSeq = kRTPSeqMod + 1; mReceived = 0; mExpectedPrior = 0; mReceivedPrior = 0; } bool RTPSink::Source::updateSeq(uint16_t seq, const sp &buffer) { uint16_t udelta = seq - mMaxSeq; if (mProbation) { // Startup phase if (seq == mMaxSeq + 1) { buffer->setInt32Data(mCycles | seq); queuePacket(buffer); --mProbation; mMaxSeq = seq; if (mProbation == 0) { initSeq(seq); ++mReceived; return true; } } else { // Packet out of sequence, restart startup phase mProbation = kMinSequential - 1; mMaxSeq = seq; #if 0 mPackets.clear(); mTotalBytesQueued = 0; ALOGI("XXX cleared packets"); #endif buffer->setInt32Data(mCycles | seq); queuePacket(buffer); } return false; } if (udelta < kMaxDropout) { // In order, with permissible gap. if (seq < mMaxSeq) { // Sequence number wrapped - count another 64K cycle mCycles += kRTPSeqMod; } mMaxSeq = seq; } else if (udelta <= kRTPSeqMod - kMaxMisorder) { // The sequence number made a very large jump if (seq == mBadSeq) { // Two sequential packets -- assume that the other side // restarted without telling us so just re-sync // (i.e. pretend this was the first packet) initSeq(seq); } else { mBadSeq = (seq + 1) & (kRTPSeqMod - 1); return false; } } else { // Duplicate or reordered packet. } ++mReceived; buffer->setInt32Data(mCycles | seq); queuePacket(buffer); return true; } void RTPSink::Source::queuePacket(const sp &buffer) { sp msg = mQueueBufferMsg->dup(); msg->setBuffer("buffer", buffer); msg->post(); } void RTPSink::Source::addReportBlock( uint32_t ssrc, const sp &buf) { uint32_t extMaxSeq = mMaxSeq | mCycles; uint32_t expected = extMaxSeq - mBaseSeq + 1; int64_t lost = (int64_t)expected - (int64_t)mReceived; if (lost > 0x7fffff) { lost = 0x7fffff; } else if (lost < -0x800000) { lost = -0x800000; } uint32_t expectedInterval = expected - mExpectedPrior; mExpectedPrior = expected; uint32_t receivedInterval = mReceived - mReceivedPrior; mReceivedPrior = mReceived; int64_t lostInterval = expectedInterval - receivedInterval; uint8_t fractionLost; if (expectedInterval == 0 || lostInterval <=0) { fractionLost = 0; } else { fractionLost = (lostInterval << 8) / expectedInterval; } uint8_t *ptr = buf->data() + buf->size(); ptr[0] = ssrc >> 24; ptr[1] = (ssrc >> 16) & 0xff; ptr[2] = (ssrc >> 8) & 0xff; ptr[3] = ssrc & 0xff; ptr[4] = fractionLost; ptr[5] = (lost >> 16) & 0xff; ptr[6] = (lost >> 8) & 0xff; ptr[7] = lost & 0xff; ptr[8] = extMaxSeq >> 24; ptr[9] = (extMaxSeq >> 16) & 0xff; ptr[10] = (extMaxSeq >> 8) & 0xff; ptr[11] = extMaxSeq & 0xff; // XXX TODO: ptr[12] = 0x00; // interarrival jitter ptr[13] = 0x00; ptr[14] = 0x00; ptr[15] = 0x00; ptr[16] = 0x00; // last SR ptr[17] = 0x00; ptr[18] = 0x00; ptr[19] = 0x00; ptr[20] = 0x00; // delay since last SR ptr[21] = 0x00; ptr[22] = 0x00; ptr[23] = 0x00; } //////////////////////////////////////////////////////////////////////////////// RTPSink::RTPSink( const sp &netSession, const sp &bufferProducer, const sp ¬ify) : mNetSession(netSession), mSurfaceTex(bufferProducer), mNotify(notify), mRTPPort(0), mRTPSessionID(0), mRTCPSessionID(0), mFirstArrivalTimeUs(-1ll), mNumPacketsReceived(0ll), mRegression(1000), mMaxDelayMs(-1ll) { } RTPSink::~RTPSink() { if (mRTCPSessionID != 0) { mNetSession->destroySession(mRTCPSessionID); } if (mRTPSessionID != 0) { mNetSession->destroySession(mRTPSessionID); } } status_t RTPSink::init(bool useTCPInterleaving) { if (useTCPInterleaving) { return OK; } int clientRtp; sp rtpNotify = new AMessage(kWhatRTPNotify, id()); sp rtcpNotify = new AMessage(kWhatRTCPNotify, id()); for (clientRtp = 15550;; clientRtp += 2) { int32_t rtpSession; status_t err = mNetSession->createUDPSession( clientRtp, rtpNotify, &rtpSession); if (err != OK) { ALOGI("failed to create RTP socket on port %d", clientRtp); continue; } int32_t rtcpSession; err = mNetSession->createUDPSession( clientRtp + 1, rtcpNotify, &rtcpSession); if (err == OK) { mRTPPort = clientRtp; mRTPSessionID = rtpSession; mRTCPSessionID = rtcpSession; break; } ALOGI("failed to create RTCP socket on port %d", clientRtp + 1); mNetSession->destroySession(rtpSession); } if (mRTPPort == 0) { return UNKNOWN_ERROR; } return OK; } int32_t RTPSink::getRTPPort() const { return mRTPPort; } void RTPSink::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatRTPNotify: case kWhatRTCPNotify: { int32_t reason; CHECK(msg->findInt32("reason", &reason)); switch (reason) { case ANetworkSession::kWhatError: { int32_t sessionID; CHECK(msg->findInt32("sessionID", &sessionID)); int32_t err; CHECK(msg->findInt32("err", &err)); AString detail; CHECK(msg->findString("detail", &detail)); ALOGE("An error occurred in session %d (%d, '%s/%s').", sessionID, err, detail.c_str(), strerror(-err)); mNetSession->destroySession(sessionID); if (sessionID == mRTPSessionID) { mRTPSessionID = 0; } else if (sessionID == mRTCPSessionID) { mRTCPSessionID = 0; } break; } case ANetworkSession::kWhatDatagram: { int32_t sessionID; CHECK(msg->findInt32("sessionID", &sessionID)); sp data; CHECK(msg->findBuffer("data", &data)); status_t err; if (msg->what() == kWhatRTPNotify) { err = parseRTP(data); } else { err = parseRTCP(data); } break; } default: TRESPASS(); } break; } case kWhatSendRR: { onSendRR(); break; } case kWhatPacketLost: { onPacketLost(msg); break; } case kWhatInject: { int32_t isRTP; CHECK(msg->findInt32("isRTP", &isRTP)); sp buffer; CHECK(msg->findBuffer("buffer", &buffer)); status_t err; if (isRTP) { err = parseRTP(buffer); } else { err = parseRTCP(buffer); } break; } default: TRESPASS(); } } status_t RTPSink::injectPacket(bool isRTP, const sp &buffer) { sp msg = new AMessage(kWhatInject, id()); msg->setInt32("isRTP", isRTP); msg->setBuffer("buffer", buffer); msg->post(); return OK; } status_t RTPSink::parseRTP(const sp &buffer) { size_t size = buffer->size(); if (size < 12) { // Too short to be a valid RTP header. return ERROR_MALFORMED; } const uint8_t *data = buffer->data(); if ((data[0] >> 6) != 2) { // Unsupported version. return ERROR_UNSUPPORTED; } if (data[0] & 0x20) { // Padding present. size_t paddingLength = data[size - 1]; if (paddingLength + 12 > size) { // If we removed this much padding we'd end up with something // that's too short to be a valid RTP header. return ERROR_MALFORMED; } size -= paddingLength; } int numCSRCs = data[0] & 0x0f; size_t payloadOffset = 12 + 4 * numCSRCs; if (size < payloadOffset) { // Not enough data to fit the basic header and all the CSRC entries. return ERROR_MALFORMED; } if (data[0] & 0x10) { // Header eXtension present. if (size < payloadOffset + 4) { // Not enough data to fit the basic header, all CSRC entries // and the first 4 bytes of the extension header. return ERROR_MALFORMED; } const uint8_t *extensionData = &data[payloadOffset]; size_t extensionLength = 4 * (extensionData[2] << 8 | extensionData[3]); if (size < payloadOffset + 4 + extensionLength) { return ERROR_MALFORMED; } payloadOffset += 4 + extensionLength; } uint32_t srcId = U32_AT(&data[8]); uint32_t rtpTime = U32_AT(&data[4]); uint16_t seqNo = U16_AT(&data[2]); #if 0 int64_t arrivalTimeUs; CHECK(buffer->meta()->findInt64("arrivalTimeUs", &arrivalTimeUs)); if (mFirstArrivalTimeUs < 0ll) { mFirstArrivalTimeUs = arrivalTimeUs; } arrivalTimeUs -= mFirstArrivalTimeUs; int64_t arrivalTimeMedia = (arrivalTimeUs * 9ll) / 100ll; ALOGV("seqNo: %d, SSRC 0x%08x, diff %lld", seqNo, srcId, rtpTime - arrivalTimeMedia); mRegression.addPoint((float)rtpTime, (float)arrivalTimeMedia); ++mNumPacketsReceived; float n1, n2, b; if (mRegression.approxLine(&n1, &n2, &b)) { ALOGV("Line %lld: %.2f %.2f %.2f, slope %.2f", mNumPacketsReceived, n1, n2, b, -n1 / n2); float expectedArrivalTimeMedia = (b - n1 * (float)rtpTime) / n2; float latenessMs = (arrivalTimeMedia - expectedArrivalTimeMedia) / 90.0; if (mMaxDelayMs < 0ll || latenessMs > mMaxDelayMs) { mMaxDelayMs = latenessMs; ALOGI("packet was %.2f ms late", latenessMs); } } #endif sp meta = buffer->meta(); meta->setInt32("ssrc", srcId); meta->setInt32("rtp-time", rtpTime); meta->setInt32("PT", data[1] & 0x7f); meta->setInt32("M", data[1] >> 7); buffer->setRange(payloadOffset, size - payloadOffset); ssize_t index = mSources.indexOfKey(srcId); if (index < 0) { if (mRenderer == NULL) { sp notifyLost = new AMessage(kWhatPacketLost, id()); notifyLost->setInt32("ssrc", srcId); mRenderer = new RENDERER_CLASS(notifyLost, mSurfaceTex); looper()->registerHandler(mRenderer); } sp queueBufferMsg = new AMessage(RENDERER_CLASS::kWhatQueueBuffer, mRenderer->id()); sp source = new Source(seqNo, buffer, queueBufferMsg); mSources.add(srcId, source); } else { mSources.valueAt(index)->updateSeq(seqNo, buffer); } return OK; } status_t RTPSink::parseRTCP(const sp &buffer) { const uint8_t *data = buffer->data(); size_t size = buffer->size(); while (size > 0) { if (size < 8) { // Too short to be a valid RTCP header return ERROR_MALFORMED; } if ((data[0] >> 6) != 2) { // Unsupported version. return ERROR_UNSUPPORTED; } if (data[0] & 0x20) { // Padding present. size_t paddingLength = data[size - 1]; if (paddingLength + 12 > size) { // If we removed this much padding we'd end up with something // that's too short to be a valid RTP header. return ERROR_MALFORMED; } size -= paddingLength; } size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; if (size < headerLength) { // Only received a partial packet? return ERROR_MALFORMED; } switch (data[1]) { case 200: { parseSR(data, headerLength); break; } case 201: // RR case 202: // SDES case 204: // APP break; case 205: // TSFB (transport layer specific feedback) case 206: // PSFB (payload specific feedback) // hexdump(data, headerLength); break; case 203: { parseBYE(data, headerLength); break; } default: { ALOGW("Unknown RTCP packet type %u of size %d", (unsigned)data[1], headerLength); break; } } data += headerLength; size -= headerLength; } return OK; } status_t RTPSink::parseBYE(const uint8_t *data, size_t size) { size_t SC = data[0] & 0x3f; if (SC == 0 || size < (4 + SC * 4)) { // Packet too short for the minimal BYE header. return ERROR_MALFORMED; } uint32_t id = U32_AT(&data[4]); return OK; } status_t RTPSink::parseSR(const uint8_t *data, size_t size) { size_t RC = data[0] & 0x1f; if (size < (7 + RC * 6) * 4) { // Packet too short for the minimal SR header. return ERROR_MALFORMED; } uint32_t id = U32_AT(&data[4]); uint64_t ntpTime = U64_AT(&data[8]); uint32_t rtpTime = U32_AT(&data[16]); ALOGV("SR: ssrc 0x%08x, ntpTime 0x%016llx, rtpTime 0x%08x", id, ntpTime, rtpTime); return OK; } status_t RTPSink::connect( const char *host, int32_t remoteRtpPort, int32_t remoteRtcpPort) { ALOGI("connecting RTP/RTCP sockets to %s:{%d,%d}", host, remoteRtpPort, remoteRtcpPort); status_t err = mNetSession->connectUDPSession(mRTPSessionID, host, remoteRtpPort); if (err != OK) { return err; } err = mNetSession->connectUDPSession(mRTCPSessionID, host, remoteRtcpPort); if (err != OK) { return err; } #if 0 sp buf = new ABuffer(1500); memset(buf->data(), 0, buf->size()); mNetSession->sendRequest( mRTPSessionID, buf->data(), buf->size()); mNetSession->sendRequest( mRTCPSessionID, buf->data(), buf->size()); #endif scheduleSendRR(); return OK; } void RTPSink::scheduleSendRR() { (new AMessage(kWhatSendRR, id()))->post(2000000ll); } void RTPSink::addSDES(const sp &buffer) { uint8_t *data = buffer->data() + buffer->size(); data[0] = 0x80 | 1; data[1] = 202; // SDES data[4] = 0xde; // SSRC data[5] = 0xad; data[6] = 0xbe; data[7] = 0xef; size_t offset = 8; data[offset++] = 1; // CNAME AString cname = "stagefright@somewhere"; data[offset++] = cname.size(); memcpy(&data[offset], cname.c_str(), cname.size()); offset += cname.size(); data[offset++] = 6; // TOOL AString tool = "stagefright/1.0"; data[offset++] = tool.size(); memcpy(&data[offset], tool.c_str(), tool.size()); offset += tool.size(); data[offset++] = 0; if ((offset % 4) > 0) { size_t count = 4 - (offset % 4); switch (count) { case 3: data[offset++] = 0; case 2: data[offset++] = 0; case 1: data[offset++] = 0; } } size_t numWords = (offset / 4) - 1; data[2] = numWords >> 8; data[3] = numWords & 0xff; buffer->setRange(buffer->offset(), buffer->size() + offset); } void RTPSink::onSendRR() { sp buf = new ABuffer(1500); buf->setRange(0, 0); uint8_t *ptr = buf->data(); ptr[0] = 0x80 | 0; ptr[1] = 201; // RR ptr[2] = 0; ptr[3] = 1; ptr[4] = 0xde; // SSRC ptr[5] = 0xad; ptr[6] = 0xbe; ptr[7] = 0xef; buf->setRange(0, 8); size_t numReportBlocks = 0; for (size_t i = 0; i < mSources.size(); ++i) { uint32_t ssrc = mSources.keyAt(i); sp source = mSources.valueAt(i); if (numReportBlocks > 31 || buf->size() + 24 > buf->capacity()) { // Cannot fit another report block. break; } source->addReportBlock(ssrc, buf); ++numReportBlocks; } ptr[0] |= numReportBlocks; // 5 bit size_t sizeInWordsMinus1 = 1 + 6 * numReportBlocks; ptr[2] = sizeInWordsMinus1 >> 8; ptr[3] = sizeInWordsMinus1 & 0xff; buf->setRange(0, (sizeInWordsMinus1 + 1) * 4); addSDES(buf); mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size()); scheduleSendRR(); } void RTPSink::onPacketLost(const sp &msg) { uint32_t srcId; CHECK(msg->findInt32("ssrc", (int32_t *)&srcId)); int32_t seqNo; CHECK(msg->findInt32("seqNo", &seqNo)); int32_t blp = 0; sp buf = new ABuffer(16); buf->setRange(0, 0); uint8_t *ptr = buf->data(); ptr[0] = 0x80 | 1; // generic NACK ptr[1] = 205; // TSFB ptr[2] = 0; ptr[3] = 3; ptr[4] = 0xde; // sender SSRC ptr[5] = 0xad; ptr[6] = 0xbe; ptr[7] = 0xef; ptr[8] = (srcId >> 24) & 0xff; ptr[9] = (srcId >> 16) & 0xff; ptr[10] = (srcId >> 8) & 0xff; ptr[11] = (srcId & 0xff); ptr[12] = (seqNo >> 8) & 0xff; ptr[13] = (seqNo & 0xff); ptr[14] = (blp >> 8) & 0xff; ptr[15] = (blp & 0xff); buf->setRange(0, 16); mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size()); } } // namespace android