/* * Copyright (C) 2011 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "LibAAH_RTP" #include #include #include #include #include #include #include #include #include "aah_tx_player.h" #include "aah_tx_sender.h" namespace android { const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr"; const char* AAH_TXSender::kSendPacketPort = "port"; const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp"; const int AAH_TXSender::kRetryTrimIntervalUs = 100000; const int AAH_TXSender::kHeartbeatIntervalUs = 1000000; const int AAH_TXSender::kRetryBufferCapacity = 100; const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull; Mutex AAH_TXSender::sLock; wp AAH_TXSender::sInstance; uint32_t AAH_TXSender::sNextEpoch; bool AAH_TXSender::sNextEpochValid = false; AAH_TXSender::AAH_TXSender() : mSocket(-1) { mLastSentPacketTime = systemTime(); } sp AAH_TXSender::GetInstance() { Mutex::Autolock autoLock(sLock); sp sender = sInstance.promote(); if (sender == NULL) { sender = new AAH_TXSender(); if (sender == NULL) { return NULL; } sender->mLooper = new ALooper(); if (sender->mLooper == NULL) { return NULL; } sender->mReflector = new AHandlerReflector(sender.get()); if (sender->mReflector == NULL) { return NULL; } sender->mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (sender->mSocket == -1) { ALOGW("%s unable to create socket", __PRETTY_FUNCTION__); return NULL; } struct sockaddr_in bind_addr; memset(&bind_addr, 0, sizeof(bind_addr)); bind_addr.sin_family = AF_INET; if (bind(sender->mSocket, reinterpret_cast(&bind_addr), sizeof(bind_addr)) < 0) { ALOGW("%s unable to bind socket (errno %d)", __PRETTY_FUNCTION__, errno); return NULL; } sender->mRetryReceiver = new RetryReceiver(sender.get()); if (sender->mRetryReceiver == NULL) { return NULL; } sender->mLooper->setName("AAH_TXSender"); sender->mLooper->registerHandler(sender->mReflector); sender->mLooper->start(false, false, PRIORITY_AUDIO); if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO) != OK) { ALOGW("%s unable to start retry thread", __PRETTY_FUNCTION__); return NULL; } sInstance = sender; } return sender; } AAH_TXSender::~AAH_TXSender() { mLooper->stop(); mLooper->unregisterHandler(mReflector->id()); if (mRetryReceiver != NULL) { mRetryReceiver->requestExit(); mRetryReceiver->mWakeupEvent.setEvent(); if (mRetryReceiver->requestExitAndWait() != OK) { ALOGW("%s shutdown of retry receiver failed", __PRETTY_FUNCTION__); } mRetryReceiver->mSender = NULL; mRetryReceiver.clear(); } if (mSocket != -1) { close(mSocket); } } // Return the next epoch number usable for a newly instantiated endpoint. uint32_t AAH_TXSender::getNextEpoch() { Mutex::Autolock autoLock(sLock); if (sNextEpochValid) { sNextEpoch = (sNextEpoch + 1) & TRTPPacket::kTRTPEpochMask; } else { sNextEpoch = ns2ms(systemTime()) & TRTPPacket::kTRTPEpochMask; sNextEpochValid = true; } return sNextEpoch; } // Notify the sender that a player has started sending to this endpoint. // Returns a program ID for use by the calling player. uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) { Mutex::Autolock lock(mEndpointLock); EndpointState* eps = mEndpointMap.valueFor(endpoint); if (eps) { eps->playerRefCount++; } else { eps = new EndpointState(getNextEpoch()); mEndpointMap.add(endpoint, eps); } // if this is the first registered endpoint, then send a message to start // trimming retry buffers and a message to start sending heartbeats. if (mEndpointMap.size() == 1) { sp trimMessage = new AMessage(kWhatTrimRetryBuffers, handlerID()); trimMessage->post(kRetryTrimIntervalUs); sp heartbeatMessage = new AMessage(kWhatSendHeartbeats, handlerID()); heartbeatMessage->post(kHeartbeatIntervalUs); } eps->nextProgramID++; return eps->nextProgramID; } // Notify the sender that a player has ceased sending to this endpoint. // An endpoint's state can not be deleted until all of the endpoint's // registered players have called unregisterEndpoint. void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) { Mutex::Autolock lock(mEndpointLock); EndpointState* eps = mEndpointMap.valueFor(endpoint); if (eps) { eps->playerRefCount--; CHECK(eps->playerRefCount >= 0); } } void AAH_TXSender::onMessageReceived(const sp& msg) { switch (msg->what()) { case kWhatSendPacket: onSendPacket(msg); break; case kWhatTrimRetryBuffers: trimRetryBuffers(); break; case kWhatSendHeartbeats: sendHeartbeats(); break; default: TRESPASS(); break; } } void AAH_TXSender::onSendPacket(const sp& msg) { sp obj; CHECK(msg->findObject(kSendPacketTRTPPacket, &obj)); sp packet = static_cast(obj.get()); uint32_t ipAddr; CHECK(msg->findInt32(kSendPacketIPAddr, reinterpret_cast(&ipAddr))); int32_t port32; CHECK(msg->findInt32(kSendPacketPort, &port32)); uint16_t port = port32; Mutex::Autolock lock(mEndpointLock); doSendPacket_l(packet, Endpoint(ipAddr, port)); mLastSentPacketTime = systemTime(); } void AAH_TXSender::doSendPacket_l(const sp& packet, const Endpoint& endpoint) { EndpointState* eps = mEndpointMap.valueFor(endpoint); if (!eps) { // the endpoint state has disappeared, so the player that sent this // packet must be dead. return; } // assign the packet's sequence number packet->setEpoch(eps->epoch); packet->setSeqNumber(eps->trtpSeqNumber++); // add the packet to the retry buffer RetryBuffer& retry = eps->retry; retry.push_back(packet); // send the packet struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = endpoint.addr; addr.sin_port = endpoint.port; ssize_t result = sendto(mSocket, packet->getPacket(), packet->getPacketLen(), 0, (const struct sockaddr *) &addr, sizeof(addr)); if (result == -1) { ALOGW("%s sendto failed", __PRETTY_FUNCTION__); } } void AAH_TXSender::trimRetryBuffers() { Mutex::Autolock lock(mEndpointLock); nsecs_t localTimeNow = systemTime(); Vector endpointsToRemove; for (size_t i = 0; i < mEndpointMap.size(); i++) { EndpointState* eps = mEndpointMap.editValueAt(i); RetryBuffer& retry = eps->retry; while (!retry.isEmpty()) { if (retry[0]->getExpireTime() < localTimeNow) { retry.pop_front(); } else { break; } } if (retry.isEmpty() && eps->playerRefCount == 0) { endpointsToRemove.add(mEndpointMap.keyAt(i)); } } // remove the state for any endpoints that are no longer in use for (size_t i = 0; i < endpointsToRemove.size(); i++) { Endpoint& e = endpointsToRemove.editItemAt(i); ALOGD("*** %s removing endpoint addr=%08x", __PRETTY_FUNCTION__, e.addr); size_t index = mEndpointMap.indexOfKey(e); delete mEndpointMap.valueAt(index); mEndpointMap.removeItemsAt(index); } // schedule the next trim if (mEndpointMap.size()) { sp trimMessage = new AMessage(kWhatTrimRetryBuffers, handlerID()); trimMessage->post(kRetryTrimIntervalUs); } } void AAH_TXSender::sendHeartbeats() { Mutex::Autolock lock(mEndpointLock); if (shouldSendHeartbeats_l()) { for (size_t i = 0; i < mEndpointMap.size(); i++) { EndpointState* eps = mEndpointMap.editValueAt(i); const Endpoint& ep = mEndpointMap.keyAt(i); sp packet = new TRTPControlPacket(); packet->setCommandID(TRTPControlPacket::kCommandNop); packet->setExpireTime(systemTime() + AAH_TXPlayer::kAAHRetryKeepAroundTimeNs); packet->pack(); doSendPacket_l(packet, ep); } } // schedule the next heartbeat if (mEndpointMap.size()) { sp heartbeatMessage = new AMessage(kWhatSendHeartbeats, handlerID()); heartbeatMessage->post(kHeartbeatIntervalUs); } } bool AAH_TXSender::shouldSendHeartbeats_l() { // assert(holding endpoint lock) return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout)); } // Receiver // initial 4-byte ID of a retry request packet const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq'; // initial 4-byte ID of a retry NAK packet const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak'; // initial 4-byte ID of a fast start request packet const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst'; AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender) : Thread(false), mSender(sender) {} AAH_TXSender::RetryReceiver::~RetryReceiver() { mWakeupEvent.clearPendingEvents(); } // Returns true if val is within the interval bounded inclusively by // start and end. Also handles the case where there is a rollover of the // range between start and end. template static inline bool withinIntervalWithRollover(T val, T start, T end) { return ((start <= end && val >= start && val <= end) || (start > end && (val >= start || val <= end))); } bool AAH_TXSender::RetryReceiver::threadLoop() { struct pollfd pollFds[2]; pollFds[0].fd = mSender->mSocket; pollFds[0].events = POLLIN; pollFds[0].revents = 0; pollFds[1].fd = mWakeupEvent.getWakeupHandle(); pollFds[1].events = POLLIN; pollFds[1].revents = 0; int pollResult = poll(pollFds, NELEM(pollFds), -1); if (pollResult == -1) { ALOGE("%s poll failed", __PRETTY_FUNCTION__); return false; } if (exitPending()) { ALOGI("*** %s exiting", __PRETTY_FUNCTION__); return false; } if (pollFds[0].revents) { handleRetryRequest(); } return true; } void AAH_TXSender::RetryReceiver::handleRetryRequest() { ALOGV("*** RX %s start", __PRETTY_FUNCTION__); RetryPacket request; struct sockaddr requestSrcAddr; socklen_t requestSrcAddrLen = sizeof(requestSrcAddr); ssize_t result = recvfrom(mSender->mSocket, &request, sizeof(request), 0, &requestSrcAddr, &requestSrcAddrLen); if (result == -1) { ALOGE("%s recvfrom failed, errno=%d", __PRETTY_FUNCTION__, errno); return; } if (static_cast(result) < sizeof(RetryPacket)) { ALOGW("%s short packet received", __PRETTY_FUNCTION__); return; } uint32_t host_request_id = ntohl(request.id); if ((host_request_id != kRetryRequestID) && (host_request_id != kFastStartRequestID)) { ALOGW("%s received retry request with bogus ID (%08x)", __PRETTY_FUNCTION__, host_request_id); return; } Endpoint endpoint(request.endpointIP, request.endpointPort); Mutex::Autolock lock(mSender->mEndpointLock); EndpointState* eps = mSender->mEndpointMap.valueFor(endpoint); if (eps == NULL || eps->retry.isEmpty()) { // we have no retry buffer or an empty retry buffer for this endpoint, // so NAK the entire request RetryPacket nak = request; nak.id = htonl(kRetryNakID); result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, &requestSrcAddr, requestSrcAddrLen); if (result == -1) { ALOGW("%s sendto failed", __PRETTY_FUNCTION__); } return; } RetryBuffer& retry = eps->retry; uint16_t startSeq = ntohs(request.seqStart); uint16_t endSeq = ntohs(request.seqEnd); uint16_t retryFirstSeq = retry[0]->getSeqNumber(); uint16_t retryLastSeq = retry[retry.size() - 1]->getSeqNumber(); // If this is a fast start, then force the start of the retry to match the // start of the retransmit ring buffer (unless the end of the retransmit // ring buffer is already past the point of fast start) if ((host_request_id == kFastStartRequestID) && !((startSeq - retryFirstSeq) & 0x8000)) { startSeq = retryFirstSeq; } int startIndex; if (withinIntervalWithRollover(startSeq, retryFirstSeq, retryLastSeq)) { startIndex = static_cast(startSeq - retryFirstSeq); } else { startIndex = -1; } int endIndex; if (withinIntervalWithRollover(endSeq, retryFirstSeq, retryLastSeq)) { endIndex = static_cast(endSeq - retryFirstSeq); } else { endIndex = -1; } if (startIndex == -1 && endIndex == -1) { // no part of the request range is found in the retry buffer RetryPacket nak = request; nak.id = htonl(kRetryNakID); result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, &requestSrcAddr, requestSrcAddrLen); if (result == -1) { ALOGW("%s sendto failed", __PRETTY_FUNCTION__); } return; } if (startIndex == -1) { // NAK a subrange at the front of the request range RetryPacket nak = request; nak.id = htonl(kRetryNakID); nak.seqEnd = htons(retryFirstSeq - 1); result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, &requestSrcAddr, requestSrcAddrLen); if (result == -1) { ALOGW("%s sendto failed", __PRETTY_FUNCTION__); } startIndex = 0; } else if (endIndex == -1) { // NAK a subrange at the back of the request range RetryPacket nak = request; nak.id = htonl(kRetryNakID); nak.seqStart = htons(retryLastSeq + 1); result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, &requestSrcAddr, requestSrcAddrLen); if (result == -1) { ALOGW("%s sendto failed", __PRETTY_FUNCTION__); } endIndex = retry.size() - 1; } // send the retry packets for (int i = startIndex; i <= endIndex; i++) { const sp& replyPacket = retry[i]; result = sendto(mSender->mSocket, replyPacket->getPacket(), replyPacket->getPacketLen(), 0, &requestSrcAddr, requestSrcAddrLen); if (result == -1) { ALOGW("%s sendto failed", __PRETTY_FUNCTION__); } } } // Endpoint AAH_TXSender::Endpoint::Endpoint() : addr(0) , port(0) { } AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p) : addr(a) , port(p) {} bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const { return ((addr < other.addr) || (addr == other.addr && port < other.port)); } // EndpointState AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch) : retry(kRetryBufferCapacity) , playerRefCount(1) , trtpSeqNumber(0) , nextProgramID(0) , epoch(_epoch) { } // CircularBuffer template CircularBuffer::CircularBuffer(size_t capacity) : mCapacity(capacity) , mHead(0) , mTail(0) , mFillCount(0) { mBuffer = new T[capacity]; } template CircularBuffer::~CircularBuffer() { delete [] mBuffer; } template void CircularBuffer::push_back(const T& item) { if (this->isFull()) { this->pop_front(); } mBuffer[mHead] = item; mHead = (mHead + 1) % mCapacity; mFillCount++; } template void CircularBuffer::pop_front() { CHECK(!isEmpty()); mBuffer[mTail] = T(); mTail = (mTail + 1) % mCapacity; mFillCount--; } template size_t CircularBuffer::size() const { return mFillCount; } template bool CircularBuffer::isFull() const { return (mFillCount == mCapacity); } template bool CircularBuffer::isEmpty() const { return (mFillCount == 0); } template const T& CircularBuffer::itemAt(size_t index) const { CHECK(index < mFillCount); return mBuffer[(mTail + index) % mCapacity]; } template const T& CircularBuffer::operator[](size_t index) const { return itemAt(index); } } // namespace android