/* * Copyright (C) 2010 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "ARTPConnection" #include #include "ARTPAssembler.h" #include "ARTPConnection.h" #include "ARTPSource.h" #include "ASessionDescription.h" #include #include #include #include #include #include #include namespace android { static const size_t kMaxUDPSize = 1500; static uint16_t u16at(const uint8_t *data) { return data[0] << 8 | data[1]; } static uint32_t u32at(const uint8_t *data) { return u16at(data) << 16 | u16at(&data[2]); } static uint64_t u64at(const uint8_t *data) { return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); } // static const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; struct ARTPConnection::StreamInfo { int mRTPSocket; int mRTCPSocket; sp mSessionDesc; size_t mIndex; sp mNotifyMsg; KeyedVector > mSources; int64_t mNumRTCPPacketsReceived; int64_t mNumRTPPacketsReceived; struct sockaddr_in mRemoteRTCPAddr; bool mIsInjected; }; ARTPConnection::ARTPConnection(uint32_t flags) : mFlags(flags), mPollEventPending(false), mLastReceiverReportTimeUs(-1) { } ARTPConnection::~ARTPConnection() { } void ARTPConnection::addStream( int rtpSocket, int rtcpSocket, const sp &sessionDesc, size_t index, const sp ¬ify, bool injected) { sp msg = new AMessage(kWhatAddStream, id()); msg->setInt32("rtp-socket", rtpSocket); msg->setInt32("rtcp-socket", rtcpSocket); msg->setObject("session-desc", sessionDesc); msg->setSize("index", index); msg->setMessage("notify", notify); msg->setInt32("injected", injected); msg->post(); } void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { sp msg = new AMessage(kWhatRemoveStream, id()); msg->setInt32("rtp-socket", rtpSocket); msg->setInt32("rtcp-socket", rtcpSocket); msg->post(); } static void bumpSocketBufferSize(int s) { int size = 256 * 1024; CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); } // static void ARTPConnection::MakePortPair( int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); CHECK_GE(*rtpSocket, 0); bumpSocketBufferSize(*rtpSocket); *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); CHECK_GE(*rtcpSocket, 0); bumpSocketBufferSize(*rtcpSocket); /* rand() * 1000 may overflow int type, use long long */ unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550; start &= ~1; for (unsigned port = start; port < 65536; port += 2) { struct sockaddr_in addr; memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(port); if (bind(*rtpSocket, (const struct sockaddr *)&addr, sizeof(addr)) < 0) { continue; } addr.sin_port = htons(port + 1); if (bind(*rtcpSocket, (const struct sockaddr *)&addr, sizeof(addr)) == 0) { *rtpPort = port; return; } } TRESPASS(); } void ARTPConnection::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatAddStream: { onAddStream(msg); break; } case kWhatRemoveStream: { onRemoveStream(msg); break; } case kWhatPollStreams: { onPollStreams(); break; } case kWhatInjectPacket: { onInjectPacket(msg); break; } default: { TRESPASS(); break; } } } void ARTPConnection::onAddStream(const sp &msg) { mStreams.push_back(StreamInfo()); StreamInfo *info = &*--mStreams.end(); int32_t s; CHECK(msg->findInt32("rtp-socket", &s)); info->mRTPSocket = s; CHECK(msg->findInt32("rtcp-socket", &s)); info->mRTCPSocket = s; int32_t injected; CHECK(msg->findInt32("injected", &injected)); info->mIsInjected = injected; sp obj; CHECK(msg->findObject("session-desc", &obj)); info->mSessionDesc = static_cast(obj.get()); CHECK(msg->findSize("index", &info->mIndex)); CHECK(msg->findMessage("notify", &info->mNotifyMsg)); info->mNumRTCPPacketsReceived = 0; info->mNumRTPPacketsReceived = 0; memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); if (!injected) { postPollEvent(); } } void ARTPConnection::onRemoveStream(const sp &msg) { int32_t rtpSocket, rtcpSocket; CHECK(msg->findInt32("rtp-socket", &rtpSocket)); CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); List::iterator it = mStreams.begin(); while (it != mStreams.end() && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { ++it; } if (it == mStreams.end()) { return; } mStreams.erase(it); } void ARTPConnection::postPollEvent() { if (mPollEventPending) { return; } sp msg = new AMessage(kWhatPollStreams, id()); msg->post(); mPollEventPending = true; } void ARTPConnection::onPollStreams() { mPollEventPending = false; if (mStreams.empty()) { return; } struct timeval tv; tv.tv_sec = 0; tv.tv_usec = kSelectTimeoutUs; fd_set rs; FD_ZERO(&rs); int maxSocket = -1; for (List::iterator it = mStreams.begin(); it != mStreams.end(); ++it) { if ((*it).mIsInjected) { continue; } FD_SET(it->mRTPSocket, &rs); FD_SET(it->mRTCPSocket, &rs); if (it->mRTPSocket > maxSocket) { maxSocket = it->mRTPSocket; } if (it->mRTCPSocket > maxSocket) { maxSocket = it->mRTCPSocket; } } if (maxSocket == -1) { return; } int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); if (res > 0) { List::iterator it = mStreams.begin(); while (it != mStreams.end()) { if ((*it).mIsInjected) { ++it; continue; } status_t err = OK; if (FD_ISSET(it->mRTPSocket, &rs)) { err = receive(&*it, true); } if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) { err = receive(&*it, false); } if (err == -ECONNRESET) { // socket failure, this stream is dead, Jim. ALOGW("failed to receive RTP/RTCP datagram."); it = mStreams.erase(it); continue; } ++it; } } int64_t nowUs = ALooper::GetNowUs(); if (mLastReceiverReportTimeUs <= 0 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { sp buffer = new ABuffer(kMaxUDPSize); List::iterator it = mStreams.begin(); while (it != mStreams.end()) { StreamInfo *s = &*it; if (s->mIsInjected) { ++it; continue; } if (s->mNumRTCPPacketsReceived == 0) { // We have never received any RTCP packets on this stream, // we don't even know where to send a report. ++it; continue; } buffer->setRange(0, 0); for (size_t i = 0; i < s->mSources.size(); ++i) { sp source = s->mSources.valueAt(i); source->addReceiverReport(buffer); if (mFlags & kRegularlyRequestFIR) { source->addFIR(buffer); } } if (buffer->size() > 0) { ALOGV("Sending RR..."); ssize_t n; do { n = sendto( s->mRTCPSocket, buffer->data(), buffer->size(), 0, (const struct sockaddr *)&s->mRemoteRTCPAddr, sizeof(s->mRemoteRTCPAddr)); } while (n < 0 && errno == EINTR); if (n <= 0) { ALOGW("failed to send RTCP receiver report (%s).", n == 0 ? "connection gone" : strerror(errno)); it = mStreams.erase(it); continue; } CHECK_EQ(n, (ssize_t)buffer->size()); mLastReceiverReportTimeUs = nowUs; } ++it; } } if (!mStreams.empty()) { postPollEvent(); } } status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP"); CHECK(!s->mIsInjected); sp buffer = new ABuffer(65536); socklen_t remoteAddrLen = (!receiveRTP && s->mNumRTCPPacketsReceived == 0) ? sizeof(s->mRemoteRTCPAddr) : 0; ssize_t nbytes; do { nbytes = recvfrom( receiveRTP ? s->mRTPSocket : s->mRTCPSocket, buffer->data(), buffer->capacity(), 0, remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, remoteAddrLen > 0 ? &remoteAddrLen : NULL); } while (nbytes < 0 && errno == EINTR); if (nbytes <= 0) { return -ECONNRESET; } buffer->setRange(0, nbytes); // ALOGI("received %d bytes.", buffer->size()); status_t err; if (receiveRTP) { err = parseRTP(s, buffer); } else { err = parseRTCP(s, buffer); } return err; } status_t ARTPConnection::parseRTP(StreamInfo *s, const sp &buffer) { if (s->mNumRTPPacketsReceived++ == 0) { sp notify = s->mNotifyMsg->dup(); notify->setInt32("first-rtp", true); notify->post(); } size_t size = buffer->size(); if (size < 12) { // Too short to be a valid RTP header. return -1; } const uint8_t *data = buffer->data(); if ((data[0] >> 6) != 2) { // Unsupported version. return -1; } if (data[0] & 0x20) { // Padding present. size_t paddingLength = data[size - 1]; if (paddingLength + 12 > size) { // If we removed this much padding we'd end up with something // that's too short to be a valid RTP header. return -1; } size -= paddingLength; } int numCSRCs = data[0] & 0x0f; size_t payloadOffset = 12 + 4 * numCSRCs; if (size < payloadOffset) { // Not enough data to fit the basic header and all the CSRC entries. return -1; } if (data[0] & 0x10) { // Header eXtension present. if (size < payloadOffset + 4) { // Not enough data to fit the basic header, all CSRC entries // and the first 4 bytes of the extension header. return -1; } const uint8_t *extensionData = &data[payloadOffset]; size_t extensionLength = 4 * (extensionData[2] << 8 | extensionData[3]); if (size < payloadOffset + 4 + extensionLength) { return -1; } payloadOffset += 4 + extensionLength; } uint32_t srcId = u32at(&data[8]); sp source = findSource(s, srcId); uint32_t rtpTime = u32at(&data[4]); sp meta = buffer->meta(); meta->setInt32("ssrc", srcId); meta->setInt32("rtp-time", rtpTime); meta->setInt32("PT", data[1] & 0x7f); meta->setInt32("M", data[1] >> 7); buffer->setInt32Data(u16at(&data[2])); buffer->setRange(payloadOffset, size - payloadOffset); source->processRTPPacket(buffer); return OK; } status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp &buffer) { if (s->mNumRTCPPacketsReceived++ == 0) { sp notify = s->mNotifyMsg->dup(); notify->setInt32("first-rtcp", true); notify->post(); } const uint8_t *data = buffer->data(); size_t size = buffer->size(); while (size > 0) { if (size < 8) { // Too short to be a valid RTCP header return -1; } if ((data[0] >> 6) != 2) { // Unsupported version. return -1; } if (data[0] & 0x20) { // Padding present. size_t paddingLength = data[size - 1]; if (paddingLength + 12 > size) { // If we removed this much padding we'd end up with something // that's too short to be a valid RTP header. return -1; } size -= paddingLength; } size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; if (size < headerLength) { // Only received a partial packet? return -1; } switch (data[1]) { case 200: { parseSR(s, data, headerLength); break; } case 201: // RR case 202: // SDES case 204: // APP break; case 205: // TSFB (transport layer specific feedback) case 206: // PSFB (payload specific feedback) // hexdump(data, headerLength); break; case 203: { parseBYE(s, data, headerLength); break; } default: { ALOGW("Unknown RTCP packet type %u of size %zu", (unsigned)data[1], headerLength); break; } } data += headerLength; size -= headerLength; } return OK; } status_t ARTPConnection::parseBYE( StreamInfo *s, const uint8_t *data, size_t size) { size_t SC = data[0] & 0x3f; if (SC == 0 || size < (4 + SC * 4)) { // Packet too short for the minimal BYE header. return -1; } uint32_t id = u32at(&data[4]); sp source = findSource(s, id); source->byeReceived(); return OK; } status_t ARTPConnection::parseSR( StreamInfo *s, const uint8_t *data, size_t size) { size_t RC = data[0] & 0x1f; if (size < (7 + RC * 6) * 4) { // Packet too short for the minimal SR header. return -1; } uint32_t id = u32at(&data[4]); uint64_t ntpTime = u64at(&data[8]); uint32_t rtpTime = u32at(&data[16]); #if 0 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", id, rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); #endif sp source = findSource(s, id); source->timeUpdate(rtpTime, ntpTime); return 0; } sp ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { sp source; ssize_t index = info->mSources.indexOfKey(srcId); if (index < 0) { index = info->mSources.size(); source = new ARTPSource( srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); info->mSources.add(srcId, source); } else { source = info->mSources.valueAt(index); } return source; } void ARTPConnection::injectPacket(int index, const sp &buffer) { sp msg = new AMessage(kWhatInjectPacket, id()); msg->setInt32("index", index); msg->setBuffer("buffer", buffer); msg->post(); } void ARTPConnection::onInjectPacket(const sp &msg) { int32_t index; CHECK(msg->findInt32("index", &index)); sp buffer; CHECK(msg->findBuffer("buffer", &buffer)); List::iterator it = mStreams.begin(); while (it != mStreams.end() && it->mRTPSocket != index && it->mRTCPSocket != index) { ++it; } if (it == mStreams.end()) { TRESPASS(); } StreamInfo *s = &*it; status_t err; if (it->mRTPSocket == index) { err = parseRTP(s, buffer); } else { err = parseRTCP(s, buffer); } } } // namespace android