summaryrefslogtreecommitdiffstats
path: root/media/libaah_rtp/aah_tx_sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'media/libaah_rtp/aah_tx_sender.cpp')
-rw-r--r--media/libaah_rtp/aah_tx_sender.cpp603
1 files changed, 0 insertions, 603 deletions
diff --git a/media/libaah_rtp/aah_tx_sender.cpp b/media/libaah_rtp/aah_tx_sender.cpp
deleted file mode 100644
index 08e32d2..0000000
--- a/media/libaah_rtp/aah_tx_sender.cpp
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Copyright (C) 2011 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define LOG_TAG "LibAAH_RTP"
-#include <media/stagefright/foundation/ADebug.h>
-
-#include <netinet/in.h>
-#include <poll.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-
-#include <media/stagefright/foundation/AMessage.h>
-#include <utils/misc.h>
-
-#include "aah_tx_player.h"
-#include "aah_tx_sender.h"
-
-namespace android {
-
-const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr";
-const char* AAH_TXSender::kSendPacketPort = "port";
-const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp";
-
-const int AAH_TXSender::kRetryTrimIntervalUs = 100000;
-const int AAH_TXSender::kHeartbeatIntervalUs = 1000000;
-const int AAH_TXSender::kRetryBufferCapacity = 100;
-const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull;
-
-Mutex AAH_TXSender::sLock;
-wp<AAH_TXSender> AAH_TXSender::sInstance;
-uint32_t AAH_TXSender::sNextEpoch;
-bool AAH_TXSender::sNextEpochValid = false;
-
-AAH_TXSender::AAH_TXSender() : mSocket(-1) {
- mLastSentPacketTime = systemTime();
-}
-
-sp<AAH_TXSender> AAH_TXSender::GetInstance() {
- Mutex::Autolock autoLock(sLock);
-
- sp<AAH_TXSender> sender = sInstance.promote();
-
- if (sender == NULL) {
- sender = new AAH_TXSender();
- if (sender == NULL) {
- return NULL;
- }
-
- sender->mLooper = new ALooper();
- if (sender->mLooper == NULL) {
- return NULL;
- }
-
- sender->mReflector = new AHandlerReflector<AAH_TXSender>(sender.get());
- if (sender->mReflector == NULL) {
- return NULL;
- }
-
- sender->mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- if (sender->mSocket == -1) {
- ALOGW("%s unable to create socket", __PRETTY_FUNCTION__);
- return NULL;
- }
-
- struct sockaddr_in bind_addr;
- memset(&bind_addr, 0, sizeof(bind_addr));
- bind_addr.sin_family = AF_INET;
- if (bind(sender->mSocket,
- reinterpret_cast<const sockaddr*>(&bind_addr),
- sizeof(bind_addr)) < 0) {
- ALOGW("%s unable to bind socket (errno %d)",
- __PRETTY_FUNCTION__, errno);
- return NULL;
- }
-
- sender->mRetryReceiver = new RetryReceiver(sender.get());
- if (sender->mRetryReceiver == NULL) {
- return NULL;
- }
-
- sender->mLooper->setName("AAH_TXSender");
- sender->mLooper->registerHandler(sender->mReflector);
- sender->mLooper->start(false, false, PRIORITY_AUDIO);
-
- if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO)
- != OK) {
- ALOGW("%s unable to start retry thread", __PRETTY_FUNCTION__);
- return NULL;
- }
-
- sInstance = sender;
- }
-
- return sender;
-}
-
-AAH_TXSender::~AAH_TXSender() {
- mLooper->stop();
- mLooper->unregisterHandler(mReflector->id());
-
- if (mRetryReceiver != NULL) {
- mRetryReceiver->requestExit();
- mRetryReceiver->mWakeupEvent.setEvent();
- if (mRetryReceiver->requestExitAndWait() != OK) {
- ALOGW("%s shutdown of retry receiver failed", __PRETTY_FUNCTION__);
- }
- mRetryReceiver->mSender = NULL;
- mRetryReceiver.clear();
- }
-
- if (mSocket != -1) {
- close(mSocket);
- }
-}
-
-// Return the next epoch number usable for a newly instantiated endpoint.
-uint32_t AAH_TXSender::getNextEpoch() {
- Mutex::Autolock autoLock(sLock);
-
- if (sNextEpochValid) {
- sNextEpoch = (sNextEpoch + 1) & TRTPPacket::kTRTPEpochMask;
- } else {
- sNextEpoch = ns2ms(systemTime()) & TRTPPacket::kTRTPEpochMask;
- sNextEpochValid = true;
- }
-
- return sNextEpoch;
-}
-
-// Notify the sender that a player has started sending to this endpoint.
-// Returns a program ID for use by the calling player.
-uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) {
- Mutex::Autolock lock(mEndpointLock);
-
- EndpointState* eps = mEndpointMap.valueFor(endpoint);
- if (eps) {
- eps->playerRefCount++;
- } else {
- eps = new EndpointState(getNextEpoch());
- mEndpointMap.add(endpoint, eps);
- }
-
- // if this is the first registered endpoint, then send a message to start
- // trimming retry buffers and a message to start sending heartbeats.
- if (mEndpointMap.size() == 1) {
- sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
- handlerID());
- trimMessage->post(kRetryTrimIntervalUs);
-
- sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
- handlerID());
- heartbeatMessage->post(kHeartbeatIntervalUs);
- }
-
- eps->nextProgramID++;
- return eps->nextProgramID;
-}
-
-// Notify the sender that a player has ceased sending to this endpoint.
-// An endpoint's state can not be deleted until all of the endpoint's
-// registered players have called unregisterEndpoint.
-void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
- Mutex::Autolock lock(mEndpointLock);
-
- EndpointState* eps = mEndpointMap.valueFor(endpoint);
- if (eps) {
- eps->playerRefCount--;
- CHECK(eps->playerRefCount >= 0);
- }
-}
-
-void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
- switch (msg->what()) {
- case kWhatSendPacket:
- onSendPacket(msg);
- break;
-
- case kWhatTrimRetryBuffers:
- trimRetryBuffers();
- break;
-
- case kWhatSendHeartbeats:
- sendHeartbeats();
- break;
-
- default:
- TRESPASS();
- break;
- }
-}
-
-void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
- sp<RefBase> obj;
- CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
- sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());
-
- uint32_t ipAddr;
- CHECK(msg->findInt32(kSendPacketIPAddr,
- reinterpret_cast<int32_t*>(&ipAddr)));
-
- int32_t port32;
- CHECK(msg->findInt32(kSendPacketPort, &port32));
- uint16_t port = port32;
-
- Mutex::Autolock lock(mEndpointLock);
- doSendPacket_l(packet, Endpoint(ipAddr, port));
- mLastSentPacketTime = systemTime();
-}
-
-void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
- const Endpoint& endpoint) {
- EndpointState* eps = mEndpointMap.valueFor(endpoint);
- if (!eps) {
- // the endpoint state has disappeared, so the player that sent this
- // packet must be dead.
- return;
- }
-
- // assign the packet's sequence number
- packet->setEpoch(eps->epoch);
- packet->setSeqNumber(eps->trtpSeqNumber++);
-
- // add the packet to the retry buffer
- RetryBuffer& retry = eps->retry;
- retry.push_back(packet);
-
- // send the packet
- struct sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = endpoint.addr;
- addr.sin_port = endpoint.port;
-
- ssize_t result = sendto(mSocket,
- packet->getPacket(),
- packet->getPacketLen(),
- 0,
- (const struct sockaddr *) &addr,
- sizeof(addr));
- if (result == -1) {
- ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
- }
-}
-
-void AAH_TXSender::trimRetryBuffers() {
- Mutex::Autolock lock(mEndpointLock);
-
- nsecs_t localTimeNow = systemTime();
-
- Vector<Endpoint> endpointsToRemove;
-
- for (size_t i = 0; i < mEndpointMap.size(); i++) {
- EndpointState* eps = mEndpointMap.editValueAt(i);
- RetryBuffer& retry = eps->retry;
-
- while (!retry.isEmpty()) {
- if (retry[0]->getExpireTime() < localTimeNow) {
- retry.pop_front();
- } else {
- break;
- }
- }
-
- if (retry.isEmpty() && eps->playerRefCount == 0) {
- endpointsToRemove.add(mEndpointMap.keyAt(i));
- }
- }
-
- // remove the state for any endpoints that are no longer in use
- for (size_t i = 0; i < endpointsToRemove.size(); i++) {
- Endpoint& e = endpointsToRemove.editItemAt(i);
- ALOGD("*** %s removing endpoint addr=%08x",
- __PRETTY_FUNCTION__, e.addr);
- size_t index = mEndpointMap.indexOfKey(e);
- delete mEndpointMap.valueAt(index);
- mEndpointMap.removeItemsAt(index);
- }
-
- // schedule the next trim
- if (mEndpointMap.size()) {
- sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
- handlerID());
- trimMessage->post(kRetryTrimIntervalUs);
- }
-}
-
-void AAH_TXSender::sendHeartbeats() {
- Mutex::Autolock lock(mEndpointLock);
-
- if (shouldSendHeartbeats_l()) {
- for (size_t i = 0; i < mEndpointMap.size(); i++) {
- EndpointState* eps = mEndpointMap.editValueAt(i);
- const Endpoint& ep = mEndpointMap.keyAt(i);
-
- sp<TRTPControlPacket> packet = new TRTPControlPacket();
- packet->setCommandID(TRTPControlPacket::kCommandNop);
-
- packet->setExpireTime(systemTime() +
- AAH_TXPlayer::kAAHRetryKeepAroundTimeNs);
- packet->pack();
-
- doSendPacket_l(packet, ep);
- }
- }
-
- // schedule the next heartbeat
- if (mEndpointMap.size()) {
- sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
- handlerID());
- heartbeatMessage->post(kHeartbeatIntervalUs);
- }
-}
-
-bool AAH_TXSender::shouldSendHeartbeats_l() {
- // assert(holding endpoint lock)
- return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout));
-}
-
-// Receiver
-
-// initial 4-byte ID of a retry request packet
-const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq';
-
-// initial 4-byte ID of a retry NAK packet
-const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak';
-
-// initial 4-byte ID of a fast start request packet
-const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst';
-
-AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender)
- : Thread(false),
- mSender(sender) {}
-
- AAH_TXSender::RetryReceiver::~RetryReceiver() {
- mWakeupEvent.clearPendingEvents();
- }
-
-// Returns true if val is within the interval bounded inclusively by
-// start and end. Also handles the case where there is a rollover of the
-// range between start and end.
-template <typename T>
-static inline bool withinIntervalWithRollover(T val, T start, T end) {
- return ((start <= end && val >= start && val <= end) ||
- (start > end && (val >= start || val <= end)));
-}
-
-bool AAH_TXSender::RetryReceiver::threadLoop() {
- struct pollfd pollFds[2];
- pollFds[0].fd = mSender->mSocket;
- pollFds[0].events = POLLIN;
- pollFds[0].revents = 0;
- pollFds[1].fd = mWakeupEvent.getWakeupHandle();
- pollFds[1].events = POLLIN;
- pollFds[1].revents = 0;
-
- int pollResult = poll(pollFds, NELEM(pollFds), -1);
- if (pollResult == -1) {
- ALOGE("%s poll failed", __PRETTY_FUNCTION__);
- return false;
- }
-
- if (exitPending()) {
- ALOGI("*** %s exiting", __PRETTY_FUNCTION__);
- return false;
- }
-
- if (pollFds[0].revents) {
- handleRetryRequest();
- }
-
- return true;
-}
-
-void AAH_TXSender::RetryReceiver::handleRetryRequest() {
- ALOGV("*** RX %s start", __PRETTY_FUNCTION__);
-
- RetryPacket request;
- struct sockaddr requestSrcAddr;
- socklen_t requestSrcAddrLen = sizeof(requestSrcAddr);
-
- ssize_t result = recvfrom(mSender->mSocket, &request, sizeof(request), 0,
- &requestSrcAddr, &requestSrcAddrLen);
- if (result == -1) {
- ALOGE("%s recvfrom failed, errno=%d", __PRETTY_FUNCTION__, errno);
- return;
- }
-
- if (static_cast<size_t>(result) < sizeof(RetryPacket)) {
- ALOGW("%s short packet received", __PRETTY_FUNCTION__);
- return;
- }
-
- uint32_t host_request_id = ntohl(request.id);
- if ((host_request_id != kRetryRequestID) &&
- (host_request_id != kFastStartRequestID)) {
- ALOGW("%s received retry request with bogus ID (%08x)",
- __PRETTY_FUNCTION__, host_request_id);
- return;
- }
-
- Endpoint endpoint(request.endpointIP, request.endpointPort);
-
- Mutex::Autolock lock(mSender->mEndpointLock);
-
- EndpointState* eps = mSender->mEndpointMap.valueFor(endpoint);
-
- if (eps == NULL || eps->retry.isEmpty()) {
- // we have no retry buffer or an empty retry buffer for this endpoint,
- // so NAK the entire request
- RetryPacket nak = request;
- nak.id = htonl(kRetryNakID);
- result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
- &requestSrcAddr, requestSrcAddrLen);
- if (result == -1) {
- ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
- }
- return;
- }
-
- RetryBuffer& retry = eps->retry;
-
- uint16_t startSeq = ntohs(request.seqStart);
- uint16_t endSeq = ntohs(request.seqEnd);
-
- uint16_t retryFirstSeq = retry[0]->getSeqNumber();
- uint16_t retryLastSeq = retry[retry.size() - 1]->getSeqNumber();
-
- // If this is a fast start, then force the start of the retry to match the
- // start of the retransmit ring buffer (unless the end of the retransmit
- // ring buffer is already past the point of fast start)
- if ((host_request_id == kFastStartRequestID) &&
- !((startSeq - retryFirstSeq) & 0x8000)) {
- startSeq = retryFirstSeq;
- }
-
- int startIndex;
- if (withinIntervalWithRollover(startSeq, retryFirstSeq, retryLastSeq)) {
- startIndex = static_cast<uint16_t>(startSeq - retryFirstSeq);
- } else {
- startIndex = -1;
- }
-
- int endIndex;
- if (withinIntervalWithRollover(endSeq, retryFirstSeq, retryLastSeq)) {
- endIndex = static_cast<uint16_t>(endSeq - retryFirstSeq);
- } else {
- endIndex = -1;
- }
-
- if (startIndex == -1 && endIndex == -1) {
- // no part of the request range is found in the retry buffer
- RetryPacket nak = request;
- nak.id = htonl(kRetryNakID);
- result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
- &requestSrcAddr, requestSrcAddrLen);
- if (result == -1) {
- ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
- }
- return;
- }
-
- if (startIndex == -1) {
- // NAK a subrange at the front of the request range
- RetryPacket nak = request;
- nak.id = htonl(kRetryNakID);
- nak.seqEnd = htons(retryFirstSeq - 1);
- result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
- &requestSrcAddr, requestSrcAddrLen);
- if (result == -1) {
- ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
- }
-
- startIndex = 0;
- } else if (endIndex == -1) {
- // NAK a subrange at the back of the request range
- RetryPacket nak = request;
- nak.id = htonl(kRetryNakID);
- nak.seqStart = htons(retryLastSeq + 1);
- result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
- &requestSrcAddr, requestSrcAddrLen);
- if (result == -1) {
- ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
- }
-
- endIndex = retry.size() - 1;
- }
-
- // send the retry packets
- for (int i = startIndex; i <= endIndex; i++) {
- const sp<TRTPPacket>& replyPacket = retry[i];
-
- result = sendto(mSender->mSocket,
- replyPacket->getPacket(),
- replyPacket->getPacketLen(),
- 0,
- &requestSrcAddr,
- requestSrcAddrLen);
-
- if (result == -1) {
- ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
- }
- }
-}
-
-// Endpoint
-
-AAH_TXSender::Endpoint::Endpoint()
- : addr(0)
- , port(0) { }
-
-AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p)
- : addr(a)
- , port(p) {}
-
-bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const {
- return ((addr < other.addr) ||
- (addr == other.addr && port < other.port));
-}
-
-// EndpointState
-
-AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch)
- : retry(kRetryBufferCapacity)
- , playerRefCount(1)
- , trtpSeqNumber(0)
- , nextProgramID(0)
- , epoch(_epoch) { }
-
-// CircularBuffer
-
-template <typename T>
-CircularBuffer<T>::CircularBuffer(size_t capacity)
- : mCapacity(capacity)
- , mHead(0)
- , mTail(0)
- , mFillCount(0) {
- mBuffer = new T[capacity];
-}
-
-template <typename T>
-CircularBuffer<T>::~CircularBuffer() {
- delete [] mBuffer;
-}
-
-template <typename T>
-void CircularBuffer<T>::push_back(const T& item) {
- if (this->isFull()) {
- this->pop_front();
- }
- mBuffer[mHead] = item;
- mHead = (mHead + 1) % mCapacity;
- mFillCount++;
-}
-
-template <typename T>
-void CircularBuffer<T>::pop_front() {
- CHECK(!isEmpty());
- mBuffer[mTail] = T();
- mTail = (mTail + 1) % mCapacity;
- mFillCount--;
-}
-
-template <typename T>
-size_t CircularBuffer<T>::size() const {
- return mFillCount;
-}
-
-template <typename T>
-bool CircularBuffer<T>::isFull() const {
- return (mFillCount == mCapacity);
-}
-
-template <typename T>
-bool CircularBuffer<T>::isEmpty() const {
- return (mFillCount == 0);
-}
-
-template <typename T>
-const T& CircularBuffer<T>::itemAt(size_t index) const {
- CHECK(index < mFillCount);
- return mBuffer[(mTail + index) % mCapacity];
-}
-
-template <typename T>
-const T& CircularBuffer<T>::operator[](size_t index) const {
- return itemAt(index);
-}
-
-} // namespace android