path: root/media
diff options
authorJohn Grossman <>2012-02-09 15:09:05 -0800
committerJohn Grossman <>2012-02-16 13:45:12 -0800
commitd72031cee315438c4728fd3f0cce1971a9f00717 (patch)
tree1b7e0fb053ae129e69040f09066b3db1812984d6 /media
parent4fbe95ede28e9de22404fc38645667fd3a401b14 (diff)
Upintegreate AAH TX and RX players from ICS_AAH
Upintegrate the android at home TX and RX players developed in the ICS_AAH branch. Change-Id: I8247d3702e30d8b0e215b31a92675d8ab28dccbb Signed-off-by: John Grossman <>
Diffstat (limited to 'media')
18 files changed, 5718 insertions, 4 deletions
diff --git a/media/libaah_rtp/ b/media/libaah_rtp/
new file mode 100644
index 0000000..54fd9ec
--- /dev/null
+++ b/media/libaah_rtp/
@@ -0,0 +1,40 @@
+LOCAL_PATH:= $(call my-dir)
+# libaah_rtp
+include $(CLEAR_VARS)
+LOCAL_MODULE := libaah_rtp
+LOCAL_MODULE_TAGS := optional
+ aah_decoder_pump.cpp \
+ aah_rx_player.cpp \
+ aah_rx_player_core.cpp \
+ aah_rx_player_ring_buffer.cpp \
+ aah_rx_player_substream.cpp \
+ aah_tx_packet.cpp \
+ aah_tx_player.cpp \
+ aah_tx_sender.cpp \
+ pipe_event.cpp
+ frameworks/base/include \
+ frameworks/base/include/media/stagefright/openmax \
+ frameworks/base/media \
+ frameworks/base/media/libstagefright
+ libcommon_time_client \
+ libbinder \
+ libmedia \
+ libstagefright \
+ libstagefright_foundation \
+ libutils
+ -lpthread
diff --git a/media/libaah_rtp/aah_decoder_pump.cpp b/media/libaah_rtp/aah_decoder_pump.cpp
new file mode 100644
index 0000000..72fe43b
--- /dev/null
+++ b/media/libaah_rtp/aah_decoder_pump.cpp
@@ -0,0 +1,520 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+//#define LOG_NDEBUG 0
+#include <utils/Log.h>
+#include <poll.h>
+#include <pthread.h>
+#include <common_time/cc_helper.h>
+#include <media/AudioSystem.h>
+#include <media/AudioTrack.h>
+#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/MetaData.h>
+#include <media/stagefright/OMXClient.h>
+#include <media/stagefright/OMXCodec.h>
+#include <media/stagefright/Utils.h>
+#include <utils/Timers.h>
+#include <utils/threads.h>
+#include "aah_decoder_pump.h"
+namespace android {
+static const long long kLongDecodeErrorThreshold = 1000000ll;
+static const uint32_t kMaxLongErrorsBeforeFatal = 3;
+static const uint32_t kMaxErrorsBeforeFatal = 60;
+AAH_DecoderPump::AAH_DecoderPump(OMXClient& omx)
+ : omx_(omx)
+ , thread_status_(OK)
+ , renderer_(NULL)
+ , last_queued_pts_valid_(false)
+ , last_queued_pts_(0)
+ , last_ts_transform_valid_(false)
+ , last_volume_(0xFF) {
+ thread_ = new ThreadWrapper(this);
+AAH_DecoderPump::~AAH_DecoderPump() {
+ shutdown();
+status_t AAH_DecoderPump::initCheck() {
+ if (thread_ == NULL) {
+ ALOGE("Failed to allocate thread");
+ return NO_MEMORY;
+ }
+ return OK;
+status_t AAH_DecoderPump::queueForDecode(MediaBuffer* buf) {
+ if (NULL == buf) {
+ return BAD_VALUE;
+ }
+ if (OK != thread_status_) {
+ return thread_status_;
+ }
+ { // Explicit scope for AutoMutex pattern.
+ AutoMutex lock(&thread_lock_);
+ in_queue_.push_back(buf);
+ }
+ thread_cond_.signal();
+ return OK;
+void AAH_DecoderPump::queueToRenderer(MediaBuffer* decoded_sample) {
+ Mutex::Autolock lock(&render_lock_);
+ sp<MetaData> meta;
+ int64_t ts;
+ status_t res;
+ // Fetch the metadata and make sure the sample has a timestamp. We
+ // cannot render samples which are missing PTSs.
+ meta = decoded_sample->meta_data();
+ if ((meta == NULL) || (!meta->findInt64(kKeyTime, &ts))) {
+ ALOGV("Decoded sample missing timestamp, cannot render.");
+ CHECK(false);
+ } else {
+ // If we currently are not holding on to a renderer, go ahead and
+ // make one now.
+ if (NULL == renderer_) {
+ renderer_ = new TimedAudioTrack();
+ if (NULL != renderer_) {
+ int frameCount;
+ AudioTrack::getMinFrameCount(&frameCount,
+ static_cast<int>(format_sample_rate_));
+ int ch_format = (format_channels_ == 1)
+ res = renderer_->set(AUDIO_STREAM_DEFAULT,
+ format_sample_rate_,
+ ch_format,
+ frameCount);
+ if (res != OK) {
+ ALOGE("Failed to setup audio renderer. (res = %d)", res);
+ delete renderer_;
+ renderer_ = NULL;
+ } else {
+ CHECK(last_ts_transform_valid_);
+ res = renderer_->setMediaTimeTransform(
+ last_ts_transform_, TimedAudioTrack::COMMON_TIME);
+ if (res != NO_ERROR) {
+ ALOGE("Failed to set media time transform on AudioTrack"
+ " (res = %d)", res);
+ delete renderer_;
+ renderer_ = NULL;
+ } else {
+ float volume = static_cast<float>(last_volume_)
+ / 255.0f;
+ if (renderer_->setVolume(volume, volume) != OK) {
+ ALOGW("%s: setVolume failed", __FUNCTION__);
+ }
+ renderer_->start();
+ }
+ }
+ } else {
+ ALOGE("Failed to allocate AudioTrack to use as a renderer.");
+ }
+ }
+ if (NULL != renderer_) {
+ uint8_t* decoded_data =
+ reinterpret_cast<uint8_t*>(decoded_sample->data());
+ uint32_t decoded_amt = decoded_sample->range_length();
+ decoded_data += decoded_sample->range_offset();
+ sp<IMemory> pcm_payload;
+ res = renderer_->allocateTimedBuffer(decoded_amt, &pcm_payload);
+ if (res != OK) {
+ ALOGE("Failed to allocate %d byte audio track buffer."
+ " (res = %d)", decoded_amt, res);
+ } else {
+ memcpy(pcm_payload->pointer(), decoded_data, decoded_amt);
+ res = renderer_->queueTimedBuffer(pcm_payload, ts);
+ if (res != OK) {
+ ALOGE("Failed to queue %d byte audio track buffer with media"
+ " PTS %lld. (res = %d)", decoded_amt, ts, res);
+ } else {
+ last_queued_pts_valid_ = true;
+ last_queued_pts_ = ts;
+ }
+ }
+ } else {
+ ALOGE("No renderer, dropping audio payload.");
+ }
+ }
+void AAH_DecoderPump::stopAndCleanupRenderer() {
+ if (NULL == renderer_) {
+ return;
+ }
+ renderer_->stop();
+ delete renderer_;
+ renderer_ = NULL;
+void AAH_DecoderPump::setRenderTSTransform(const LinearTransform& trans) {
+ Mutex::Autolock lock(&render_lock_);
+ if (last_ts_transform_valid_ && !memcmp(&trans,
+ &last_ts_transform_,
+ sizeof(trans))) {
+ return;
+ }
+ last_ts_transform_ = trans;
+ last_ts_transform_valid_ = true;
+ if (NULL != renderer_) {
+ status_t res = renderer_->setMediaTimeTransform(
+ last_ts_transform_, TimedAudioTrack::COMMON_TIME);
+ if (res != NO_ERROR) {
+ ALOGE("Failed to set media time transform on AudioTrack"
+ " (res = %d)", res);
+ }
+ }
+void AAH_DecoderPump::setRenderVolume(uint8_t volume) {
+ Mutex::Autolock lock(&render_lock_);
+ if (volume == last_volume_) {
+ return;
+ }
+ last_volume_ = volume;
+ if (renderer_ != NULL) {
+ float volume = static_cast<float>(last_volume_) / 255.0f;
+ if (renderer_->setVolume(volume, volume) != OK) {
+ ALOGW("%s: setVolume failed", __FUNCTION__);
+ }
+ }
+// isAboutToUnderflow is something of a hack used to figure out when it might be
+// time to give up on trying to fill in a gap in the RTP sequence and simply
+// move on with a discontinuity. If we had perfect knowledge of when we were
+// going to underflow, it would not be a hack, but unfortunately we do not.
+// Right now, we just take the PTS of the last sample queued, and check to see
+// if its presentation time is within kAboutToUnderflowThreshold from now. If
+// it is, then we say that we are about to underflow. This decision is based on
+// two (possibly invalid) assumptions.
+// 1) The transmitter is leading the clock by more than
+// kAboutToUnderflowThreshold.
+// 2) The delta between the PTS of the last sample queued and the next sample
+// is less than the transmitter's clock lead amount.
+// Right now, the default transmitter lead time is 1 second, which is a pretty
+// large number and greater than the 50mSec that kAboutToUnderflowThreshold is
+// currently set to. This should satisfy assumption #1 for now, but changes to
+// the transmitter clock lead time could effect this.
+// For non-sparse streams with a homogeneous sample rate (the vast majority of
+// streams in the world), the delta between any two adjacent PTSs will always be
+// the homogeneous sample period. It is very uncommon to see a sample period
+// greater than the 1 second clock lead we are currently using, and you
+// certainly will not see it in an MP3 file which should satisfy assumption #2.
+// Sparse audio streams (where no audio is transmitted for long periods of
+// silence) and extremely low framerate video stream (like an MPEG-2 slideshow
+// or the video stream for a pay TV audio channel) are examples of streams which
+// might violate assumption #2.
+bool AAH_DecoderPump::isAboutToUnderflow(int64_t threshold) {
+ Mutex::Autolock lock(&render_lock_);
+ // If we have never queued anything to the decoder, we really don't know if
+ // we are going to underflow or not.
+ if (!last_queued_pts_valid_ || !last_ts_transform_valid_) {
+ return false;
+ }
+ // Don't have access to Common Time? If so, then things are Very Bad
+ // elsewhere in the system; it pretty much does not matter what we do here.
+ // Since we cannot really tell if we are about to underflow or not, its
+ // probably best to assume that we are not and proceed accordingly.
+ int64_t tt_now;
+ if (OK != cc_helper_.getCommonTime(&tt_now)) {
+ return false;
+ }
+ // Transform from media time to common time.
+ int64_t last_queued_pts_tt;
+ if (!last_ts_transform_.doForwardTransform(last_queued_pts_,
+ &last_queued_pts_tt)) {
+ return false;
+ }
+ // Check to see if we are underflowing.
+ return ((tt_now + threshold - last_queued_pts_tt) > 0);
+void* AAH_DecoderPump::workThread() {
+ // No need to lock when accessing decoder_ from the thread. The
+ // implementation of init and shutdown ensure that other threads never touch
+ // decoder_ while the work thread is running.
+ CHECK(decoder_ != NULL);
+ CHECK(format_ != NULL);
+ // Start the decoder and note its result code. If something goes horribly
+ // wrong, callers of queueForDecode and getOutput will be able to detect
+ // that the thread encountered a fatal error and shut down by examining
+ // thread_status_.
+ thread_status_ = decoder_->start(format_.get());
+ if (OK != thread_status_) {
+ ALOGE("AAH_DecoderPump's work thread failed to start decoder (res = %d)",
+ thread_status_);
+ return NULL;
+ }
+ DurationTimer decode_timer;
+ uint32_t consecutive_long_errors = 0;
+ uint32_t consecutive_errors = 0;
+ while (!thread_->exitPending()) {
+ status_t res;
+ MediaBuffer* bufOut = NULL;
+ decode_timer.start();
+ res = decoder_->read(&bufOut);
+ decode_timer.stop();
+ if (res == INFO_FORMAT_CHANGED) {
+ // Format has changed. Destroy our current renderer so that a new
+ // one can be created during queueToRenderer with the proper format.
+ //
+ // TODO : In order to transition seamlessly, we should change this
+ // to put the old renderer in a queue to play out completely before
+ // we destroy it. We can still create a new renderer, the timed
+ // nature of the renderer should ensure a seamless splice.
+ stopAndCleanupRenderer();
+ res = OK;
+ }
+ // Try to be a little nuanced in our handling of actual decode errors.
+ // Errors could happen because of minor stream corruption or because of
+ // transient resource limitations. In these cases, we would rather drop
+ // a little bit of output and ride out the unpleasantness then throw up
+ // our hands and abort everything.
+ //
+ // OTOH - When things are really bad (like we have a non-transient
+ // resource or bookkeeping issue, or the stream being fed to us is just
+ // complete and total garbage) we really want to terminate playback and
+ // raise an error condition all the way up to the application level so
+ // they can deal with it.
+ //
+ // Unfortunately, the error codes returned by the decoder can be a
+ // little non-specific. For example, if an OMXCodec times out
+ // attempting to obtain an output buffer, the error we get back is a
+ // generic -1. Try to distinguish between this resource timeout error
+ // and ES corruption error by timing how long the decode operation
+ // takes. Maintain accounting for both errors and "long errors". If we
+ // get more than a certain number consecutive errors of either type,
+ // consider it fatal and shutdown (which will cause the error to
+ // propagate all of the way up to the application level). The threshold
+ // for "long errors" is deliberately much lower than that of normal
+ // decode errors, both because of how long they take to happen and
+ // because they generally indicate resource limitation errors which are
+ // unlikely to go away in pathologically bad cases (in contrast to
+ // stream corruption errors which might happen 20 times in a row and
+ // then be suddenly OK again)
+ if (res != OK) {
+ consecutive_errors++;
+ if (decode_timer.durationUsecs() >= kLongDecodeErrorThreshold)
+ consecutive_long_errors++;
+ CHECK(NULL == bufOut);
+ ALOGW("%s: Failed to decode data (res = %d)",
+ __PRETTY_FUNCTION__, res);
+ if ((consecutive_errors >= kMaxErrorsBeforeFatal) ||
+ (consecutive_long_errors >= kMaxLongErrorsBeforeFatal)) {
+ ALOGE("%s: Maximum decode error threshold has been reached."
+ " There have been %d consecutive decode errors, and %d"
+ " consecutive decode operations which resulted in errors"
+ " and took more than %lld uSec to process. The last"
+ " decode operation took %lld uSec.",
+ consecutive_errors, consecutive_long_errors,
+ kLongDecodeErrorThreshold, decode_timer.durationUsecs());
+ thread_status_ = res;
+ break;
+ }
+ continue;
+ }
+ if (NULL == bufOut) {
+ ALOGW("%s: Successful decode, but no buffer produced",
+ continue;
+ }
+ // Successful decode (with actual output produced). Clear the error
+ // counters.
+ consecutive_errors = 0;
+ consecutive_long_errors = 0;
+ queueToRenderer(bufOut);
+ bufOut->release();
+ }
+ decoder_->stop();
+ stopAndCleanupRenderer();
+ return NULL;
+status_t AAH_DecoderPump::init(const sp<MetaData>& params) {
+ Mutex::Autolock lock(&init_lock_);
+ if (decoder_ != NULL) {
+ // already inited
+ return OK;
+ }
+ if (params == NULL) {
+ return BAD_VALUE;
+ }
+ if (!params->findInt32(kKeyChannelCount, &format_channels_)) {
+ return BAD_VALUE;
+ }
+ if (!params->findInt32(kKeySampleRate, &format_sample_rate_)) {
+ return BAD_VALUE;
+ }
+ CHECK(OK == thread_status_);
+ CHECK(decoder_ == NULL);
+ status_t ret_val = UNKNOWN_ERROR;
+ // Cache the format and attempt to create the decoder.
+ format_ = params;
+ decoder_ = OMXCodec::Create(
+ omx_.interface(), // IOMX Handle
+ format_, // Metadata for substream (indicates codec)
+ false, // Make a decoder, not an encoder
+ sp<MediaSource>(this)); // We will be the source for this codec.
+ if (decoder_ == NULL) {
+ ALOGE("Failed to allocate decoder in %s", __PRETTY_FUNCTION__);
+ goto bailout;
+ }
+ // Fire up the pump thread. It will take care of starting and stopping the
+ // decoder.
+ ret_val = thread_->run("aah_decode_pump", ANDROID_PRIORITY_AUDIO);
+ if (OK != ret_val) {
+ ALOGE("Failed to start work thread in %s (res = %d)",
+ __PRETTY_FUNCTION__, ret_val);
+ goto bailout;
+ }
+ if (OK != ret_val) {
+ decoder_ = NULL;
+ format_ = NULL;
+ }
+ return OK;
+status_t AAH_DecoderPump::shutdown() {
+ Mutex::Autolock lock(&init_lock_);
+ return shutdown_l();
+status_t AAH_DecoderPump::shutdown_l() {
+ thread_->requestExit();
+ thread_cond_.signal();
+ thread_->requestExitAndWait();
+ for (MBQueue::iterator iter = in_queue_.begin();
+ iter != in_queue_.end();
+ ++iter) {
+ (*iter)->release();
+ }
+ in_queue_.clear();
+ last_queued_pts_valid_ = false;
+ last_ts_transform_valid_ = false;
+ last_volume_ = 0xFF;
+ thread_status_ = OK;
+ decoder_ = NULL;
+ format_ = NULL;
+ return OK;
+status_t AAH_DecoderPump::read(MediaBuffer **buffer,
+ const ReadOptions *options) {
+ if (!buffer) {
+ return BAD_VALUE;
+ }
+ *buffer = NULL;
+ // While its not time to shut down, and we have no data to process, wait.
+ AutoMutex lock(&thread_lock_);
+ while (!thread_->exitPending() && in_queue_.empty())
+ thread_cond_.wait(thread_lock_);
+ // At this point, if its not time to shutdown then we must have something to
+ // process. Go ahead and pop the front of the queue for processing.
+ if (!thread_->exitPending()) {
+ CHECK(!in_queue_.empty());
+ *buffer = *(in_queue_.begin());
+ in_queue_.erase(in_queue_.begin());
+ }
+ // If we managed to get a buffer, then everything must be OK. If not, then
+ // we must be shutting down.
+ return (NULL == *buffer) ? INVALID_OPERATION : OK;
+AAH_DecoderPump::ThreadWrapper::ThreadWrapper(AAH_DecoderPump* owner)
+ : Thread(false /* canCallJava*/ )
+ , owner_(owner) {
+bool AAH_DecoderPump::ThreadWrapper::threadLoop() {
+ CHECK(NULL != owner_);
+ owner_->workThread();
+ return false;
+} // namespace android
diff --git a/media/libaah_rtp/aah_decoder_pump.h b/media/libaah_rtp/aah_decoder_pump.h
new file mode 100644
index 0000000..f5a6529
--- /dev/null
+++ b/media/libaah_rtp/aah_decoder_pump.h
@@ -0,0 +1,107 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __DECODER_PUMP_H__
+#define __DECODER_PUMP_H__
+#include <pthread.h>
+#include <common_time/cc_helper.h>
+#include <media/stagefright/MediaSource.h>
+#include <utils/LinearTransform.h>
+#include <utils/List.h>
+#include <utils/threads.h>
+namespace android {
+class MetaData;
+class OMXClient;
+class TimedAudioTrack;
+class AAH_DecoderPump : public MediaSource {
+ public:
+ explicit AAH_DecoderPump(OMXClient& omx);
+ status_t initCheck();
+ status_t queueForDecode(MediaBuffer* buf);
+ status_t init(const sp<MetaData>& params);
+ status_t shutdown();
+ void setRenderTSTransform(const LinearTransform& trans);
+ void setRenderVolume(uint8_t volume);
+ bool isAboutToUnderflow(int64_t threshold);
+ bool getStatus() const { return thread_status_; }
+ // MediaSource methods
+ virtual status_t start(MetaData *params) { return OK; }
+ virtual sp<MetaData> getFormat() { return format_; }
+ virtual status_t stop() { return OK; }
+ virtual status_t read(MediaBuffer **buffer,
+ const ReadOptions *options);
+ protected:
+ virtual ~AAH_DecoderPump();
+ private:
+ class ThreadWrapper : public Thread {
+ public:
+ friend class AAH_DecoderPump;
+ explicit ThreadWrapper(AAH_DecoderPump* owner);
+ private:
+ virtual bool threadLoop();
+ AAH_DecoderPump* owner_;
+ };
+ void* workThread();
+ virtual status_t shutdown_l();
+ void queueToRenderer(MediaBuffer* decoded_sample);
+ void stopAndCleanupRenderer();
+ sp<MetaData> format_;
+ int32_t format_channels_;
+ int32_t format_sample_rate_;
+ sp<MediaSource> decoder_;
+ OMXClient& omx_;
+ Mutex init_lock_;
+ sp<ThreadWrapper> thread_;
+ Condition thread_cond_;
+ Mutex thread_lock_;
+ status_t thread_status_;
+ Mutex render_lock_;
+ TimedAudioTrack* renderer_;
+ bool last_queued_pts_valid_;
+ int64_t last_queued_pts_;
+ bool last_ts_transform_valid_;
+ LinearTransform last_ts_transform_;
+ uint8_t last_volume_;
+ CCHelper cc_helper_;
+ // protected by the thread_lock_
+ typedef List<MediaBuffer*> MBQueue;
+ MBQueue in_queue_;
+} // namespace android
+#endif // __DECODER_PUMP_H__
diff --git a/media/libaah_rtp/aah_rx_player.cpp b/media/libaah_rtp/aah_rx_player.cpp
new file mode 100644
index 0000000..9dd79fd
--- /dev/null
+++ b/media/libaah_rtp/aah_rx_player.cpp
@@ -0,0 +1,288 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+//#define LOG_NDEBUG 0
+#include <binder/IServiceManager.h>
+#include <media/MediaPlayerInterface.h>
+#include <utils/Log.h>
+#include "aah_rx_player.h"
+namespace android {
+const uint32_t AAH_RXPlayer::kRTPRingBufferSize = 1 << 10;
+sp<MediaPlayerBase> createAAH_RXPlayer() {
+ sp<MediaPlayerBase> ret = new AAH_RXPlayer();
+ return ret;
+ : ring_buffer_(kRTPRingBufferSize)
+ , substreams_(NULL) {
+ thread_wrapper_ = new ThreadWrapper(*this);
+ is_playing_ = false;
+ multicast_joined_ = false;
+ transmitter_known_ = false;
+ current_epoch_known_ = false;
+ data_source_set_ = false;
+ sock_fd_ = -1;
+ substreams_.setCapacity(4);
+ memset(&listen_addr_, 0, sizeof(listen_addr_));
+ memset(&transmitter_addr_, 0, sizeof(transmitter_addr_));
+ fetchAudioFlinger();
+AAH_RXPlayer::~AAH_RXPlayer() {
+ reset_l();
+ CHECK(substreams_.size() == 0);
+ omx_.disconnect();
+status_t AAH_RXPlayer::initCheck() {
+ if (thread_wrapper_ == NULL) {
+ ALOGE("Failed to allocate thread wrapper!");
+ return NO_MEMORY;
+ }
+ if (!ring_buffer_.initCheck()) {
+ ALOGE("Failed to allocate reassembly ring buffer!");
+ return NO_MEMORY;
+ }
+ // Check for the presense of the common time service by attempting to query
+ // for CommonTime's frequency. If we get an error back, we cannot talk to
+ // the service at all and should abort now.
+ status_t res;
+ uint64_t freq;
+ res = cc_helper_.getCommonFreq(&freq);
+ if (OK != res) {
+ ALOGE("Failed to connect to common time service!");
+ return res;
+ }
+ return omx_.connect();
+status_t AAH_RXPlayer::setDataSource(
+ const char *url,
+ const KeyedVector<String8, String8> *headers) {
+ AutoMutex api_lock(&api_lock_);
+ uint32_t a, b, c, d;
+ uint16_t port;
+ if (data_source_set_) {
+ }
+ if (NULL == url) {
+ return BAD_VALUE;
+ }
+ if (5 != sscanf(url, "%*[^:/]://%u.%u.%u.%u:%hu", &a, &b, &c, &d, &port)) {
+ ALOGE("Failed to parse URL \"%s\"", url);
+ return BAD_VALUE;
+ }
+ if ((a > 255) || (b > 255) || (c > 255) || (d > 255) || (port == 0)) {
+ ALOGE("Bad multicast address \"%s\"", url);
+ return BAD_VALUE;
+ }
+ ALOGI("setDataSource :: %u.%u.%u.%u:%hu", a, b, c, d, port);
+ a = (a << 24) | (b << 16) | (c << 8) | d;
+ memset(&listen_addr_, 0, sizeof(listen_addr_));
+ listen_addr_.sin_family = AF_INET;
+ listen_addr_.sin_port = htons(port);
+ listen_addr_.sin_addr.s_addr = htonl(a);
+ data_source_set_ = true;
+ return OK;
+status_t AAH_RXPlayer::setDataSource(int fd, int64_t offset, int64_t length) {
+status_t AAH_RXPlayer::setVideoSurface(const sp<Surface>& surface) {
+ return OK;
+status_t AAH_RXPlayer::setVideoSurfaceTexture(
+ const sp<ISurfaceTexture>& surfaceTexture) {
+ return OK;
+status_t AAH_RXPlayer::prepare() {
+ return OK;
+status_t AAH_RXPlayer::prepareAsync() {
+ sendEvent(MEDIA_PREPARED);
+ return OK;
+status_t AAH_RXPlayer::start() {
+ AutoMutex api_lock(&api_lock_);
+ if (is_playing_) {
+ return OK;
+ }
+ status_t res = startWorkThread();
+ is_playing_ = (res == OK);
+ return res;
+status_t AAH_RXPlayer::stop() {
+ return pause();
+status_t AAH_RXPlayer::pause() {
+ AutoMutex api_lock(&api_lock_);
+ stopWorkThread();
+ CHECK(sock_fd_ < 0);
+ is_playing_ = false;
+ return OK;
+bool AAH_RXPlayer::isPlaying() {
+ AutoMutex api_lock(&api_lock_);
+ return is_playing_;
+status_t AAH_RXPlayer::seekTo(int msec) {
+ return OK;
+status_t AAH_RXPlayer::getCurrentPosition(int *msec) {
+ if (NULL != msec) {
+ *msec = 0;
+ }
+ return OK;
+status_t AAH_RXPlayer::getDuration(int *msec) {
+ if (NULL != msec) {
+ *msec = 1;
+ }
+ return OK;
+status_t AAH_RXPlayer::reset() {
+ AutoMutex api_lock(&api_lock_);
+ reset_l();
+ return OK;
+void AAH_RXPlayer::reset_l() {
+ stopWorkThread();
+ CHECK(sock_fd_ < 0);
+ CHECK(!multicast_joined_);
+ is_playing_ = false;
+ data_source_set_ = false;
+ transmitter_known_ = false;
+ memset(&listen_addr_, 0, sizeof(listen_addr_));
+status_t AAH_RXPlayer::setLooping(int loop) {
+ return OK;
+player_type AAH_RXPlayer::playerType() {
+ return AAH_RX_PLAYER;
+status_t AAH_RXPlayer::setParameter(int key, const Parcel &request) {
+status_t AAH_RXPlayer::getParameter(int key, Parcel *reply) {
+status_t AAH_RXPlayer::invoke(const Parcel& request, Parcel *reply) {
+ if (!reply) {
+ return BAD_VALUE;
+ }
+ int32_t magic;
+ status_t err = request.readInt32(&magic);
+ if (err != OK) {
+ reply->writeInt32(err);
+ return OK;
+ }
+ if (magic != 0x12345) {
+ reply->writeInt32(BAD_VALUE);
+ return OK;
+ }
+ int32_t methodID;
+ err = request.readInt32(&methodID);
+ if (err != OK) {
+ reply->writeInt32(err);
+ return OK;
+ }
+ switch (methodID) {
+ // Get Volume
+ if (audio_flinger_ != NULL) {
+ reply->writeInt32(OK);
+ reply->writeFloat(audio_flinger_->masterVolume());
+ } else {
+ reply->writeInt32(UNKNOWN_ERROR);
+ }
+ } break;
+ // Set Volume
+ float targetVol = request.readFloat();
+ reply->writeInt32(audio_flinger_->setMasterVolume(targetVol));
+ } break;
+ default: return BAD_VALUE;
+ }
+ return OK;
+void AAH_RXPlayer::fetchAudioFlinger() {
+ if (audio_flinger_ == NULL) {
+ sp<IServiceManager> sm = defaultServiceManager();
+ sp<IBinder> binder;
+ binder = sm->getService(String16("media.audio_flinger"));
+ if (binder == NULL) {
+ ALOGW("AAH_RXPlayer failed to fetch handle to audio flinger."
+ " Master volume control will not be possible.");
+ }
+ audio_flinger_ = interface_cast<IAudioFlinger>(binder);
+ }
+} // namespace android
diff --git a/media/libaah_rtp/aah_rx_player.h b/media/libaah_rtp/aah_rx_player.h
new file mode 100644
index 0000000..7a1b6e3
--- /dev/null
+++ b/media/libaah_rtp/aah_rx_player.h
@@ -0,0 +1,313 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __AAH_RX_PLAYER_H__
+#define __AAH_RX_PLAYER_H__
+#include <common_time/cc_helper.h>
+#include <media/MediaPlayerInterface.h>
+#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/MediaBuffer.h>
+#include <media/stagefright/MediaSource.h>
+#include <media/stagefright/MetaData.h>
+#include <media/stagefright/OMXClient.h>
+#include <netinet/in.h>
+#include <utils/KeyedVector.h>
+#include <utils/LinearTransform.h>
+#include <utils/threads.h>
+#include "aah_decoder_pump.h"
+#include "pipe_event.h"
+namespace android {
+class AAH_RXPlayer : public MediaPlayerInterface {
+ public:
+ AAH_RXPlayer();
+ virtual status_t initCheck();
+ virtual status_t setDataSource(const char *url,
+ const KeyedVector<String8, String8>*
+ headers);
+ virtual status_t setDataSource(int fd, int64_t offset, int64_t length);
+ virtual status_t setVideoSurface(const sp<Surface>& surface);
+ virtual status_t setVideoSurfaceTexture(const sp<ISurfaceTexture>&
+ surfaceTexture);
+ virtual status_t prepare();
+ virtual status_t prepareAsync();
+ virtual status_t start();
+ virtual status_t stop();
+ virtual status_t pause();
+ virtual bool isPlaying();
+ virtual status_t seekTo(int msec);
+ virtual status_t getCurrentPosition(int *msec);
+ virtual status_t getDuration(int *msec);
+ virtual status_t reset();
+ virtual status_t setLooping(int loop);
+ virtual player_type playerType();
+ virtual status_t setParameter(int key, const Parcel &request);
+ virtual status_t getParameter(int key, Parcel *reply);
+ virtual status_t invoke(const Parcel& request, Parcel *reply);
+ protected:
+ virtual ~AAH_RXPlayer();
+ private:
+ class ThreadWrapper : public Thread {
+ public:
+ friend class AAH_RXPlayer;
+ explicit ThreadWrapper(AAH_RXPlayer& player)
+ : Thread(false /* canCallJava */ )
+ , player_(player) { }
+ virtual bool threadLoop() { return player_.threadLoop(); }
+ private:
+ AAH_RXPlayer& player_;
+ };
+#pragma pack(push, 1)
+ // PacketBuffers are structures used by the RX ring buffer. The ring buffer
+ // is a ring of pointers to PacketBuffer structures which act as variable
+ // length byte arrays and hold the contents of received UDP packets. Rather
+ // than make this a structure which hold a length and a pointer to another
+ // allocated structure (which would require two allocations), this struct
+ // uses a structure overlay pattern where allocation for the byte array
+ // consists of allocating (arrayLen + sizeof(ssize_t)) bytes of data from
+ // whatever pool/heap the packet buffer pulls from, and then overlaying the
+ // packed PacketBuffer structure on top of the allocation. The one-byte
+ // array at the end of the structure serves as an offset to the the data
+ // portion of the allocation; packet buffers are never allocated on the
+ // stack or using the new operator. Instead, the static allocate-byte-array
+ // and destroy methods handle the allocate and overlay pattern. They also
+ // allow for a potential future optimization where instead of just
+ // allocating blocks from the process global heap and overlaying, the
+ // allocator is replaced with a different implementation (private heap,
+ // free-list, circular buffer, etc) which reduces potential heap
+ // fragmentation issues which might arise from the frequent allocation and
+ // destruction of the received UDP traffic.
+ struct PacketBuffer {
+ ssize_t length_;
+ uint8_t data_[1];
+ // TODO : consider changing this to be some form of ring buffer or free
+ // pool system instead of just using the heap in order to avoid heap
+ // fragmentation.
+ static PacketBuffer* allocate(ssize_t length);
+ static void destroy(PacketBuffer* pb);
+ private:
+ // Force people to use allocate/destroy instead of new/delete.
+ PacketBuffer() { }
+ ~PacketBuffer() { }
+ };
+ struct RetransRequest {
+ uint32_t magic_;
+ uint32_t mcast_ip_;
+ uint16_t mcast_port_;
+ uint16_t start_seq_;
+ uint16_t end_seq_;
+ };
+#pragma pack(pop)
+ enum GapStatus {
+ kGS_NoGap = 0,
+ kGS_NormalGap,
+ kGS_FastStartGap,
+ };
+ struct SeqNoGap {
+ uint16_t start_seq_;
+ uint16_t end_seq_;
+ };
+ class RXRingBuffer {
+ public:
+ explicit RXRingBuffer(uint32_t capacity);
+ ~RXRingBuffer();
+ bool initCheck() const { return (ring_ != NULL); }
+ void reset();
+ // Push a packet buffer with a given sequence number into the ring
+ // buffer. pushBuffer will always consume the buffer pushed to it,
+ // either destroying it because it was a duplicate or overflow, or
+ // holding on to it in the ring. Callers should not hold any references
+ // to PacketBuffers after they have been pushed to the ring. Returns
+ // false in the case of a serious error (such as ring overflow).
+ // Callers should consider resetting the pipeline entirely in the event
+ // of a serious error.
+ bool pushBuffer(PacketBuffer* buf, uint16_t seq);
+ // Fetch the next buffer in the RTP sequence. Returns NULL if there is
+ // no buffer to fetch. If a non-NULL PacketBuffer is returned,
+ // is_discon will be set to indicate whether or not this PacketBuffer is
+ // discontiuous with any previously returned packet buffers. Packet
+ // buffers returned by fetchBuffer are the caller's responsibility; they
+ // must be certain to destroy the buffers when they are done.
+ PacketBuffer* fetchBuffer(bool* is_discon);
+ // Returns true and fills out the gap structure if the read pointer of
+ // the ring buffer is currently pointing to a gap which would stall a
+ // fetchBuffer operation. Returns false if the read pointer is not
+ // pointing to a gap in the sequence currently.
+ GapStatus fetchCurrentGap(SeqNoGap* gap);
+ // Causes the read pointer to skip over any portion of a gap indicated
+ // by nak. If nak is NULL, any gap currently blocking the read pointer
+ // will be completely skipped. If any portion of a gap is skipped, the
+ // next successful read from fetch buffer will indicate a discontinuity.
+ void processNAK(const SeqNoGap* nak = NULL);
+ // Compute the number of milliseconds until the inactivity timer for
+ // this RTP stream. Returns -1 if there is no active timeout, or 0 if
+ // the system has already timed out.
+ int computeInactivityTimeout();
+ private:
+ Mutex lock_;
+ PacketBuffer** ring_;
+ uint32_t capacity_;
+ uint32_t rd_;
+ uint32_t wr_;
+ uint16_t rd_seq_;
+ bool rd_seq_known_;
+ bool waiting_for_fast_start_;
+ bool fetched_first_packet_;
+ uint64_t rtp_activity_timeout_;
+ bool rtp_activity_timeout_valid_;
+ };
+ class Substream : public virtual RefBase {
+ public:
+ Substream(uint32_t ssrc, OMXClient& omx);
+ void cleanupBufferInProgress();
+ void shutdown();
+ void processPayloadStart(uint8_t* buf,
+ uint32_t amt,
+ int32_t ts_lower);
+ void processPayloadCont (uint8_t* buf,
+ uint32_t amt);
+ void processTSTransform(const LinearTransform& trans);
+ bool isAboutToUnderflow();
+ uint32_t getSSRC() const { return ssrc_; }
+ uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; }
+ status_t getStatus() const { return status_; }
+ protected:
+ virtual ~Substream() {
+ shutdown();
+ }
+ private:
+ void cleanupDecoder();
+ bool shouldAbort(const char* log_tag);
+ void processCompletedBuffer();
+ bool setupSubstreamType(uint8_t substream_type,
+ uint8_t codec_type);
+ uint32_t ssrc_;
+ bool waiting_for_rap_;
+ status_t status_;
+ bool substream_details_known_;
+ uint8_t substream_type_;
+ uint8_t codec_type_;
+ sp<MetaData> substream_meta_;
+ MediaBuffer* buffer_in_progress_;
+ uint32_t expected_buffer_size_;
+ uint32_t buffer_filled_;
+ sp<AAH_DecoderPump> decoder_;
+ static int64_t kAboutToUnderflowThreshold;
+ };
+ typedef DefaultKeyedVector< uint32_t, sp<Substream> > SubstreamVec;
+ status_t startWorkThread();
+ void stopWorkThread();
+ virtual bool threadLoop();
+ bool setupSocket();
+ void cleanupSocket();
+ void resetPipeline();
+ void reset_l();
+ bool processRX(PacketBuffer* pb);
+ void processRingBuffer();
+ void processCommandPacket(PacketBuffer* pb);
+ bool processGaps();
+ int computeNextGapRetransmitTimeout();
+ void fetchAudioFlinger();
+ PipeEvent wakeup_work_thread_evt_;
+ sp<ThreadWrapper> thread_wrapper_;
+ Mutex api_lock_;
+ bool is_playing_;
+ bool data_source_set_;
+ struct sockaddr_in listen_addr_;
+ int sock_fd_;
+ bool multicast_joined_;
+ struct sockaddr_in transmitter_addr_;
+ bool transmitter_known_;
+ uint32_t current_epoch_;
+ bool current_epoch_known_;
+ SeqNoGap current_gap_;
+ GapStatus current_gap_status_;
+ uint64_t next_retrans_req_time_;
+ RXRingBuffer ring_buffer_;
+ SubstreamVec substreams_;
+ OMXClient omx_;
+ CCHelper cc_helper_;
+ // Connection to audio flinger used to hack a path to setMasterVolume.
+ sp<IAudioFlinger> audio_flinger_;
+ static const uint32_t kRTPRingBufferSize;
+ static const uint32_t kRetransRequestMagic;
+ static const uint32_t kFastStartRequestMagic;
+ static const uint32_t kRetransNAKMagic;
+ static const uint32_t kGapRerequestTimeoutUSec;
+ static const uint32_t kFastStartTimeoutUSec;
+ static const uint32_t kRTPActivityTimeoutUSec;
+ static const uint32_t INVOKE_GET_MASTER_VOLUME = 3;
+ static const uint32_t INVOKE_SET_MASTER_VOLUME = 4;
+ static uint64_t monotonicUSecNow();
+} // namespace android
+#endif // __AAH_RX_PLAYER_H__
diff --git a/media/libaah_rtp/aah_rx_player_core.cpp b/media/libaah_rtp/aah_rx_player_core.cpp
new file mode 100644
index 0000000..d2b3386
--- /dev/null
+++ b/media/libaah_rtp/aah_rx_player_core.cpp
@@ -0,0 +1,807 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+//#define LOG_NDEBUG 0
+#include <utils/Log.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <sys/socket.h>
+#include <time.h>
+#include <utils/misc.h>
+#include <media/stagefright/Utils.h>
+#include "aah_rx_player.h"
+#include "aah_tx_packet.h"
+namespace android {
+const uint32_t AAH_RXPlayer::kRetransRequestMagic =
+ FOURCC('T','r','e','q');
+const uint32_t AAH_RXPlayer::kRetransNAKMagic =
+ FOURCC('T','n','a','k');
+const uint32_t AAH_RXPlayer::kFastStartRequestMagic =
+ FOURCC('T','f','s','t');
+const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000;
+const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000;
+const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000;
+static inline int16_t fetchInt16(uint8_t* data) {
+ return static_cast<int16_t>(U16_AT(data));
+static inline int32_t fetchInt32(uint8_t* data) {
+ return static_cast<int32_t>(U32_AT(data));
+static inline int64_t fetchInt64(uint8_t* data) {
+ return static_cast<int64_t>(U64_AT(data));
+uint64_t AAH_RXPlayer::monotonicUSecNow() {
+ struct timespec now;
+ int res = clock_gettime(CLOCK_MONOTONIC, &now);
+ CHECK(res >= 0);
+ uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000;
+ ret += now.tv_nsec / 1000;
+ return ret;
+status_t AAH_RXPlayer::startWorkThread() {
+ status_t res;
+ stopWorkThread();
+ res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO);
+ if (res != OK) {
+ ALOGE("Failed to start work thread (res = %d)", res);
+ }
+ return res;
+void AAH_RXPlayer::stopWorkThread() {
+ thread_wrapper_->requestExit(); // set the exit pending flag
+ wakeup_work_thread_evt_.setEvent();
+ status_t res;
+ res = thread_wrapper_->requestExitAndWait(); // block until thread exit.
+ if (res != OK) {
+ ALOGE("Failed to stop work thread (res = %d)", res);
+ }
+ wakeup_work_thread_evt_.clearPendingEvents();
+void AAH_RXPlayer::cleanupSocket() {
+ if (sock_fd_ >= 0) {
+ if (multicast_joined_) {
+ int res;
+ struct ip_mreq mreq;
+ mreq.imr_multiaddr = listen_addr_.sin_addr;
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+ res = setsockopt(sock_fd_,
+ &mreq, sizeof(mreq));
+ if (res < 0) {
+ ALOGW("Failed to leave multicast group. (%d, %d)", res, errno);
+ }
+ multicast_joined_ = false;
+ }
+ close(sock_fd_);
+ sock_fd_ = -1;
+ }
+ resetPipeline();
+void AAH_RXPlayer::resetPipeline() {
+ ring_buffer_.reset();
+ // Explicitly shudown all of the active substreams, then call clear out the
+ // collection. Failure to clear out a substream can result in its decoder
+ // holding a reference to itself and therefor not going away when the
+ // collection is cleared.
+ for (size_t i = 0; i < substreams_.size(); ++i)
+ substreams_.valueAt(i)->shutdown();
+ substreams_.clear();
+ current_gap_status_ = kGS_NoGap;
+bool AAH_RXPlayer::setupSocket() {
+ long flags;
+ int res, buf_size;
+ socklen_t opt_size;
+ cleanupSocket();
+ CHECK(sock_fd_ < 0);
+ // Make the socket
+ sock_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (sock_fd_ < 0) {
+ ALOGE("Failed to create listen socket (errno %d)", errno);
+ goto bailout;
+ }
+ // Set non-blocking operation
+ flags = fcntl(sock_fd_, F_GETFL);
+ res = fcntl(sock_fd_, F_SETFL, flags | O_NONBLOCK);
+ if (res < 0) {
+ ALOGE("Failed to set socket (%d) to non-blocking mode (errno %d)",
+ sock_fd_, errno);
+ goto bailout;
+ }
+ // Bind to our port
+ struct sockaddr_in bind_addr;
+ memset(&bind_addr, 0, sizeof(bind_addr));
+ bind_addr.sin_family = AF_INET;
+ bind_addr.sin_addr.s_addr = INADDR_ANY;
+ bind_addr.sin_port = listen_addr_.sin_port;
+ res = bind(sock_fd_,
+ reinterpret_cast<const sockaddr*>(&bind_addr),
+ sizeof(bind_addr));
+ if (res < 0) {
+ uint32_t a = ntohl(bind_addr.sin_addr.s_addr);
+ uint16_t p = ntohs(bind_addr.sin_port);
+ ALOGE("Failed to bind socket (%d) to %d.%d.%d.%d:%hd. (errno %d)",
+ sock_fd_,
+ (a >> 24) & 0xFF,
+ (a >> 16) & 0xFF,
+ (a >> 8) & 0xFF,
+ (a ) & 0xFF,
+ p,
+ errno);
+ goto bailout;
+ }
+ buf_size = 1 << 16; // 64k
+ res = setsockopt(sock_fd_,
+ &buf_size, sizeof(buf_size));
+ if (res < 0) {
+ ALOGW("Failed to increase socket buffer size to %d. (errno %d)",
+ buf_size, errno);
+ }
+ buf_size = 0;
+ opt_size = sizeof(buf_size);
+ res = getsockopt(sock_fd_,
+ &buf_size, &opt_size);
+ if (res < 0) {
+ ALOGW("Failed to fetch socket buffer size. (errno %d)", errno);
+ } else {
+ ALOGI("RX socket buffer size is now %d bytes", buf_size);
+ }
+ if (listen_addr_.sin_addr.s_addr) {
+ // Join the multicast group and we should be good to go.
+ struct ip_mreq mreq;
+ mreq.imr_multiaddr = listen_addr_.sin_addr;
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+ res = setsockopt(sock_fd_,
+ &mreq, sizeof(mreq));
+ if (res < 0) {
+ ALOGE("Failed to join multicast group. (errno %d)", errno);
+ goto bailout;
+ }
+ multicast_joined_ = true;
+ }
+ return true;
+ cleanupSocket();
+ return false;
+bool AAH_RXPlayer::threadLoop() {
+ struct pollfd poll_fds[2];
+ bool process_more_right_now = false;
+ if (!setupSocket()) {
+ sendEvent(MEDIA_ERROR);
+ goto bailout;
+ }
+ while (!thread_wrapper_->exitPending()) {
+ // Step 1: Wait until there is something to do.
+ int gap_timeout = computeNextGapRetransmitTimeout();
+ int ring_timeout = ring_buffer_.computeInactivityTimeout();
+ int timeout = -1;
+ if (!ring_timeout) {
+ ALOGW("RTP inactivity timeout reached, resetting pipeline.");
+ resetPipeline();
+ timeout = gap_timeout;
+ } else {
+ if (gap_timeout < 0) {
+ timeout = ring_timeout;
+ } else if (ring_timeout < 0) {
+ timeout = gap_timeout;
+ } else {
+ timeout = (gap_timeout < ring_timeout) ? gap_timeout
+ : ring_timeout;
+ }
+ }
+ if ((0 != timeout) && (!process_more_right_now)) {
+ // Set up the events to wait on. Start with the wakeup pipe.
+ memset(&poll_fds, 0, sizeof(poll_fds));
+ poll_fds[0].fd = wakeup_work_thread_evt_.getWakeupHandle();
+ poll_fds[0].events = POLLIN;
+ // Add the RX socket.
+ poll_fds[1].fd = sock_fd_;
+ poll_fds[1].events = POLLIN;
+ // Wait for something interesing to happen.
+ int poll_res = poll(poll_fds, NELEM(poll_fds), timeout);
+ if (poll_res < 0) {
+ ALOGE("Fatal error (%d,%d) while waiting on events",
+ poll_res, errno);
+ sendEvent(MEDIA_ERROR);
+ goto bailout;
+ }
+ }
+ if (thread_wrapper_->exitPending()) {
+ break;
+ }
+ wakeup_work_thread_evt_.clearPendingEvents();
+ process_more_right_now = false;
+ // Step 2: Do we have data waiting in the socket? If so, drain the
+ // socket moving valid RTP information into the ring buffer to be
+ // processed.
+ if (poll_fds[1].revents) {
+ struct sockaddr_in from;
+ socklen_t from_len;
+ ssize_t res = 0;
+ while (!thread_wrapper_->exitPending()) {
+ // Check the size of any pending packet.
+ res = recv(sock_fd_, NULL, 0, MSG_PEEK | MSG_TRUNC);
+ // Error?
+ if (res < 0) {
+ // If the error is anything other than would block,
+ // something has gone very wrong.
+ if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
+ ALOGE("Fatal socket error during recvfrom (%d, %d)",
+ (int)res, errno);
+ goto bailout;
+ }
+ // Socket is out of data, just break out of processing and
+ // wait for more.
+ break;
+ }
+ // Allocate a payload.
+ PacketBuffer* pb = PacketBuffer::allocate(res);
+ if (NULL == pb) {
+ ALOGE("Fatal error, failed to allocate packet buffer of"
+ " length %u", static_cast<uint32_t>(res));
+ goto bailout;
+ }
+ // Fetch the data.
+ from_len = sizeof(from);
+ res = recvfrom(sock_fd_, pb->data_, pb->length_, 0,
+ reinterpret_cast<struct sockaddr*>(&from),
+ &from_len);
+ if (res != pb->length_) {
+ ALOGE("Fatal error, fetched packet length (%d) does not"
+ " match peeked packet length (%u). This should never"
+ " happen. (errno = %d)",
+ static_cast<int>(res),
+ static_cast<uint32_t>(pb->length_),
+ errno);
+ }
+ bool drop_packet = false;
+ if (transmitter_known_) {
+ if (from.sin_addr.s_addr !=
+ transmitter_addr_.sin_addr.s_addr) {
+ uint32_t a = ntohl(from.sin_addr.s_addr);
+ uint16_t p = ntohs(from.sin_port);
+ ALOGV("Dropping packet from unknown transmitter"
+ " %u.%u.%u.%u:%hu",
+ ((a >> 24) & 0xFF),
+ ((a >> 16) & 0xFF),
+ ((a >> 8) & 0xFF),
+ ( a & 0xFF),
+ p);
+ drop_packet = true;
+ } else {
+ transmitter_addr_.sin_port = from.sin_port;
+ }
+ } else {
+ memcpy(&transmitter_addr_, &from, sizeof(from));
+ transmitter_known_ = true;
+ }
+ if (!drop_packet) {
+ bool serious_error = !processRX(pb);
+ if (serious_error) {
+ // Something went "seriously wrong". Currently, the
+ // only trigger for this should be a ring buffer
+ // overflow. The current failsafe behavior for when
+ // something goes seriously wrong is to just reset the
+ // pipeline. The system should behave as if this
+ // AAH_RXPlayer was just set up for the first time.
+ ALOGE("Something just went seriously wrong with the"
+ " pipeline. Resetting.");
+ resetPipeline();
+ }
+ } else {
+ PacketBuffer::destroy(pb);
+ }
+ }
+ }
+ // Step 3: Process any data we mave have accumulated in the ring buffer
+ // so far.
+ if (!thread_wrapper_->exitPending()) {
+ processRingBuffer();
+ }
+ // Step 4: At this point in time, the ring buffer should either be
+ // empty, or stalled in front of a gap caused by some dropped packets.
+ // Check on the current gap situation and deal with it in an appropriate
+ // fashion. If processGaps returns true, it means that it has given up
+ // on a gap and that we should try to process some more data
+ // immediately.
+ if (!thread_wrapper_->exitPending()) {
+ process_more_right_now = processGaps();
+ }
+ // Step 5: Check for fatal errors. If any of our substreams has
+ // encountered a fatal, unrecoverable, error, then propagate the error
+ // up to user level and shut down.
+ for (size_t i = 0; i < substreams_.size(); ++i) {
+ status_t status;
+ CHECK(substreams_.valueAt(i) != NULL);
+ status = substreams_.valueAt(i)->getStatus();
+ if (OK != status) {
+ ALOGE("Substream index %d has encountered an unrecoverable"
+ " error (%d). Signalling application level and shutting"
+ " down.", i, status);
+ sendEvent(MEDIA_ERROR);
+ goto bailout;
+ }
+ }
+ }
+ cleanupSocket();
+ return false;
+bool AAH_RXPlayer::processRX(PacketBuffer* pb) {
+ CHECK(NULL != pb);
+ uint8_t* data = pb->data_;
+ ssize_t amt = pb->length_;
+ uint32_t nak_magic;
+ uint16_t seq_no;
+ uint32_t epoch;
+ // Every packet either starts with an RTP header which is at least 12 bytes
+ // long or is a retry NAK which is 14 bytes long. If there are fewer than
+ // 12 bytes here, this cannot be a proper RTP packet.
+ if (amt < 12) {
+ ALOGV("Dropping packet, too short to contain RTP header (%u bytes)",
+ static_cast<uint32_t>(amt));
+ goto drop_packet;
+ }
+ // Check to see if this is the special case of a NAK packet.
+ nak_magic = ntohl(*(reinterpret_cast<uint32_t*>(data)));
+ if (nak_magic == kRetransNAKMagic) {
+ // Looks like a NAK packet; make sure its long enough.
+ if (amt < static_cast<ssize_t>(sizeof(RetransRequest))) {
+ ALOGV("Dropping packet, too short to contain NAK payload (%u bytes)",
+ static_cast<uint32_t>(amt));
+ goto drop_packet;
+ }
+ SeqNoGap gap;
+ RetransRequest* rtr = reinterpret_cast<RetransRequest*>(data);
+ gap.start_seq_ = ntohs(rtr->start_seq_);
+ gap.end_seq_ = ntohs(rtr->end_seq_);
+ ALOGV("Process NAK for gap at [%hu, %hu]", gap.start_seq_, gap.end_seq_);
+ ring_buffer_.processNAK(&gap);
+ return true;
+ }
+ // According to the TRTP spec, version should be 2, padding should be 0,
+ // extension should be 0 and CSRCCnt should be 0. If any of these tests
+ // fail, we chuck the packet.
+ if (data[0] != 0x80) {
+ ALOGV("Dropping packet, bad V/P/X/CSRCCnt field (0x%02x)",
+ data[0]);
+ goto drop_packet;
+ }
+ // Check the payload type. For TRTP, it should always be 100.
+ if ((data[1] & 0x7F) != 100) {
+ ALOGV("Dropping packet, bad payload type. (%u)",
+ data[1] & 0x7F);
+ goto drop_packet;
+ }
+ // Check whether the transmitter has begun a new epoch.
+ epoch = (U32_AT(data + 8) >> 10) & 0x3FFFFF;
+ if (current_epoch_known_) {
+ if (epoch != current_epoch_) {
+ ALOGV("%s: new epoch %u", __PRETTY_FUNCTION__, epoch);
+ current_epoch_ = epoch;
+ resetPipeline();
+ }
+ } else {
+ current_epoch_ = epoch;
+ current_epoch_known_ = true;
+ }
+ // Extract the sequence number and hand the packet off to the ring buffer
+ // for dropped packet detection and later processing.
+ seq_no = U16_AT(data + 2);
+ return ring_buffer_.pushBuffer(pb, seq_no);
+ PacketBuffer::destroy(pb);
+ return true;
+void AAH_RXPlayer::processRingBuffer() {
+ PacketBuffer* pb;
+ bool is_discon;
+ sp<Substream> substream;
+ LinearTransform trans;
+ bool foundTrans = false;
+ while (NULL != (pb = ring_buffer_.fetchBuffer(&is_discon))) {
+ if (is_discon) {
+ // Abort all partially assembled payloads.
+ for (size_t i = 0; i < substreams_.size(); ++i) {
+ CHECK(substreams_.valueAt(i) != NULL);
+ substreams_.valueAt(i)->cleanupBufferInProgress();
+ }
+ }
+ uint8_t* data = pb->data_;
+ ssize_t amt = pb->length_;
+ // Should not have any non-RTP packets in the ring buffer. RTP packets
+ // must be at least 12 bytes long.
+ CHECK(amt >= 12);
+ // Extract the marker bit and the SSRC field.
+ bool marker = (data[1] & 0x80) != 0;
+ uint32_t ssrc = U32_AT(data + 8);
+ // Is this the start of a new TRTP payload? If so, the marker bit
+ // should be set and there are some things we should be checking for.
+ if (marker) {
+ // TRTP headers need to have at least a byte for version, a byte for
+ // payload type and flags, and 4 bytes for length.
+ if (amt < 18) {
+ ALOGV("Dropping packet, too short to contain TRTP header"
+ " (%u bytes)", static_cast<uint32_t>(amt));
+ goto process_next_packet;
+ }
+ // Check the TRTP version and extract the payload type/flags.
+ uint8_t trtp_version = data[12];
+ uint8_t payload_type = (data[13] >> 4) & 0xF;
+ uint8_t trtp_flags = data[13] & 0xF;
+ if (1 != trtp_version) {
+ ALOGV("Dropping packet, bad trtp version %hhu", trtp_version);
+ goto process_next_packet;
+ }
+ // Is there a timestamp transformation present on this packet? If
+ // so, extract it and pass it to the appropriate substreams.
+ if (trtp_flags & 0x02) {
+ ssize_t offset = 18 + ((trtp_flags & 0x01) ? 4 : 0);
+ if (amt < (offset + 24)) {
+ ALOGV("Dropping packet, too short to contain TRTP Timestamp"
+ " Transformation (%u bytes)",
+ static_cast<uint32_t>(amt));
+ goto process_next_packet;
+ }
+ trans.a_zero = fetchInt64(data + offset);
+ trans.b_zero = fetchInt64(data + offset + 16);
+ trans.a_to_b_numer = static_cast<int32_t>(
+ fetchInt32 (data + offset + 8));
+ trans.a_to_b_denom = U32_AT(data + offset + 12);
+ foundTrans = true;
+ uint32_t program_id = (ssrc >> 5) & 0x1F;
+ for (size_t i = 0; i < substreams_.size(); ++i) {
+ sp<Substream> iter = substreams_.valueAt(i);
+ CHECK(iter != NULL);
+ if (iter->getProgramID() == program_id) {
+ iter->processTSTransform(trans);
+ }
+ }
+ }
+ // Is this a command packet? If so, its not necessarily associate
+ // with one particular substream. Just give it to the command
+ // packet handler and then move on.
+ if (4 == payload_type) {
+ processCommandPacket(pb);
+ goto process_next_packet;
+ }
+ }
+ // If we got to here, then we are a normal packet. Find (or allocate)
+ // the substream we belong to and send the packet off to be processed.
+ substream = substreams_.valueFor(ssrc);
+ if (substream == NULL) {
+ substream = new Substream(ssrc, omx_);
+ if (substream == NULL) {
+ ALOGE("Failed to allocate substream for SSRC 0x%08x", ssrc);
+ goto process_next_packet;
+ }
+ substreams_.add(ssrc, substream);
+ if (foundTrans) {
+ substream->processTSTransform(trans);
+ }
+ }
+ CHECK(substream != NULL);
+ if (marker) {
+ // Start of a new TRTP payload for this substream. Extract the
+ // lower 32 bits of the timestamp and hand the buffer to the
+ // substream for processing.
+ uint32_t ts_lower = U32_AT(data + 4);
+ substream->processPayloadStart(data + 12, amt - 12, ts_lower);
+ } else {
+ // Continuation of an existing TRTP payload. Just hand it off to
+ // the substream for processing.
+ substream->processPayloadCont(data + 12, amt - 12);
+ }
+ PacketBuffer::destroy(pb);
+ } // end of main processing while loop.
+void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) {
+ CHECK(NULL != pb);
+ uint8_t* data = pb->data_;
+ ssize_t amt = pb->length_;
+ // verify that this packet meets the minimum length of a command packet
+ if (amt < 20) {
+ return;
+ }
+ uint8_t trtp_version = data[12];
+ uint8_t trtp_flags = data[13] & 0xF;
+ if (1 != trtp_version) {
+ ALOGV("Dropping packet, bad trtp version %hhu", trtp_version);
+ return;
+ }
+ // calculate the start of the command payload
+ ssize_t offset = 18;
+ if (trtp_flags & 0x01) {
+ // timestamp is present (4 bytes)
+ offset += 4;
+ }
+ if (trtp_flags & 0x02) {
+ // transform is present (24 bytes)
+ offset += 24;
+ }
+ // the packet must contain 2 bytes of command payload beyond the TRTP header
+ if (amt < offset + 2) {
+ return;
+ }
+ uint16_t command_id = U16_AT(data + offset);
+ switch (command_id) {
+ case TRTPControlPacket::kCommandNop:
+ break;
+ case TRTPControlPacket::kCommandEOS:
+ case TRTPControlPacket::kCommandFlush: {
+ uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F;
+ ALOGI("*** %s flushing program_id=%d",
+ __PRETTY_FUNCTION__, program_id);
+ Vector<uint32_t> substreams_to_remove;
+ for (size_t i = 0; i < substreams_.size(); ++i) {
+ sp<Substream> iter = substreams_.valueAt(i);
+ if (iter->getProgramID() == program_id) {
+ iter->shutdown();
+ substreams_to_remove.add(iter->getSSRC());
+ }
+ }
+ for (size_t i = 0; i < substreams_to_remove.size(); ++i) {
+ substreams_.removeItem(substreams_to_remove[i]);
+ }
+ } break;
+ }
+bool AAH_RXPlayer::processGaps() {
+ // Deal with the current gap situation. Specifically...
+ //
+ // 1) If a new gap has shown up, send a retransmit request to the
+ // transmitter.
+ // 2) If a gap we were working on has had a packet in the middle or at
+ // the end filled in, send another retransmit request for the begining
+ // portion of the gap. TRTP was designed for LANs where packet
+ // re-ordering is very unlikely; so see the middle or end of a gap
+ // filled in before the begining is an almost certain indication that
+ // a retransmission packet was also dropped.
+ // 3) If we have been working on a gap for a while and it still has not
+ // been filled in, send another retransmit request.
+ // 4) If the are no more gaps in the ring, clear the current_gap_status_
+ // flag to indicate that all is well again.
+ // Start by fetching the active gap status.
+ SeqNoGap gap;
+ bool send_retransmit_request = false;
+ bool ret_val = false;
+ GapStatus gap_status;
+ if (kGS_NoGap != (gap_status = ring_buffer_.fetchCurrentGap(&gap))) {
+ // Note: checking for a change in the end sequence number should cover
+ // moving on to an entirely new gap for case #1 as well as resending the
+ // begining of a gap range for case #2.
+ send_retransmit_request = (kGS_NoGap == current_gap_status_) ||
+ (current_gap_.end_seq_ != gap.end_seq_);
+ // If this is the same gap we have been working on, and it has timed
+ // out, then check to see if our substreams are about to underflow. If
+ // so, instead of sending another retransmit request, just give up on
+ // this gap and move on.
+ if (!send_retransmit_request &&
+ (kGS_NoGap != current_gap_status_) &&
+ (0 == computeNextGapRetransmitTimeout())) {
+ // If out current gap is the fast-start gap, don't bother to skip it
+ // because substreams look like the are about to underflow.
+ if ((kGS_FastStartGap != gap_status) ||
+ (current_gap_.end_seq_ != gap.end_seq_)) {
+ for (size_t i = 0; i < substreams_.size(); ++i) {
+ if (substreams_.valueAt(i)->isAboutToUnderflow()) {
+ ALOGV("About to underflow, giving up on gap [%hu, %hu]",
+ gap.start_seq_, gap.end_seq_);
+ ring_buffer_.processNAK();
+ current_gap_status_ = kGS_NoGap;
+ return true;
+ }
+ }
+ }
+ // Looks like no one is about to underflow. Just go ahead and send
+ // the request.
+ send_retransmit_request = true;
+ }
+ } else {
+ current_gap_status_ = kGS_NoGap;
+ }
+ if (send_retransmit_request) {
+ // If we have been working on a fast start, and it is still not filled
+ // in, even after the extended retransmit time out, give up and skip it.
+ // The system should fall back into its normal slow-start behavior.
+ if ((kGS_FastStartGap == current_gap_status_) &&
+ (current_gap_.end_seq_ == gap.end_seq_)) {
+ ALOGV("Fast start is taking forever; giving up.");
+ ring_buffer_.processNAK();
+ current_gap_status_ = kGS_NoGap;
+ return true;
+ }
+ // Send the request.
+ RetransRequest req;
+ uint32_t magic = (kGS_FastStartGap == gap_status)
+ ? kFastStartRequestMagic
+ : kRetransRequestMagic;
+ req.magic_ = htonl(magic);
+ req.mcast_ip_ = listen_addr_.sin_addr.s_addr;
+ req.mcast_port_ = listen_addr_.sin_port;
+ req.start_seq_ = htons(gap.start_seq_);
+ req.end_seq_ = htons(gap.end_seq_);
+ {
+ uint32_t a = ntohl(transmitter_addr_.sin_addr.s_addr);
+ uint16_t p = ntohs(transmitter_addr_.sin_port);
+ ALOGV("Sending to transmitter %u.%u.%u.%u:%hu",
+ ((a >> 24) & 0xFF),
+ ((a >> 16) & 0xFF),
+ ((a >> 8) & 0xFF),
+ ( a & 0xFF),
+ p);
+ }
+ int res = sendto(sock_fd_, &req, sizeof(req), 0,
+ reinterpret_cast<struct sockaddr*>(&transmitter_addr_),
+ sizeof(transmitter_addr_));
+ if (res < 0) {
+ ALOGE("Error when sending retransmit request (%d)", errno);
+ } else {
+ ALOGV("%s request for range [%hu, %hu] sent",
+ (kGS_FastStartGap == gap_status) ? "Fast Start" : "Retransmit",
+ gap.start_seq_, gap.end_seq_);
+ }
+ // Update the current gap info.
+ current_gap_ = gap;
+ current_gap_status_ = gap_status;
+ next_retrans_req_time_ = monotonicUSecNow() +
+ ((kGS_FastStartGap == current_gap_status_)
+ ? kFastStartTimeoutUSec
+ : kGapRerequestTimeoutUSec);
+ }
+ return false;
+// Compute when its time to send the next gap retransmission in milliseconds.
+// Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit
+// right now.
+int AAH_RXPlayer::computeNextGapRetransmitTimeout() {
+ if (kGS_NoGap == current_gap_status_) {
+ return -1;
+ }
+ int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow();
+ timeout_delta /= 1000;
+ if (timeout_delta <= 0) {
+ return 0;
+ }
+ return static_cast<uint32_t>(timeout_delta);
+} // namespace android
diff --git a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp
new file mode 100644
index 0000000..0d8b31f
--- /dev/null
+++ b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp
@@ -0,0 +1,366 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+//#define LOG_NDEBUG 0
+#include <utils/Log.h>
+#include "aah_rx_player.h"
+namespace android {
+AAH_RXPlayer::RXRingBuffer::RXRingBuffer(uint32_t capacity) {
+ capacity_ = capacity;
+ rd_ = wr_ = 0;
+ ring_ = new PacketBuffer*[capacity];
+ memset(ring_, 0, sizeof(PacketBuffer*) * capacity);
+ reset();
+AAH_RXPlayer::RXRingBuffer::~RXRingBuffer() {
+ reset();
+ delete[] ring_;
+void AAH_RXPlayer::RXRingBuffer::reset() {
+ AutoMutex lock(&lock_);
+ if (NULL != ring_) {
+ while (rd_ != wr_) {
+ CHECK(rd_ < capacity_);
+ if (NULL != ring_[rd_]) {
+ PacketBuffer::destroy(ring_[rd_]);
+ ring_[rd_] = NULL;
+ }
+ rd_ = (rd_ + 1) % capacity_;
+ }
+ }
+ rd_ = wr_ = 0;
+ rd_seq_known_ = false;
+ waiting_for_fast_start_ = true;
+ fetched_first_packet_ = false;
+ rtp_activity_timeout_valid_ = false;
+bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf,
+ uint16_t seq) {
+ AutoMutex lock(&lock_);
+ CHECK(NULL != ring_);
+ CHECK(NULL != buf);
+ rtp_activity_timeout_valid_ = true;
+ rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec;
+ // If the ring buffer is totally reset (we have never received a single
+ // payload) then we don't know the rd sequence number and this should be
+ // simple. We just store the payload, advance the wr pointer and record the
+ // initial sequence number.
+ if (!rd_seq_known_) {
+ CHECK(rd_ == wr_);
+ CHECK(NULL == ring_[wr_]);
+ CHECK(wr_ < capacity_);
+ ring_[wr_] = buf;
+ wr_ = (wr_ + 1) % capacity_;
+ rd_seq_ = seq;
+ rd_seq_known_ = true;
+ return true;
+ }
+ // Compute the seqence number of this payload and of the write pointer,
+ // normalized around the read pointer. IOW - transform the payload seq no
+ // and the wr pointer seq no into a space where the rd pointer seq no is
+ // zero. This will define 4 cases we can consider...
+ //
+ // 1) norm_seq == norm_wr_seq
+ // This payload is contiguous with the last. All is good.
+ //
+ // 2) ((norm_seq < norm_wr_seq) && (norm_seq >= norm_rd_seq)
+ // aka ((norm_seq < norm_wr_seq) && (norm_seq >= 0)
+ // This payload is in the past, in the unprocessed region of the ring
+ // buffer. It is probably a retransmit intended to fill in a dropped
+ // payload; it may be a duplicate.
+ //
+ // 3) ((norm_seq - norm_wr_seq) & 0x8000) != 0
+ // This payload is in the past compared to the write pointer (or so very
+ // far in the future that it has wrapped the seq no space), but not in
+ // the unprocessed region of the ring buffer. This could be a duplicate
+ // retransmit; we just drop these payloads unless we are waiting for our
+ // first fast start packet. If we are waiting for fast start, than this
+ // packet is probably the first packet of the fast start retransmission.
+ // If it will fit in the buffer, back up the read pointer to its position
+ // and clear the fast start flag, otherwise just drop it.
+ //
+ // 4) ((norm_seq - norm_wr_seq) & 0x8000) == 0
+ // This payload which is ahead of the next write pointer. This indicates
+ // that we have missed some payloads and need to request a retransmit.
+ // If norm_seq >= (capacity - 1), then the gap is so large that it would
+ // overflow the ring buffer and we should probably start to panic.
+ uint16_t norm_wr_seq = ((wr_ + capacity_ - rd_) % capacity_);
+ uint16_t norm_seq = seq - rd_seq_;
+ // Check for overflow first.
+ if ((!(norm_seq & 0x8000)) && (norm_seq >= (capacity_ - 1))) {
+ ALOGW("Ring buffer overflow; cap = %u, [rd, wr] = [%hu, %hu], seq = %hu",
+ capacity_, rd_seq_, norm_wr_seq + rd_seq_, seq);
+ PacketBuffer::destroy(buf);
+ return false;
+ }
+ // Check for case #1
+ if (norm_seq == norm_wr_seq) {
+ CHECK(wr_ < capacity_);
+ CHECK(NULL == ring_[wr_]);
+ ring_[wr_] = buf;
+ wr_ = (wr_ + 1) % capacity_;
+ CHECK(wr_ != rd_);
+ return true;
+ }
+ // Check case #2
+ uint32_t ring_pos = (rd_ + norm_seq) % capacity_;
+ if ((norm_seq < norm_wr_seq) && (!(norm_seq & 0x8000))) {
+ // Do we already have a payload for this slot? If so, then this looks
+ // like a duplicate retransmit. Just ignore it.
+ if (NULL != ring_[ring_pos]) {
+ ALOGD("RXed duplicate retransmit, seq = %hu", seq);
+ PacketBuffer::destroy(buf);
+ } else {
+ // Looks like we were missing this payload. Go ahead and store it.
+ ring_[ring_pos] = buf;
+ }
+ return true;
+ }
+ // Check case #3
+ if ((norm_seq - norm_wr_seq) & 0x8000) {
+ if (!waiting_for_fast_start_) {
+ ALOGD("RXed duplicate retransmit from before rd pointer, seq = %hu",
+ seq);
+ PacketBuffer::destroy(buf);
+ } else {
+ // Looks like a fast start fill-in. Go ahead and store it, assuming
+ // that we can fit it in the buffer.
+ uint32_t implied_ring_size = static_cast<uint32_t>(norm_wr_seq)
+ + (rd_seq_ - seq);
+ if (implied_ring_size >= (capacity_ - 1)) {
+ ALOGD("RXed what looks like a fast start packet (seq = %hu),"
+ " but packet is too far in the past to fit into the ring"
+ " buffer. Dropping.", seq);
+ PacketBuffer::destroy(buf);
+ } else {
+ ring_pos = (rd_ + capacity_ + seq - rd_seq_) % capacity_;
+ rd_seq_ = seq;
+ rd_ = ring_pos;
+ waiting_for_fast_start_ = false;
+ CHECK(ring_pos < capacity_);
+ CHECK(NULL == ring_[ring_pos]);
+ ring_[ring_pos] = buf;
+ }
+ }
+ return true;
+ }
+ // Must be in case #4 with no overflow. This packet fits in the current
+ // ring buffer, but is discontiuguous. Advance the write pointer leaving a
+ // gap behind.
+ uint32_t gap_len = (ring_pos + capacity_ - wr_) % capacity_;
+ ALOGD("Drop detected; %u packets, seq_range [%hu, %hu]",
+ gap_len,
+ rd_seq_ + norm_wr_seq,
+ rd_seq_ + norm_wr_seq + gap_len - 1);
+ CHECK(NULL == ring_[ring_pos]);
+ ring_[ring_pos] = buf;
+ wr_ = (ring_pos + 1) % capacity_;
+ CHECK(wr_ != rd_);
+ return true;
+AAH_RXPlayer::RXRingBuffer::fetchBuffer(bool* is_discon) {
+ AutoMutex lock(&lock_);
+ CHECK(NULL != ring_);
+ CHECK(NULL != is_discon);
+ // If the read seqence number is not known, then this ring buffer has not
+ // received a packet since being reset and there cannot be any packets to
+ // return. If we are still waiting for the first fast start packet to show
+ // up, we don't want to let any buffer be consumed yet because we expect to
+ // see a packet before the initial read sequence number show up shortly.
+ if (!rd_seq_known_ || waiting_for_fast_start_) {
+ *is_discon = false;
+ return NULL;
+ }
+ PacketBuffer* ret = NULL;
+ *is_discon = !fetched_first_packet_;
+ while ((rd_ != wr_) && (NULL == ret)) {
+ CHECK(rd_ < capacity_);
+ // If we hit a gap, stall and do not advance the read pointer. Let the
+ // higher level code deal with requesting retries and/or deciding to
+ // skip the current gap.
+ ret = ring_[rd_];
+ if (NULL == ret) {
+ break;
+ }
+ ring_[rd_] = NULL;
+ rd_ = (rd_ + 1) % capacity_;
+ ++rd_seq_;
+ }
+ if (NULL != ret) {
+ fetched_first_packet_ = true;
+ }
+ return ret;
+AAH_RXPlayer::RXRingBuffer::fetchCurrentGap(SeqNoGap* gap) {
+ AutoMutex lock(&lock_);
+ CHECK(NULL != ring_);
+ CHECK(NULL != gap);
+ // If the read seqence number is not known, then this ring buffer has not
+ // received a packet since being reset and there cannot be any gaps.
+ if (!rd_seq_known_) {
+ return kGS_NoGap;
+ }
+ // If we are waiting for fast start, then the current gap is a fast start
+ // gap and it includes all packets before the read sequence number.
+ if (waiting_for_fast_start_) {
+ gap->start_seq_ =
+ gap->end_seq_ = rd_seq_ - 1;
+ return kGS_FastStartGap;
+ }
+ // If rd == wr, then the buffer is empty and there cannot be any gaps.
+ if (rd_ == wr_) {
+ return kGS_NoGap;
+ }
+ // If rd_ is currently pointing at an unprocessed packet, then there is no
+ // current gap.
+ CHECK(rd_ < capacity_);
+ if (NULL != ring_[rd_]) {
+ return kGS_NoGap;
+ }
+ // Looks like there must be a gap here. The start of the gap is the current
+ // rd sequence number, all we need to do now is determine its length in
+ // order to compute the end sequence number.
+ gap->start_seq_ = rd_seq_;
+ uint16_t end = rd_seq_;
+ uint32_t tmp = (rd_ + 1) % capacity_;
+ while ((tmp != wr_) && (NULL == ring_[tmp])) {
+ ++end;
+ tmp = (tmp + 1) % capacity_;
+ }
+ gap->end_seq_ = end;
+ return kGS_NormalGap;
+void AAH_RXPlayer::RXRingBuffer::processNAK(const SeqNoGap* nak) {
+ AutoMutex lock(&lock_);
+ CHECK(NULL != ring_);
+ // If we were waiting for our first fast start fill-in packet, and we
+ // received a NAK, then apparantly we are not getting our fast start. Just
+ // clear the waiting flag and go back to normal behavior.
+ if (waiting_for_fast_start_) {
+ waiting_for_fast_start_ = false;
+ }
+ // If we have not received a packet since last reset, or there is no data in
+ // the ring, then there is nothing to skip.
+ if ((!rd_seq_known_) || (rd_ == wr_)) {
+ return;
+ }
+ // If rd_ is currently pointing at an unprocessed packet, then there is no
+ // gap to skip.
+ CHECK(rd_ < capacity_);
+ if (NULL != ring_[rd_]) {
+ return;
+ }
+ // Looks like there must be a gap here. Advance rd until we have passed
+ // over the portion of it indicated by nak (or all of the gap if nak is
+ // NULL). Then reset fetched_first_packet_ so that the next read will show
+ // up as being discontiguous.
+ uint16_t seq_after_gap = (NULL == nak) ? 0 : nak->end_seq_ + 1;
+ while ((rd_ != wr_) &&
+ (NULL == ring_[rd_]) &&
+ ((NULL == nak) || (seq_after_gap != rd_seq_))) {
+ rd_ = (rd_ + 1) % capacity_;
+ ++rd_seq_;
+ }
+ fetched_first_packet_ = false;
+int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() {
+ AutoMutex lock(&lock_);
+ if (!rtp_activity_timeout_valid_) {
+ return -1;
+ }
+ uint64_t now = monotonicUSecNow();
+ if (rtp_activity_timeout_ <= now) {
+ return 0;
+ }
+ return (rtp_activity_timeout_ - now) / 1000;
+AAH_RXPlayer::PacketBuffer::allocate(ssize_t length) {
+ if (length <= 0) {
+ return NULL;
+ }
+ uint32_t alloc_len = sizeof(PacketBuffer) + length;
+ PacketBuffer* ret = reinterpret_cast<PacketBuffer*>(
+ new uint8_t[alloc_len]);
+ if (NULL != ret) {
+ ret->length_ = length;
+ }
+ return ret;
+void AAH_RXPlayer::PacketBuffer::destroy(PacketBuffer* pb) {
+ uint8_t* kill_me = reinterpret_cast<uint8_t*>(pb);
+ delete[] kill_me;
+} // namespace android
diff --git a/media/libaah_rtp/aah_rx_player_substream.cpp b/media/libaah_rtp/aah_rx_player_substream.cpp
new file mode 100644
index 0000000..1e4c784
--- /dev/null
+++ b/media/libaah_rtp/aah_rx_player_substream.cpp
@@ -0,0 +1,498 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+//#define LOG_NDEBUG 0
+#include <utils/Log.h>
+#include <include/avc_utils.h>
+#include <media/stagefright/MediaBuffer.h>
+#include <media/stagefright/MediaDefs.h>
+#include <media/stagefright/MetaData.h>
+#include <media/stagefright/OMXCodec.h>
+#include <media/stagefright/Utils.h>
+#include "aah_rx_player.h"
+namespace android {
+int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold =
+ 50ull * 1000;
+AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) {
+ ssrc_ = ssrc;
+ substream_details_known_ = false;
+ buffer_in_progress_ = NULL;
+ status_ = OK;
+ decoder_ = new AAH_DecoderPump(omx);
+ if (decoder_ == NULL) {
+ ALOGE("%s failed to allocate decoder pump!", __PRETTY_FUNCTION__);
+ }
+ if (OK != decoder_->initCheck()) {
+ ALOGE("%s failed to initialize decoder pump!", __PRETTY_FUNCTION__);
+ }
+ // cleanupBufferInProgress will reset most of the internal state variables.
+ // Just need to make sure that buffer_in_progress_ is NULL before calling.
+ cleanupBufferInProgress();
+void AAH_RXPlayer::Substream::shutdown() {
+ substream_meta_ = NULL;
+ status_ = OK;
+ cleanupBufferInProgress();
+ cleanupDecoder();
+void AAH_RXPlayer::Substream::cleanupBufferInProgress() {
+ if (NULL != buffer_in_progress_) {
+ buffer_in_progress_->release();
+ buffer_in_progress_ = NULL;
+ }
+ expected_buffer_size_ = 0;
+ buffer_filled_ = 0;
+ waiting_for_rap_ = true;
+void AAH_RXPlayer::Substream::cleanupDecoder() {
+ if (decoder_ != NULL) {
+ decoder_->shutdown();
+ }
+bool AAH_RXPlayer::Substream::shouldAbort(const char* log_tag) {
+ // If we have already encountered a fatal error, do nothing. We are just
+ // waiting for our owner to shut us down now.
+ if (OK != status_) {
+ ALOGV("Skipping %s, substream has encountered fatal error (%d).",
+ log_tag, status_);
+ return true;
+ }
+ return false;
+void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf,
+ uint32_t amt,
+ int32_t ts_lower) {
+ uint32_t min_length = 6;
+ if (shouldAbort(__PRETTY_FUNCTION__)) {
+ return;
+ }
+ // Do we have a buffer in progress already? If so, abort the buffer. In
+ // theory, this should never happen. If there were a discontinutity in the
+ // stream, the discon in the seq_nos at the RTP level should have already
+ // triggered a cleanup of the buffer in progress. To see a problem at this
+ // level is an indication either of a bug in the transmitter, or some form
+ // of terrible corruption/tampering on the wire.
+ if (NULL != buffer_in_progress_) {
+ ALOGE("processPayloadStart is aborting payload already in progress.");
+ cleanupBufferInProgress();
+ }
+ // Parse enough of the header to know where we stand. Since this is a
+ // payload start, it should begin with a TRTP header which has to be at
+ // least 6 bytes long.
+ if (amt < min_length) {
+ ALOGV("Discarding payload too short to contain TRTP header (len = %u)",
+ amt);
+ return;
+ }
+ // Check the TRTP version number.
+ if (0x01 != buf[0]) {
+ ALOGV("Unexpected TRTP version (%u) in header. Expected %u.",
+ buf[0], 1);
+ return;
+ }
+ // Extract the substream type field and make sure its one we understand (and
+ // one that does not conflict with any previously received substream type.
+ uint8_t header_type = (buf[1] >> 4) & 0xF;
+ switch (header_type) {
+ case 0x01:
+ // Audio, yay! Just break. We understand audio payloads.
+ break;
+ case 0x02:
+ ALOGV("RXed packet with unhandled TRTP header type (Video).");
+ return;
+ case 0x03:
+ ALOGV("RXed packet with unhandled TRTP header type (Subpicture).");
+ return;
+ case 0x04:
+ ALOGV("RXed packet with unhandled TRTP header type (Control).");
+ return;
+ default:
+ ALOGV("RXed packet with unhandled TRTP header type (%u).",
+ header_type);
+ return;
+ }
+ if (substream_details_known_ && (header_type != substream_type_)) {
+ ALOGV("RXed TRTP Payload for SSRC=0x%08x where header type (%u) does not"
+ " match previously received header type (%u)",
+ ssrc_, header_type, substream_type_);
+ return;
+ }
+ // Check the flags to see if there is another 32 bits of timestamp present.
+ uint32_t trtp_header_len = 6;
+ bool ts_valid = buf[1] & 0x1;
+ if (ts_valid) {
+ min_length += 4;
+ trtp_header_len += 4;
+ if (amt < min_length) {
+ ALOGV("Discarding payload too short to contain TRTP timestamp"
+ " (len = %u)", amt);
+ return;
+ }
+ }
+ // Extract the TRTP length field and sanity check it.
+ uint32_t trtp_len;
+ trtp_len = (static_cast<uint32_t>(buf[2]) << 24) |
+ (static_cast<uint32_t>(buf[3]) << 16) |
+ (static_cast<uint32_t>(buf[4]) << 8) |
+ static_cast<uint32_t>(buf[5]);
+ if (trtp_len < min_length) {
+ ALOGV("TRTP length (%u) is too short to be valid. Must be at least %u"
+ " bytes.", trtp_len, min_length);
+ return;
+ }
+ // Extract the rest of the timestamp field if valid.
+ int64_t ts = 0;
+ uint32_t parse_offset = 6;
+ if (ts_valid) {
+ ts = (static_cast<int64_t>(buf[parse_offset ]) << 56) |
+ (static_cast<int64_t>(buf[parse_offset + 1]) << 48) |
+ (static_cast<int64_t>(buf[parse_offset + 2]) << 40) |
+ (static_cast<int64_t>(buf[parse_offset + 3]) << 32);
+ ts |= ts_lower;
+ parse_offset += 4;
+ }
+ // Check the flags to see if there is another 24 bytes of timestamp
+ // transformation present.
+ if (buf[1] & 0x2) {
+ min_length += 24;
+ parse_offset += 24;
+ trtp_header_len += 24;
+ if (amt < min_length) {
+ ALOGV("Discarding payload too short to contain TRTP timestamp"
+ " transformation (len = %u)", amt);
+ return;
+ }
+ }
+ // TODO : break the parsing into individual parsers for the different
+ // payload types (audio, video, etc).
+ //
+ // At this point in time, we know that this is audio. Go ahead and parse
+ // the basic header, check the codec type, and find the payload portion of
+ // the packet.
+ min_length += 3;
+ if (trtp_len < min_length) {
+ ALOGV("TRTP length (%u) is too short to be a valid audio payload. Must"
+ " be at least %u bytes.", trtp_len, min_length);
+ return;
+ }
+ if (amt < min_length) {
+ ALOGV("TRTP porttion of RTP payload (%u bytes) too small to contain"
+ " entire TRTP header. TRTP does not currently support fragmenting"
+ " TRTP headers across RTP payloads", amt);
+ return;
+ }
+ uint8_t codec_type = buf[parse_offset ];
+ uint8_t flags = buf[parse_offset + 1];
+ uint8_t volume = buf[parse_offset + 2];
+ parse_offset += 3;
+ trtp_header_len += 3;
+ if (!setupSubstreamType(header_type, codec_type)) {
+ return;
+ }
+ if (decoder_ != NULL) {
+ decoder_->setRenderVolume(volume);
+ }
+ // TODO : move all of the constant flag and offset definitions for TRTP up
+ // into some sort of common header file.
+ if (waiting_for_rap_ && !(flags & 0x08)) {
+ ALOGV("Dropping non-RAP TRTP Audio Payload while waiting for RAP.");
+ return;
+ }
+ if (flags & 0x10) {
+ ALOGV("Dropping TRTP Audio Payload with aux codec data present (only"
+ " handle MP3 right now, and it has no aux data)");
+ return;
+ }
+ // OK - everything left is just payload. Compute the payload size, start
+ // the buffer in progress and pack as much payload as we can into it. If
+ // the payload is finished once we are done, go ahead and send the payload
+ // to the decoder.
+ expected_buffer_size_ = trtp_len - trtp_header_len;
+ if (!expected_buffer_size_) {
+ ALOGV("Dropping TRTP Audio Payload with 0 Access Unit length");
+ return;
+ }
+ CHECK(amt >= trtp_header_len);
+ uint32_t todo = amt - trtp_header_len;
+ if (expected_buffer_size_ < todo) {
+ ALOGV("Extra data (%u > %u) present in initial TRTP Audio Payload;"
+ " dropping payload.", todo, expected_buffer_size_);
+ return;
+ }
+ buffer_filled_ = 0;
+ buffer_in_progress_ = new MediaBuffer(expected_buffer_size_);
+ if ((NULL == buffer_in_progress_) ||
+ (NULL == buffer_in_progress_->data())) {
+ ALOGV("Failed to allocate MediaBuffer of length %u",
+ expected_buffer_size_);
+ cleanupBufferInProgress();
+ return;
+ }
+ sp<MetaData> meta = buffer_in_progress_->meta_data();
+ if (meta == NULL) {
+ ALOGV("Missing metadata structure in allocated MediaBuffer; dropping"
+ " payload");
+ cleanupBufferInProgress();
+ return;
+ }
+ // TODO : set this based on the codec type indicated in the TRTP stream.
+ // Right now, we only support MP3, so the choice is obvious.
+ meta->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_MPEG);
+ if (ts_valid) {
+ meta->setInt64(kKeyTime, ts);
+ }
+ if (amt > 0) {
+ uint8_t* tgt =
+ reinterpret_cast<uint8_t*>(buffer_in_progress_->data());
+ memcpy(tgt + buffer_filled_, buf + trtp_header_len, todo);
+ buffer_filled_ += amt;
+ }
+ if (buffer_filled_ >= expected_buffer_size_) {
+ processCompletedBuffer();
+ }
+void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf,
+ uint32_t amt) {
+ if (shouldAbort(__PRETTY_FUNCTION__)) {
+ return;
+ }
+ if (NULL == buffer_in_progress_) {
+ ALOGV("TRTP Receiver skipping payload continuation; no buffer currently"
+ " in progress.");
+ return;
+ }
+ CHECK(buffer_filled_ < expected_buffer_size_);
+ uint32_t buffer_left = expected_buffer_size_ - buffer_filled_;
+ if (amt > buffer_left) {
+ ALOGV("Extra data (%u > %u) present in continued TRTP Audio Payload;"
+ " dropping payload.", amt, buffer_left);
+ cleanupBufferInProgress();
+ return;
+ }
+ if (amt > 0) {
+ uint8_t* tgt =
+ reinterpret_cast<uint8_t*>(buffer_in_progress_->data());
+ memcpy(tgt + buffer_filled_, buf, amt);
+ buffer_filled_ += amt;
+ }
+ if (buffer_filled_ >= expected_buffer_size_) {
+ processCompletedBuffer();
+ }
+void AAH_RXPlayer::Substream::processCompletedBuffer() {
+ const uint8_t* buffer_data = NULL;
+ int sample_rate;
+ int channel_count;
+ size_t frame_size;
+ status_t res;
+ CHECK(NULL != buffer_in_progress_);
+ if (decoder_ == NULL) {
+ ALOGV("Dropping complete buffer, no decoder pump allocated");
+ goto bailout;
+ }
+ buffer_data = reinterpret_cast<const uint8_t*>(buffer_in_progress_->data());
+ if (buffer_in_progress_->size() < 4) {
+ ALOGV("MP3 payload too short to contain header, dropping payload.");
+ goto bailout;
+ }
+ // Extract the channel count and the sample rate from the MP3 header. The
+ // stagefright MP3 requires that these be delivered before decoing can
+ // begin.
+ if (!GetMPEGAudioFrameSize(U32_AT(buffer_data),
+ &frame_size,
+ &sample_rate,
+ &channel_count,
+ NULL)) {
+ ALOGV("Failed to parse MP3 header in payload, droping payload.");
+ goto bailout;
+ }
+ // Make sure that our substream metadata is set up properly. If there has
+ // been a format change, be sure to reset the underlying decoder. In
+ // stagefright, it seems like the only way to do this is to destroy and
+ // recreate the decoder.
+ if (substream_meta_ == NULL) {
+ substream_meta_ = new MetaData();
+ if (substream_meta_ == NULL) {
+ ALOGE("Failed to allocate MetaData structure for substream");
+ goto bailout;
+ }
+ substream_meta_->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_MPEG);
+ substream_meta_->setInt32 (kKeyChannelCount, channel_count);
+ substream_meta_->setInt32 (kKeySampleRate, sample_rate);
+ } else {
+ int32_t prev_sample_rate;
+ int32_t prev_channel_count;
+ substream_meta_->findInt32(kKeySampleRate, &prev_sample_rate);
+ substream_meta_->findInt32(kKeyChannelCount, &prev_channel_count);
+ if ((prev_channel_count != channel_count) ||
+ (prev_sample_rate != sample_rate)) {
+ ALOGW("Format change detected, forcing decoder reset.");
+ cleanupDecoder();
+ substream_meta_->setInt32(kKeyChannelCount, channel_count);
+ substream_meta_->setInt32(kKeySampleRate, sample_rate);
+ }
+ }
+ // If our decoder has not be set up, do so now.
+ res = decoder_->init(substream_meta_);
+ if (OK != res) {
+ ALOGE("Failed to init decoder (res = %d)", res);
+ cleanupDecoder();
+ substream_meta_ = NULL;
+ goto bailout;
+ }
+ // Queue the payload for decode.
+ res = decoder_->queueForDecode(buffer_in_progress_);
+ if (res != OK) {
+ ALOGD("Failed to queue payload for decode, resetting decoder pump!"
+ " (res = %d)", res);
+ status_ = res;
+ cleanupDecoder();
+ cleanupBufferInProgress();
+ }
+ // NULL out buffer_in_progress before calling the cleanup helper.
+ //
+ // MediaBuffers use something of a hybrid ref-counting pattern which prevent
+ // the AAH_DecoderPump's input queue from adding their own reference to the
+ // MediaBuffer. MediaBuffers start life with a reference count of 0, as
+ // well as an observer which starts as NULL. Before being given an
+ // observer, the ref count cannot be allowed to become non-zero as it will
+ // cause calls to release() to assert. Basically, before a MediaBuffer has
+ // an observer, they behave like non-ref counted obects where release()
+ // serves the roll of delete. After a MediaBuffer has an observer, they
+ // become more like ref counted objects where add ref and release can be
+ // used, and when the ref count hits zero, the MediaBuffer is handed off to
+ // the observer.
+ //
+ // Given all of this, when we give the buffer to the decoder pump to wait in
+ // the to-be-processed queue, the decoder cannot add a ref to the buffer as
+ // it would in a traditional ref counting system. Instead it needs to
+ // "steal" the non-existent ref. In the case of queue failure, we need to
+ // make certain to release this non-existent reference so that the buffer is
+ // cleaned up during the cleanupBufferInProgress helper. In the case of a
+ // successful queue operation, we need to make certain that the
+ // cleanupBufferInProgress helper does not release the buffer since it needs
+ // to remain alive in the queue. We acomplish this by NULLing out the
+ // buffer pointer before calling the cleanup helper.
+ buffer_in_progress_ = NULL;
+ cleanupBufferInProgress();
+void AAH_RXPlayer::Substream::processTSTransform(const LinearTransform& trans) {
+ if (decoder_ != NULL) {
+ decoder_->setRenderTSTransform(trans);
+ }
+bool AAH_RXPlayer::Substream::isAboutToUnderflow() {
+ if (decoder_ == NULL) {
+ return false;
+ }
+ return decoder_->isAboutToUnderflow(kAboutToUnderflowThreshold);
+bool AAH_RXPlayer::Substream::setupSubstreamType(uint8_t substream_type,
+ uint8_t codec_type) {
+ // Sanity check the codec type. Right now we only support MP3. Also check
+ // for conflicts with previously delivered codec types.
+ if (substream_details_known_ && (codec_type != codec_type_)) {
+ ALOGV("RXed TRTP Payload for SSRC=0x%08x where codec type (%u) does not"
+ " match previously received codec type (%u)",
+ ssrc_, codec_type, codec_type_);
+ return false;
+ }
+ if (codec_type != 0x03) {
+ ALOGV("RXed TRTP Audio Payload for SSRC=0x%08x with unsupported codec"
+ " type (%u)", ssrc_, codec_type);
+ return false;
+ }
+ if (!substream_details_known_) {
+ substream_type_ = substream_type;
+ codec_type_ = codec_type;
+ substream_details_known_ = true;
+ }
+ return true;
+} // namespace android
diff --git a/media/libaah_rtp/aah_tx_packet.cpp b/media/libaah_rtp/aah_tx_packet.cpp
new file mode 100644
index 0000000..3f6e0e9
--- /dev/null
+++ b/media/libaah_rtp/aah_tx_packet.cpp
@@ -0,0 +1,331 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+#include <utils/Log.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include <media/stagefright/foundation/ADebug.h>
+#include "aah_tx_packet.h"
+namespace android {
+const int TRTPPacket::kRTPHeaderLen;
+const uint32_t TRTPPacket::kTRTPEpochMask;
+TRTPPacket::~TRTPPacket() {
+ delete mPacket;
+/*** TRTP packet properties ***/
+void TRTPPacket::setSeqNumber(uint16_t val) {
+ mSeqNumber = val;
+ if (mIsPacked) {
+ const int kTRTPSeqNumberOffset = 2;
+ uint16_t* buf = reinterpret_cast<uint16_t*>(
+ mPacket + kTRTPSeqNumberOffset);
+ *buf = htons(mSeqNumber);
+ }
+uint16_t TRTPPacket::getSeqNumber() const {
+ return mSeqNumber;
+void TRTPPacket::setPTS(int64_t val) {
+ CHECK(!mIsPacked);
+ mPTS = val;
+ mPTSValid = true;
+int64_t TRTPPacket::getPTS() const {
+ return mPTS;
+void TRTPPacket::setEpoch(uint32_t val) {
+ mEpoch = val;
+ if (mIsPacked) {
+ const int kTRTPEpochOffset = 8;
+ uint32_t* buf = reinterpret_cast<uint32_t*>(
+ mPacket + kTRTPEpochOffset);
+ uint32_t val = ntohl(*buf);
+ val &= ~(kTRTPEpochMask << kTRTPEpochShift);
+ val |= (mEpoch & kTRTPEpochMask) << kTRTPEpochShift;
+ *buf = htonl(val);
+ }
+void TRTPPacket::setProgramID(uint16_t val) {
+ CHECK(!mIsPacked);
+ mProgramID = val;
+void TRTPPacket::setSubstreamID(uint16_t val) {
+ CHECK(!mIsPacked);
+ mSubstreamID = val;
+void TRTPPacket::setClockTransform(const LinearTransform& trans) {
+ CHECK(!mIsPacked);
+ mClockTranform = trans;
+ mClockTranformValid = true;
+uint8_t* TRTPPacket::getPacket() const {
+ CHECK(mIsPacked);
+ return mPacket;
+int TRTPPacket::getPacketLen() const {
+ CHECK(mIsPacked);
+ return mPacketLen;
+void TRTPPacket::setExpireTime(nsecs_t val) {
+ CHECK(!mIsPacked);
+ mExpireTime = val;
+nsecs_t TRTPPacket::getExpireTime() const {
+ return mExpireTime;
+/*** TRTP audio packet properties ***/
+void TRTPAudioPacket::setCodecType(TRTPAudioCodecType val) {
+ CHECK(!mIsPacked);
+ mCodecType = val;
+void TRTPAudioPacket::setRandomAccessPoint(bool val) {
+ CHECK(!mIsPacked);
+ mRandomAccessPoint = val;
+void TRTPAudioPacket::setDropable(bool val) {
+ CHECK(!mIsPacked);
+ mDropable = val;
+void TRTPAudioPacket::setDiscontinuity(bool val) {
+ CHECK(!mIsPacked);
+ mDiscontinuity = val;
+void TRTPAudioPacket::setEndOfStream(bool val) {
+ CHECK(!mIsPacked);
+ mEndOfStream = val;
+void TRTPAudioPacket::setVolume(uint8_t val) {
+ CHECK(!mIsPacked);
+ mVolume = val;
+void TRTPAudioPacket::setAccessUnitData(void* data, int len) {
+ CHECK(!mIsPacked);
+ mAccessUnitData = data;
+ mAccessUnitLen = len;
+/*** TRTP control packet properties ***/
+void TRTPControlPacket::setCommandID(TRTPCommandID val) {
+ CHECK(!mIsPacked);
+ mCommandID = val;
+/*** TRTP packet serializers ***/
+void TRTPPacket::writeU8(uint8_t*& buf, uint8_t val) {
+ *buf = val;
+ buf++;
+void TRTPPacket::writeU16(uint8_t*& buf, uint16_t val) {
+ *reinterpret_cast<uint16_t*>(buf) = htons(val);
+ buf += 2;
+void TRTPPacket::writeU32(uint8_t*& buf, uint32_t val) {
+ *reinterpret_cast<uint32_t*>(buf) = htonl(val);
+ buf += 4;
+void TRTPPacket::writeU64(uint8_t*& buf, uint64_t val) {
+ buf[0] = static_cast<uint8_t>(val >> 56);
+ buf[1] = static_cast<uint8_t>(val >> 48);
+ buf[2] = static_cast<uint8_t>(val >> 40);
+ buf[3] = static_cast<uint8_t>(val >> 32);
+ buf[4] = static_cast<uint8_t>(val >> 24);
+ buf[5] = static_cast<uint8_t>(val >> 16);
+ buf[6] = static_cast<uint8_t>(val >> 8);
+ buf[7] = static_cast<uint8_t>(val);
+ buf += 8;
+void TRTPPacket::writeTRTPHeader(uint8_t*& buf,
+ bool isFirstFragment,
+ int totalPacketLen) {
+ // RTP header
+ writeU8(buf,
+ ((mVersion & 0x03) << 6) |
+ (static_cast<int>(mPadding) << 5) |
+ (static_cast<int>(mExtension) << 4) |
+ (mCsrcCount & 0x0F));
+ writeU8(buf,
+ (static_cast<int>(isFirstFragment) << 7) |
+ (mPayloadType & 0x7F));
+ writeU16(buf, mSeqNumber);
+ if (isFirstFragment && mPTSValid) {
+ writeU32(buf, mPTS & 0xFFFFFFFF);
+ } else {
+ writeU32(buf, 0);
+ }
+ writeU32(buf,
+ ((mEpoch & kTRTPEpochMask) << kTRTPEpochShift) |
+ ((mProgramID & 0x1F) << 5) |
+ (mSubstreamID & 0x1F));
+ // TRTP header
+ writeU8(buf, mTRTPVersion);
+ writeU8(buf,
+ ((mTRTPHeaderType & 0x0F) << 4) |
+ (mClockTranformValid ? 0x02 : 0x00) |
+ (mPTSValid ? 0x01 : 0x00));
+ writeU32(buf, totalPacketLen - kRTPHeaderLen);
+ if (mPTSValid) {
+ writeU32(buf, mPTS >> 32);
+ }
+ if (mClockTranformValid) {
+ writeU64(buf, mClockTranform.a_zero);
+ writeU32(buf, mClockTranform.a_to_b_numer);
+ writeU32(buf, mClockTranform.a_to_b_denom);
+ writeU64(buf, mClockTranform.b_zero);
+ }
+bool TRTPAudioPacket::pack() {
+ if (mIsPacked) {
+ return false;
+ }
+ int packetLen = kRTPHeaderLen +
+ mAccessUnitLen +
+ TRTPHeaderLen();
+ // TODO : support multiple fragments
+ const int kMaxUDPPayloadLen = 65507;
+ if (packetLen > kMaxUDPPayloadLen) {
+ return false;
+ }
+ mPacket = new uint8_t[packetLen];
+ if (!mPacket) {
+ return false;
+ }
+ mPacketLen = packetLen;
+ uint8_t* cur = mPacket;
+ writeTRTPHeader(cur, true, packetLen);
+ writeU8(cur, mCodecType);
+ writeU8(cur,
+ (static_cast<int>(mRandomAccessPoint) << 3) |
+ (static_cast<int>(mDropable) << 2) |
+ (static_cast<int>(mDiscontinuity) << 1) |
+ (static_cast<int>(mEndOfStream)));
+ writeU8(cur, mVolume);
+ memcpy(cur, mAccessUnitData, mAccessUnitLen);
+ mIsPacked = true;
+ return true;
+int TRTPPacket::TRTPHeaderLen() const {
+ // 6 bytes for version, payload type, flags and length. An additional 4 if
+ // there are upper timestamp bits present and another 24 if there is a clock
+ // transformation present.
+ return 6 +
+ (mClockTranformValid ? 24 : 0) +
+ (mPTSValid ? 4 : 0);
+int TRTPAudioPacket::TRTPHeaderLen() const {
+ // TRTPPacket::TRTPHeaderLen() for the base TRTPHeader. 3 bytes for audio's
+ // codec type, flags and volume field. Another 5 bytes if the codec type is
+ // PCM and we are sending sample rate/channel count. as well as however long
+ // the aux data (if present) is.
+ int pcmParamLength;
+ switch(mCodecType) {
+ case kCodecPCMBigEndian:
+ case kCodecPCMLittleEndian:
+ pcmParamLength = 5;
+ break;
+ default:
+ pcmParamLength = 0;
+ break;
+ }
+ // TODO : properly compute aux data length. Currently, nothing
+ // uses aux data, so its length is always 0.
+ int auxDataLength = 0;
+ return TRTPPacket::TRTPHeaderLen() +
+ 3 +
+ auxDataLength +
+ pcmParamLength;
+bool TRTPControlPacket::pack() {
+ if (mIsPacked) {
+ return false;
+ }
+ // command packets contain a 2-byte command ID
+ int packetLen = kRTPHeaderLen +
+ TRTPHeaderLen() +
+ 2;
+ mPacket = new uint8_t[packetLen];
+ if (!mPacket) {
+ return false;
+ }
+ mPacketLen = packetLen;
+ uint8_t* cur = mPacket;
+ writeTRTPHeader(cur, true, packetLen);
+ writeU16(cur, mCommandID);
+ mIsPacked = true;
+ return true;
+} // namespace android
diff --git a/media/libaah_rtp/aah_tx_packet.h b/media/libaah_rtp/aah_tx_packet.h
new file mode 100644
index 0000000..833803e
--- /dev/null
+++ b/media/libaah_rtp/aah_tx_packet.h
@@ -0,0 +1,191 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __AAH_TX_PACKET_H__
+#define __AAH_TX_PACKET_H__
+#include <media/stagefright/foundation/ABase.h>
+#include <utils/LinearTransform.h>
+#include <utils/RefBase.h>
+#include <utils/Timers.h>
+namespace android {
+class TRTPPacket : public RefBase {
+ protected:
+ enum TRTPHeaderType {
+ kHeaderTypeAudio = 1,
+ kHeaderTypeVideo = 2,
+ kHeaderTypeSubpicture = 3,
+ kHeaderTypeControl = 4,
+ };
+ TRTPPacket(TRTPHeaderType headerType)
+ : mIsPacked(false)
+ , mVersion(2)
+ , mPadding(false)
+ , mExtension(false)
+ , mCsrcCount(0)
+ , mPayloadType(100)
+ , mSeqNumber(0)
+ , mPTSValid(false)
+ , mPTS(0)
+ , mEpoch(0)
+ , mProgramID(0)
+ , mSubstreamID(0)
+ , mClockTranformValid(false)
+ , mTRTPVersion(1)
+ , mTRTPLength(0)
+ , mTRTPHeaderType(headerType)
+ , mPacket(NULL)
+ , mPacketLen(0) { }
+ public:
+ virtual ~TRTPPacket();
+ void setSeqNumber(uint16_t val);
+ uint16_t getSeqNumber() const;
+ void setPTS(int64_t val);
+ int64_t getPTS() const;
+ void setEpoch(uint32_t val);
+ void setProgramID(uint16_t val);
+ void setSubstreamID(uint16_t val);
+ void setClockTransform(const LinearTransform& trans);
+ uint8_t* getPacket() const;
+ int getPacketLen() const;
+ void setExpireTime(nsecs_t val);
+ nsecs_t getExpireTime() const;
+ virtual bool pack() = 0;
+ // mask for the number of bits in a TRTP epoch
+ static const uint32_t kTRTPEpochMask = (1 << 22) - 1;
+ static const int kTRTPEpochShift = 10;
+ protected:
+ static const int kRTPHeaderLen = 12;
+ virtual int TRTPHeaderLen() const;
+ void writeTRTPHeader(uint8_t*& buf,
+ bool isFirstFragment,
+ int totalPacketLen);
+ void writeU8(uint8_t*& buf, uint8_t val);
+ void writeU16(uint8_t*& buf, uint16_t val);
+ void writeU32(uint8_t*& buf, uint32_t val);
+ void writeU64(uint8_t*& buf, uint64_t val);
+ bool mIsPacked;
+ uint8_t mVersion;
+ bool mPadding;
+ bool mExtension;
+ uint8_t mCsrcCount;
+ uint8_t mPayloadType;
+ uint16_t mSeqNumber;
+ bool mPTSValid;
+ int64_t mPTS;
+ uint32_t mEpoch;
+ uint16_t mProgramID;
+ uint16_t mSubstreamID;
+ LinearTransform mClockTranform;
+ bool mClockTranformValid;
+ uint8_t mTRTPVersion;
+ uint32_t mTRTPLength;
+ TRTPHeaderType mTRTPHeaderType;
+ uint8_t* mPacket;
+ int mPacketLen;
+ nsecs_t mExpireTime;
+class TRTPAudioPacket : public TRTPPacket {
+ public:
+ TRTPAudioPacket()
+ : TRTPPacket(kHeaderTypeAudio)
+ , mCodecType(kCodecInvalid)
+ , mRandomAccessPoint(false)
+ , mDropable(false)
+ , mDiscontinuity(false)
+ , mEndOfStream(false)
+ , mVolume(0)
+ , mAccessUnitData(NULL) { }
+ enum TRTPAudioCodecType {
+ kCodecInvalid = 0,
+ kCodecPCMBigEndian = 1,
+ kCodecPCMLittleEndian = 2,
+ kCodecMPEG1Audio = 3,
+ };
+ void setCodecType(TRTPAudioCodecType val);
+ void setRandomAccessPoint(bool val);
+ void setDropable(bool val);
+ void setDiscontinuity(bool val);
+ void setEndOfStream(bool val);
+ void setVolume(uint8_t val);
+ void setAccessUnitData(void* data, int len);
+ virtual bool pack();
+ protected:
+ virtual int TRTPHeaderLen() const;
+ private:
+ TRTPAudioCodecType mCodecType;
+ bool mRandomAccessPoint;
+ bool mDropable;
+ bool mDiscontinuity;
+ bool mEndOfStream;
+ uint8_t mVolume;
+ void* mAccessUnitData;
+ int mAccessUnitLen;
+class TRTPControlPacket : public TRTPPacket {
+ public:
+ TRTPControlPacket()
+ : TRTPPacket(kHeaderTypeControl)
+ , mCommandID(kCommandNop) {}
+ enum TRTPCommandID {
+ kCommandNop = 1,
+ kCommandFlush = 2,
+ kCommandEOS = 3,
+ };
+ void setCommandID(TRTPCommandID val);
+ virtual bool pack();
+ private:
+ TRTPCommandID mCommandID;
+} // namespace android
+#endif // __AAH_TX_PLAYER_H__
diff --git a/media/libaah_rtp/aah_tx_player.cpp b/media/libaah_rtp/aah_tx_player.cpp
new file mode 100644
index 0000000..a79a989
--- /dev/null
+++ b/media/libaah_rtp/aah_tx_player.cpp
@@ -0,0 +1,1139 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+#include <utils/Log.h>
+#include <inttypes.h>
+#include <netdb.h>
+#include <netinet/ip.h>
+#include <common_time/cc_helper.h>
+#include <media/IMediaPlayer.h>
+#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/FileSource.h>
+#include <media/stagefright/MediaBuffer.h>
+#include <media/stagefright/MetaData.h>
+#include <utils/Timers.h>
+#include "aah_tx_packet.h"
+#include "aah_tx_player.h"
+namespace android {
+static int64_t kLowWaterMarkUs = 2000000ll; // 2secs
+static int64_t kHighWaterMarkUs = 10000000ll; // 10secs
+static const size_t kLowWaterMarkBytes = 40000;
+static const size_t kHighWaterMarkBytes = 200000;
+// When we start up, how much lead time should we put on the first access unit?
+static const int64_t kAAHStartupLeadTimeUs = 300000LL;
+// How much time do we attempt to lead the clock by in steady state?
+static const int64_t kAAHBufferTimeUs = 1000000LL;
+// how long do we keep data in our retransmit buffer after sending it.
+const int64_t AAH_TXPlayer::kAAHRetryKeepAroundTimeNs =
+ kAAHBufferTimeUs * 1100;
+sp<MediaPlayerBase> createAAH_TXPlayer() {
+ sp<MediaPlayerBase> ret = new AAH_TXPlayer();
+ return ret;
+template <typename T> static T clamp(T val, T min, T max) {
+ if (val < min) {
+ return min;
+ } else if (val > max) {
+ return max;
+ } else {
+ return val;
+ }
+struct AAH_TXEvent : public TimedEventQueue::Event {
+ AAH_TXEvent(AAH_TXPlayer *player,
+ void (AAH_TXPlayer::*method)()) : mPlayer(player)
+ , mMethod(method) {}
+ protected:
+ virtual ~AAH_TXEvent() {}
+ virtual void fire(TimedEventQueue *queue, int64_t /* now_us */) {
+ (mPlayer->*mMethod)();
+ }
+ private:
+ AAH_TXPlayer *mPlayer;
+ void (AAH_TXPlayer::*mMethod)();
+ AAH_TXEvent(const AAH_TXEvent &);
+ AAH_TXEvent& operator=(const AAH_TXEvent &);
+ : mQueueStarted(false)
+ , mFlags(0)
+ , mExtractorFlags(0) {
+ DataSource::RegisterDefaultSniffers();
+ mBufferingEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onBufferingUpdate);
+ mBufferingEventPending = false;
+ mPumpAudioEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onPumpAudio);
+ mPumpAudioEventPending = false;
+ reset_l();
+AAH_TXPlayer::~AAH_TXPlayer() {
+ if (mQueueStarted) {
+ mQueue.stop();
+ }
+ reset_l();
+void AAH_TXPlayer::cancelPlayerEvents(bool keepBufferingGoing) {
+ if (!keepBufferingGoing) {
+ mQueue.cancelEvent(mBufferingEvent->eventID());
+ mBufferingEventPending = false;
+ mQueue.cancelEvent(mPumpAudioEvent->eventID());
+ mPumpAudioEventPending = false;
+ }
+status_t AAH_TXPlayer::initCheck() {
+ // Check for the presense of the common time service by attempting to query
+ // for CommonTime's frequency. If we get an error back, we cannot talk to
+ // the service at all and should abort now.
+ status_t res;
+ uint64_t freq;
+ res = mCCHelper.getCommonFreq(&freq);
+ if (OK != res) {
+ ALOGE("Failed to connect to common time service! (res %d)", res);
+ return res;
+ }
+ return OK;
+status_t AAH_TXPlayer::setDataSource(
+ const char *url,
+ const KeyedVector<String8, String8> *headers) {
+ Mutex::Autolock autoLock(mLock);
+ return setDataSource_l(url, headers);
+status_t AAH_TXPlayer::setDataSource_l(
+ const char *url,
+ const KeyedVector<String8, String8> *headers) {
+ reset_l();
+ // the URL must consist of "aahTX://" followed by the real URL of
+ // the data source
+ const char *kAAHPrefix = "aahTX://";
+ if (strncasecmp(url, kAAHPrefix, strlen(kAAHPrefix))) {
+ }
+ mUri.setTo(url + strlen(kAAHPrefix));
+ if (headers) {
+ mUriHeaders = *headers;
+ ssize_t index = mUriHeaders.indexOfKey(String8("x-hide-urls-from-log"));
+ if (index >= 0) {
+ // Browser is in "incognito" mode, suppress logging URLs.
+ // This isn't something that should be passed to the server.
+ mUriHeaders.removeItemsAt(index);
+ mFlags |= INCOGNITO;
+ }
+ }
+ // The URL may optionally contain a "#" character followed by a Skyjam
+ // cookie. Ideally the cookie header should just be passed in the headers
+ // argument, but the Java API for supplying headers is apparently not yet
+ // exposed in the SDK used by application developers.
+ const char kSkyjamCookieDelimiter = '#';
+ char* skyjamCookie = strrchr(mUri.string(), kSkyjamCookieDelimiter);
+ if (skyjamCookie) {
+ skyjamCookie++;
+ mUriHeaders.add(String8("Cookie"), String8(skyjamCookie));
+ mUri = String8(mUri.string(), skyjamCookie - mUri.string());
+ }
+ return OK;
+status_t AAH_TXPlayer::setDataSource(int fd, int64_t offset, int64_t length) {
+ Mutex::Autolock autoLock(mLock);
+ reset_l();
+ sp<DataSource> dataSource = new FileSource(dup(fd), offset, length);
+ status_t err = dataSource->initCheck();
+ if (err != OK) {
+ return err;
+ }
+ mFileSource = dataSource;
+ sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);
+ if (extractor == NULL) {
+ }
+ return setDataSource_l(extractor);
+status_t AAH_TXPlayer::setVideoSurface(const sp<Surface>& surface) {
+ return OK;
+status_t AAH_TXPlayer::setVideoSurfaceTexture(
+ const sp<ISurfaceTexture>& surfaceTexture) {
+ return OK;
+status_t AAH_TXPlayer::prepare() {
+status_t AAH_TXPlayer::prepareAsync() {
+ Mutex::Autolock autoLock(mLock);
+ return prepareAsync_l();
+status_t AAH_TXPlayer::prepareAsync_l() {
+ if (mFlags & PREPARING) {
+ return UNKNOWN_ERROR; // async prepare already pending
+ }
+ mAAH_Sender = AAH_TXSender::GetInstance();
+ if (mAAH_Sender == NULL) {
+ return NO_MEMORY;
+ }
+ if (!mQueueStarted) {
+ mQueue.start();
+ mQueueStarted = true;
+ }
+ mFlags |= PREPARING;
+ mAsyncPrepareEvent = new AAH_TXEvent(
+ this, &AAH_TXPlayer::onPrepareAsyncEvent);
+ mQueue.postEvent(mAsyncPrepareEvent);
+ return OK;
+status_t AAH_TXPlayer::finishSetDataSource_l() {
+ sp<DataSource> dataSource;
+ if (!strncasecmp("http://", mUri.string(), 7) ||
+ !strncasecmp("https://", mUri.string(), 8)) {
+ mConnectingDataSource = HTTPBase::Create(
+ (mFlags & INCOGNITO)
+ ? HTTPBase::kFlagIncognito
+ : 0);
+ mLock.unlock();
+ status_t err = mConnectingDataSource->connect(mUri, &mUriHeaders);
+ mLock.lock();
+ if (err != OK) {
+ mConnectingDataSource.clear();
+ ALOGI("mConnectingDataSource->connect() returned %d", err);
+ return err;
+ }
+ mCachedSource = new NuCachedSource2(mConnectingDataSource);
+ mConnectingDataSource.clear();
+ dataSource = mCachedSource;
+ // We're going to prefill the cache before trying to instantiate
+ // the extractor below, as the latter is an operation that otherwise
+ // could block on the datasource for a significant amount of time.
+ // During that time we'd be unable to abort the preparation phase
+ // without this prefill.
+ mLock.unlock();
+ for (;;) {
+ status_t finalStatus;
+ size_t cachedDataRemaining =
+ mCachedSource->approxDataRemaining(&finalStatus);
+ if (finalStatus != OK ||
+ cachedDataRemaining >= kHighWaterMarkBytes ||
+ break;
+ }
+ usleep(200000);
+ }
+ mLock.lock();
+ if (mFlags & PREPARE_CANCELLED) {
+ ALOGI("Prepare cancelled while waiting for initial cache fill.");
+ }
+ } else {
+ dataSource = DataSource::CreateFromURI(mUri.string(), &mUriHeaders);
+ }
+ if (dataSource == NULL) {
+ }
+ sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);
+ if (extractor == NULL) {
+ }
+ return setDataSource_l(extractor);
+status_t AAH_TXPlayer::setDataSource_l(const sp<MediaExtractor> &extractor) {
+ // Attempt to approximate overall stream bitrate by summing all
+ // tracks' individual bitrates, if not all of them advertise bitrate,
+ // we have to fail.
+ int64_t totalBitRate = 0;
+ for (size_t i = 0; i < extractor->countTracks(); ++i) {
+ sp<MetaData> meta = extractor->getTrackMetaData(i);
+ int32_t bitrate;
+ if (!meta->findInt32(kKeyBitRate, &bitrate)) {
+ totalBitRate = -1;
+ break;
+ }
+ totalBitRate += bitrate;
+ }
+ mBitrate = totalBitRate;
+ ALOGV("mBitrate = %lld bits/sec", mBitrate);
+ bool haveAudio = false;
+ for (size_t i = 0; i < extractor->countTracks(); ++i) {
+ sp<MetaData> meta = extractor->getTrackMetaData(i);
+ const char *mime;
+ CHECK(meta->findCString(kKeyMIMEType, &mime));
+ if (!strncasecmp(mime, "audio/", 6)) {
+ mAudioSource = extractor->getTrack(i);
+ CHECK(mAudioSource != NULL);
+ haveAudio = true;
+ break;
+ }
+ }
+ if (!haveAudio) {
+ }
+ mExtractorFlags = extractor->flags();
+ return OK;
+void AAH_TXPlayer::abortPrepare(status_t err) {
+ CHECK(err != OK);
+ notifyListener_l(MEDIA_ERROR, MEDIA_ERROR_UNKNOWN, err);
+ mPrepareResult = err;
+ mPreparedCondition.broadcast();
+void AAH_TXPlayer::onPrepareAsyncEvent() {
+ Mutex::Autolock autoLock(mLock);
+ if (mFlags & PREPARE_CANCELLED) {
+ ALOGI("prepare was cancelled before doing anything");
+ abortPrepare(UNKNOWN_ERROR);
+ return;
+ }
+ if (mUri.size() > 0) {
+ status_t err = finishSetDataSource_l();
+ if (err != OK) {
+ abortPrepare(err);
+ return;
+ }
+ }
+ mAudioSource->getFormat()->findInt64(kKeyDuration, &mDurationUs);
+ status_t err = mAudioSource->start();
+ if (err != OK) {
+ ALOGI("failed to start audio source, err=%d", err);
+ abortPrepare(err);
+ return;
+ }
+ if (mCachedSource != NULL) {
+ postBufferingEvent_l();
+ } else {
+ finishAsyncPrepare_l();
+ }
+void AAH_TXPlayer::finishAsyncPrepare_l() {
+ notifyListener_l(MEDIA_PREPARED);
+ mPrepareResult = OK;
+ mFlags |= PREPARED;
+ mPreparedCondition.broadcast();
+status_t AAH_TXPlayer::start() {
+ Mutex::Autolock autoLock(mLock);
+ mFlags &= ~CACHE_UNDERRUN;
+ return play_l();
+status_t AAH_TXPlayer::play_l() {
+ if (mFlags & PLAYING) {
+ return OK;
+ }
+ if (!(mFlags & PREPARED)) {
+ }
+ {
+ Mutex::Autolock lock(mEndpointLock);
+ if (!mEndpointValid) {
+ }
+ if (!mEndpointRegistered) {
+ mProgramID = mAAH_Sender->registerEndpoint(mEndpoint);
+ mEndpointRegistered = true;
+ }
+ }
+ mFlags |= PLAYING;
+ updateClockTransform_l(false);
+ postPumpAudioEvent_l(-1);
+ return OK;
+status_t AAH_TXPlayer::stop() {
+ status_t ret = pause();
+ sendEOS_l();
+ return ret;
+status_t AAH_TXPlayer::pause() {
+ Mutex::Autolock autoLock(mLock);
+ mFlags &= ~CACHE_UNDERRUN;
+ return pause_l();
+status_t AAH_TXPlayer::pause_l(bool doClockUpdate) {
+ if (!(mFlags & PLAYING)) {
+ return OK;
+ }
+ cancelPlayerEvents(true /* keepBufferingGoing */);
+ mFlags &= ~PLAYING;
+ if (doClockUpdate) {
+ updateClockTransform_l(true);
+ }
+ return OK;
+void AAH_TXPlayer::updateClockTransform_l(bool pause) {
+ // record the new pause status so that onPumpAudio knows what rate to apply
+ // when it initializes the transform
+ mPlayRateIsPaused = pause;
+ // if we haven't yet established a valid clock transform, then we can't
+ // do anything here
+ if (!mCurrentClockTransformValid) {
+ return;
+ }
+ // sample the current common time
+ int64_t commonTimeNow;
+ if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
+ ALOGE("updateClockTransform_l get common time failed");
+ mCurrentClockTransformValid = false;
+ return;
+ }
+ // convert the current common time to media time using the old
+ // transform
+ int64_t mediaTimeNow;
+ if (!mCurrentClockTransform.doReverseTransform(
+ commonTimeNow, &mediaTimeNow)) {
+ ALOGE("updateClockTransform_l reverse transform failed");
+ mCurrentClockTransformValid = false;
+ return;
+ }
+ // calculate a new transform that preserves the old transform's
+ // result for the current time
+ mCurrentClockTransform.a_zero = mediaTimeNow;
+ mCurrentClockTransform.b_zero = commonTimeNow;
+ mCurrentClockTransform.a_to_b_numer = 1;
+ mCurrentClockTransform.a_to_b_denom = pause ? 0 : 1;
+ // send a packet announcing the new transform
+ sp<TRTPControlPacket> packet = new TRTPControlPacket();
+ packet->setClockTransform(mCurrentClockTransform);
+ packet->setCommandID(TRTPControlPacket::kCommandNop);
+ queuePacketToSender_l(packet);
+void AAH_TXPlayer::sendEOS_l() {
+ sp<TRTPControlPacket> packet = new TRTPControlPacket();
+ packet->setCommandID(TRTPControlPacket::kCommandEOS);
+ queuePacketToSender_l(packet);
+bool AAH_TXPlayer::isPlaying() {
+ return (mFlags & PLAYING) || (mFlags & CACHE_UNDERRUN);
+status_t AAH_TXPlayer::seekTo(int msec) {
+ if (mExtractorFlags & MediaExtractor::CAN_SEEK) {
+ Mutex::Autolock autoLock(mLock);
+ return seekTo_l(static_cast<int64_t>(msec) * 1000);
+ }
+ notifyListener_l(MEDIA_SEEK_COMPLETE);
+ return OK;
+status_t AAH_TXPlayer::seekTo_l(int64_t timeUs) {
+ mIsSeeking = true;
+ mSeekTimeUs = timeUs;
+ mCurrentClockTransformValid = false;
+ mLastQueuedMediaTimePTSValid = false;
+ // send a flush command packet
+ sp<TRTPControlPacket> packet = new TRTPControlPacket();
+ packet->setCommandID(TRTPControlPacket::kCommandFlush);
+ queuePacketToSender_l(packet);
+ return OK;
+status_t AAH_TXPlayer::getCurrentPosition(int *msec) {
+ if (!msec) {
+ return BAD_VALUE;
+ }
+ Mutex::Autolock lock(mLock);
+ int position;
+ if (mIsSeeking) {
+ position = mSeekTimeUs / 1000;
+ } else if (mCurrentClockTransformValid) {
+ // sample the current common time
+ int64_t commonTimeNow;
+ if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
+ ALOGE("getCurrentPosition get common time failed");
+ }
+ int64_t mediaTimeNow;
+ if (!mCurrentClockTransform.doReverseTransform(commonTimeNow,
+ &mediaTimeNow)) {
+ ALOGE("getCurrentPosition reverse transform failed");
+ }
+ position = static_cast<int>(mediaTimeNow / 1000);
+ } else {
+ position = 0;
+ }
+ int duration;
+ if (getDuration_l(&duration) == OK) {
+ *msec = clamp(position, 0, duration);
+ } else {
+ *msec = (position >= 0) ? position : 0;
+ }
+ return OK;
+status_t AAH_TXPlayer::getDuration(int* msec) {
+ if (!msec) {
+ return BAD_VALUE;
+ }
+ Mutex::Autolock lock(mLock);
+ return getDuration_l(msec);
+status_t AAH_TXPlayer::getDuration_l(int* msec) {
+ if (mDurationUs < 0) {
+ }
+ *msec = (mDurationUs + 500) / 1000;
+ return OK;
+status_t AAH_TXPlayer::reset() {
+ Mutex::Autolock autoLock(mLock);
+ reset_l();
+ return OK;
+void AAH_TXPlayer::reset_l() {
+ if (mFlags & PREPARING) {
+ if (mConnectingDataSource != NULL) {
+ ALOGI("interrupting the connection process");
+ mConnectingDataSource->disconnect();
+ }
+ // We are basically done preparing, we're just buffering
+ // enough data to start playback, we can safely interrupt that.
+ finishAsyncPrepare_l();
+ }
+ }
+ while (mFlags & PREPARING) {
+ mPreparedCondition.wait(mLock);
+ }
+ cancelPlayerEvents();
+ sendEOS_l();
+ mCachedSource.clear();
+ if (mAudioSource != NULL) {
+ mAudioSource->stop();
+ }
+ mAudioSource.clear();
+ mFlags = 0;
+ mExtractorFlags = 0;
+ mDurationUs = -1;
+ mIsSeeking = false;
+ mSeekTimeUs = 0;
+ mUri.setTo("");
+ mUriHeaders.clear();
+ mFileSource.clear();
+ mBitrate = -1;
+ {
+ Mutex::Autolock lock(mEndpointLock);
+ if (mAAH_Sender != NULL && mEndpointRegistered) {
+ mAAH_Sender->unregisterEndpoint(mEndpoint);
+ }
+ mEndpointRegistered = false;
+ mEndpointValid = false;
+ }
+ mProgramID = 0;
+ mAAH_Sender.clear();
+ mLastQueuedMediaTimePTSValid = false;
+ mCurrentClockTransformValid = false;
+ mPlayRateIsPaused = false;
+ mTRTPVolume = 255;
+status_t AAH_TXPlayer::setLooping(int loop) {
+ return OK;
+player_type AAH_TXPlayer::playerType() {
+ return AAH_TX_PLAYER;
+status_t AAH_TXPlayer::setParameter(int key, const Parcel &request) {
+status_t AAH_TXPlayer::getParameter(int key, Parcel *reply) {
+status_t AAH_TXPlayer::invoke(const Parcel& request, Parcel *reply) {
+ if (!reply) {
+ return BAD_VALUE;
+ }
+ int32_t methodID;
+ status_t err = request.readInt32(&methodID);
+ if (err != android::OK) {
+ return err;
+ }
+ switch (methodID) {
+ case kInvokeSetAAHDstIPPort:
+ case kInvokeSetAAHConfigBlob: {
+ if (mEndpointValid) {
+ }
+ String8 addr;
+ uint16_t port;
+ if (methodID == kInvokeSetAAHDstIPPort) {
+ addr = String8(request.readString16());
+ int32_t port32;
+ err = request.readInt32(&port32);
+ if (err != android::OK) {
+ return err;
+ }
+ port = static_cast<uint16_t>(port32);
+ } else {
+ String8 blob(request.readString16());
+ char addr_buf[101];
+ if (sscanf(blob.string(), "V1:%100s %" SCNu16,
+ addr_buf, &port) != 2) {
+ return BAD_VALUE;
+ }
+ if (addr.setTo(addr_buf) != OK) {
+ return NO_MEMORY;
+ }
+ }
+ struct hostent* ent = gethostbyname(addr.string());
+ if (ent == NULL) {
+ }
+ if (!(ent->h_addrtype == AF_INET && ent->h_length == 4)) {
+ return BAD_VALUE;
+ }
+ Mutex::Autolock lock(mEndpointLock);
+ mEndpoint = AAH_TXSender::Endpoint(
+ reinterpret_cast<struct in_addr*>(ent->h_addr)->s_addr,
+ port);
+ mEndpointValid = true;
+ return OK;
+ };
+ default:
+ }
+status_t AAH_TXPlayer::getMetadata(const media::Metadata::Filter& ids,
+ Parcel* records) {
+ using media::Metadata;
+ Metadata metadata(records);
+ metadata.appendBool(Metadata::kPauseAvailable, true);
+ metadata.appendBool(Metadata::kSeekBackwardAvailable, false);
+ metadata.appendBool(Metadata::kSeekForwardAvailable, false);
+ metadata.appendBool(Metadata::kSeekAvailable, false);
+ return OK;
+status_t AAH_TXPlayer::setVolume(float leftVolume, float rightVolume) {
+ if (leftVolume != rightVolume) {
+ ALOGE("%s does not support per channel volume: %f, %f",
+ __PRETTY_FUNCTION__, leftVolume, rightVolume);
+ }
+ float volume = clamp(leftVolume, 0.0f, 1.0f);
+ Mutex::Autolock lock(mLock);
+ mTRTPVolume = static_cast<uint8_t>((leftVolume * 255.0) + 0.5);
+ return OK;
+status_t AAH_TXPlayer::setAudioStreamType(audio_stream_type_t streamType) {
+ return OK;
+void AAH_TXPlayer::notifyListener_l(int msg, int ext1, int ext2) {
+ sendEvent(msg, ext1, ext2);
+bool AAH_TXPlayer::getBitrate_l(int64_t *bitrate) {
+ off64_t size;
+ if (mDurationUs >= 0 &&
+ mCachedSource != NULL &&
+ mCachedSource->getSize(&size) == OK) {
+ *bitrate = size * 8000000ll / mDurationUs; // in bits/sec
+ return true;
+ }
+ if (mBitrate >= 0) {
+ *bitrate = mBitrate;
+ return true;
+ }
+ *bitrate = 0;
+ return false;
+// Returns true iff cached duration is available/applicable.
+bool AAH_TXPlayer::getCachedDuration_l(int64_t *durationUs, bool *eos) {
+ int64_t bitrate;
+ if (mCachedSource != NULL && getBitrate_l(&bitrate)) {
+ status_t finalStatus;
+ size_t cachedDataRemaining = mCachedSource->approxDataRemaining(
+ &finalStatus);
+ *durationUs = cachedDataRemaining * 8000000ll / bitrate;
+ *eos = (finalStatus != OK);
+ return true;
+ }
+ return false;
+void AAH_TXPlayer::ensureCacheIsFetching_l() {
+ if (mCachedSource != NULL) {
+ mCachedSource->resumeFetchingIfNecessary();
+ }
+void AAH_TXPlayer::postBufferingEvent_l() {
+ if (mBufferingEventPending) {
+ return;
+ }
+ mBufferingEventPending = true;
+ mQueue.postEventWithDelay(mBufferingEvent, 1000000ll);
+void AAH_TXPlayer::postPumpAudioEvent_l(int64_t delayUs) {
+ if (mPumpAudioEventPending) {
+ return;
+ }
+ mPumpAudioEventPending = true;
+ mQueue.postEventWithDelay(mPumpAudioEvent, delayUs < 0 ? 10000 : delayUs);
+void AAH_TXPlayer::onBufferingUpdate() {
+ Mutex::Autolock autoLock(mLock);
+ if (!mBufferingEventPending) {
+ return;
+ }
+ mBufferingEventPending = false;
+ if (mCachedSource != NULL) {
+ status_t finalStatus;
+ size_t cachedDataRemaining = mCachedSource->approxDataRemaining(
+ &finalStatus);
+ bool eos = (finalStatus != OK);
+ if (eos) {
+ if (finalStatus == ERROR_END_OF_STREAM) {
+ notifyListener_l(MEDIA_BUFFERING_UPDATE, 100);
+ }
+ if (mFlags & PREPARING) {
+ ALOGV("cache has reached EOS, prepare is done.");
+ finishAsyncPrepare_l();
+ }
+ } else {
+ int64_t bitrate;
+ if (getBitrate_l(&bitrate)) {
+ size_t cachedSize = mCachedSource->cachedSize();
+ int64_t cachedDurationUs = cachedSize * 8000000ll / bitrate;
+ int percentage = (100.0 * (double) cachedDurationUs)
+ / mDurationUs;
+ if (percentage > 100) {
+ percentage = 100;
+ }
+ notifyListener_l(MEDIA_BUFFERING_UPDATE, percentage);
+ } else {
+ // We don't know the bitrate of the stream, use absolute size
+ // limits to maintain the cache.
+ if ((mFlags & PLAYING) &&
+ !eos &&
+ (cachedDataRemaining < kLowWaterMarkBytes)) {
+ ALOGI("cache is running low (< %d) , pausing.",
+ kLowWaterMarkBytes);
+ pause_l();
+ ensureCacheIsFetching_l();
+ } else if (eos || cachedDataRemaining > kHighWaterMarkBytes) {
+ if (mFlags & CACHE_UNDERRUN) {
+ ALOGI("cache has filled up (> %d), resuming.",
+ kHighWaterMarkBytes);
+ mFlags &= ~CACHE_UNDERRUN;
+ play_l();
+ } else if (mFlags & PREPARING) {
+ ALOGV("cache has filled up (> %d), prepare is done",
+ kHighWaterMarkBytes);
+ finishAsyncPrepare_l();
+ }
+ }
+ }
+ }
+ }
+ int64_t cachedDurationUs;
+ bool eos;
+ if (getCachedDuration_l(&cachedDurationUs, &eos)) {
+ ALOGV("cachedDurationUs = %.2f secs, eos=%d",
+ cachedDurationUs / 1E6, eos);
+ if ((mFlags & PLAYING) &&
+ !eos &&
+ (cachedDurationUs < kLowWaterMarkUs)) {
+ ALOGI("cache is running low (%.2f secs) , pausing.",
+ cachedDurationUs / 1E6);
+ pause_l();
+ ensureCacheIsFetching_l();
+ } else if (eos || cachedDurationUs > kHighWaterMarkUs) {
+ if (mFlags & CACHE_UNDERRUN) {
+ ALOGI("cache has filled up (%.2f secs), resuming.",
+ cachedDurationUs / 1E6);
+ mFlags &= ~CACHE_UNDERRUN;
+ play_l();
+ } else if (mFlags & PREPARING) {
+ ALOGV("cache has filled up (%.2f secs), prepare is done",
+ cachedDurationUs / 1E6);
+ finishAsyncPrepare_l();
+ }
+ }
+ }
+ postBufferingEvent_l();
+void AAH_TXPlayer::onPumpAudio() {
+ while (true) {
+ Mutex::Autolock autoLock(mLock);
+ // If this flag is clear, its because someone has externally canceled
+ // this pump operation (probably because we a resetting/shutting down).
+ // Get out immediately, do not reschedule ourselves.
+ if (!mPumpAudioEventPending) {
+ return;
+ }
+ // Start by checking if there is still work to be doing. If we have
+ // never queued a payload (so we don't know what the last queued PTS is)
+ // or we have never established a MediaTime->CommonTime transformation,
+ // then we have work to do (one time through this loop should establish
+ // both). Otherwise, we want to keep a fixed amt of presentation time
+ // worth of data buffered. If we cannot get common time (service is
+ // unavailable, or common time is undefined)) then we don't have a lot
+ // of good options here. For now, signal an error up to the app level
+ // and shut down the transmission pump.
+ int64_t commonTimeNow;
+ if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
+ // Failed to get common time; either the service is down or common
+ // time is not synced. Raise an error and shutdown the player.
+ ALOGE("*** Cannot pump audio, unable to fetch common time."
+ " Shutting down.");
+ mPumpAudioEventPending = false;
+ break;
+ }
+ if (mCurrentClockTransformValid && mLastQueuedMediaTimePTSValid) {
+ int64_t mediaTimeNow;
+ bool conversionResult = mCurrentClockTransform.doReverseTransform(
+ commonTimeNow,
+ &mediaTimeNow);
+ CHECK(conversionResult);
+ if ((mediaTimeNow +
+ kAAHBufferTimeUs -
+ mLastQueuedMediaTimePTS) <= 0) {
+ break;
+ }
+ }
+ MediaSource::ReadOptions options;
+ if (mIsSeeking) {
+ options.setSeekTo(mSeekTimeUs);
+ }
+ MediaBuffer* mediaBuffer;
+ status_t err = mAudioSource->read(&mediaBuffer, &options);
+ if (err != NO_ERROR) {
+ if (err == ERROR_END_OF_STREAM) {
+ ALOGI("*** %s reached end of stream", __PRETTY_FUNCTION__);
+ notifyListener_l(MEDIA_BUFFERING_UPDATE, 100);
+ notifyListener_l(MEDIA_PLAYBACK_COMPLETE);
+ pause_l(false);
+ sendEOS_l();
+ } else {
+ ALOGE("*** %s read failed err=%d", __PRETTY_FUNCTION__, err);
+ }
+ return;
+ }
+ if (mIsSeeking) {
+ mIsSeeking = false;
+ notifyListener_l(MEDIA_SEEK_COMPLETE);
+ }
+ uint8_t* data = (static_cast<uint8_t*>(mediaBuffer->data()) +
+ mediaBuffer->range_offset());
+ ALOGV("*** %s got media buffer data=[%02hhx %02hhx %02hhx %02hhx]"
+ " offset=%d length=%d", __PRETTY_FUNCTION__,
+ data[0], data[1], data[2], data[3],
+ mediaBuffer->range_offset(), mediaBuffer->range_length());
+ int64_t mediaTimeUs;
+ CHECK(mediaBuffer->meta_data()->findInt64(kKeyTime, &mediaTimeUs));
+ ALOGV("*** timeUs=%lld", mediaTimeUs);
+ if (!mCurrentClockTransformValid) {
+ if (OK == mCCHelper.getCommonTime(&commonTimeNow)) {
+ mCurrentClockTransform.a_zero = mediaTimeUs;
+ mCurrentClockTransform.b_zero = commonTimeNow +
+ kAAHStartupLeadTimeUs;
+ mCurrentClockTransform.a_to_b_numer = 1;
+ mCurrentClockTransform.a_to_b_denom = mPlayRateIsPaused ? 0 : 1;
+ mCurrentClockTransformValid = true;
+ } else {
+ // Failed to get common time; either the service is down or
+ // common time is not synced. Raise an error and shutdown the
+ // player.
+ ALOGE("*** Cannot begin transmission, unable to fetch common"
+ " time. Dropping sample with pts=%lld", mediaTimeUs);
+ notifyListener_l(MEDIA_ERROR,
+ mPumpAudioEventPending = false;
+ break;
+ }
+ }
+ ALOGV("*** transmitting packet with pts=%lld", mediaTimeUs);
+ sp<TRTPAudioPacket> packet = new TRTPAudioPacket();
+ packet->setPTS(mediaTimeUs);
+ packet->setSubstreamID(1);
+ packet->setCodecType(TRTPAudioPacket::kCodecMPEG1Audio);
+ packet->setVolume(mTRTPVolume);
+ // TODO : introduce a throttle for this so we can control the
+ // frequency with which transforms get sent.
+ packet->setClockTransform(mCurrentClockTransform);
+ packet->setAccessUnitData(data, mediaBuffer->range_length());
+ packet->setRandomAccessPoint(true);
+ queuePacketToSender_l(packet);
+ mediaBuffer->release();
+ mLastQueuedMediaTimePTSValid = true;
+ mLastQueuedMediaTimePTS = mediaTimeUs;
+ }
+ { // Explicit scope for the autolock pattern.
+ Mutex::Autolock autoLock(mLock);
+ // If someone externally has cleared this flag, its because we should be
+ // shutting down. Do not reschedule ourselves.
+ if (!mPumpAudioEventPending) {
+ return;
+ }
+ // Looks like no one canceled us explicitly. Clear our flag and post a
+ // new event to ourselves.
+ mPumpAudioEventPending = false;
+ postPumpAudioEvent_l(10000);
+ }
+void AAH_TXPlayer::queuePacketToSender_l(const sp<TRTPPacket>& packet) {
+ if (mAAH_Sender == NULL) {
+ return;
+ }
+ sp<AMessage> message = new AMessage(AAH_TXSender::kWhatSendPacket,
+ mAAH_Sender->handlerID());
+ {
+ Mutex::Autolock lock(mEndpointLock);
+ if (!mEndpointValid) {
+ return;
+ }
+ message->setInt32(AAH_TXSender::kSendPacketIPAddr, mEndpoint.addr);
+ message->setInt32(AAH_TXSender::kSendPacketPort, mEndpoint.port);
+ }
+ packet->setProgramID(mProgramID);
+ packet->setExpireTime(systemTime() + kAAHRetryKeepAroundTimeNs);
+ packet->pack();
+ message->setObject(AAH_TXSender::kSendPacketTRTPPacket, packet);
+ message->post();
+} // namespace android
diff --git a/media/libaah_rtp/aah_tx_player.h b/media/libaah_rtp/aah_tx_player.h
new file mode 100644
index 0000000..64cf5dc
--- /dev/null
+++ b/media/libaah_rtp/aah_tx_player.h
@@ -0,0 +1,179 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __AAH_TX_PLAYER_H__
+#define __AAH_TX_PLAYER_H__
+#include <common_time/cc_helper.h>
+#include <libstagefright/include/HTTPBase.h>
+#include <libstagefright/include/NuCachedSource2.h>
+#include <libstagefright/include/TimedEventQueue.h>
+#include <media/MediaPlayerInterface.h>
+#include <media/stagefright/MediaExtractor.h>
+#include <media/stagefright/MediaSource.h>
+#include <utils/LinearTransform.h>
+#include <utils/String8.h>
+#include <utils/threads.h>
+#include "aah_tx_sender.h"
+namespace android {
+class AAH_TXPlayer : public MediaPlayerHWInterface {
+ public:
+ AAH_TXPlayer();
+ virtual status_t initCheck();
+ virtual status_t setDataSource(const char *url,
+ const KeyedVector<String8, String8>*
+ headers);
+ virtual status_t setDataSource(int fd, int64_t offset, int64_t length);
+ virtual status_t setVideoSurface(const sp<Surface>& surface);
+ virtual status_t setVideoSurfaceTexture(const sp<ISurfaceTexture>&
+ surfaceTexture);
+ virtual status_t prepare();
+ virtual status_t prepareAsync();
+ virtual status_t start();
+ virtual status_t stop();
+ virtual status_t pause();
+ virtual bool isPlaying();
+ virtual status_t seekTo(int msec);
+ virtual status_t getCurrentPosition(int *msec);
+ virtual status_t getDuration(int *msec);
+ virtual status_t reset();
+ virtual status_t setLooping(int loop);
+ virtual player_type playerType();
+ virtual status_t setParameter(int key, const Parcel &request);
+ virtual status_t getParameter(int key, Parcel *reply);
+ virtual status_t invoke(const Parcel& request, Parcel *reply);
+ virtual status_t getMetadata(const media::Metadata::Filter& ids,
+ Parcel* records);
+ virtual status_t setVolume(float leftVolume, float rightVolume);
+ virtual status_t setAudioStreamType(audio_stream_type_t streamType);
+ // invoke method IDs
+ enum {
+ // set the IP address and port of the A@H receiver
+ kInvokeSetAAHDstIPPort = 1,
+ // set the destination IP address and port (and perhaps any additional
+ // parameters added in the future) packaged in one string
+ kInvokeSetAAHConfigBlob,
+ };
+ static const int64_t kAAHRetryKeepAroundTimeNs;
+ protected:
+ virtual ~AAH_TXPlayer();
+ private:
+ friend struct AwesomeEvent;
+ enum {
+ PLAYING = 1,
+ PREPARED = 16,
+ // We are basically done preparing but are currently buffering
+ // sufficient data to begin playback and finish the preparation
+ // phase for good.
+ INCOGNITO = 32768,
+ };
+ status_t setDataSource_l(const char *url,
+ const KeyedVector<String8, String8> *headers);
+ status_t setDataSource_l(const sp<MediaExtractor>& extractor);
+ status_t finishSetDataSource_l();
+ status_t prepareAsync_l();
+ void onPrepareAsyncEvent();
+ void finishAsyncPrepare_l();
+ void abortPrepare(status_t err);
+ status_t play_l();
+ status_t pause_l(bool doClockUpdate = true);
+ status_t seekTo_l(int64_t timeUs);
+ void updateClockTransform_l(bool pause);
+ void sendEOS_l();
+ void cancelPlayerEvents(bool keepBufferingGoing = false);
+ void reset_l();
+ void notifyListener_l(int msg, int ext1 = 0, int ext2 = 0);
+ bool getBitrate_l(int64_t* bitrate);
+ status_t getDuration_l(int* msec);
+ bool getCachedDuration_l(int64_t* durationUs, bool* eos);
+ void ensureCacheIsFetching_l();
+ void postBufferingEvent_l();
+ void postPumpAudioEvent_l(int64_t delayUs);
+ void onBufferingUpdate();
+ void onPumpAudio();
+ void queuePacketToSender_l(const sp<TRTPPacket>& packet);
+ Mutex mLock;
+ TimedEventQueue mQueue;
+ bool mQueueStarted;
+ sp<TimedEventQueue::Event> mBufferingEvent;
+ bool mBufferingEventPending;
+ uint32_t mFlags;
+ uint32_t mExtractorFlags;
+ String8 mUri;
+ KeyedVector<String8, String8> mUriHeaders;
+ sp<DataSource> mFileSource;
+ sp<TimedEventQueue::Event> mAsyncPrepareEvent;
+ Condition mPreparedCondition;
+ status_t mPrepareResult;
+ bool mIsSeeking;
+ int64_t mSeekTimeUs;
+ sp<TimedEventQueue::Event> mPumpAudioEvent;
+ bool mPumpAudioEventPending;
+ sp<HTTPBase> mConnectingDataSource;
+ sp<NuCachedSource2> mCachedSource;
+ sp<MediaSource> mAudioSource;
+ int64_t mDurationUs;
+ int64_t mBitrate;
+ sp<AAH_TXSender> mAAH_Sender;
+ LinearTransform mCurrentClockTransform;
+ bool mCurrentClockTransformValid;
+ int64_t mLastQueuedMediaTimePTS;
+ bool mLastQueuedMediaTimePTSValid;
+ bool mPlayRateIsPaused;
+ CCHelper mCCHelper;
+ Mutex mEndpointLock;
+ AAH_TXSender::Endpoint mEndpoint;
+ bool mEndpointValid;
+ bool mEndpointRegistered;
+ uint16_t mProgramID;
+ uint8_t mTRTPVolume;
+} // namespace android
+#endif // __AAH_TX_PLAYER_H__
diff --git a/media/libaah_rtp/aah_tx_sender.cpp b/media/libaah_rtp/aah_tx_sender.cpp
new file mode 100644
index 0000000..d991ea7
--- /dev/null
+++ b/media/libaah_rtp/aah_tx_sender.cpp
@@ -0,0 +1,602 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+#include <media/stagefright/foundation/ADebug.h>
+#include <netinet/in.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <media/stagefright/foundation/AMessage.h>
+#include <utils/misc.h>
+#include "aah_tx_player.h"
+#include "aah_tx_sender.h"
+namespace android {
+const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr";
+const char* AAH_TXSender::kSendPacketPort = "port";
+const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp";
+const int AAH_TXSender::kRetryTrimIntervalUs = 100000;
+const int AAH_TXSender::kHeartbeatIntervalUs = 1000000;
+const int AAH_TXSender::kRetryBufferCapacity = 100;
+const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull;
+Mutex AAH_TXSender::sLock;
+wp<AAH_TXSender> AAH_TXSender::sInstance;
+uint32_t AAH_TXSender::sNextEpoch;
+bool AAH_TXSender::sNextEpochValid = false;
+AAH_TXSender::AAH_TXSender() : mSocket(-1) {
+ mLastSentPacketTime = systemTime();
+sp<AAH_TXSender> AAH_TXSender::GetInstance() {
+ Mutex::Autolock autoLock(sLock);
+ sp<AAH_TXSender> sender = sInstance.promote();
+ if (sender == NULL) {
+ sender = new AAH_TXSender();
+ if (sender == NULL) {
+ return NULL;
+ }
+ sender->mLooper = new ALooper();
+ if (sender->mLooper == NULL) {
+ return NULL;
+ }
+ sender->mReflector = new AHandlerReflector<AAH_TXSender>(sender.get());
+ if (sender->mReflector == NULL) {
+ return NULL;
+ }
+ sender->mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (sender->mSocket == -1) {
+ ALOGW("%s unable to create socket", __PRETTY_FUNCTION__);
+ return NULL;
+ }
+ struct sockaddr_in bind_addr;
+ memset(&bind_addr, 0, sizeof(bind_addr));
+ bind_addr.sin_family = AF_INET;
+ if (bind(sender->mSocket,
+ reinterpret_cast<const sockaddr*>(&bind_addr),
+ sizeof(bind_addr)) < 0) {
+ ALOGW("%s unable to bind socket (errno %d)",
+ __PRETTY_FUNCTION__, errno);
+ return NULL;
+ }
+ sender->mRetryReceiver = new RetryReceiver(sender.get());
+ if (sender->mRetryReceiver == NULL) {
+ return NULL;
+ }
+ sender->mLooper->setName("AAH_TXSender");
+ sender->mLooper->registerHandler(sender->mReflector);
+ sender->mLooper->start(false, false, PRIORITY_AUDIO);
+ if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO)
+ != OK) {
+ ALOGW("%s unable to start retry thread", __PRETTY_FUNCTION__);
+ return NULL;
+ }
+ sInstance = sender;
+ }
+ return sender;
+AAH_TXSender::~AAH_TXSender() {
+ mLooper->stop();
+ mLooper->unregisterHandler(mReflector->id());
+ if (mRetryReceiver != NULL) {
+ mRetryReceiver->requestExit();
+ mRetryReceiver->mWakeupEvent.setEvent();
+ if (mRetryReceiver->requestExitAndWait() != OK) {
+ ALOGW("%s shutdown of retry receiver failed", __PRETTY_FUNCTION__);
+ }
+ mRetryReceiver->mSender = NULL;
+ mRetryReceiver.clear();
+ }
+ if (mSocket != -1) {
+ close(mSocket);
+ }
+// Return the next epoch number usable for a newly instantiated endpoint.
+uint32_t AAH_TXSender::getNextEpoch() {
+ Mutex::Autolock autoLock(sLock);
+ if (sNextEpochValid) {
+ sNextEpoch = (sNextEpoch + 1) & TRTPPacket::kTRTPEpochMask;
+ } else {
+ sNextEpoch = ns2ms(systemTime()) & TRTPPacket::kTRTPEpochMask;
+ sNextEpochValid = true;
+ }
+ return sNextEpoch;
+// Notify the sender that a player has started sending to this endpoint.
+// Returns a program ID for use by the calling player.
+uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) {
+ Mutex::Autolock lock(mEndpointLock);
+ EndpointState* eps = mEndpointMap.valueFor(endpoint);
+ if (eps) {
+ eps->playerRefCount++;
+ } else {
+ eps = new EndpointState(getNextEpoch());
+ mEndpointMap.add(endpoint, eps);
+ }
+ // if this is the first registered endpoint, then send a message to start
+ // trimming retry buffers and a message to start sending heartbeats.
+ if (mEndpointMap.size() == 1) {
+ sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
+ handlerID());
+ trimMessage->post(kRetryTrimIntervalUs);
+ sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
+ handlerID());
+ heartbeatMessage->post(kHeartbeatIntervalUs);
+ }
+ eps->nextProgramID++;
+ return eps->nextProgramID;
+// Notify the sender that a player has ceased sending to this endpoint.
+// An endpoint's state can not be deleted until all of the endpoint's
+// registered players have called unregisterEndpoint.
+void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
+ Mutex::Autolock lock(mEndpointLock);
+ EndpointState* eps = mEndpointMap.valueFor(endpoint);
+ if (eps) {
+ eps->playerRefCount--;
+ CHECK(eps->playerRefCount >= 0);
+ }
+void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
+ switch (msg->what()) {
+ case kWhatSendPacket:
+ onSendPacket(msg);
+ break;
+ case kWhatTrimRetryBuffers:
+ trimRetryBuffers();
+ break;
+ case kWhatSendHeartbeats:
+ sendHeartbeats();
+ break;
+ default:
+ break;
+ }
+void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
+ sp<RefBase> obj;
+ CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
+ sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());
+ uint32_t ipAddr;
+ CHECK(msg->findInt32(kSendPacketIPAddr,
+ reinterpret_cast<int32_t*>(&ipAddr)));
+ int32_t port32;
+ CHECK(msg->findInt32(kSendPacketPort, &port32));
+ uint16_t port = port32;
+ Mutex::Autolock lock(mEndpointLock);
+ doSendPacket_l(packet, Endpoint(ipAddr, port));
+ mLastSentPacketTime = systemTime();
+void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
+ const Endpoint& endpoint) {
+ EndpointState* eps = mEndpointMap.valueFor(endpoint);
+ if (!eps) {
+ // the endpoint state has disappeared, so the player that sent this
+ // packet must be dead.
+ return;
+ }
+ // assign the packet's sequence number
+ packet->setEpoch(eps->epoch);
+ packet->setSeqNumber(eps->trtpSeqNumber++);
+ // add the packet to the retry buffer
+ RetryBuffer& retry = eps->retry;
+ retry.push_back(packet);
+ // send the packet
+ struct sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = endpoint.addr;
+ addr.sin_port = htons(endpoint.port);
+ ssize_t result = sendto(mSocket,
+ packet->getPacket(),
+ packet->getPacketLen(),
+ 0,
+ (const struct sockaddr *) &addr,
+ sizeof(addr));
+ if (result == -1) {
+ ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
+ }
+void AAH_TXSender::trimRetryBuffers() {
+ Mutex::Autolock lock(mEndpointLock);
+ nsecs_t localTimeNow = systemTime();
+ Vector<Endpoint> endpointsToRemove;
+ for (size_t i = 0; i < mEndpointMap.size(); i++) {
+ EndpointState* eps = mEndpointMap.editValueAt(i);
+ RetryBuffer& retry = eps->retry;
+ while (!retry.isEmpty()) {
+ if (retry[0]->getExpireTime() < localTimeNow) {
+ retry.pop_front();
+ } else {
+ break;
+ }
+ }
+ if (retry.isEmpty() && eps->playerRefCount == 0) {
+ endpointsToRemove.add(mEndpointMap.keyAt(i));
+ }
+ }
+ // remove the state for any endpoints that are no longer in use
+ for (size_t i = 0; i < endpointsToRemove.size(); i++) {
+ Endpoint& e = endpointsToRemove.editItemAt(i);
+ ALOGD("*** %s removing endpoint addr=%08x", __PRETTY_FUNCTION__, e.addr);
+ size_t index = mEndpointMap.indexOfKey(e);
+ delete mEndpointMap.valueAt(index);
+ mEndpointMap.removeItemsAt(index);
+ }
+ // schedule the next trim
+ if (mEndpointMap.size()) {
+ sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
+ handlerID());
+ trimMessage->post(kRetryTrimIntervalUs);
+ }
+void AAH_TXSender::sendHeartbeats() {
+ Mutex::Autolock lock(mEndpointLock);
+ if (shouldSendHeartbeats_l()) {
+ for (size_t i = 0; i < mEndpointMap.size(); i++) {
+ EndpointState* eps = mEndpointMap.editValueAt(i);
+ const Endpoint& ep = mEndpointMap.keyAt(i);
+ sp<TRTPControlPacket> packet = new TRTPControlPacket();
+ packet->setCommandID(TRTPControlPacket::kCommandNop);
+ packet->setExpireTime(systemTime() +
+ AAH_TXPlayer::kAAHRetryKeepAroundTimeNs);
+ packet->pack();
+ doSendPacket_l(packet, ep);
+ }
+ }
+ // schedule the next heartbeat
+ if (mEndpointMap.size()) {
+ sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
+ handlerID());
+ heartbeatMessage->post(kHeartbeatIntervalUs);
+ }
+bool AAH_TXSender::shouldSendHeartbeats_l() {
+ // assert(holding endpoint lock)
+ return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout));
+// Receiver
+// initial 4-byte ID of a retry request packet
+const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq';
+// initial 4-byte ID of a retry NAK packet
+const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak';
+// initial 4-byte ID of a fast start request packet
+const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst';
+AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender)
+ : Thread(false),
+ mSender(sender) {}
+ AAH_TXSender::RetryReceiver::~RetryReceiver() {
+ mWakeupEvent.clearPendingEvents();
+ }
+// Returns true if val is within the interval bounded inclusively by
+// start and end. Also handles the case where there is a rollover of the
+// range between start and end.
+template <typename T>
+static inline bool withinIntervalWithRollover(T val, T start, T end) {
+ return ((start <= end && val >= start && val <= end) ||
+ (start > end && (val >= start || val <= end)));
+bool AAH_TXSender::RetryReceiver::threadLoop() {
+ struct pollfd pollFds[2];
+ pollFds[0].fd = mSender->mSocket;
+ pollFds[0].events = POLLIN;
+ pollFds[0].revents = 0;
+ pollFds[1].fd = mWakeupEvent.getWakeupHandle();
+ pollFds[1].events = POLLIN;
+ pollFds[1].revents = 0;
+ int pollResult = poll(pollFds, NELEM(pollFds), -1);
+ if (pollResult == -1) {
+ ALOGE("%s poll failed", __PRETTY_FUNCTION__);
+ return false;
+ }
+ if (exitPending()) {
+ ALOGI("*** %s exiting", __PRETTY_FUNCTION__);
+ return false;
+ }
+ if (pollFds[0].revents) {
+ handleRetryRequest();
+ }
+ return true;
+void AAH_TXSender::RetryReceiver::handleRetryRequest() {
+ ALOGV("*** RX %s start", __PRETTY_FUNCTION__);
+ RetryPacket request;
+ struct sockaddr requestSrcAddr;
+ socklen_t requestSrcAddrLen = sizeof(requestSrcAddr);
+ ssize_t result = recvfrom(mSender->mSocket, &request, sizeof(request), 0,
+ &requestSrcAddr, &requestSrcAddrLen);
+ if (result == -1) {
+ ALOGE("%s recvfrom failed, errno=%d", __PRETTY_FUNCTION__, errno);
+ return;
+ }
+ if (static_cast<size_t>(result) < sizeof(RetryPacket)) {
+ ALOGW("%s short packet received", __PRETTY_FUNCTION__);
+ return;
+ }
+ uint32_t host_request_id = ntohl(;
+ if ((host_request_id != kRetryRequestID) &&
+ (host_request_id != kFastStartRequestID)) {
+ ALOGW("%s received retry request with bogus ID (%08x)",
+ __PRETTY_FUNCTION__, host_request_id);
+ return;
+ }
+ Endpoint endpoint(request.endpointIP, ntohs(request.endpointPort));
+ Mutex::Autolock lock(mSender->mEndpointLock);
+ EndpointState* eps = mSender->mEndpointMap.valueFor(endpoint);
+ if (eps == NULL || eps->retry.isEmpty()) {
+ // we have no retry buffer or an empty retry buffer for this endpoint,
+ // so NAK the entire request
+ RetryPacket nak = request;
+ = htonl(kRetryNakID);
+ result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
+ &requestSrcAddr, requestSrcAddrLen);
+ if (result == -1) {
+ ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
+ }
+ return;
+ }
+ RetryBuffer& retry = eps->retry;
+ uint16_t startSeq = ntohs(request.seqStart);
+ uint16_t endSeq = ntohs(request.seqEnd);
+ uint16_t retryFirstSeq = retry[0]->getSeqNumber();
+ uint16_t retryLastSeq = retry[retry.size() - 1]->getSeqNumber();
+ // If this is a fast start, then force the start of the retry to match the
+ // start of the retransmit ring buffer (unless the end of the retransmit
+ // ring buffer is already past the point of fast start)
+ if ((host_request_id == kFastStartRequestID) &&
+ !((startSeq - retryFirstSeq) & 0x8000)) {
+ startSeq = retryFirstSeq;
+ }
+ int startIndex;
+ if (withinIntervalWithRollover(startSeq, retryFirstSeq, retryLastSeq)) {
+ startIndex = static_cast<uint16_t>(startSeq - retryFirstSeq);
+ } else {
+ startIndex = -1;
+ }
+ int endIndex;
+ if (withinIntervalWithRollover(endSeq, retryFirstSeq, retryLastSeq)) {
+ endIndex = static_cast<uint16_t>(endSeq - retryFirstSeq);
+ } else {
+ endIndex = -1;
+ }
+ if (startIndex == -1 && endIndex == -1) {
+ // no part of the request range is found in the retry buffer
+ RetryPacket nak = request;
+ = htonl(kRetryNakID);
+ result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
+ &requestSrcAddr, requestSrcAddrLen);
+ if (result == -1) {
+ ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
+ }
+ return;
+ }
+ if (startIndex == -1) {
+ // NAK a subrange at the front of the request range
+ RetryPacket nak = request;
+ = htonl(kRetryNakID);
+ nak.seqEnd = htons(retryFirstSeq - 1);
+ result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
+ &requestSrcAddr, requestSrcAddrLen);
+ if (result == -1) {
+ ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
+ }
+ startIndex = 0;
+ } else if (endIndex == -1) {
+ // NAK a subrange at the back of the request range
+ RetryPacket nak = request;
+ = htonl(kRetryNakID);
+ nak.seqStart = htons(retryLastSeq + 1);
+ result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
+ &requestSrcAddr, requestSrcAddrLen);
+ if (result == -1) {
+ ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
+ }
+ endIndex = retry.size() - 1;
+ }
+ // send the retry packets
+ for (int i = startIndex; i <= endIndex; i++) {
+ const sp<TRTPPacket>& replyPacket = retry[i];
+ result = sendto(mSender->mSocket,
+ replyPacket->getPacket(),
+ replyPacket->getPacketLen(),
+ 0,
+ &requestSrcAddr,
+ requestSrcAddrLen);
+ if (result == -1) {
+ ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
+ }
+ }
+// Endpoint
+ : addr(0)
+ , port(0) { }
+AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p)
+ : addr(a)
+ , port(p) {}
+bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const {
+ return ((addr < other.addr) ||
+ (addr == other.addr && port < other.port));
+// EndpointState
+AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch)
+ : retry(kRetryBufferCapacity)
+ , playerRefCount(1)
+ , trtpSeqNumber(0)
+ , nextProgramID(0)
+ , epoch(_epoch) { }
+// CircularBuffer
+template <typename T>
+CircularBuffer<T>::CircularBuffer(size_t capacity)
+ : mCapacity(capacity)
+ , mHead(0)
+ , mTail(0)
+ , mFillCount(0) {
+ mBuffer = new T[capacity];
+template <typename T>
+CircularBuffer<T>::~CircularBuffer() {
+ delete [] mBuffer;
+template <typename T>
+void CircularBuffer<T>::push_back(const T& item) {
+ if (this->isFull()) {
+ this->pop_front();
+ }
+ mBuffer[mHead] = item;
+ mHead = (mHead + 1) % mCapacity;
+ mFillCount++;
+template <typename T>
+void CircularBuffer<T>::pop_front() {
+ CHECK(!isEmpty());
+ mBuffer[mTail] = T();
+ mTail = (mTail + 1) % mCapacity;
+ mFillCount--;
+template <typename T>
+size_t CircularBuffer<T>::size() const {
+ return mFillCount;
+template <typename T>
+bool CircularBuffer<T>::isFull() const {
+ return (mFillCount == mCapacity);
+template <typename T>
+bool CircularBuffer<T>::isEmpty() const {
+ return (mFillCount == 0);
+template <typename T>
+const T& CircularBuffer<T>::itemAt(size_t index) const {
+ CHECK(index < mFillCount);
+ return mBuffer[(mTail + index) % mCapacity];
+template <typename T>
+const T& CircularBuffer<T>::operator[](size_t index) const {
+ return itemAt(index);
+} // namespace android
diff --git a/media/libaah_rtp/aah_tx_sender.h b/media/libaah_rtp/aah_tx_sender.h
new file mode 100644
index 0000000..74206c4
--- /dev/null
+++ b/media/libaah_rtp/aah_tx_sender.h
@@ -0,0 +1,162 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __AAH_TX_SENDER_H__
+#define __AAH_TX_SENDER_H__
+#include <media/stagefright/foundation/ALooper.h>
+#include <media/stagefright/foundation/AHandlerReflector.h>
+#include <utils/RefBase.h>
+#include <utils/threads.h>
+#include "aah_tx_packet.h"
+#include "pipe_event.h"
+namespace android {
+template <typename T> class CircularBuffer {
+ public:
+ CircularBuffer(size_t capacity);
+ ~CircularBuffer();
+ void push_back(const T& item);;
+ void pop_front();
+ size_t size() const;
+ bool isFull() const;
+ bool isEmpty() const;
+ const T& itemAt(size_t index) const;
+ const T& operator[](size_t index) const;
+ private:
+ T* mBuffer;
+ size_t mCapacity;
+ size_t mHead;
+ size_t mTail;
+ size_t mFillCount;
+class AAH_TXSender : public virtual RefBase {
+ public:
+ ~AAH_TXSender();
+ static sp<AAH_TXSender> GetInstance();
+ ALooper::handler_id handlerID() { return mReflector->id(); }
+ // an IP address and port
+ struct Endpoint {
+ Endpoint();
+ Endpoint(uint32_t a, uint16_t p);
+ bool operator<(const Endpoint& other) const;
+ uint32_t addr;
+ uint16_t port;
+ };
+ uint16_t registerEndpoint(const Endpoint& endpoint);
+ void unregisterEndpoint(const Endpoint& endpoint);
+ enum {
+ kWhatSendPacket,
+ kWhatTrimRetryBuffers,
+ kWhatSendHeartbeats,
+ };
+ // fields for SendPacket messages
+ static const char* kSendPacketIPAddr;
+ static const char* kSendPacketPort;
+ static const char* kSendPacketTRTPPacket;
+ private:
+ AAH_TXSender();
+ static Mutex sLock;
+ static wp<AAH_TXSender> sInstance;
+ static uint32_t sNextEpoch;
+ static bool sNextEpochValid;
+ static uint32_t getNextEpoch();
+ typedef CircularBuffer<sp<TRTPPacket> > RetryBuffer;
+ // state maintained on a per-endpoint basis
+ struct EndpointState {
+ EndpointState(uint32_t epoch);
+ RetryBuffer retry;
+ int playerRefCount;
+ uint16_t trtpSeqNumber;
+ uint16_t nextProgramID;
+ uint32_t epoch;
+ };
+ friend class AHandlerReflector<AAH_TXSender>;
+ void onMessageReceived(const sp<AMessage>& msg);
+ void onSendPacket(const sp<AMessage>& msg);
+ void doSendPacket_l(const sp<TRTPPacket>& packet,
+ const Endpoint& endpoint);
+ void trimRetryBuffers();
+ void sendHeartbeats();
+ bool shouldSendHeartbeats_l();
+ sp<ALooper> mLooper;
+ sp<AHandlerReflector<AAH_TXSender> > mReflector;
+ int mSocket;
+ nsecs_t mLastSentPacketTime;
+ DefaultKeyedVector<Endpoint, EndpointState*> mEndpointMap;
+ Mutex mEndpointLock;
+ static const int kRetryTrimIntervalUs;
+ static const int kHeartbeatIntervalUs;
+ static const int kRetryBufferCapacity;
+ static const nsecs_t kHeartbeatTimeout;
+ class RetryReceiver : public Thread {
+ private:
+ friend class AAH_TXSender;
+ RetryReceiver(AAH_TXSender* sender);
+ virtual ~RetryReceiver();
+ virtual bool threadLoop();
+ void handleRetryRequest();
+ static const int kMaxReceiverPacketLen;
+ static const uint32_t kRetryRequestID;
+ static const uint32_t kFastStartRequestID;
+ static const uint32_t kRetryNakID;
+ AAH_TXSender* mSender;
+ PipeEvent mWakeupEvent;
+ };
+ sp<RetryReceiver> mRetryReceiver;
+struct RetryPacket {
+ uint32_t id;
+ uint32_t endpointIP;
+ uint16_t endpointPort;
+ uint16_t seqStart;
+ uint16_t seqEnd;
+} __attribute__((packed));
+} // namespace android
+#endif // __AAH_TX_SENDER_H__
diff --git a/media/libaah_rtp/pipe_event.cpp b/media/libaah_rtp/pipe_event.cpp
new file mode 100644
index 0000000..b8e6960
--- /dev/null
+++ b/media/libaah_rtp/pipe_event.cpp
@@ -0,0 +1,86 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "LibAAH_RTP"
+#include <utils/Log.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <unistd.h>
+#include "pipe_event.h"
+namespace android {
+PipeEvent::PipeEvent() {
+ pipe_[0] = -1;
+ pipe_[1] = -1;
+ // Create the pipe.
+ if (pipe(pipe_) >= 0) {
+ // Set non-blocking mode on the read side of the pipe so we can
+ // easily drain it whenever we wakeup.
+ fcntl(pipe_[0], F_SETFL, O_NONBLOCK);
+ } else {
+ ALOGE("Failed to create pipe event %d %d %d",
+ pipe_[0], pipe_[1], errno);
+ pipe_[0] = -1;
+ pipe_[1] = -1;
+ }
+PipeEvent::~PipeEvent() {
+ if (pipe_[0] >= 0) {
+ close(pipe_[0]);
+ }
+ if (pipe_[1] >= 0) {
+ close(pipe_[1]);
+ }
+void PipeEvent::clearPendingEvents() {
+ char drain_buffer[16];
+ while (read(pipe_[0], drain_buffer, sizeof(drain_buffer)) > 0) {
+ // No body.
+ }
+bool PipeEvent::wait(int timeout) {
+ struct pollfd wait_fd;
+ wait_fd.fd = getWakeupHandle();
+ wait_fd.revents = 0;
+ int res = poll(&wait_fd, 1, timeout);
+ if (res < 0) {
+ ALOGE("Wait error in PipeEvent; sleeping to prevent overload!");
+ usleep(1000);
+ }
+ return (res > 0);
+void PipeEvent::setEvent() {
+ char foo = 'q';
+ write(pipe_[1], &foo, 1);
+} // namespace android
diff --git a/media/libaah_rtp/pipe_event.h b/media/libaah_rtp/pipe_event.h
new file mode 100644
index 0000000..e53b0fd
--- /dev/null
+++ b/media/libaah_rtp/pipe_event.h
@@ -0,0 +1,51 @@
+ * Copyright (C) 2011 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PIPE_EVENT_H__
+#define __PIPE_EVENT_H__
+#include <media/stagefright/foundation/ABase.h>
+namespace android {
+class PipeEvent {
+ public:
+ PipeEvent();
+ ~PipeEvent();
+ bool initCheck() const {
+ return ((pipe_[0] >= 0) && (pipe_[1] >= 0));
+ }
+ int getWakeupHandle() const { return pipe_[0]; }
+ // block until the event fires; returns true if the event fired and false if
+ // the wait timed out. Timeout is expressed in milliseconds; negative
+ // values mean wait forever.
+ bool wait(int timeout = -1);
+ void clearPendingEvents();
+ void setEvent();
+ private:
+ int pipe_[2];
+} // namespace android
+#endif // __PIPE_EVENT_H__
diff --git a/media/libmediaplayerservice/ b/media/libmediaplayerservice/
index a3e2517..e521648 100644
--- a/media/libmediaplayerservice/
+++ b/media/libmediaplayerservice/
@@ -29,7 +29,8 @@ LOCAL_SHARED_LIBRARIES := \
libstagefright_omx \
libstagefright_foundation \
libgui \
- libdl
+ libdl \
+ libaah_rtp
libstagefright_nuplayer \
diff --git a/media/libmediaplayerservice/MediaPlayerService.cpp b/media/libmediaplayerservice/MediaPlayerService.cpp
index 4df7f3d..764eddc 100644
--- a/media/libmediaplayerservice/MediaPlayerService.cpp
+++ b/media/libmediaplayerservice/MediaPlayerService.cpp
@@ -70,6 +70,11 @@
#include <OMX.h>
+namespace android {
+sp<MediaPlayerBase> createAAH_TXPlayer();
+sp<MediaPlayerBase> createAAH_RXPlayer();
namespace {
using android::media::Metadata;
using android::status_t;
@@ -593,6 +598,14 @@ player_type getPlayerType(const char* url)
return NU_PLAYER;
+ if (!strncasecmp("aahRX://", url, 8)) {
+ return AAH_RX_PLAYER;
+ }
+ if (!strncasecmp("aahTX://", url, 8)) {
+ return AAH_TX_PLAYER;
+ }
// use MidiFile for MIDI extensions
int lenURL = strlen(url);
for (int i = 0; i < NELEM(FILE_EXTS); ++i) {
@@ -629,6 +642,14 @@ static sp<MediaPlayerBase> createPlayer(player_type playerType, void* cookie,
ALOGV("Create Test Player stub");
p = new TestPlayerStub();
+ ALOGV(" create A@H RX Player");
+ p = createAAH_RXPlayer();
+ break;
+ ALOGV(" create A@H TX Player");
+ p = createAAH_TXPlayer();
+ break;
ALOGE("Unknown player type: %d", playerType);
return NULL;
@@ -1031,9 +1052,21 @@ status_t MediaPlayerService::Client::setLooping(int loop)
status_t MediaPlayerService::Client::setVolume(float leftVolume, float rightVolume)
ALOGV("[%d] setVolume(%f, %f)", mConnId, leftVolume, rightVolume);
- // TODO: for hardware output, call player instead
- Mutex::Autolock l(mLock);
- if (mAudioOutput != 0) mAudioOutput->setVolume(leftVolume, rightVolume);
+ // for hardware output, call player instead
+ sp<MediaPlayerBase> p = getPlayer();
+ {
+ Mutex::Autolock l(mLock);
+ if (p != 0 && p->hardwareOutput()) {
+ MediaPlayerHWInterface* hwp =
+ reinterpret_cast<MediaPlayerHWInterface*>(p.get());
+ return hwp->setVolume(leftVolume, rightVolume);
+ } else {
+ if (mAudioOutput != 0) mAudioOutput->setVolume(leftVolume, rightVolume);
+ return NO_ERROR;
+ }
+ }
return NO_ERROR;