From 761defc341c5ce9019a42919c441f035f665ec0d Mon Sep 17 00:00:00 2001 From: John Grossman Date: Thu, 9 Feb 2012 15:09:05 -0800 Subject: Upintegreate AAH TX and RX players from ICS_AAH Upintegrate the android at home TX and RX players developed in the ICS_AAH branch. Change-Id: I8247d3702e30d8b0e215b31a92675d8ab28dccbb Signed-off-by: John Grossman --- include/media/MediaPlayerInterface.h | 3 + media/libaah_rtp/Android.mk | 40 + media/libaah_rtp/aah_decoder_pump.cpp | 520 +++++++++ media/libaah_rtp/aah_decoder_pump.h | 107 ++ media/libaah_rtp/aah_rx_player.cpp | 288 +++++ media/libaah_rtp/aah_rx_player.h | 313 ++++++ media/libaah_rtp/aah_rx_player_core.cpp | 807 ++++++++++++++ media/libaah_rtp/aah_rx_player_ring_buffer.cpp | 366 +++++++ media/libaah_rtp/aah_rx_player_substream.cpp | 498 +++++++++ media/libaah_rtp/aah_tx_packet.cpp | 331 ++++++ media/libaah_rtp/aah_tx_packet.h | 191 ++++ media/libaah_rtp/aah_tx_player.cpp | 1139 ++++++++++++++++++++ media/libaah_rtp/aah_tx_player.h | 179 +++ media/libaah_rtp/aah_tx_sender.cpp | 602 +++++++++++ media/libaah_rtp/aah_tx_sender.h | 162 +++ media/libaah_rtp/pipe_event.cpp | 86 ++ media/libaah_rtp/pipe_event.h | 51 + media/libmediaplayerservice/Android.mk | 3 +- media/libmediaplayerservice/MediaPlayerService.cpp | 39 +- 19 files changed, 5721 insertions(+), 4 deletions(-) create mode 100644 media/libaah_rtp/Android.mk create mode 100644 media/libaah_rtp/aah_decoder_pump.cpp create mode 100644 media/libaah_rtp/aah_decoder_pump.h create mode 100644 media/libaah_rtp/aah_rx_player.cpp create mode 100644 media/libaah_rtp/aah_rx_player.h create mode 100644 media/libaah_rtp/aah_rx_player_core.cpp create mode 100644 media/libaah_rtp/aah_rx_player_ring_buffer.cpp create mode 100644 media/libaah_rtp/aah_rx_player_substream.cpp create mode 100644 media/libaah_rtp/aah_tx_packet.cpp create mode 100644 media/libaah_rtp/aah_tx_packet.h create mode 100644 media/libaah_rtp/aah_tx_player.cpp create mode 100644 media/libaah_rtp/aah_tx_player.h create mode 100644 media/libaah_rtp/aah_tx_sender.cpp create mode 100644 media/libaah_rtp/aah_tx_sender.h create mode 100644 media/libaah_rtp/pipe_event.cpp create mode 100644 media/libaah_rtp/pipe_event.h diff --git a/include/media/MediaPlayerInterface.h b/include/media/MediaPlayerInterface.h index 77c82b2..23a3e49 100644 --- a/include/media/MediaPlayerInterface.h +++ b/include/media/MediaPlayerInterface.h @@ -46,6 +46,9 @@ enum player_type { // The shared library with the test player is passed passed as an // argument to the 'test:' url in the setDataSource call. TEST_PLAYER = 5, + + AAH_RX_PLAYER = 100, + AAH_TX_PLAYER = 101, }; diff --git a/media/libaah_rtp/Android.mk b/media/libaah_rtp/Android.mk new file mode 100644 index 0000000..54fd9ec --- /dev/null +++ b/media/libaah_rtp/Android.mk @@ -0,0 +1,40 @@ +LOCAL_PATH:= $(call my-dir) +# +# libaah_rtp +# + +include $(CLEAR_VARS) + +LOCAL_MODULE := libaah_rtp +LOCAL_MODULE_TAGS := optional + +LOCAL_SRC_FILES := \ + aah_decoder_pump.cpp \ + aah_rx_player.cpp \ + aah_rx_player_core.cpp \ + aah_rx_player_ring_buffer.cpp \ + aah_rx_player_substream.cpp \ + aah_tx_packet.cpp \ + aah_tx_player.cpp \ + aah_tx_sender.cpp \ + pipe_event.cpp + +LOCAL_C_INCLUDES := \ + frameworks/base/include \ + frameworks/base/include/media/stagefright/openmax \ + frameworks/base/media \ + frameworks/base/media/libstagefright + +LOCAL_SHARED_LIBRARIES := \ + libcommon_time_client \ + libbinder \ + libmedia \ + libstagefright \ + libstagefright_foundation \ + libutils + +LOCAL_LDLIBS := \ + -lpthread + +include $(BUILD_SHARED_LIBRARY) + diff --git a/media/libaah_rtp/aah_decoder_pump.cpp b/media/libaah_rtp/aah_decoder_pump.cpp new file mode 100644 index 0000000..72fe43b --- /dev/null +++ b/media/libaah_rtp/aah_decoder_pump.cpp @@ -0,0 +1,520 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +//#define LOG_NDEBUG 0 +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "aah_decoder_pump.h" + +namespace android { + +static const long long kLongDecodeErrorThreshold = 1000000ll; +static const uint32_t kMaxLongErrorsBeforeFatal = 3; +static const uint32_t kMaxErrorsBeforeFatal = 60; + +AAH_DecoderPump::AAH_DecoderPump(OMXClient& omx) + : omx_(omx) + , thread_status_(OK) + , renderer_(NULL) + , last_queued_pts_valid_(false) + , last_queued_pts_(0) + , last_ts_transform_valid_(false) + , last_volume_(0xFF) { + thread_ = new ThreadWrapper(this); +} + +AAH_DecoderPump::~AAH_DecoderPump() { + shutdown(); +} + +status_t AAH_DecoderPump::initCheck() { + if (thread_ == NULL) { + ALOGE("Failed to allocate thread"); + return NO_MEMORY; + } + + return OK; +} + +status_t AAH_DecoderPump::queueForDecode(MediaBuffer* buf) { + if (NULL == buf) { + return BAD_VALUE; + } + + if (OK != thread_status_) { + return thread_status_; + } + + { // Explicit scope for AutoMutex pattern. + AutoMutex lock(&thread_lock_); + in_queue_.push_back(buf); + } + + thread_cond_.signal(); + + return OK; +} + +void AAH_DecoderPump::queueToRenderer(MediaBuffer* decoded_sample) { + Mutex::Autolock lock(&render_lock_); + sp meta; + int64_t ts; + status_t res; + + // Fetch the metadata and make sure the sample has a timestamp. We + // cannot render samples which are missing PTSs. + meta = decoded_sample->meta_data(); + if ((meta == NULL) || (!meta->findInt64(kKeyTime, &ts))) { + ALOGV("Decoded sample missing timestamp, cannot render."); + CHECK(false); + } else { + // If we currently are not holding on to a renderer, go ahead and + // make one now. + if (NULL == renderer_) { + renderer_ = new TimedAudioTrack(); + if (NULL != renderer_) { + int frameCount; + AudioTrack::getMinFrameCount(&frameCount, + AUDIO_STREAM_DEFAULT, + static_cast(format_sample_rate_)); + int ch_format = (format_channels_ == 1) + ? AUDIO_CHANNEL_OUT_MONO + : AUDIO_CHANNEL_OUT_STEREO; + + res = renderer_->set(AUDIO_STREAM_DEFAULT, + format_sample_rate_, + AUDIO_FORMAT_PCM_16_BIT, + ch_format, + frameCount); + if (res != OK) { + ALOGE("Failed to setup audio renderer. (res = %d)", res); + delete renderer_; + renderer_ = NULL; + } else { + CHECK(last_ts_transform_valid_); + + res = renderer_->setMediaTimeTransform( + last_ts_transform_, TimedAudioTrack::COMMON_TIME); + if (res != NO_ERROR) { + ALOGE("Failed to set media time transform on AudioTrack" + " (res = %d)", res); + delete renderer_; + renderer_ = NULL; + } else { + float volume = static_cast(last_volume_) + / 255.0f; + if (renderer_->setVolume(volume, volume) != OK) { + ALOGW("%s: setVolume failed", __FUNCTION__); + } + + renderer_->start(); + } + } + } else { + ALOGE("Failed to allocate AudioTrack to use as a renderer."); + } + } + + if (NULL != renderer_) { + uint8_t* decoded_data = + reinterpret_cast(decoded_sample->data()); + uint32_t decoded_amt = decoded_sample->range_length(); + decoded_data += decoded_sample->range_offset(); + + sp pcm_payload; + res = renderer_->allocateTimedBuffer(decoded_amt, &pcm_payload); + if (res != OK) { + ALOGE("Failed to allocate %d byte audio track buffer." + " (res = %d)", decoded_amt, res); + } else { + memcpy(pcm_payload->pointer(), decoded_data, decoded_amt); + + res = renderer_->queueTimedBuffer(pcm_payload, ts); + if (res != OK) { + ALOGE("Failed to queue %d byte audio track buffer with media" + " PTS %lld. (res = %d)", decoded_amt, ts, res); + } else { + last_queued_pts_valid_ = true; + last_queued_pts_ = ts; + } + } + + } else { + ALOGE("No renderer, dropping audio payload."); + } + } +} + +void AAH_DecoderPump::stopAndCleanupRenderer() { + if (NULL == renderer_) { + return; + } + + renderer_->stop(); + delete renderer_; + renderer_ = NULL; +} + +void AAH_DecoderPump::setRenderTSTransform(const LinearTransform& trans) { + Mutex::Autolock lock(&render_lock_); + + if (last_ts_transform_valid_ && !memcmp(&trans, + &last_ts_transform_, + sizeof(trans))) { + return; + } + + last_ts_transform_ = trans; + last_ts_transform_valid_ = true; + + if (NULL != renderer_) { + status_t res = renderer_->setMediaTimeTransform( + last_ts_transform_, TimedAudioTrack::COMMON_TIME); + if (res != NO_ERROR) { + ALOGE("Failed to set media time transform on AudioTrack" + " (res = %d)", res); + } + } +} + +void AAH_DecoderPump::setRenderVolume(uint8_t volume) { + Mutex::Autolock lock(&render_lock_); + + if (volume == last_volume_) { + return; + } + + last_volume_ = volume; + if (renderer_ != NULL) { + float volume = static_cast(last_volume_) / 255.0f; + if (renderer_->setVolume(volume, volume) != OK) { + ALOGW("%s: setVolume failed", __FUNCTION__); + } + } +} + +// isAboutToUnderflow is something of a hack used to figure out when it might be +// time to give up on trying to fill in a gap in the RTP sequence and simply +// move on with a discontinuity. If we had perfect knowledge of when we were +// going to underflow, it would not be a hack, but unfortunately we do not. +// Right now, we just take the PTS of the last sample queued, and check to see +// if its presentation time is within kAboutToUnderflowThreshold from now. If +// it is, then we say that we are about to underflow. This decision is based on +// two (possibly invalid) assumptions. +// +// 1) The transmitter is leading the clock by more than +// kAboutToUnderflowThreshold. +// 2) The delta between the PTS of the last sample queued and the next sample +// is less than the transmitter's clock lead amount. +// +// Right now, the default transmitter lead time is 1 second, which is a pretty +// large number and greater than the 50mSec that kAboutToUnderflowThreshold is +// currently set to. This should satisfy assumption #1 for now, but changes to +// the transmitter clock lead time could effect this. +// +// For non-sparse streams with a homogeneous sample rate (the vast majority of +// streams in the world), the delta between any two adjacent PTSs will always be +// the homogeneous sample period. It is very uncommon to see a sample period +// greater than the 1 second clock lead we are currently using, and you +// certainly will not see it in an MP3 file which should satisfy assumption #2. +// Sparse audio streams (where no audio is transmitted for long periods of +// silence) and extremely low framerate video stream (like an MPEG-2 slideshow +// or the video stream for a pay TV audio channel) are examples of streams which +// might violate assumption #2. +bool AAH_DecoderPump::isAboutToUnderflow(int64_t threshold) { + Mutex::Autolock lock(&render_lock_); + + // If we have never queued anything to the decoder, we really don't know if + // we are going to underflow or not. + if (!last_queued_pts_valid_ || !last_ts_transform_valid_) { + return false; + } + + // Don't have access to Common Time? If so, then things are Very Bad + // elsewhere in the system; it pretty much does not matter what we do here. + // Since we cannot really tell if we are about to underflow or not, its + // probably best to assume that we are not and proceed accordingly. + int64_t tt_now; + if (OK != cc_helper_.getCommonTime(&tt_now)) { + return false; + } + + // Transform from media time to common time. + int64_t last_queued_pts_tt; + if (!last_ts_transform_.doForwardTransform(last_queued_pts_, + &last_queued_pts_tt)) { + return false; + } + + // Check to see if we are underflowing. + return ((tt_now + threshold - last_queued_pts_tt) > 0); +} + +void* AAH_DecoderPump::workThread() { + // No need to lock when accessing decoder_ from the thread. The + // implementation of init and shutdown ensure that other threads never touch + // decoder_ while the work thread is running. + CHECK(decoder_ != NULL); + CHECK(format_ != NULL); + + // Start the decoder and note its result code. If something goes horribly + // wrong, callers of queueForDecode and getOutput will be able to detect + // that the thread encountered a fatal error and shut down by examining + // thread_status_. + thread_status_ = decoder_->start(format_.get()); + if (OK != thread_status_) { + ALOGE("AAH_DecoderPump's work thread failed to start decoder (res = %d)", + thread_status_); + return NULL; + } + + DurationTimer decode_timer; + uint32_t consecutive_long_errors = 0; + uint32_t consecutive_errors = 0; + + while (!thread_->exitPending()) { + status_t res; + MediaBuffer* bufOut = NULL; + + decode_timer.start(); + res = decoder_->read(&bufOut); + decode_timer.stop(); + + if (res == INFO_FORMAT_CHANGED) { + // Format has changed. Destroy our current renderer so that a new + // one can be created during queueToRenderer with the proper format. + // + // TODO : In order to transition seamlessly, we should change this + // to put the old renderer in a queue to play out completely before + // we destroy it. We can still create a new renderer, the timed + // nature of the renderer should ensure a seamless splice. + stopAndCleanupRenderer(); + res = OK; + } + + // Try to be a little nuanced in our handling of actual decode errors. + // Errors could happen because of minor stream corruption or because of + // transient resource limitations. In these cases, we would rather drop + // a little bit of output and ride out the unpleasantness then throw up + // our hands and abort everything. + // + // OTOH - When things are really bad (like we have a non-transient + // resource or bookkeeping issue, or the stream being fed to us is just + // complete and total garbage) we really want to terminate playback and + // raise an error condition all the way up to the application level so + // they can deal with it. + // + // Unfortunately, the error codes returned by the decoder can be a + // little non-specific. For example, if an OMXCodec times out + // attempting to obtain an output buffer, the error we get back is a + // generic -1. Try to distinguish between this resource timeout error + // and ES corruption error by timing how long the decode operation + // takes. Maintain accounting for both errors and "long errors". If we + // get more than a certain number consecutive errors of either type, + // consider it fatal and shutdown (which will cause the error to + // propagate all of the way up to the application level). The threshold + // for "long errors" is deliberately much lower than that of normal + // decode errors, both because of how long they take to happen and + // because they generally indicate resource limitation errors which are + // unlikely to go away in pathologically bad cases (in contrast to + // stream corruption errors which might happen 20 times in a row and + // then be suddenly OK again) + if (res != OK) { + consecutive_errors++; + if (decode_timer.durationUsecs() >= kLongDecodeErrorThreshold) + consecutive_long_errors++; + + CHECK(NULL == bufOut); + + ALOGW("%s: Failed to decode data (res = %d)", + __PRETTY_FUNCTION__, res); + + if ((consecutive_errors >= kMaxErrorsBeforeFatal) || + (consecutive_long_errors >= kMaxLongErrorsBeforeFatal)) { + ALOGE("%s: Maximum decode error threshold has been reached." + " There have been %d consecutive decode errors, and %d" + " consecutive decode operations which resulted in errors" + " and took more than %lld uSec to process. The last" + " decode operation took %lld uSec.", + __PRETTY_FUNCTION__, + consecutive_errors, consecutive_long_errors, + kLongDecodeErrorThreshold, decode_timer.durationUsecs()); + thread_status_ = res; + break; + } + + continue; + } + + if (NULL == bufOut) { + ALOGW("%s: Successful decode, but no buffer produced", + __PRETTY_FUNCTION__); + continue; + } + + // Successful decode (with actual output produced). Clear the error + // counters. + consecutive_errors = 0; + consecutive_long_errors = 0; + + queueToRenderer(bufOut); + bufOut->release(); + } + + decoder_->stop(); + stopAndCleanupRenderer(); + + return NULL; +} + +status_t AAH_DecoderPump::init(const sp& params) { + Mutex::Autolock lock(&init_lock_); + + if (decoder_ != NULL) { + // already inited + return OK; + } + + if (params == NULL) { + return BAD_VALUE; + } + + if (!params->findInt32(kKeyChannelCount, &format_channels_)) { + return BAD_VALUE; + } + + if (!params->findInt32(kKeySampleRate, &format_sample_rate_)) { + return BAD_VALUE; + } + + CHECK(OK == thread_status_); + CHECK(decoder_ == NULL); + + status_t ret_val = UNKNOWN_ERROR; + + // Cache the format and attempt to create the decoder. + format_ = params; + decoder_ = OMXCodec::Create( + omx_.interface(), // IOMX Handle + format_, // Metadata for substream (indicates codec) + false, // Make a decoder, not an encoder + sp(this)); // We will be the source for this codec. + + if (decoder_ == NULL) { + ALOGE("Failed to allocate decoder in %s", __PRETTY_FUNCTION__); + goto bailout; + } + + // Fire up the pump thread. It will take care of starting and stopping the + // decoder. + ret_val = thread_->run("aah_decode_pump", ANDROID_PRIORITY_AUDIO); + if (OK != ret_val) { + ALOGE("Failed to start work thread in %s (res = %d)", + __PRETTY_FUNCTION__, ret_val); + goto bailout; + } + +bailout: + if (OK != ret_val) { + decoder_ = NULL; + format_ = NULL; + } + + return OK; +} + +status_t AAH_DecoderPump::shutdown() { + Mutex::Autolock lock(&init_lock_); + return shutdown_l(); +} + +status_t AAH_DecoderPump::shutdown_l() { + thread_->requestExit(); + thread_cond_.signal(); + thread_->requestExitAndWait(); + + for (MBQueue::iterator iter = in_queue_.begin(); + iter != in_queue_.end(); + ++iter) { + (*iter)->release(); + } + in_queue_.clear(); + + last_queued_pts_valid_ = false; + last_ts_transform_valid_ = false; + last_volume_ = 0xFF; + thread_status_ = OK; + + decoder_ = NULL; + format_ = NULL; + + return OK; +} + +status_t AAH_DecoderPump::read(MediaBuffer **buffer, + const ReadOptions *options) { + if (!buffer) { + return BAD_VALUE; + } + + *buffer = NULL; + + // While its not time to shut down, and we have no data to process, wait. + AutoMutex lock(&thread_lock_); + while (!thread_->exitPending() && in_queue_.empty()) + thread_cond_.wait(thread_lock_); + + // At this point, if its not time to shutdown then we must have something to + // process. Go ahead and pop the front of the queue for processing. + if (!thread_->exitPending()) { + CHECK(!in_queue_.empty()); + + *buffer = *(in_queue_.begin()); + in_queue_.erase(in_queue_.begin()); + } + + // If we managed to get a buffer, then everything must be OK. If not, then + // we must be shutting down. + return (NULL == *buffer) ? INVALID_OPERATION : OK; +} + +AAH_DecoderPump::ThreadWrapper::ThreadWrapper(AAH_DecoderPump* owner) + : Thread(false /* canCallJava*/ ) + , owner_(owner) { +} + +bool AAH_DecoderPump::ThreadWrapper::threadLoop() { + CHECK(NULL != owner_); + owner_->workThread(); + return false; +} + +} // namespace android diff --git a/media/libaah_rtp/aah_decoder_pump.h b/media/libaah_rtp/aah_decoder_pump.h new file mode 100644 index 0000000..f5a6529 --- /dev/null +++ b/media/libaah_rtp/aah_decoder_pump.h @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __DECODER_PUMP_H__ +#define __DECODER_PUMP_H__ + +#include + +#include +#include +#include +#include +#include + +namespace android { + +class MetaData; +class OMXClient; +class TimedAudioTrack; + +class AAH_DecoderPump : public MediaSource { + public: + explicit AAH_DecoderPump(OMXClient& omx); + status_t initCheck(); + + status_t queueForDecode(MediaBuffer* buf); + + status_t init(const sp& params); + status_t shutdown(); + + void setRenderTSTransform(const LinearTransform& trans); + void setRenderVolume(uint8_t volume); + bool isAboutToUnderflow(int64_t threshold); + bool getStatus() const { return thread_status_; } + + // MediaSource methods + virtual status_t start(MetaData *params) { return OK; } + virtual sp getFormat() { return format_; } + virtual status_t stop() { return OK; } + virtual status_t read(MediaBuffer **buffer, + const ReadOptions *options); + + protected: + virtual ~AAH_DecoderPump(); + + private: + class ThreadWrapper : public Thread { + public: + friend class AAH_DecoderPump; + explicit ThreadWrapper(AAH_DecoderPump* owner); + + private: + virtual bool threadLoop(); + AAH_DecoderPump* owner_; + + DISALLOW_EVIL_CONSTRUCTORS(ThreadWrapper); + }; + + void* workThread(); + virtual status_t shutdown_l(); + void queueToRenderer(MediaBuffer* decoded_sample); + void stopAndCleanupRenderer(); + + sp format_; + int32_t format_channels_; + int32_t format_sample_rate_; + + sp decoder_; + OMXClient& omx_; + Mutex init_lock_; + + sp thread_; + Condition thread_cond_; + Mutex thread_lock_; + status_t thread_status_; + + Mutex render_lock_; + TimedAudioTrack* renderer_; + bool last_queued_pts_valid_; + int64_t last_queued_pts_; + bool last_ts_transform_valid_; + LinearTransform last_ts_transform_; + uint8_t last_volume_; + CCHelper cc_helper_; + + // protected by the thread_lock_ + typedef List MBQueue; + MBQueue in_queue_; + + DISALLOW_EVIL_CONSTRUCTORS(AAH_DecoderPump); +}; + +} // namespace android +#endif // __DECODER_PUMP_H__ diff --git a/media/libaah_rtp/aah_rx_player.cpp b/media/libaah_rtp/aah_rx_player.cpp new file mode 100644 index 0000000..9dd79fd --- /dev/null +++ b/media/libaah_rtp/aah_rx_player.cpp @@ -0,0 +1,288 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +//#define LOG_NDEBUG 0 + +#include +#include +#include + +#include "aah_rx_player.h" + +namespace android { + +const uint32_t AAH_RXPlayer::kRTPRingBufferSize = 1 << 10; + +sp createAAH_RXPlayer() { + sp ret = new AAH_RXPlayer(); + return ret; +} + +AAH_RXPlayer::AAH_RXPlayer() + : ring_buffer_(kRTPRingBufferSize) + , substreams_(NULL) { + thread_wrapper_ = new ThreadWrapper(*this); + + is_playing_ = false; + multicast_joined_ = false; + transmitter_known_ = false; + current_epoch_known_ = false; + data_source_set_ = false; + sock_fd_ = -1; + + substreams_.setCapacity(4); + + memset(&listen_addr_, 0, sizeof(listen_addr_)); + memset(&transmitter_addr_, 0, sizeof(transmitter_addr_)); + + fetchAudioFlinger(); +} + +AAH_RXPlayer::~AAH_RXPlayer() { + reset_l(); + CHECK(substreams_.size() == 0); + omx_.disconnect(); +} + +status_t AAH_RXPlayer::initCheck() { + if (thread_wrapper_ == NULL) { + ALOGE("Failed to allocate thread wrapper!"); + return NO_MEMORY; + } + + if (!ring_buffer_.initCheck()) { + ALOGE("Failed to allocate reassembly ring buffer!"); + return NO_MEMORY; + } + + // Check for the presense of the common time service by attempting to query + // for CommonTime's frequency. If we get an error back, we cannot talk to + // the service at all and should abort now. + status_t res; + uint64_t freq; + res = cc_helper_.getCommonFreq(&freq); + if (OK != res) { + ALOGE("Failed to connect to common time service!"); + return res; + } + + return omx_.connect(); +} + +status_t AAH_RXPlayer::setDataSource( + const char *url, + const KeyedVector *headers) { + AutoMutex api_lock(&api_lock_); + uint32_t a, b, c, d; + uint16_t port; + + if (data_source_set_) { + return INVALID_OPERATION; + } + + if (NULL == url) { + return BAD_VALUE; + } + + if (5 != sscanf(url, "%*[^:/]://%u.%u.%u.%u:%hu", &a, &b, &c, &d, &port)) { + ALOGE("Failed to parse URL \"%s\"", url); + return BAD_VALUE; + } + + if ((a > 255) || (b > 255) || (c > 255) || (d > 255) || (port == 0)) { + ALOGE("Bad multicast address \"%s\"", url); + return BAD_VALUE; + } + + ALOGI("setDataSource :: %u.%u.%u.%u:%hu", a, b, c, d, port); + + a = (a << 24) | (b << 16) | (c << 8) | d; + + memset(&listen_addr_, 0, sizeof(listen_addr_)); + listen_addr_.sin_family = AF_INET; + listen_addr_.sin_port = htons(port); + listen_addr_.sin_addr.s_addr = htonl(a); + data_source_set_ = true; + + return OK; +} + +status_t AAH_RXPlayer::setDataSource(int fd, int64_t offset, int64_t length) { + return INVALID_OPERATION; +} + +status_t AAH_RXPlayer::setVideoSurface(const sp& surface) { + return OK; +} + +status_t AAH_RXPlayer::setVideoSurfaceTexture( + const sp& surfaceTexture) { + return OK; +} + +status_t AAH_RXPlayer::prepare() { + return OK; +} + +status_t AAH_RXPlayer::prepareAsync() { + sendEvent(MEDIA_PREPARED); + return OK; +} + +status_t AAH_RXPlayer::start() { + AutoMutex api_lock(&api_lock_); + + if (is_playing_) { + return OK; + } + + status_t res = startWorkThread(); + is_playing_ = (res == OK); + return res; +} + +status_t AAH_RXPlayer::stop() { + return pause(); +} + +status_t AAH_RXPlayer::pause() { + AutoMutex api_lock(&api_lock_); + stopWorkThread(); + CHECK(sock_fd_ < 0); + is_playing_ = false; + return OK; +} + +bool AAH_RXPlayer::isPlaying() { + AutoMutex api_lock(&api_lock_); + return is_playing_; +} + +status_t AAH_RXPlayer::seekTo(int msec) { + sendEvent(MEDIA_SEEK_COMPLETE); + return OK; +} + +status_t AAH_RXPlayer::getCurrentPosition(int *msec) { + if (NULL != msec) { + *msec = 0; + } + return OK; +} + +status_t AAH_RXPlayer::getDuration(int *msec) { + if (NULL != msec) { + *msec = 1; + } + return OK; +} + +status_t AAH_RXPlayer::reset() { + AutoMutex api_lock(&api_lock_); + reset_l(); + return OK; +} + +void AAH_RXPlayer::reset_l() { + stopWorkThread(); + CHECK(sock_fd_ < 0); + CHECK(!multicast_joined_); + is_playing_ = false; + data_source_set_ = false; + transmitter_known_ = false; + memset(&listen_addr_, 0, sizeof(listen_addr_)); +} + +status_t AAH_RXPlayer::setLooping(int loop) { + return OK; +} + +player_type AAH_RXPlayer::playerType() { + return AAH_RX_PLAYER; +} + +status_t AAH_RXPlayer::setParameter(int key, const Parcel &request) { + return ERROR_UNSUPPORTED; +} + +status_t AAH_RXPlayer::getParameter(int key, Parcel *reply) { + return ERROR_UNSUPPORTED; +} + +status_t AAH_RXPlayer::invoke(const Parcel& request, Parcel *reply) { + if (!reply) { + return BAD_VALUE; + } + + int32_t magic; + status_t err = request.readInt32(&magic); + if (err != OK) { + reply->writeInt32(err); + return OK; + } + + if (magic != 0x12345) { + reply->writeInt32(BAD_VALUE); + return OK; + } + + int32_t methodID; + err = request.readInt32(&methodID); + if (err != OK) { + reply->writeInt32(err); + return OK; + } + + switch (methodID) { + // Get Volume + case INVOKE_GET_MASTER_VOLUME: { + if (audio_flinger_ != NULL) { + reply->writeInt32(OK); + reply->writeFloat(audio_flinger_->masterVolume()); + } else { + reply->writeInt32(UNKNOWN_ERROR); + } + } break; + + // Set Volume + case INVOKE_SET_MASTER_VOLUME: { + float targetVol = request.readFloat(); + reply->writeInt32(audio_flinger_->setMasterVolume(targetVol)); + } break; + + default: return BAD_VALUE; + } + + return OK; +} + +void AAH_RXPlayer::fetchAudioFlinger() { + if (audio_flinger_ == NULL) { + sp sm = defaultServiceManager(); + sp binder; + binder = sm->getService(String16("media.audio_flinger")); + + if (binder == NULL) { + ALOGW("AAH_RXPlayer failed to fetch handle to audio flinger." + " Master volume control will not be possible."); + } + + audio_flinger_ = interface_cast(binder); + } +} + +} // namespace android diff --git a/media/libaah_rtp/aah_rx_player.h b/media/libaah_rtp/aah_rx_player.h new file mode 100644 index 0000000..7a1b6e3 --- /dev/null +++ b/media/libaah_rtp/aah_rx_player.h @@ -0,0 +1,313 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __AAH_RX_PLAYER_H__ +#define __AAH_RX_PLAYER_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "aah_decoder_pump.h" +#include "pipe_event.h" + +namespace android { + +class AAH_RXPlayer : public MediaPlayerInterface { + public: + AAH_RXPlayer(); + + virtual status_t initCheck(); + virtual status_t setDataSource(const char *url, + const KeyedVector* + headers); + virtual status_t setDataSource(int fd, int64_t offset, int64_t length); + virtual status_t setVideoSurface(const sp& surface); + virtual status_t setVideoSurfaceTexture(const sp& + surfaceTexture); + virtual status_t prepare(); + virtual status_t prepareAsync(); + virtual status_t start(); + virtual status_t stop(); + virtual status_t pause(); + virtual bool isPlaying(); + virtual status_t seekTo(int msec); + virtual status_t getCurrentPosition(int *msec); + virtual status_t getDuration(int *msec); + virtual status_t reset(); + virtual status_t setLooping(int loop); + virtual player_type playerType(); + virtual status_t setParameter(int key, const Parcel &request); + virtual status_t getParameter(int key, Parcel *reply); + virtual status_t invoke(const Parcel& request, Parcel *reply); + + protected: + virtual ~AAH_RXPlayer(); + + private: + class ThreadWrapper : public Thread { + public: + friend class AAH_RXPlayer; + explicit ThreadWrapper(AAH_RXPlayer& player) + : Thread(false /* canCallJava */ ) + , player_(player) { } + + virtual bool threadLoop() { return player_.threadLoop(); } + + private: + AAH_RXPlayer& player_; + + DISALLOW_EVIL_CONSTRUCTORS(ThreadWrapper); + }; + +#pragma pack(push, 1) + // PacketBuffers are structures used by the RX ring buffer. The ring buffer + // is a ring of pointers to PacketBuffer structures which act as variable + // length byte arrays and hold the contents of received UDP packets. Rather + // than make this a structure which hold a length and a pointer to another + // allocated structure (which would require two allocations), this struct + // uses a structure overlay pattern where allocation for the byte array + // consists of allocating (arrayLen + sizeof(ssize_t)) bytes of data from + // whatever pool/heap the packet buffer pulls from, and then overlaying the + // packed PacketBuffer structure on top of the allocation. The one-byte + // array at the end of the structure serves as an offset to the the data + // portion of the allocation; packet buffers are never allocated on the + // stack or using the new operator. Instead, the static allocate-byte-array + // and destroy methods handle the allocate and overlay pattern. They also + // allow for a potential future optimization where instead of just + // allocating blocks from the process global heap and overlaying, the + // allocator is replaced with a different implementation (private heap, + // free-list, circular buffer, etc) which reduces potential heap + // fragmentation issues which might arise from the frequent allocation and + // destruction of the received UDP traffic. + struct PacketBuffer { + ssize_t length_; + uint8_t data_[1]; + + // TODO : consider changing this to be some form of ring buffer or free + // pool system instead of just using the heap in order to avoid heap + // fragmentation. + static PacketBuffer* allocate(ssize_t length); + static void destroy(PacketBuffer* pb); + + private: + // Force people to use allocate/destroy instead of new/delete. + PacketBuffer() { } + ~PacketBuffer() { } + }; + + struct RetransRequest { + uint32_t magic_; + uint32_t mcast_ip_; + uint16_t mcast_port_; + uint16_t start_seq_; + uint16_t end_seq_; + }; +#pragma pack(pop) + + enum GapStatus { + kGS_NoGap = 0, + kGS_NormalGap, + kGS_FastStartGap, + }; + + struct SeqNoGap { + uint16_t start_seq_; + uint16_t end_seq_; + }; + + class RXRingBuffer { + public: + explicit RXRingBuffer(uint32_t capacity); + ~RXRingBuffer(); + + bool initCheck() const { return (ring_ != NULL); } + void reset(); + + // Push a packet buffer with a given sequence number into the ring + // buffer. pushBuffer will always consume the buffer pushed to it, + // either destroying it because it was a duplicate or overflow, or + // holding on to it in the ring. Callers should not hold any references + // to PacketBuffers after they have been pushed to the ring. Returns + // false in the case of a serious error (such as ring overflow). + // Callers should consider resetting the pipeline entirely in the event + // of a serious error. + bool pushBuffer(PacketBuffer* buf, uint16_t seq); + + // Fetch the next buffer in the RTP sequence. Returns NULL if there is + // no buffer to fetch. If a non-NULL PacketBuffer is returned, + // is_discon will be set to indicate whether or not this PacketBuffer is + // discontiuous with any previously returned packet buffers. Packet + // buffers returned by fetchBuffer are the caller's responsibility; they + // must be certain to destroy the buffers when they are done. + PacketBuffer* fetchBuffer(bool* is_discon); + + // Returns true and fills out the gap structure if the read pointer of + // the ring buffer is currently pointing to a gap which would stall a + // fetchBuffer operation. Returns false if the read pointer is not + // pointing to a gap in the sequence currently. + GapStatus fetchCurrentGap(SeqNoGap* gap); + + // Causes the read pointer to skip over any portion of a gap indicated + // by nak. If nak is NULL, any gap currently blocking the read pointer + // will be completely skipped. If any portion of a gap is skipped, the + // next successful read from fetch buffer will indicate a discontinuity. + void processNAK(const SeqNoGap* nak = NULL); + + // Compute the number of milliseconds until the inactivity timer for + // this RTP stream. Returns -1 if there is no active timeout, or 0 if + // the system has already timed out. + int computeInactivityTimeout(); + + private: + Mutex lock_; + PacketBuffer** ring_; + uint32_t capacity_; + uint32_t rd_; + uint32_t wr_; + + uint16_t rd_seq_; + bool rd_seq_known_; + bool waiting_for_fast_start_; + bool fetched_first_packet_; + + uint64_t rtp_activity_timeout_; + bool rtp_activity_timeout_valid_; + + DISALLOW_EVIL_CONSTRUCTORS(RXRingBuffer); + }; + + class Substream : public virtual RefBase { + public: + Substream(uint32_t ssrc, OMXClient& omx); + + void cleanupBufferInProgress(); + void shutdown(); + void processPayloadStart(uint8_t* buf, + uint32_t amt, + int32_t ts_lower); + void processPayloadCont (uint8_t* buf, + uint32_t amt); + void processTSTransform(const LinearTransform& trans); + + bool isAboutToUnderflow(); + uint32_t getSSRC() const { return ssrc_; } + uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } + status_t getStatus() const { return status_; } + + protected: + virtual ~Substream() { + shutdown(); + } + + private: + void cleanupDecoder(); + bool shouldAbort(const char* log_tag); + void processCompletedBuffer(); + bool setupSubstreamType(uint8_t substream_type, + uint8_t codec_type); + + uint32_t ssrc_; + bool waiting_for_rap_; + status_t status_; + + bool substream_details_known_; + uint8_t substream_type_; + uint8_t codec_type_; + sp substream_meta_; + + MediaBuffer* buffer_in_progress_; + uint32_t expected_buffer_size_; + uint32_t buffer_filled_; + + sp decoder_; + + static int64_t kAboutToUnderflowThreshold; + + DISALLOW_EVIL_CONSTRUCTORS(Substream); + }; + + typedef DefaultKeyedVector< uint32_t, sp > SubstreamVec; + + status_t startWorkThread(); + void stopWorkThread(); + virtual bool threadLoop(); + bool setupSocket(); + void cleanupSocket(); + void resetPipeline(); + void reset_l(); + bool processRX(PacketBuffer* pb); + void processRingBuffer(); + void processCommandPacket(PacketBuffer* pb); + bool processGaps(); + int computeNextGapRetransmitTimeout(); + void fetchAudioFlinger(); + + PipeEvent wakeup_work_thread_evt_; + sp thread_wrapper_; + Mutex api_lock_; + bool is_playing_; + bool data_source_set_; + + struct sockaddr_in listen_addr_; + int sock_fd_; + bool multicast_joined_; + + struct sockaddr_in transmitter_addr_; + bool transmitter_known_; + + uint32_t current_epoch_; + bool current_epoch_known_; + + SeqNoGap current_gap_; + GapStatus current_gap_status_; + uint64_t next_retrans_req_time_; + + RXRingBuffer ring_buffer_; + SubstreamVec substreams_; + OMXClient omx_; + CCHelper cc_helper_; + + // Connection to audio flinger used to hack a path to setMasterVolume. + sp audio_flinger_; + + static const uint32_t kRTPRingBufferSize; + static const uint32_t kRetransRequestMagic; + static const uint32_t kFastStartRequestMagic; + static const uint32_t kRetransNAKMagic; + static const uint32_t kGapRerequestTimeoutUSec; + static const uint32_t kFastStartTimeoutUSec; + static const uint32_t kRTPActivityTimeoutUSec; + + static const uint32_t INVOKE_GET_MASTER_VOLUME = 3; + static const uint32_t INVOKE_SET_MASTER_VOLUME = 4; + + static uint64_t monotonicUSecNow(); + + DISALLOW_EVIL_CONSTRUCTORS(AAH_RXPlayer); +}; + +} // namespace android + +#endif // __AAH_RX_PLAYER_H__ diff --git a/media/libaah_rtp/aah_rx_player_core.cpp b/media/libaah_rtp/aah_rx_player_core.cpp new file mode 100644 index 0000000..d2b3386 --- /dev/null +++ b/media/libaah_rtp/aah_rx_player_core.cpp @@ -0,0 +1,807 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +//#define LOG_NDEBUG 0 +#include + +#include +#include +#include +#include +#include + +#include + +#include "aah_rx_player.h" +#include "aah_tx_packet.h" + +namespace android { + +const uint32_t AAH_RXPlayer::kRetransRequestMagic = + FOURCC('T','r','e','q'); +const uint32_t AAH_RXPlayer::kRetransNAKMagic = + FOURCC('T','n','a','k'); +const uint32_t AAH_RXPlayer::kFastStartRequestMagic = + FOURCC('T','f','s','t'); +const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000; +const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000; +const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000; + +static inline int16_t fetchInt16(uint8_t* data) { + return static_cast(U16_AT(data)); +} + +static inline int32_t fetchInt32(uint8_t* data) { + return static_cast(U32_AT(data)); +} + +static inline int64_t fetchInt64(uint8_t* data) { + return static_cast(U64_AT(data)); +} + +uint64_t AAH_RXPlayer::monotonicUSecNow() { + struct timespec now; + int res = clock_gettime(CLOCK_MONOTONIC, &now); + CHECK(res >= 0); + + uint64_t ret = static_cast(now.tv_sec) * 1000000; + ret += now.tv_nsec / 1000; + + return ret; +} + +status_t AAH_RXPlayer::startWorkThread() { + status_t res; + stopWorkThread(); + res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO); + + if (res != OK) { + ALOGE("Failed to start work thread (res = %d)", res); + } + + return res; +} + +void AAH_RXPlayer::stopWorkThread() { + thread_wrapper_->requestExit(); // set the exit pending flag + wakeup_work_thread_evt_.setEvent(); + + status_t res; + res = thread_wrapper_->requestExitAndWait(); // block until thread exit. + if (res != OK) { + ALOGE("Failed to stop work thread (res = %d)", res); + } + + wakeup_work_thread_evt_.clearPendingEvents(); +} + +void AAH_RXPlayer::cleanupSocket() { + if (sock_fd_ >= 0) { + if (multicast_joined_) { + int res; + struct ip_mreq mreq; + mreq.imr_multiaddr = listen_addr_.sin_addr; + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + res = setsockopt(sock_fd_, + IPPROTO_IP, + IP_DROP_MEMBERSHIP, + &mreq, sizeof(mreq)); + if (res < 0) { + ALOGW("Failed to leave multicast group. (%d, %d)", res, errno); + } + multicast_joined_ = false; + } + + close(sock_fd_); + sock_fd_ = -1; + } + + resetPipeline(); +} + +void AAH_RXPlayer::resetPipeline() { + ring_buffer_.reset(); + + // Explicitly shudown all of the active substreams, then call clear out the + // collection. Failure to clear out a substream can result in its decoder + // holding a reference to itself and therefor not going away when the + // collection is cleared. + for (size_t i = 0; i < substreams_.size(); ++i) + substreams_.valueAt(i)->shutdown(); + + substreams_.clear(); + + current_gap_status_ = kGS_NoGap; +} + +bool AAH_RXPlayer::setupSocket() { + long flags; + int res, buf_size; + socklen_t opt_size; + + cleanupSocket(); + CHECK(sock_fd_ < 0); + + // Make the socket + sock_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (sock_fd_ < 0) { + ALOGE("Failed to create listen socket (errno %d)", errno); + goto bailout; + } + + // Set non-blocking operation + flags = fcntl(sock_fd_, F_GETFL); + res = fcntl(sock_fd_, F_SETFL, flags | O_NONBLOCK); + if (res < 0) { + ALOGE("Failed to set socket (%d) to non-blocking mode (errno %d)", + sock_fd_, errno); + goto bailout; + } + + // Bind to our port + struct sockaddr_in bind_addr; + memset(&bind_addr, 0, sizeof(bind_addr)); + bind_addr.sin_family = AF_INET; + bind_addr.sin_addr.s_addr = INADDR_ANY; + bind_addr.sin_port = listen_addr_.sin_port; + res = bind(sock_fd_, + reinterpret_cast(&bind_addr), + sizeof(bind_addr)); + if (res < 0) { + uint32_t a = ntohl(bind_addr.sin_addr.s_addr); + uint16_t p = ntohs(bind_addr.sin_port); + ALOGE("Failed to bind socket (%d) to %d.%d.%d.%d:%hd. (errno %d)", + sock_fd_, + (a >> 24) & 0xFF, + (a >> 16) & 0xFF, + (a >> 8) & 0xFF, + (a ) & 0xFF, + p, + errno); + + goto bailout; + } + + buf_size = 1 << 16; // 64k + res = setsockopt(sock_fd_, + SOL_SOCKET, SO_RCVBUF, + &buf_size, sizeof(buf_size)); + if (res < 0) { + ALOGW("Failed to increase socket buffer size to %d. (errno %d)", + buf_size, errno); + } + + buf_size = 0; + opt_size = sizeof(buf_size); + res = getsockopt(sock_fd_, + SOL_SOCKET, SO_RCVBUF, + &buf_size, &opt_size); + if (res < 0) { + ALOGW("Failed to fetch socket buffer size. (errno %d)", errno); + } else { + ALOGI("RX socket buffer size is now %d bytes", buf_size); + } + + if (listen_addr_.sin_addr.s_addr) { + // Join the multicast group and we should be good to go. + struct ip_mreq mreq; + mreq.imr_multiaddr = listen_addr_.sin_addr; + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + res = setsockopt(sock_fd_, + IPPROTO_IP, + IP_ADD_MEMBERSHIP, + &mreq, sizeof(mreq)); + if (res < 0) { + ALOGE("Failed to join multicast group. (errno %d)", errno); + goto bailout; + } + multicast_joined_ = true; + } + + return true; + +bailout: + cleanupSocket(); + return false; +} + +bool AAH_RXPlayer::threadLoop() { + struct pollfd poll_fds[2]; + bool process_more_right_now = false; + + if (!setupSocket()) { + sendEvent(MEDIA_ERROR); + goto bailout; + } + + while (!thread_wrapper_->exitPending()) { + // Step 1: Wait until there is something to do. + int gap_timeout = computeNextGapRetransmitTimeout(); + int ring_timeout = ring_buffer_.computeInactivityTimeout(); + int timeout = -1; + + if (!ring_timeout) { + ALOGW("RTP inactivity timeout reached, resetting pipeline."); + resetPipeline(); + timeout = gap_timeout; + } else { + if (gap_timeout < 0) { + timeout = ring_timeout; + } else if (ring_timeout < 0) { + timeout = gap_timeout; + } else { + timeout = (gap_timeout < ring_timeout) ? gap_timeout + : ring_timeout; + } + } + + if ((0 != timeout) && (!process_more_right_now)) { + // Set up the events to wait on. Start with the wakeup pipe. + memset(&poll_fds, 0, sizeof(poll_fds)); + poll_fds[0].fd = wakeup_work_thread_evt_.getWakeupHandle(); + poll_fds[0].events = POLLIN; + + // Add the RX socket. + poll_fds[1].fd = sock_fd_; + poll_fds[1].events = POLLIN; + + // Wait for something interesing to happen. + int poll_res = poll(poll_fds, NELEM(poll_fds), timeout); + if (poll_res < 0) { + ALOGE("Fatal error (%d,%d) while waiting on events", + poll_res, errno); + sendEvent(MEDIA_ERROR); + goto bailout; + } + } + + if (thread_wrapper_->exitPending()) { + break; + } + + wakeup_work_thread_evt_.clearPendingEvents(); + process_more_right_now = false; + + // Step 2: Do we have data waiting in the socket? If so, drain the + // socket moving valid RTP information into the ring buffer to be + // processed. + if (poll_fds[1].revents) { + struct sockaddr_in from; + socklen_t from_len; + + ssize_t res = 0; + while (!thread_wrapper_->exitPending()) { + // Check the size of any pending packet. + res = recv(sock_fd_, NULL, 0, MSG_PEEK | MSG_TRUNC); + + // Error? + if (res < 0) { + // If the error is anything other than would block, + // something has gone very wrong. + if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { + ALOGE("Fatal socket error during recvfrom (%d, %d)", + (int)res, errno); + goto bailout; + } + + // Socket is out of data, just break out of processing and + // wait for more. + break; + } + + // Allocate a payload. + PacketBuffer* pb = PacketBuffer::allocate(res); + if (NULL == pb) { + ALOGE("Fatal error, failed to allocate packet buffer of" + " length %u", static_cast(res)); + goto bailout; + } + + // Fetch the data. + from_len = sizeof(from); + res = recvfrom(sock_fd_, pb->data_, pb->length_, 0, + reinterpret_cast(&from), + &from_len); + if (res != pb->length_) { + ALOGE("Fatal error, fetched packet length (%d) does not" + " match peeked packet length (%u). This should never" + " happen. (errno = %d)", + static_cast(res), + static_cast(pb->length_), + errno); + } + + bool drop_packet = false; + if (transmitter_known_) { + if (from.sin_addr.s_addr != + transmitter_addr_.sin_addr.s_addr) { + uint32_t a = ntohl(from.sin_addr.s_addr); + uint16_t p = ntohs(from.sin_port); + ALOGV("Dropping packet from unknown transmitter" + " %u.%u.%u.%u:%hu", + ((a >> 24) & 0xFF), + ((a >> 16) & 0xFF), + ((a >> 8) & 0xFF), + ( a & 0xFF), + p); + + drop_packet = true; + } else { + transmitter_addr_.sin_port = from.sin_port; + } + } else { + memcpy(&transmitter_addr_, &from, sizeof(from)); + transmitter_known_ = true; + } + + if (!drop_packet) { + bool serious_error = !processRX(pb); + + if (serious_error) { + // Something went "seriously wrong". Currently, the + // only trigger for this should be a ring buffer + // overflow. The current failsafe behavior for when + // something goes seriously wrong is to just reset the + // pipeline. The system should behave as if this + // AAH_RXPlayer was just set up for the first time. + ALOGE("Something just went seriously wrong with the" + " pipeline. Resetting."); + resetPipeline(); + } + } else { + PacketBuffer::destroy(pb); + } + } + } + + // Step 3: Process any data we mave have accumulated in the ring buffer + // so far. + if (!thread_wrapper_->exitPending()) { + processRingBuffer(); + } + + // Step 4: At this point in time, the ring buffer should either be + // empty, or stalled in front of a gap caused by some dropped packets. + // Check on the current gap situation and deal with it in an appropriate + // fashion. If processGaps returns true, it means that it has given up + // on a gap and that we should try to process some more data + // immediately. + if (!thread_wrapper_->exitPending()) { + process_more_right_now = processGaps(); + } + + // Step 5: Check for fatal errors. If any of our substreams has + // encountered a fatal, unrecoverable, error, then propagate the error + // up to user level and shut down. + for (size_t i = 0; i < substreams_.size(); ++i) { + status_t status; + CHECK(substreams_.valueAt(i) != NULL); + + status = substreams_.valueAt(i)->getStatus(); + if (OK != status) { + ALOGE("Substream index %d has encountered an unrecoverable" + " error (%d). Signalling application level and shutting" + " down.", i, status); + sendEvent(MEDIA_ERROR); + goto bailout; + } + } + } + +bailout: + cleanupSocket(); + return false; +} + +bool AAH_RXPlayer::processRX(PacketBuffer* pb) { + CHECK(NULL != pb); + + uint8_t* data = pb->data_; + ssize_t amt = pb->length_; + uint32_t nak_magic; + uint16_t seq_no; + uint32_t epoch; + + // Every packet either starts with an RTP header which is at least 12 bytes + // long or is a retry NAK which is 14 bytes long. If there are fewer than + // 12 bytes here, this cannot be a proper RTP packet. + if (amt < 12) { + ALOGV("Dropping packet, too short to contain RTP header (%u bytes)", + static_cast(amt)); + goto drop_packet; + } + + // Check to see if this is the special case of a NAK packet. + nak_magic = ntohl(*(reinterpret_cast(data))); + if (nak_magic == kRetransNAKMagic) { + // Looks like a NAK packet; make sure its long enough. + + if (amt < static_cast(sizeof(RetransRequest))) { + ALOGV("Dropping packet, too short to contain NAK payload (%u bytes)", + static_cast(amt)); + goto drop_packet; + } + + SeqNoGap gap; + RetransRequest* rtr = reinterpret_cast(data); + gap.start_seq_ = ntohs(rtr->start_seq_); + gap.end_seq_ = ntohs(rtr->end_seq_); + + ALOGV("Process NAK for gap at [%hu, %hu]", gap.start_seq_, gap.end_seq_); + ring_buffer_.processNAK(&gap); + + return true; + } + + // According to the TRTP spec, version should be 2, padding should be 0, + // extension should be 0 and CSRCCnt should be 0. If any of these tests + // fail, we chuck the packet. + if (data[0] != 0x80) { + ALOGV("Dropping packet, bad V/P/X/CSRCCnt field (0x%02x)", + data[0]); + goto drop_packet; + } + + // Check the payload type. For TRTP, it should always be 100. + if ((data[1] & 0x7F) != 100) { + ALOGV("Dropping packet, bad payload type. (%u)", + data[1] & 0x7F); + goto drop_packet; + } + + // Check whether the transmitter has begun a new epoch. + epoch = (U32_AT(data + 8) >> 10) & 0x3FFFFF; + if (current_epoch_known_) { + if (epoch != current_epoch_) { + ALOGV("%s: new epoch %u", __PRETTY_FUNCTION__, epoch); + current_epoch_ = epoch; + resetPipeline(); + } + } else { + current_epoch_ = epoch; + current_epoch_known_ = true; + } + + // Extract the sequence number and hand the packet off to the ring buffer + // for dropped packet detection and later processing. + seq_no = U16_AT(data + 2); + return ring_buffer_.pushBuffer(pb, seq_no); + +drop_packet: + PacketBuffer::destroy(pb); + return true; +} + +void AAH_RXPlayer::processRingBuffer() { + PacketBuffer* pb; + bool is_discon; + sp substream; + LinearTransform trans; + bool foundTrans = false; + + while (NULL != (pb = ring_buffer_.fetchBuffer(&is_discon))) { + if (is_discon) { + // Abort all partially assembled payloads. + for (size_t i = 0; i < substreams_.size(); ++i) { + CHECK(substreams_.valueAt(i) != NULL); + substreams_.valueAt(i)->cleanupBufferInProgress(); + } + } + + uint8_t* data = pb->data_; + ssize_t amt = pb->length_; + + // Should not have any non-RTP packets in the ring buffer. RTP packets + // must be at least 12 bytes long. + CHECK(amt >= 12); + + // Extract the marker bit and the SSRC field. + bool marker = (data[1] & 0x80) != 0; + uint32_t ssrc = U32_AT(data + 8); + + // Is this the start of a new TRTP payload? If so, the marker bit + // should be set and there are some things we should be checking for. + if (marker) { + // TRTP headers need to have at least a byte for version, a byte for + // payload type and flags, and 4 bytes for length. + if (amt < 18) { + ALOGV("Dropping packet, too short to contain TRTP header" + " (%u bytes)", static_cast(amt)); + goto process_next_packet; + } + + // Check the TRTP version and extract the payload type/flags. + uint8_t trtp_version = data[12]; + uint8_t payload_type = (data[13] >> 4) & 0xF; + uint8_t trtp_flags = data[13] & 0xF; + + if (1 != trtp_version) { + ALOGV("Dropping packet, bad trtp version %hhu", trtp_version); + goto process_next_packet; + } + + // Is there a timestamp transformation present on this packet? If + // so, extract it and pass it to the appropriate substreams. + if (trtp_flags & 0x02) { + ssize_t offset = 18 + ((trtp_flags & 0x01) ? 4 : 0); + if (amt < (offset + 24)) { + ALOGV("Dropping packet, too short to contain TRTP Timestamp" + " Transformation (%u bytes)", + static_cast(amt)); + goto process_next_packet; + } + + trans.a_zero = fetchInt64(data + offset); + trans.b_zero = fetchInt64(data + offset + 16); + trans.a_to_b_numer = static_cast( + fetchInt32 (data + offset + 8)); + trans.a_to_b_denom = U32_AT(data + offset + 12); + foundTrans = true; + + uint32_t program_id = (ssrc >> 5) & 0x1F; + for (size_t i = 0; i < substreams_.size(); ++i) { + sp iter = substreams_.valueAt(i); + CHECK(iter != NULL); + + if (iter->getProgramID() == program_id) { + iter->processTSTransform(trans); + } + } + } + + // Is this a command packet? If so, its not necessarily associate + // with one particular substream. Just give it to the command + // packet handler and then move on. + if (4 == payload_type) { + processCommandPacket(pb); + goto process_next_packet; + } + } + + // If we got to here, then we are a normal packet. Find (or allocate) + // the substream we belong to and send the packet off to be processed. + substream = substreams_.valueFor(ssrc); + if (substream == NULL) { + substream = new Substream(ssrc, omx_); + if (substream == NULL) { + ALOGE("Failed to allocate substream for SSRC 0x%08x", ssrc); + goto process_next_packet; + } + substreams_.add(ssrc, substream); + + if (foundTrans) { + substream->processTSTransform(trans); + } + } + + CHECK(substream != NULL); + + if (marker) { + // Start of a new TRTP payload for this substream. Extract the + // lower 32 bits of the timestamp and hand the buffer to the + // substream for processing. + uint32_t ts_lower = U32_AT(data + 4); + substream->processPayloadStart(data + 12, amt - 12, ts_lower); + } else { + // Continuation of an existing TRTP payload. Just hand it off to + // the substream for processing. + substream->processPayloadCont(data + 12, amt - 12); + } + +process_next_packet: + PacketBuffer::destroy(pb); + } // end of main processing while loop. +} + +void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) { + CHECK(NULL != pb); + + uint8_t* data = pb->data_; + ssize_t amt = pb->length_; + + // verify that this packet meets the minimum length of a command packet + if (amt < 20) { + return; + } + + uint8_t trtp_version = data[12]; + uint8_t trtp_flags = data[13] & 0xF; + + if (1 != trtp_version) { + ALOGV("Dropping packet, bad trtp version %hhu", trtp_version); + return; + } + + // calculate the start of the command payload + ssize_t offset = 18; + if (trtp_flags & 0x01) { + // timestamp is present (4 bytes) + offset += 4; + } + if (trtp_flags & 0x02) { + // transform is present (24 bytes) + offset += 24; + } + + // the packet must contain 2 bytes of command payload beyond the TRTP header + if (amt < offset + 2) { + return; + } + + uint16_t command_id = U16_AT(data + offset); + + switch (command_id) { + case TRTPControlPacket::kCommandNop: + break; + + case TRTPControlPacket::kCommandEOS: + case TRTPControlPacket::kCommandFlush: { + uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; + ALOGI("*** %s flushing program_id=%d", + __PRETTY_FUNCTION__, program_id); + + Vector substreams_to_remove; + for (size_t i = 0; i < substreams_.size(); ++i) { + sp iter = substreams_.valueAt(i); + if (iter->getProgramID() == program_id) { + iter->shutdown(); + substreams_to_remove.add(iter->getSSRC()); + } + } + + for (size_t i = 0; i < substreams_to_remove.size(); ++i) { + substreams_.removeItem(substreams_to_remove[i]); + } + } break; + } +} + +bool AAH_RXPlayer::processGaps() { + // Deal with the current gap situation. Specifically... + // + // 1) If a new gap has shown up, send a retransmit request to the + // transmitter. + // 2) If a gap we were working on has had a packet in the middle or at + // the end filled in, send another retransmit request for the begining + // portion of the gap. TRTP was designed for LANs where packet + // re-ordering is very unlikely; so see the middle or end of a gap + // filled in before the begining is an almost certain indication that + // a retransmission packet was also dropped. + // 3) If we have been working on a gap for a while and it still has not + // been filled in, send another retransmit request. + // 4) If the are no more gaps in the ring, clear the current_gap_status_ + // flag to indicate that all is well again. + + // Start by fetching the active gap status. + SeqNoGap gap; + bool send_retransmit_request = false; + bool ret_val = false; + GapStatus gap_status; + if (kGS_NoGap != (gap_status = ring_buffer_.fetchCurrentGap(&gap))) { + // Note: checking for a change in the end sequence number should cover + // moving on to an entirely new gap for case #1 as well as resending the + // begining of a gap range for case #2. + send_retransmit_request = (kGS_NoGap == current_gap_status_) || + (current_gap_.end_seq_ != gap.end_seq_); + + // If this is the same gap we have been working on, and it has timed + // out, then check to see if our substreams are about to underflow. If + // so, instead of sending another retransmit request, just give up on + // this gap and move on. + if (!send_retransmit_request && + (kGS_NoGap != current_gap_status_) && + (0 == computeNextGapRetransmitTimeout())) { + + // If out current gap is the fast-start gap, don't bother to skip it + // because substreams look like the are about to underflow. + if ((kGS_FastStartGap != gap_status) || + (current_gap_.end_seq_ != gap.end_seq_)) { + for (size_t i = 0; i < substreams_.size(); ++i) { + if (substreams_.valueAt(i)->isAboutToUnderflow()) { + ALOGV("About to underflow, giving up on gap [%hu, %hu]", + gap.start_seq_, gap.end_seq_); + ring_buffer_.processNAK(); + current_gap_status_ = kGS_NoGap; + return true; + } + } + } + + // Looks like no one is about to underflow. Just go ahead and send + // the request. + send_retransmit_request = true; + } + } else { + current_gap_status_ = kGS_NoGap; + } + + if (send_retransmit_request) { + // If we have been working on a fast start, and it is still not filled + // in, even after the extended retransmit time out, give up and skip it. + // The system should fall back into its normal slow-start behavior. + if ((kGS_FastStartGap == current_gap_status_) && + (current_gap_.end_seq_ == gap.end_seq_)) { + ALOGV("Fast start is taking forever; giving up."); + ring_buffer_.processNAK(); + current_gap_status_ = kGS_NoGap; + return true; + } + + // Send the request. + RetransRequest req; + uint32_t magic = (kGS_FastStartGap == gap_status) + ? kFastStartRequestMagic + : kRetransRequestMagic; + req.magic_ = htonl(magic); + req.mcast_ip_ = listen_addr_.sin_addr.s_addr; + req.mcast_port_ = listen_addr_.sin_port; + req.start_seq_ = htons(gap.start_seq_); + req.end_seq_ = htons(gap.end_seq_); + + { + uint32_t a = ntohl(transmitter_addr_.sin_addr.s_addr); + uint16_t p = ntohs(transmitter_addr_.sin_port); + ALOGV("Sending to transmitter %u.%u.%u.%u:%hu", + ((a >> 24) & 0xFF), + ((a >> 16) & 0xFF), + ((a >> 8) & 0xFF), + ( a & 0xFF), + p); + } + + int res = sendto(sock_fd_, &req, sizeof(req), 0, + reinterpret_cast(&transmitter_addr_), + sizeof(transmitter_addr_)); + if (res < 0) { + ALOGE("Error when sending retransmit request (%d)", errno); + } else { + ALOGV("%s request for range [%hu, %hu] sent", + (kGS_FastStartGap == gap_status) ? "Fast Start" : "Retransmit", + gap.start_seq_, gap.end_seq_); + } + + // Update the current gap info. + current_gap_ = gap; + current_gap_status_ = gap_status; + next_retrans_req_time_ = monotonicUSecNow() + + ((kGS_FastStartGap == current_gap_status_) + ? kFastStartTimeoutUSec + : kGapRerequestTimeoutUSec); + } + + return false; +} + +// Compute when its time to send the next gap retransmission in milliseconds. +// Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit +// right now. +int AAH_RXPlayer::computeNextGapRetransmitTimeout() { + if (kGS_NoGap == current_gap_status_) { + return -1; + } + + int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow(); + + timeout_delta /= 1000; + if (timeout_delta <= 0) { + return 0; + } + + return static_cast(timeout_delta); +} + +} // namespace android diff --git a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp new file mode 100644 index 0000000..0d8b31f --- /dev/null +++ b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp @@ -0,0 +1,366 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +//#define LOG_NDEBUG 0 +#include + +#include "aah_rx_player.h" + +namespace android { + +AAH_RXPlayer::RXRingBuffer::RXRingBuffer(uint32_t capacity) { + capacity_ = capacity; + rd_ = wr_ = 0; + ring_ = new PacketBuffer*[capacity]; + memset(ring_, 0, sizeof(PacketBuffer*) * capacity); + reset(); +} + +AAH_RXPlayer::RXRingBuffer::~RXRingBuffer() { + reset(); + delete[] ring_; +} + +void AAH_RXPlayer::RXRingBuffer::reset() { + AutoMutex lock(&lock_); + + if (NULL != ring_) { + while (rd_ != wr_) { + CHECK(rd_ < capacity_); + if (NULL != ring_[rd_]) { + PacketBuffer::destroy(ring_[rd_]); + ring_[rd_] = NULL; + } + rd_ = (rd_ + 1) % capacity_; + } + } + + rd_ = wr_ = 0; + rd_seq_known_ = false; + waiting_for_fast_start_ = true; + fetched_first_packet_ = false; + rtp_activity_timeout_valid_ = false; +} + +bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, + uint16_t seq) { + AutoMutex lock(&lock_); + CHECK(NULL != ring_); + CHECK(NULL != buf); + + rtp_activity_timeout_valid_ = true; + rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec; + + // If the ring buffer is totally reset (we have never received a single + // payload) then we don't know the rd sequence number and this should be + // simple. We just store the payload, advance the wr pointer and record the + // initial sequence number. + if (!rd_seq_known_) { + CHECK(rd_ == wr_); + CHECK(NULL == ring_[wr_]); + CHECK(wr_ < capacity_); + + ring_[wr_] = buf; + wr_ = (wr_ + 1) % capacity_; + rd_seq_ = seq; + rd_seq_known_ = true; + return true; + } + + // Compute the seqence number of this payload and of the write pointer, + // normalized around the read pointer. IOW - transform the payload seq no + // and the wr pointer seq no into a space where the rd pointer seq no is + // zero. This will define 4 cases we can consider... + // + // 1) norm_seq == norm_wr_seq + // This payload is contiguous with the last. All is good. + // + // 2) ((norm_seq < norm_wr_seq) && (norm_seq >= norm_rd_seq) + // aka ((norm_seq < norm_wr_seq) && (norm_seq >= 0) + // This payload is in the past, in the unprocessed region of the ring + // buffer. It is probably a retransmit intended to fill in a dropped + // payload; it may be a duplicate. + // + // 3) ((norm_seq - norm_wr_seq) & 0x8000) != 0 + // This payload is in the past compared to the write pointer (or so very + // far in the future that it has wrapped the seq no space), but not in + // the unprocessed region of the ring buffer. This could be a duplicate + // retransmit; we just drop these payloads unless we are waiting for our + // first fast start packet. If we are waiting for fast start, than this + // packet is probably the first packet of the fast start retransmission. + // If it will fit in the buffer, back up the read pointer to its position + // and clear the fast start flag, otherwise just drop it. + // + // 4) ((norm_seq - norm_wr_seq) & 0x8000) == 0 + // This payload which is ahead of the next write pointer. This indicates + // that we have missed some payloads and need to request a retransmit. + // If norm_seq >= (capacity - 1), then the gap is so large that it would + // overflow the ring buffer and we should probably start to panic. + + uint16_t norm_wr_seq = ((wr_ + capacity_ - rd_) % capacity_); + uint16_t norm_seq = seq - rd_seq_; + + // Check for overflow first. + if ((!(norm_seq & 0x8000)) && (norm_seq >= (capacity_ - 1))) { + ALOGW("Ring buffer overflow; cap = %u, [rd, wr] = [%hu, %hu], seq = %hu", + capacity_, rd_seq_, norm_wr_seq + rd_seq_, seq); + PacketBuffer::destroy(buf); + return false; + } + + // Check for case #1 + if (norm_seq == norm_wr_seq) { + CHECK(wr_ < capacity_); + CHECK(NULL == ring_[wr_]); + + ring_[wr_] = buf; + wr_ = (wr_ + 1) % capacity_; + + CHECK(wr_ != rd_); + return true; + } + + // Check case #2 + uint32_t ring_pos = (rd_ + norm_seq) % capacity_; + if ((norm_seq < norm_wr_seq) && (!(norm_seq & 0x8000))) { + // Do we already have a payload for this slot? If so, then this looks + // like a duplicate retransmit. Just ignore it. + if (NULL != ring_[ring_pos]) { + ALOGD("RXed duplicate retransmit, seq = %hu", seq); + PacketBuffer::destroy(buf); + } else { + // Looks like we were missing this payload. Go ahead and store it. + ring_[ring_pos] = buf; + } + + return true; + } + + // Check case #3 + if ((norm_seq - norm_wr_seq) & 0x8000) { + if (!waiting_for_fast_start_) { + ALOGD("RXed duplicate retransmit from before rd pointer, seq = %hu", + seq); + PacketBuffer::destroy(buf); + } else { + // Looks like a fast start fill-in. Go ahead and store it, assuming + // that we can fit it in the buffer. + uint32_t implied_ring_size = static_cast(norm_wr_seq) + + (rd_seq_ - seq); + + if (implied_ring_size >= (capacity_ - 1)) { + ALOGD("RXed what looks like a fast start packet (seq = %hu)," + " but packet is too far in the past to fit into the ring" + " buffer. Dropping.", seq); + PacketBuffer::destroy(buf); + } else { + ring_pos = (rd_ + capacity_ + seq - rd_seq_) % capacity_; + rd_seq_ = seq; + rd_ = ring_pos; + waiting_for_fast_start_ = false; + + CHECK(ring_pos < capacity_); + CHECK(NULL == ring_[ring_pos]); + ring_[ring_pos] = buf; + } + + } + return true; + } + + // Must be in case #4 with no overflow. This packet fits in the current + // ring buffer, but is discontiuguous. Advance the write pointer leaving a + // gap behind. + uint32_t gap_len = (ring_pos + capacity_ - wr_) % capacity_; + ALOGD("Drop detected; %u packets, seq_range [%hu, %hu]", + gap_len, + rd_seq_ + norm_wr_seq, + rd_seq_ + norm_wr_seq + gap_len - 1); + + CHECK(NULL == ring_[ring_pos]); + ring_[ring_pos] = buf; + wr_ = (ring_pos + 1) % capacity_; + CHECK(wr_ != rd_); + + return true; +} + +AAH_RXPlayer::PacketBuffer* +AAH_RXPlayer::RXRingBuffer::fetchBuffer(bool* is_discon) { + AutoMutex lock(&lock_); + CHECK(NULL != ring_); + CHECK(NULL != is_discon); + + // If the read seqence number is not known, then this ring buffer has not + // received a packet since being reset and there cannot be any packets to + // return. If we are still waiting for the first fast start packet to show + // up, we don't want to let any buffer be consumed yet because we expect to + // see a packet before the initial read sequence number show up shortly. + if (!rd_seq_known_ || waiting_for_fast_start_) { + *is_discon = false; + return NULL; + } + + PacketBuffer* ret = NULL; + *is_discon = !fetched_first_packet_; + + while ((rd_ != wr_) && (NULL == ret)) { + CHECK(rd_ < capacity_); + + // If we hit a gap, stall and do not advance the read pointer. Let the + // higher level code deal with requesting retries and/or deciding to + // skip the current gap. + ret = ring_[rd_]; + if (NULL == ret) { + break; + } + + ring_[rd_] = NULL; + rd_ = (rd_ + 1) % capacity_; + ++rd_seq_; + } + + if (NULL != ret) { + fetched_first_packet_ = true; + } + + return ret; +} + +AAH_RXPlayer::GapStatus +AAH_RXPlayer::RXRingBuffer::fetchCurrentGap(SeqNoGap* gap) { + AutoMutex lock(&lock_); + CHECK(NULL != ring_); + CHECK(NULL != gap); + + // If the read seqence number is not known, then this ring buffer has not + // received a packet since being reset and there cannot be any gaps. + if (!rd_seq_known_) { + return kGS_NoGap; + } + + // If we are waiting for fast start, then the current gap is a fast start + // gap and it includes all packets before the read sequence number. + if (waiting_for_fast_start_) { + gap->start_seq_ = + gap->end_seq_ = rd_seq_ - 1; + return kGS_FastStartGap; + } + + // If rd == wr, then the buffer is empty and there cannot be any gaps. + if (rd_ == wr_) { + return kGS_NoGap; + } + + // If rd_ is currently pointing at an unprocessed packet, then there is no + // current gap. + CHECK(rd_ < capacity_); + if (NULL != ring_[rd_]) { + return kGS_NoGap; + } + + // Looks like there must be a gap here. The start of the gap is the current + // rd sequence number, all we need to do now is determine its length in + // order to compute the end sequence number. + gap->start_seq_ = rd_seq_; + uint16_t end = rd_seq_; + uint32_t tmp = (rd_ + 1) % capacity_; + while ((tmp != wr_) && (NULL == ring_[tmp])) { + ++end; + tmp = (tmp + 1) % capacity_; + } + gap->end_seq_ = end; + + return kGS_NormalGap; +} + +void AAH_RXPlayer::RXRingBuffer::processNAK(const SeqNoGap* nak) { + AutoMutex lock(&lock_); + CHECK(NULL != ring_); + + // If we were waiting for our first fast start fill-in packet, and we + // received a NAK, then apparantly we are not getting our fast start. Just + // clear the waiting flag and go back to normal behavior. + if (waiting_for_fast_start_) { + waiting_for_fast_start_ = false; + } + + // If we have not received a packet since last reset, or there is no data in + // the ring, then there is nothing to skip. + if ((!rd_seq_known_) || (rd_ == wr_)) { + return; + } + + // If rd_ is currently pointing at an unprocessed packet, then there is no + // gap to skip. + CHECK(rd_ < capacity_); + if (NULL != ring_[rd_]) { + return; + } + + // Looks like there must be a gap here. Advance rd until we have passed + // over the portion of it indicated by nak (or all of the gap if nak is + // NULL). Then reset fetched_first_packet_ so that the next read will show + // up as being discontiguous. + uint16_t seq_after_gap = (NULL == nak) ? 0 : nak->end_seq_ + 1; + while ((rd_ != wr_) && + (NULL == ring_[rd_]) && + ((NULL == nak) || (seq_after_gap != rd_seq_))) { + rd_ = (rd_ + 1) % capacity_; + ++rd_seq_; + } + fetched_first_packet_ = false; +} + +int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() { + AutoMutex lock(&lock_); + + if (!rtp_activity_timeout_valid_) { + return -1; + } + + uint64_t now = monotonicUSecNow(); + if (rtp_activity_timeout_ <= now) { + return 0; + } + + return (rtp_activity_timeout_ - now) / 1000; +} + +AAH_RXPlayer::PacketBuffer* +AAH_RXPlayer::PacketBuffer::allocate(ssize_t length) { + if (length <= 0) { + return NULL; + } + + uint32_t alloc_len = sizeof(PacketBuffer) + length; + PacketBuffer* ret = reinterpret_cast( + new uint8_t[alloc_len]); + + if (NULL != ret) { + ret->length_ = length; + } + + return ret; +} + +void AAH_RXPlayer::PacketBuffer::destroy(PacketBuffer* pb) { + uint8_t* kill_me = reinterpret_cast(pb); + delete[] kill_me; +} + +} // namespace android diff --git a/media/libaah_rtp/aah_rx_player_substream.cpp b/media/libaah_rtp/aah_rx_player_substream.cpp new file mode 100644 index 0000000..1e4c784 --- /dev/null +++ b/media/libaah_rtp/aah_rx_player_substream.cpp @@ -0,0 +1,498 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +//#define LOG_NDEBUG 0 + +#include + +#include +#include +#include +#include +#include +#include + +#include "aah_rx_player.h" + +namespace android { + +int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = + 50ull * 1000; + +AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) { + ssrc_ = ssrc; + substream_details_known_ = false; + buffer_in_progress_ = NULL; + status_ = OK; + + decoder_ = new AAH_DecoderPump(omx); + if (decoder_ == NULL) { + ALOGE("%s failed to allocate decoder pump!", __PRETTY_FUNCTION__); + } + if (OK != decoder_->initCheck()) { + ALOGE("%s failed to initialize decoder pump!", __PRETTY_FUNCTION__); + } + + // cleanupBufferInProgress will reset most of the internal state variables. + // Just need to make sure that buffer_in_progress_ is NULL before calling. + cleanupBufferInProgress(); +} + + +void AAH_RXPlayer::Substream::shutdown() { + substream_meta_ = NULL; + status_ = OK; + cleanupBufferInProgress(); + cleanupDecoder(); +} + +void AAH_RXPlayer::Substream::cleanupBufferInProgress() { + if (NULL != buffer_in_progress_) { + buffer_in_progress_->release(); + buffer_in_progress_ = NULL; + } + + expected_buffer_size_ = 0; + buffer_filled_ = 0; + waiting_for_rap_ = true; +} + +void AAH_RXPlayer::Substream::cleanupDecoder() { + if (decoder_ != NULL) { + decoder_->shutdown(); + } +} + +bool AAH_RXPlayer::Substream::shouldAbort(const char* log_tag) { + // If we have already encountered a fatal error, do nothing. We are just + // waiting for our owner to shut us down now. + if (OK != status_) { + ALOGV("Skipping %s, substream has encountered fatal error (%d).", + log_tag, status_); + return true; + } + + return false; +} + +void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf, + uint32_t amt, + int32_t ts_lower) { + uint32_t min_length = 6; + + if (shouldAbort(__PRETTY_FUNCTION__)) { + return; + } + + // Do we have a buffer in progress already? If so, abort the buffer. In + // theory, this should never happen. If there were a discontinutity in the + // stream, the discon in the seq_nos at the RTP level should have already + // triggered a cleanup of the buffer in progress. To see a problem at this + // level is an indication either of a bug in the transmitter, or some form + // of terrible corruption/tampering on the wire. + if (NULL != buffer_in_progress_) { + ALOGE("processPayloadStart is aborting payload already in progress."); + cleanupBufferInProgress(); + } + + // Parse enough of the header to know where we stand. Since this is a + // payload start, it should begin with a TRTP header which has to be at + // least 6 bytes long. + if (amt < min_length) { + ALOGV("Discarding payload too short to contain TRTP header (len = %u)", + amt); + return; + } + + // Check the TRTP version number. + if (0x01 != buf[0]) { + ALOGV("Unexpected TRTP version (%u) in header. Expected %u.", + buf[0], 1); + return; + } + + // Extract the substream type field and make sure its one we understand (and + // one that does not conflict with any previously received substream type. + uint8_t header_type = (buf[1] >> 4) & 0xF; + switch (header_type) { + case 0x01: + // Audio, yay! Just break. We understand audio payloads. + break; + case 0x02: + ALOGV("RXed packet with unhandled TRTP header type (Video)."); + return; + case 0x03: + ALOGV("RXed packet with unhandled TRTP header type (Subpicture)."); + return; + case 0x04: + ALOGV("RXed packet with unhandled TRTP header type (Control)."); + return; + default: + ALOGV("RXed packet with unhandled TRTP header type (%u).", + header_type); + return; + } + + if (substream_details_known_ && (header_type != substream_type_)) { + ALOGV("RXed TRTP Payload for SSRC=0x%08x where header type (%u) does not" + " match previously received header type (%u)", + ssrc_, header_type, substream_type_); + return; + } + + // Check the flags to see if there is another 32 bits of timestamp present. + uint32_t trtp_header_len = 6; + bool ts_valid = buf[1] & 0x1; + if (ts_valid) { + min_length += 4; + trtp_header_len += 4; + if (amt < min_length) { + ALOGV("Discarding payload too short to contain TRTP timestamp" + " (len = %u)", amt); + return; + } + } + + // Extract the TRTP length field and sanity check it. + uint32_t trtp_len; + trtp_len = (static_cast(buf[2]) << 24) | + (static_cast(buf[3]) << 16) | + (static_cast(buf[4]) << 8) | + static_cast(buf[5]); + if (trtp_len < min_length) { + ALOGV("TRTP length (%u) is too short to be valid. Must be at least %u" + " bytes.", trtp_len, min_length); + return; + } + + // Extract the rest of the timestamp field if valid. + int64_t ts = 0; + uint32_t parse_offset = 6; + if (ts_valid) { + ts = (static_cast(buf[parse_offset ]) << 56) | + (static_cast(buf[parse_offset + 1]) << 48) | + (static_cast(buf[parse_offset + 2]) << 40) | + (static_cast(buf[parse_offset + 3]) << 32); + ts |= ts_lower; + parse_offset += 4; + } + + // Check the flags to see if there is another 24 bytes of timestamp + // transformation present. + if (buf[1] & 0x2) { + min_length += 24; + parse_offset += 24; + trtp_header_len += 24; + if (amt < min_length) { + ALOGV("Discarding payload too short to contain TRTP timestamp" + " transformation (len = %u)", amt); + return; + } + } + + // TODO : break the parsing into individual parsers for the different + // payload types (audio, video, etc). + // + // At this point in time, we know that this is audio. Go ahead and parse + // the basic header, check the codec type, and find the payload portion of + // the packet. + min_length += 3; + if (trtp_len < min_length) { + ALOGV("TRTP length (%u) is too short to be a valid audio payload. Must" + " be at least %u bytes.", trtp_len, min_length); + return; + } + + if (amt < min_length) { + ALOGV("TRTP porttion of RTP payload (%u bytes) too small to contain" + " entire TRTP header. TRTP does not currently support fragmenting" + " TRTP headers across RTP payloads", amt); + return; + } + + uint8_t codec_type = buf[parse_offset ]; + uint8_t flags = buf[parse_offset + 1]; + uint8_t volume = buf[parse_offset + 2]; + parse_offset += 3; + trtp_header_len += 3; + + if (!setupSubstreamType(header_type, codec_type)) { + return; + } + + if (decoder_ != NULL) { + decoder_->setRenderVolume(volume); + } + + // TODO : move all of the constant flag and offset definitions for TRTP up + // into some sort of common header file. + if (waiting_for_rap_ && !(flags & 0x08)) { + ALOGV("Dropping non-RAP TRTP Audio Payload while waiting for RAP."); + return; + } + + if (flags & 0x10) { + ALOGV("Dropping TRTP Audio Payload with aux codec data present (only" + " handle MP3 right now, and it has no aux data)"); + return; + } + + // OK - everything left is just payload. Compute the payload size, start + // the buffer in progress and pack as much payload as we can into it. If + // the payload is finished once we are done, go ahead and send the payload + // to the decoder. + expected_buffer_size_ = trtp_len - trtp_header_len; + if (!expected_buffer_size_) { + ALOGV("Dropping TRTP Audio Payload with 0 Access Unit length"); + return; + } + + CHECK(amt >= trtp_header_len); + uint32_t todo = amt - trtp_header_len; + if (expected_buffer_size_ < todo) { + ALOGV("Extra data (%u > %u) present in initial TRTP Audio Payload;" + " dropping payload.", todo, expected_buffer_size_); + return; + } + + buffer_filled_ = 0; + buffer_in_progress_ = new MediaBuffer(expected_buffer_size_); + if ((NULL == buffer_in_progress_) || + (NULL == buffer_in_progress_->data())) { + ALOGV("Failed to allocate MediaBuffer of length %u", + expected_buffer_size_); + cleanupBufferInProgress(); + return; + } + + sp meta = buffer_in_progress_->meta_data(); + if (meta == NULL) { + ALOGV("Missing metadata structure in allocated MediaBuffer; dropping" + " payload"); + cleanupBufferInProgress(); + return; + } + + // TODO : set this based on the codec type indicated in the TRTP stream. + // Right now, we only support MP3, so the choice is obvious. + meta->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_MPEG); + if (ts_valid) { + meta->setInt64(kKeyTime, ts); + } + + if (amt > 0) { + uint8_t* tgt = + reinterpret_cast(buffer_in_progress_->data()); + memcpy(tgt + buffer_filled_, buf + trtp_header_len, todo); + buffer_filled_ += amt; + } + + if (buffer_filled_ >= expected_buffer_size_) { + processCompletedBuffer(); + } +} + +void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf, + uint32_t amt) { + if (shouldAbort(__PRETTY_FUNCTION__)) { + return; + } + + if (NULL == buffer_in_progress_) { + ALOGV("TRTP Receiver skipping payload continuation; no buffer currently" + " in progress."); + return; + } + + CHECK(buffer_filled_ < expected_buffer_size_); + uint32_t buffer_left = expected_buffer_size_ - buffer_filled_; + if (amt > buffer_left) { + ALOGV("Extra data (%u > %u) present in continued TRTP Audio Payload;" + " dropping payload.", amt, buffer_left); + cleanupBufferInProgress(); + return; + } + + if (amt > 0) { + uint8_t* tgt = + reinterpret_cast(buffer_in_progress_->data()); + memcpy(tgt + buffer_filled_, buf, amt); + buffer_filled_ += amt; + } + + if (buffer_filled_ >= expected_buffer_size_) { + processCompletedBuffer(); + } +} + +void AAH_RXPlayer::Substream::processCompletedBuffer() { + const uint8_t* buffer_data = NULL; + int sample_rate; + int channel_count; + size_t frame_size; + status_t res; + + CHECK(NULL != buffer_in_progress_); + + if (decoder_ == NULL) { + ALOGV("Dropping complete buffer, no decoder pump allocated"); + goto bailout; + } + + buffer_data = reinterpret_cast(buffer_in_progress_->data()); + if (buffer_in_progress_->size() < 4) { + ALOGV("MP3 payload too short to contain header, dropping payload."); + goto bailout; + } + + // Extract the channel count and the sample rate from the MP3 header. The + // stagefright MP3 requires that these be delivered before decoing can + // begin. + if (!GetMPEGAudioFrameSize(U32_AT(buffer_data), + &frame_size, + &sample_rate, + &channel_count, + NULL, + NULL)) { + ALOGV("Failed to parse MP3 header in payload, droping payload."); + goto bailout; + } + + + // Make sure that our substream metadata is set up properly. If there has + // been a format change, be sure to reset the underlying decoder. In + // stagefright, it seems like the only way to do this is to destroy and + // recreate the decoder. + if (substream_meta_ == NULL) { + substream_meta_ = new MetaData(); + + if (substream_meta_ == NULL) { + ALOGE("Failed to allocate MetaData structure for substream"); + goto bailout; + } + + substream_meta_->setCString(kKeyMIMEType, MEDIA_MIMETYPE_AUDIO_MPEG); + substream_meta_->setInt32 (kKeyChannelCount, channel_count); + substream_meta_->setInt32 (kKeySampleRate, sample_rate); + } else { + int32_t prev_sample_rate; + int32_t prev_channel_count; + substream_meta_->findInt32(kKeySampleRate, &prev_sample_rate); + substream_meta_->findInt32(kKeyChannelCount, &prev_channel_count); + + if ((prev_channel_count != channel_count) || + (prev_sample_rate != sample_rate)) { + ALOGW("Format change detected, forcing decoder reset."); + cleanupDecoder(); + + substream_meta_->setInt32(kKeyChannelCount, channel_count); + substream_meta_->setInt32(kKeySampleRate, sample_rate); + } + } + + // If our decoder has not be set up, do so now. + res = decoder_->init(substream_meta_); + if (OK != res) { + ALOGE("Failed to init decoder (res = %d)", res); + cleanupDecoder(); + substream_meta_ = NULL; + goto bailout; + } + + // Queue the payload for decode. + res = decoder_->queueForDecode(buffer_in_progress_); + + if (res != OK) { + ALOGD("Failed to queue payload for decode, resetting decoder pump!" + " (res = %d)", res); + status_ = res; + cleanupDecoder(); + cleanupBufferInProgress(); + } + + // NULL out buffer_in_progress before calling the cleanup helper. + // + // MediaBuffers use something of a hybrid ref-counting pattern which prevent + // the AAH_DecoderPump's input queue from adding their own reference to the + // MediaBuffer. MediaBuffers start life with a reference count of 0, as + // well as an observer which starts as NULL. Before being given an + // observer, the ref count cannot be allowed to become non-zero as it will + // cause calls to release() to assert. Basically, before a MediaBuffer has + // an observer, they behave like non-ref counted obects where release() + // serves the roll of delete. After a MediaBuffer has an observer, they + // become more like ref counted objects where add ref and release can be + // used, and when the ref count hits zero, the MediaBuffer is handed off to + // the observer. + // + // Given all of this, when we give the buffer to the decoder pump to wait in + // the to-be-processed queue, the decoder cannot add a ref to the buffer as + // it would in a traditional ref counting system. Instead it needs to + // "steal" the non-existent ref. In the case of queue failure, we need to + // make certain to release this non-existent reference so that the buffer is + // cleaned up during the cleanupBufferInProgress helper. In the case of a + // successful queue operation, we need to make certain that the + // cleanupBufferInProgress helper does not release the buffer since it needs + // to remain alive in the queue. We acomplish this by NULLing out the + // buffer pointer before calling the cleanup helper. + buffer_in_progress_ = NULL; + +bailout: + cleanupBufferInProgress(); +} + + +void AAH_RXPlayer::Substream::processTSTransform(const LinearTransform& trans) { + if (decoder_ != NULL) { + decoder_->setRenderTSTransform(trans); + } +} + +bool AAH_RXPlayer::Substream::isAboutToUnderflow() { + if (decoder_ == NULL) { + return false; + } + + return decoder_->isAboutToUnderflow(kAboutToUnderflowThreshold); +} + +bool AAH_RXPlayer::Substream::setupSubstreamType(uint8_t substream_type, + uint8_t codec_type) { + // Sanity check the codec type. Right now we only support MP3. Also check + // for conflicts with previously delivered codec types. + if (substream_details_known_ && (codec_type != codec_type_)) { + ALOGV("RXed TRTP Payload for SSRC=0x%08x where codec type (%u) does not" + " match previously received codec type (%u)", + ssrc_, codec_type, codec_type_); + return false; + } + + if (codec_type != 0x03) { + ALOGV("RXed TRTP Audio Payload for SSRC=0x%08x with unsupported codec" + " type (%u)", ssrc_, codec_type); + return false; + } + + if (!substream_details_known_) { + substream_type_ = substream_type; + codec_type_ = codec_type; + substream_details_known_ = true; + } + + return true; +} + +} // namespace android diff --git a/media/libaah_rtp/aah_tx_packet.cpp b/media/libaah_rtp/aah_tx_packet.cpp new file mode 100644 index 0000000..3f6e0e9 --- /dev/null +++ b/media/libaah_rtp/aah_tx_packet.cpp @@ -0,0 +1,331 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +#include + +#include +#include + +#include + +#include "aah_tx_packet.h" + +namespace android { + +const int TRTPPacket::kRTPHeaderLen; +const uint32_t TRTPPacket::kTRTPEpochMask; + +TRTPPacket::~TRTPPacket() { + delete mPacket; +} + +/*** TRTP packet properties ***/ + +void TRTPPacket::setSeqNumber(uint16_t val) { + mSeqNumber = val; + + if (mIsPacked) { + const int kTRTPSeqNumberOffset = 2; + uint16_t* buf = reinterpret_cast( + mPacket + kTRTPSeqNumberOffset); + *buf = htons(mSeqNumber); + } +} + +uint16_t TRTPPacket::getSeqNumber() const { + return mSeqNumber; +} + +void TRTPPacket::setPTS(int64_t val) { + CHECK(!mIsPacked); + mPTS = val; + mPTSValid = true; +} + +int64_t TRTPPacket::getPTS() const { + return mPTS; +} + +void TRTPPacket::setEpoch(uint32_t val) { + mEpoch = val; + + if (mIsPacked) { + const int kTRTPEpochOffset = 8; + uint32_t* buf = reinterpret_cast( + mPacket + kTRTPEpochOffset); + uint32_t val = ntohl(*buf); + val &= ~(kTRTPEpochMask << kTRTPEpochShift); + val |= (mEpoch & kTRTPEpochMask) << kTRTPEpochShift; + *buf = htonl(val); + } +} + +void TRTPPacket::setProgramID(uint16_t val) { + CHECK(!mIsPacked); + mProgramID = val; +} + +void TRTPPacket::setSubstreamID(uint16_t val) { + CHECK(!mIsPacked); + mSubstreamID = val; +} + + +void TRTPPacket::setClockTransform(const LinearTransform& trans) { + CHECK(!mIsPacked); + mClockTranform = trans; + mClockTranformValid = true; +} + +uint8_t* TRTPPacket::getPacket() const { + CHECK(mIsPacked); + return mPacket; +} + +int TRTPPacket::getPacketLen() const { + CHECK(mIsPacked); + return mPacketLen; +} + +void TRTPPacket::setExpireTime(nsecs_t val) { + CHECK(!mIsPacked); + mExpireTime = val; +} + +nsecs_t TRTPPacket::getExpireTime() const { + return mExpireTime; +} + +/*** TRTP audio packet properties ***/ + +void TRTPAudioPacket::setCodecType(TRTPAudioCodecType val) { + CHECK(!mIsPacked); + mCodecType = val; +} + +void TRTPAudioPacket::setRandomAccessPoint(bool val) { + CHECK(!mIsPacked); + mRandomAccessPoint = val; +} + +void TRTPAudioPacket::setDropable(bool val) { + CHECK(!mIsPacked); + mDropable = val; +} + +void TRTPAudioPacket::setDiscontinuity(bool val) { + CHECK(!mIsPacked); + mDiscontinuity = val; +} + +void TRTPAudioPacket::setEndOfStream(bool val) { + CHECK(!mIsPacked); + mEndOfStream = val; +} + +void TRTPAudioPacket::setVolume(uint8_t val) { + CHECK(!mIsPacked); + mVolume = val; +} + +void TRTPAudioPacket::setAccessUnitData(void* data, int len) { + CHECK(!mIsPacked); + mAccessUnitData = data; + mAccessUnitLen = len; +} + +/*** TRTP control packet properties ***/ + +void TRTPControlPacket::setCommandID(TRTPCommandID val) { + CHECK(!mIsPacked); + mCommandID = val; +} + +/*** TRTP packet serializers ***/ + +void TRTPPacket::writeU8(uint8_t*& buf, uint8_t val) { + *buf = val; + buf++; +} + +void TRTPPacket::writeU16(uint8_t*& buf, uint16_t val) { + *reinterpret_cast(buf) = htons(val); + buf += 2; +} + +void TRTPPacket::writeU32(uint8_t*& buf, uint32_t val) { + *reinterpret_cast(buf) = htonl(val); + buf += 4; +} + +void TRTPPacket::writeU64(uint8_t*& buf, uint64_t val) { + buf[0] = static_cast(val >> 56); + buf[1] = static_cast(val >> 48); + buf[2] = static_cast(val >> 40); + buf[3] = static_cast(val >> 32); + buf[4] = static_cast(val >> 24); + buf[5] = static_cast(val >> 16); + buf[6] = static_cast(val >> 8); + buf[7] = static_cast(val); + buf += 8; +} + +void TRTPPacket::writeTRTPHeader(uint8_t*& buf, + bool isFirstFragment, + int totalPacketLen) { + // RTP header + writeU8(buf, + ((mVersion & 0x03) << 6) | + (static_cast(mPadding) << 5) | + (static_cast(mExtension) << 4) | + (mCsrcCount & 0x0F)); + writeU8(buf, + (static_cast(isFirstFragment) << 7) | + (mPayloadType & 0x7F)); + writeU16(buf, mSeqNumber); + if (isFirstFragment && mPTSValid) { + writeU32(buf, mPTS & 0xFFFFFFFF); + } else { + writeU32(buf, 0); + } + writeU32(buf, + ((mEpoch & kTRTPEpochMask) << kTRTPEpochShift) | + ((mProgramID & 0x1F) << 5) | + (mSubstreamID & 0x1F)); + + // TRTP header + writeU8(buf, mTRTPVersion); + writeU8(buf, + ((mTRTPHeaderType & 0x0F) << 4) | + (mClockTranformValid ? 0x02 : 0x00) | + (mPTSValid ? 0x01 : 0x00)); + writeU32(buf, totalPacketLen - kRTPHeaderLen); + if (mPTSValid) { + writeU32(buf, mPTS >> 32); + } + + if (mClockTranformValid) { + writeU64(buf, mClockTranform.a_zero); + writeU32(buf, mClockTranform.a_to_b_numer); + writeU32(buf, mClockTranform.a_to_b_denom); + writeU64(buf, mClockTranform.b_zero); + } +} + +bool TRTPAudioPacket::pack() { + if (mIsPacked) { + return false; + } + + int packetLen = kRTPHeaderLen + + mAccessUnitLen + + TRTPHeaderLen(); + + // TODO : support multiple fragments + const int kMaxUDPPayloadLen = 65507; + if (packetLen > kMaxUDPPayloadLen) { + return false; + } + + mPacket = new uint8_t[packetLen]; + if (!mPacket) { + return false; + } + + mPacketLen = packetLen; + + uint8_t* cur = mPacket; + + writeTRTPHeader(cur, true, packetLen); + writeU8(cur, mCodecType); + writeU8(cur, + (static_cast(mRandomAccessPoint) << 3) | + (static_cast(mDropable) << 2) | + (static_cast(mDiscontinuity) << 1) | + (static_cast(mEndOfStream))); + writeU8(cur, mVolume); + + memcpy(cur, mAccessUnitData, mAccessUnitLen); + + mIsPacked = true; + return true; +} + +int TRTPPacket::TRTPHeaderLen() const { + // 6 bytes for version, payload type, flags and length. An additional 4 if + // there are upper timestamp bits present and another 24 if there is a clock + // transformation present. + return 6 + + (mClockTranformValid ? 24 : 0) + + (mPTSValid ? 4 : 0); +} + +int TRTPAudioPacket::TRTPHeaderLen() const { + // TRTPPacket::TRTPHeaderLen() for the base TRTPHeader. 3 bytes for audio's + // codec type, flags and volume field. Another 5 bytes if the codec type is + // PCM and we are sending sample rate/channel count. as well as however long + // the aux data (if present) is. + + int pcmParamLength; + switch(mCodecType) { + case kCodecPCMBigEndian: + case kCodecPCMLittleEndian: + pcmParamLength = 5; + break; + + default: + pcmParamLength = 0; + break; + } + + + // TODO : properly compute aux data length. Currently, nothing + // uses aux data, so its length is always 0. + int auxDataLength = 0; + return TRTPPacket::TRTPHeaderLen() + + 3 + + auxDataLength + + pcmParamLength; +} + +bool TRTPControlPacket::pack() { + if (mIsPacked) { + return false; + } + + // command packets contain a 2-byte command ID + int packetLen = kRTPHeaderLen + + TRTPHeaderLen() + + 2; + + mPacket = new uint8_t[packetLen]; + if (!mPacket) { + return false; + } + + mPacketLen = packetLen; + + uint8_t* cur = mPacket; + + writeTRTPHeader(cur, true, packetLen); + writeU16(cur, mCommandID); + + mIsPacked = true; + return true; +} + +} // namespace android diff --git a/media/libaah_rtp/aah_tx_packet.h b/media/libaah_rtp/aah_tx_packet.h new file mode 100644 index 0000000..833803e --- /dev/null +++ b/media/libaah_rtp/aah_tx_packet.h @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __AAH_TX_PACKET_H__ +#define __AAH_TX_PACKET_H__ + +#include +#include +#include +#include + +namespace android { + +class TRTPPacket : public RefBase { + protected: + enum TRTPHeaderType { + kHeaderTypeAudio = 1, + kHeaderTypeVideo = 2, + kHeaderTypeSubpicture = 3, + kHeaderTypeControl = 4, + }; + + TRTPPacket(TRTPHeaderType headerType) + : mIsPacked(false) + , mVersion(2) + , mPadding(false) + , mExtension(false) + , mCsrcCount(0) + , mPayloadType(100) + , mSeqNumber(0) + , mPTSValid(false) + , mPTS(0) + , mEpoch(0) + , mProgramID(0) + , mSubstreamID(0) + , mClockTranformValid(false) + , mTRTPVersion(1) + , mTRTPLength(0) + , mTRTPHeaderType(headerType) + , mPacket(NULL) + , mPacketLen(0) { } + + public: + virtual ~TRTPPacket(); + + void setSeqNumber(uint16_t val); + uint16_t getSeqNumber() const; + + void setPTS(int64_t val); + int64_t getPTS() const; + + void setEpoch(uint32_t val); + void setProgramID(uint16_t val); + void setSubstreamID(uint16_t val); + void setClockTransform(const LinearTransform& trans); + + uint8_t* getPacket() const; + int getPacketLen() const; + + void setExpireTime(nsecs_t val); + nsecs_t getExpireTime() const; + + virtual bool pack() = 0; + + // mask for the number of bits in a TRTP epoch + static const uint32_t kTRTPEpochMask = (1 << 22) - 1; + static const int kTRTPEpochShift = 10; + + protected: + static const int kRTPHeaderLen = 12; + virtual int TRTPHeaderLen() const; + + void writeTRTPHeader(uint8_t*& buf, + bool isFirstFragment, + int totalPacketLen); + + void writeU8(uint8_t*& buf, uint8_t val); + void writeU16(uint8_t*& buf, uint16_t val); + void writeU32(uint8_t*& buf, uint32_t val); + void writeU64(uint8_t*& buf, uint64_t val); + + bool mIsPacked; + + uint8_t mVersion; + bool mPadding; + bool mExtension; + uint8_t mCsrcCount; + uint8_t mPayloadType; + uint16_t mSeqNumber; + bool mPTSValid; + int64_t mPTS; + uint32_t mEpoch; + uint16_t mProgramID; + uint16_t mSubstreamID; + LinearTransform mClockTranform; + bool mClockTranformValid; + uint8_t mTRTPVersion; + uint32_t mTRTPLength; + TRTPHeaderType mTRTPHeaderType; + + uint8_t* mPacket; + int mPacketLen; + + nsecs_t mExpireTime; + + DISALLOW_EVIL_CONSTRUCTORS(TRTPPacket); +}; + +class TRTPAudioPacket : public TRTPPacket { + public: + TRTPAudioPacket() + : TRTPPacket(kHeaderTypeAudio) + , mCodecType(kCodecInvalid) + , mRandomAccessPoint(false) + , mDropable(false) + , mDiscontinuity(false) + , mEndOfStream(false) + , mVolume(0) + , mAccessUnitData(NULL) { } + + enum TRTPAudioCodecType { + kCodecInvalid = 0, + kCodecPCMBigEndian = 1, + kCodecPCMLittleEndian = 2, + kCodecMPEG1Audio = 3, + }; + + void setCodecType(TRTPAudioCodecType val); + void setRandomAccessPoint(bool val); + void setDropable(bool val); + void setDiscontinuity(bool val); + void setEndOfStream(bool val); + void setVolume(uint8_t val); + void setAccessUnitData(void* data, int len); + + virtual bool pack(); + + protected: + virtual int TRTPHeaderLen() const; + + private: + TRTPAudioCodecType mCodecType; + bool mRandomAccessPoint; + bool mDropable; + bool mDiscontinuity; + bool mEndOfStream; + uint8_t mVolume; + void* mAccessUnitData; + int mAccessUnitLen; + + DISALLOW_EVIL_CONSTRUCTORS(TRTPAudioPacket); +}; + +class TRTPControlPacket : public TRTPPacket { + public: + TRTPControlPacket() + : TRTPPacket(kHeaderTypeControl) + , mCommandID(kCommandNop) {} + + enum TRTPCommandID { + kCommandNop = 1, + kCommandFlush = 2, + kCommandEOS = 3, + }; + + void setCommandID(TRTPCommandID val); + + virtual bool pack(); + + private: + TRTPCommandID mCommandID; + + DISALLOW_EVIL_CONSTRUCTORS(TRTPControlPacket); +}; + +} // namespace android + +#endif // __AAH_TX_PLAYER_H__ diff --git a/media/libaah_rtp/aah_tx_player.cpp b/media/libaah_rtp/aah_tx_player.cpp new file mode 100644 index 0000000..a79a989 --- /dev/null +++ b/media/libaah_rtp/aah_tx_player.cpp @@ -0,0 +1,1139 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +#include + +#define __STDC_FORMAT_MACROS +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "aah_tx_packet.h" +#include "aah_tx_player.h" + +namespace android { + +static int64_t kLowWaterMarkUs = 2000000ll; // 2secs +static int64_t kHighWaterMarkUs = 10000000ll; // 10secs +static const size_t kLowWaterMarkBytes = 40000; +static const size_t kHighWaterMarkBytes = 200000; + +// When we start up, how much lead time should we put on the first access unit? +static const int64_t kAAHStartupLeadTimeUs = 300000LL; + +// How much time do we attempt to lead the clock by in steady state? +static const int64_t kAAHBufferTimeUs = 1000000LL; + +// how long do we keep data in our retransmit buffer after sending it. +const int64_t AAH_TXPlayer::kAAHRetryKeepAroundTimeNs = + kAAHBufferTimeUs * 1100; + +sp createAAH_TXPlayer() { + sp ret = new AAH_TXPlayer(); + return ret; +} + +template static T clamp(T val, T min, T max) { + if (val < min) { + return min; + } else if (val > max) { + return max; + } else { + return val; + } +} + +struct AAH_TXEvent : public TimedEventQueue::Event { + AAH_TXEvent(AAH_TXPlayer *player, + void (AAH_TXPlayer::*method)()) : mPlayer(player) + , mMethod(method) {} + + protected: + virtual ~AAH_TXEvent() {} + + virtual void fire(TimedEventQueue *queue, int64_t /* now_us */) { + (mPlayer->*mMethod)(); + } + + private: + AAH_TXPlayer *mPlayer; + void (AAH_TXPlayer::*mMethod)(); + + AAH_TXEvent(const AAH_TXEvent &); + AAH_TXEvent& operator=(const AAH_TXEvent &); +}; + +AAH_TXPlayer::AAH_TXPlayer() + : mQueueStarted(false) + , mFlags(0) + , mExtractorFlags(0) { + DataSource::RegisterDefaultSniffers(); + + mBufferingEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onBufferingUpdate); + mBufferingEventPending = false; + + mPumpAudioEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onPumpAudio); + mPumpAudioEventPending = false; + + reset_l(); +} + +AAH_TXPlayer::~AAH_TXPlayer() { + if (mQueueStarted) { + mQueue.stop(); + } + + reset_l(); +} + +void AAH_TXPlayer::cancelPlayerEvents(bool keepBufferingGoing) { + if (!keepBufferingGoing) { + mQueue.cancelEvent(mBufferingEvent->eventID()); + mBufferingEventPending = false; + + mQueue.cancelEvent(mPumpAudioEvent->eventID()); + mPumpAudioEventPending = false; + } +} + +status_t AAH_TXPlayer::initCheck() { + // Check for the presense of the common time service by attempting to query + // for CommonTime's frequency. If we get an error back, we cannot talk to + // the service at all and should abort now. + status_t res; + uint64_t freq; + res = mCCHelper.getCommonFreq(&freq); + if (OK != res) { + ALOGE("Failed to connect to common time service! (res %d)", res); + return res; + } + + return OK; +} + +status_t AAH_TXPlayer::setDataSource( + const char *url, + const KeyedVector *headers) { + Mutex::Autolock autoLock(mLock); + return setDataSource_l(url, headers); +} + +status_t AAH_TXPlayer::setDataSource_l( + const char *url, + const KeyedVector *headers) { + reset_l(); + + // the URL must consist of "aahTX://" followed by the real URL of + // the data source + const char *kAAHPrefix = "aahTX://"; + if (strncasecmp(url, kAAHPrefix, strlen(kAAHPrefix))) { + return INVALID_OPERATION; + } + + mUri.setTo(url + strlen(kAAHPrefix)); + + if (headers) { + mUriHeaders = *headers; + + ssize_t index = mUriHeaders.indexOfKey(String8("x-hide-urls-from-log")); + if (index >= 0) { + // Browser is in "incognito" mode, suppress logging URLs. + + // This isn't something that should be passed to the server. + mUriHeaders.removeItemsAt(index); + + mFlags |= INCOGNITO; + } + } + + // The URL may optionally contain a "#" character followed by a Skyjam + // cookie. Ideally the cookie header should just be passed in the headers + // argument, but the Java API for supplying headers is apparently not yet + // exposed in the SDK used by application developers. + const char kSkyjamCookieDelimiter = '#'; + char* skyjamCookie = strrchr(mUri.string(), kSkyjamCookieDelimiter); + if (skyjamCookie) { + skyjamCookie++; + mUriHeaders.add(String8("Cookie"), String8(skyjamCookie)); + mUri = String8(mUri.string(), skyjamCookie - mUri.string()); + } + + return OK; +} + +status_t AAH_TXPlayer::setDataSource(int fd, int64_t offset, int64_t length) { + Mutex::Autolock autoLock(mLock); + + reset_l(); + + sp dataSource = new FileSource(dup(fd), offset, length); + + status_t err = dataSource->initCheck(); + + if (err != OK) { + return err; + } + + mFileSource = dataSource; + + sp extractor = MediaExtractor::Create(dataSource); + + if (extractor == NULL) { + return UNKNOWN_ERROR; + } + + return setDataSource_l(extractor); +} + +status_t AAH_TXPlayer::setVideoSurface(const sp& surface) { + return OK; +} + +status_t AAH_TXPlayer::setVideoSurfaceTexture( + const sp& surfaceTexture) { + return OK; +} + +status_t AAH_TXPlayer::prepare() { + return INVALID_OPERATION; +} + +status_t AAH_TXPlayer::prepareAsync() { + Mutex::Autolock autoLock(mLock); + + return prepareAsync_l(); +} + +status_t AAH_TXPlayer::prepareAsync_l() { + if (mFlags & PREPARING) { + return UNKNOWN_ERROR; // async prepare already pending + } + + mAAH_Sender = AAH_TXSender::GetInstance(); + if (mAAH_Sender == NULL) { + return NO_MEMORY; + } + + if (!mQueueStarted) { + mQueue.start(); + mQueueStarted = true; + } + + mFlags |= PREPARING; + mAsyncPrepareEvent = new AAH_TXEvent( + this, &AAH_TXPlayer::onPrepareAsyncEvent); + + mQueue.postEvent(mAsyncPrepareEvent); + + return OK; +} + +status_t AAH_TXPlayer::finishSetDataSource_l() { + sp dataSource; + + if (!strncasecmp("http://", mUri.string(), 7) || + !strncasecmp("https://", mUri.string(), 8)) { + + mConnectingDataSource = HTTPBase::Create( + (mFlags & INCOGNITO) + ? HTTPBase::kFlagIncognito + : 0); + + mLock.unlock(); + status_t err = mConnectingDataSource->connect(mUri, &mUriHeaders); + mLock.lock(); + + if (err != OK) { + mConnectingDataSource.clear(); + + ALOGI("mConnectingDataSource->connect() returned %d", err); + return err; + } + + mCachedSource = new NuCachedSource2(mConnectingDataSource); + mConnectingDataSource.clear(); + + dataSource = mCachedSource; + + // We're going to prefill the cache before trying to instantiate + // the extractor below, as the latter is an operation that otherwise + // could block on the datasource for a significant amount of time. + // During that time we'd be unable to abort the preparation phase + // without this prefill. + + mLock.unlock(); + + for (;;) { + status_t finalStatus; + size_t cachedDataRemaining = + mCachedSource->approxDataRemaining(&finalStatus); + + if (finalStatus != OK || + cachedDataRemaining >= kHighWaterMarkBytes || + (mFlags & PREPARE_CANCELLED)) { + break; + } + + usleep(200000); + } + + mLock.lock(); + + if (mFlags & PREPARE_CANCELLED) { + ALOGI("Prepare cancelled while waiting for initial cache fill."); + return UNKNOWN_ERROR; + } + } else { + dataSource = DataSource::CreateFromURI(mUri.string(), &mUriHeaders); + } + + if (dataSource == NULL) { + return UNKNOWN_ERROR; + } + + sp extractor = MediaExtractor::Create(dataSource); + + if (extractor == NULL) { + return UNKNOWN_ERROR; + } + + return setDataSource_l(extractor); +} + +status_t AAH_TXPlayer::setDataSource_l(const sp &extractor) { + // Attempt to approximate overall stream bitrate by summing all + // tracks' individual bitrates, if not all of them advertise bitrate, + // we have to fail. + + int64_t totalBitRate = 0; + + for (size_t i = 0; i < extractor->countTracks(); ++i) { + sp meta = extractor->getTrackMetaData(i); + + int32_t bitrate; + if (!meta->findInt32(kKeyBitRate, &bitrate)) { + totalBitRate = -1; + break; + } + + totalBitRate += bitrate; + } + + mBitrate = totalBitRate; + + ALOGV("mBitrate = %lld bits/sec", mBitrate); + + bool haveAudio = false; + for (size_t i = 0; i < extractor->countTracks(); ++i) { + sp meta = extractor->getTrackMetaData(i); + + const char *mime; + CHECK(meta->findCString(kKeyMIMEType, &mime)); + + if (!strncasecmp(mime, "audio/", 6)) { + mAudioSource = extractor->getTrack(i); + CHECK(mAudioSource != NULL); + haveAudio = true; + break; + } + } + + if (!haveAudio) { + return UNKNOWN_ERROR; + } + + mExtractorFlags = extractor->flags(); + + return OK; +} + +void AAH_TXPlayer::abortPrepare(status_t err) { + CHECK(err != OK); + + notifyListener_l(MEDIA_ERROR, MEDIA_ERROR_UNKNOWN, err); + + mPrepareResult = err; + mFlags &= ~(PREPARING|PREPARE_CANCELLED|PREPARING_CONNECTED); + mPreparedCondition.broadcast(); +} + +void AAH_TXPlayer::onPrepareAsyncEvent() { + Mutex::Autolock autoLock(mLock); + + if (mFlags & PREPARE_CANCELLED) { + ALOGI("prepare was cancelled before doing anything"); + abortPrepare(UNKNOWN_ERROR); + return; + } + + if (mUri.size() > 0) { + status_t err = finishSetDataSource_l(); + + if (err != OK) { + abortPrepare(err); + return; + } + } + + mAudioSource->getFormat()->findInt64(kKeyDuration, &mDurationUs); + + status_t err = mAudioSource->start(); + if (err != OK) { + ALOGI("failed to start audio source, err=%d", err); + abortPrepare(err); + return; + } + + mFlags |= PREPARING_CONNECTED; + + if (mCachedSource != NULL) { + postBufferingEvent_l(); + } else { + finishAsyncPrepare_l(); + } +} + +void AAH_TXPlayer::finishAsyncPrepare_l() { + notifyListener_l(MEDIA_PREPARED); + + mPrepareResult = OK; + mFlags &= ~(PREPARING|PREPARE_CANCELLED|PREPARING_CONNECTED); + mFlags |= PREPARED; + mPreparedCondition.broadcast(); +} + +status_t AAH_TXPlayer::start() { + Mutex::Autolock autoLock(mLock); + + mFlags &= ~CACHE_UNDERRUN; + + return play_l(); +} + +status_t AAH_TXPlayer::play_l() { + if (mFlags & PLAYING) { + return OK; + } + + if (!(mFlags & PREPARED)) { + return INVALID_OPERATION; + } + + { + Mutex::Autolock lock(mEndpointLock); + if (!mEndpointValid) { + return INVALID_OPERATION; + } + if (!mEndpointRegistered) { + mProgramID = mAAH_Sender->registerEndpoint(mEndpoint); + mEndpointRegistered = true; + } + } + + mFlags |= PLAYING; + + updateClockTransform_l(false); + + postPumpAudioEvent_l(-1); + + return OK; +} + +status_t AAH_TXPlayer::stop() { + status_t ret = pause(); + sendEOS_l(); + return ret; +} + +status_t AAH_TXPlayer::pause() { + Mutex::Autolock autoLock(mLock); + + mFlags &= ~CACHE_UNDERRUN; + + return pause_l(); +} + +status_t AAH_TXPlayer::pause_l(bool doClockUpdate) { + if (!(mFlags & PLAYING)) { + return OK; + } + + cancelPlayerEvents(true /* keepBufferingGoing */); + + mFlags &= ~PLAYING; + + if (doClockUpdate) { + updateClockTransform_l(true); + } + + return OK; +} + +void AAH_TXPlayer::updateClockTransform_l(bool pause) { + // record the new pause status so that onPumpAudio knows what rate to apply + // when it initializes the transform + mPlayRateIsPaused = pause; + + // if we haven't yet established a valid clock transform, then we can't + // do anything here + if (!mCurrentClockTransformValid) { + return; + } + + // sample the current common time + int64_t commonTimeNow; + if (OK != mCCHelper.getCommonTime(&commonTimeNow)) { + ALOGE("updateClockTransform_l get common time failed"); + mCurrentClockTransformValid = false; + return; + } + + // convert the current common time to media time using the old + // transform + int64_t mediaTimeNow; + if (!mCurrentClockTransform.doReverseTransform( + commonTimeNow, &mediaTimeNow)) { + ALOGE("updateClockTransform_l reverse transform failed"); + mCurrentClockTransformValid = false; + return; + } + + // calculate a new transform that preserves the old transform's + // result for the current time + mCurrentClockTransform.a_zero = mediaTimeNow; + mCurrentClockTransform.b_zero = commonTimeNow; + mCurrentClockTransform.a_to_b_numer = 1; + mCurrentClockTransform.a_to_b_denom = pause ? 0 : 1; + + // send a packet announcing the new transform + sp packet = new TRTPControlPacket(); + packet->setClockTransform(mCurrentClockTransform); + packet->setCommandID(TRTPControlPacket::kCommandNop); + queuePacketToSender_l(packet); +} + +void AAH_TXPlayer::sendEOS_l() { + sp packet = new TRTPControlPacket(); + packet->setCommandID(TRTPControlPacket::kCommandEOS); + queuePacketToSender_l(packet); +} + +bool AAH_TXPlayer::isPlaying() { + return (mFlags & PLAYING) || (mFlags & CACHE_UNDERRUN); +} + +status_t AAH_TXPlayer::seekTo(int msec) { + if (mExtractorFlags & MediaExtractor::CAN_SEEK) { + Mutex::Autolock autoLock(mLock); + return seekTo_l(static_cast(msec) * 1000); + } + + notifyListener_l(MEDIA_SEEK_COMPLETE); + return OK; +} + +status_t AAH_TXPlayer::seekTo_l(int64_t timeUs) { + mIsSeeking = true; + mSeekTimeUs = timeUs; + + mCurrentClockTransformValid = false; + mLastQueuedMediaTimePTSValid = false; + + // send a flush command packet + sp packet = new TRTPControlPacket(); + packet->setCommandID(TRTPControlPacket::kCommandFlush); + queuePacketToSender_l(packet); + + return OK; +} + +status_t AAH_TXPlayer::getCurrentPosition(int *msec) { + if (!msec) { + return BAD_VALUE; + } + + Mutex::Autolock lock(mLock); + + int position; + + if (mIsSeeking) { + position = mSeekTimeUs / 1000; + } else if (mCurrentClockTransformValid) { + // sample the current common time + int64_t commonTimeNow; + if (OK != mCCHelper.getCommonTime(&commonTimeNow)) { + ALOGE("getCurrentPosition get common time failed"); + return INVALID_OPERATION; + } + + int64_t mediaTimeNow; + if (!mCurrentClockTransform.doReverseTransform(commonTimeNow, + &mediaTimeNow)) { + ALOGE("getCurrentPosition reverse transform failed"); + return INVALID_OPERATION; + } + + position = static_cast(mediaTimeNow / 1000); + } else { + position = 0; + } + + int duration; + if (getDuration_l(&duration) == OK) { + *msec = clamp(position, 0, duration); + } else { + *msec = (position >= 0) ? position : 0; + } + + return OK; +} + +status_t AAH_TXPlayer::getDuration(int* msec) { + if (!msec) { + return BAD_VALUE; + } + + Mutex::Autolock lock(mLock); + + return getDuration_l(msec); +} + +status_t AAH_TXPlayer::getDuration_l(int* msec) { + if (mDurationUs < 0) { + return UNKNOWN_ERROR; + } + + *msec = (mDurationUs + 500) / 1000; + + return OK; +} + +status_t AAH_TXPlayer::reset() { + Mutex::Autolock autoLock(mLock); + reset_l(); + return OK; +} + +void AAH_TXPlayer::reset_l() { + if (mFlags & PREPARING) { + mFlags |= PREPARE_CANCELLED; + if (mConnectingDataSource != NULL) { + ALOGI("interrupting the connection process"); + mConnectingDataSource->disconnect(); + } + + if (mFlags & PREPARING_CONNECTED) { + // We are basically done preparing, we're just buffering + // enough data to start playback, we can safely interrupt that. + finishAsyncPrepare_l(); + } + } + + while (mFlags & PREPARING) { + mPreparedCondition.wait(mLock); + } + + cancelPlayerEvents(); + + sendEOS_l(); + + mCachedSource.clear(); + + if (mAudioSource != NULL) { + mAudioSource->stop(); + } + mAudioSource.clear(); + + mFlags = 0; + mExtractorFlags = 0; + + mDurationUs = -1; + mIsSeeking = false; + mSeekTimeUs = 0; + + mUri.setTo(""); + mUriHeaders.clear(); + + mFileSource.clear(); + + mBitrate = -1; + + { + Mutex::Autolock lock(mEndpointLock); + if (mAAH_Sender != NULL && mEndpointRegistered) { + mAAH_Sender->unregisterEndpoint(mEndpoint); + } + mEndpointRegistered = false; + mEndpointValid = false; + } + + mProgramID = 0; + + mAAH_Sender.clear(); + mLastQueuedMediaTimePTSValid = false; + mCurrentClockTransformValid = false; + mPlayRateIsPaused = false; + + mTRTPVolume = 255; +} + +status_t AAH_TXPlayer::setLooping(int loop) { + return OK; +} + +player_type AAH_TXPlayer::playerType() { + return AAH_TX_PLAYER; +} + +status_t AAH_TXPlayer::setParameter(int key, const Parcel &request) { + return ERROR_UNSUPPORTED; +} + +status_t AAH_TXPlayer::getParameter(int key, Parcel *reply) { + return ERROR_UNSUPPORTED; +} + +status_t AAH_TXPlayer::invoke(const Parcel& request, Parcel *reply) { + if (!reply) { + return BAD_VALUE; + } + + int32_t methodID; + status_t err = request.readInt32(&methodID); + if (err != android::OK) { + return err; + } + + switch (methodID) { + case kInvokeSetAAHDstIPPort: + case kInvokeSetAAHConfigBlob: { + if (mEndpointValid) { + return INVALID_OPERATION; + } + + String8 addr; + uint16_t port; + + if (methodID == kInvokeSetAAHDstIPPort) { + addr = String8(request.readString16()); + + int32_t port32; + err = request.readInt32(&port32); + if (err != android::OK) { + return err; + } + port = static_cast(port32); + } else { + String8 blob(request.readString16()); + + char addr_buf[101]; + if (sscanf(blob.string(), "V1:%100s %" SCNu16, + addr_buf, &port) != 2) { + return BAD_VALUE; + } + if (addr.setTo(addr_buf) != OK) { + return NO_MEMORY; + } + } + + struct hostent* ent = gethostbyname(addr.string()); + if (ent == NULL) { + return ERROR_UNKNOWN_HOST; + } + if (!(ent->h_addrtype == AF_INET && ent->h_length == 4)) { + return BAD_VALUE; + } + + Mutex::Autolock lock(mEndpointLock); + mEndpoint = AAH_TXSender::Endpoint( + reinterpret_cast(ent->h_addr)->s_addr, + port); + mEndpointValid = true; + return OK; + }; + + default: + return INVALID_OPERATION; + } +} + +status_t AAH_TXPlayer::getMetadata(const media::Metadata::Filter& ids, + Parcel* records) { + using media::Metadata; + + Metadata metadata(records); + + metadata.appendBool(Metadata::kPauseAvailable, true); + metadata.appendBool(Metadata::kSeekBackwardAvailable, false); + metadata.appendBool(Metadata::kSeekForwardAvailable, false); + metadata.appendBool(Metadata::kSeekAvailable, false); + + return OK; +} + +status_t AAH_TXPlayer::setVolume(float leftVolume, float rightVolume) { + if (leftVolume != rightVolume) { + ALOGE("%s does not support per channel volume: %f, %f", + __PRETTY_FUNCTION__, leftVolume, rightVolume); + } + + float volume = clamp(leftVolume, 0.0f, 1.0f); + + Mutex::Autolock lock(mLock); + mTRTPVolume = static_cast((leftVolume * 255.0) + 0.5); + + return OK; +} + +status_t AAH_TXPlayer::setAudioStreamType(audio_stream_type_t streamType) { + return OK; +} + +void AAH_TXPlayer::notifyListener_l(int msg, int ext1, int ext2) { + sendEvent(msg, ext1, ext2); +} + +bool AAH_TXPlayer::getBitrate_l(int64_t *bitrate) { + off64_t size; + if (mDurationUs >= 0 && + mCachedSource != NULL && + mCachedSource->getSize(&size) == OK) { + *bitrate = size * 8000000ll / mDurationUs; // in bits/sec + return true; + } + + if (mBitrate >= 0) { + *bitrate = mBitrate; + return true; + } + + *bitrate = 0; + + return false; +} + +// Returns true iff cached duration is available/applicable. +bool AAH_TXPlayer::getCachedDuration_l(int64_t *durationUs, bool *eos) { + int64_t bitrate; + + if (mCachedSource != NULL && getBitrate_l(&bitrate)) { + status_t finalStatus; + size_t cachedDataRemaining = mCachedSource->approxDataRemaining( + &finalStatus); + *durationUs = cachedDataRemaining * 8000000ll / bitrate; + *eos = (finalStatus != OK); + return true; + } + + return false; +} + +void AAH_TXPlayer::ensureCacheIsFetching_l() { + if (mCachedSource != NULL) { + mCachedSource->resumeFetchingIfNecessary(); + } +} + +void AAH_TXPlayer::postBufferingEvent_l() { + if (mBufferingEventPending) { + return; + } + mBufferingEventPending = true; + mQueue.postEventWithDelay(mBufferingEvent, 1000000ll); +} + +void AAH_TXPlayer::postPumpAudioEvent_l(int64_t delayUs) { + if (mPumpAudioEventPending) { + return; + } + mPumpAudioEventPending = true; + mQueue.postEventWithDelay(mPumpAudioEvent, delayUs < 0 ? 10000 : delayUs); +} + +void AAH_TXPlayer::onBufferingUpdate() { + Mutex::Autolock autoLock(mLock); + if (!mBufferingEventPending) { + return; + } + mBufferingEventPending = false; + + if (mCachedSource != NULL) { + status_t finalStatus; + size_t cachedDataRemaining = mCachedSource->approxDataRemaining( + &finalStatus); + bool eos = (finalStatus != OK); + + if (eos) { + if (finalStatus == ERROR_END_OF_STREAM) { + notifyListener_l(MEDIA_BUFFERING_UPDATE, 100); + } + if (mFlags & PREPARING) { + ALOGV("cache has reached EOS, prepare is done."); + finishAsyncPrepare_l(); + } + } else { + int64_t bitrate; + if (getBitrate_l(&bitrate)) { + size_t cachedSize = mCachedSource->cachedSize(); + int64_t cachedDurationUs = cachedSize * 8000000ll / bitrate; + + int percentage = (100.0 * (double) cachedDurationUs) + / mDurationUs; + if (percentage > 100) { + percentage = 100; + } + + notifyListener_l(MEDIA_BUFFERING_UPDATE, percentage); + } else { + // We don't know the bitrate of the stream, use absolute size + // limits to maintain the cache. + + if ((mFlags & PLAYING) && + !eos && + (cachedDataRemaining < kLowWaterMarkBytes)) { + ALOGI("cache is running low (< %d) , pausing.", + kLowWaterMarkBytes); + mFlags |= CACHE_UNDERRUN; + pause_l(); + ensureCacheIsFetching_l(); + notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_START); + } else if (eos || cachedDataRemaining > kHighWaterMarkBytes) { + if (mFlags & CACHE_UNDERRUN) { + ALOGI("cache has filled up (> %d), resuming.", + kHighWaterMarkBytes); + mFlags &= ~CACHE_UNDERRUN; + play_l(); + notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_END); + } else if (mFlags & PREPARING) { + ALOGV("cache has filled up (> %d), prepare is done", + kHighWaterMarkBytes); + finishAsyncPrepare_l(); + } + } + } + } + } + + int64_t cachedDurationUs; + bool eos; + if (getCachedDuration_l(&cachedDurationUs, &eos)) { + ALOGV("cachedDurationUs = %.2f secs, eos=%d", + cachedDurationUs / 1E6, eos); + + if ((mFlags & PLAYING) && + !eos && + (cachedDurationUs < kLowWaterMarkUs)) { + ALOGI("cache is running low (%.2f secs) , pausing.", + cachedDurationUs / 1E6); + mFlags |= CACHE_UNDERRUN; + pause_l(); + ensureCacheIsFetching_l(); + notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_START); + } else if (eos || cachedDurationUs > kHighWaterMarkUs) { + if (mFlags & CACHE_UNDERRUN) { + ALOGI("cache has filled up (%.2f secs), resuming.", + cachedDurationUs / 1E6); + mFlags &= ~CACHE_UNDERRUN; + play_l(); + notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_END); + } else if (mFlags & PREPARING) { + ALOGV("cache has filled up (%.2f secs), prepare is done", + cachedDurationUs / 1E6); + finishAsyncPrepare_l(); + } + } + } + + postBufferingEvent_l(); +} + +void AAH_TXPlayer::onPumpAudio() { + while (true) { + Mutex::Autolock autoLock(mLock); + // If this flag is clear, its because someone has externally canceled + // this pump operation (probably because we a resetting/shutting down). + // Get out immediately, do not reschedule ourselves. + if (!mPumpAudioEventPending) { + return; + } + + // Start by checking if there is still work to be doing. If we have + // never queued a payload (so we don't know what the last queued PTS is) + // or we have never established a MediaTime->CommonTime transformation, + // then we have work to do (one time through this loop should establish + // both). Otherwise, we want to keep a fixed amt of presentation time + // worth of data buffered. If we cannot get common time (service is + // unavailable, or common time is undefined)) then we don't have a lot + // of good options here. For now, signal an error up to the app level + // and shut down the transmission pump. + int64_t commonTimeNow; + if (OK != mCCHelper.getCommonTime(&commonTimeNow)) { + // Failed to get common time; either the service is down or common + // time is not synced. Raise an error and shutdown the player. + ALOGE("*** Cannot pump audio, unable to fetch common time." + " Shutting down."); + notifyListener_l(MEDIA_ERROR, MEDIA_ERROR_UNKNOWN, UNKNOWN_ERROR); + mPumpAudioEventPending = false; + break; + } + + if (mCurrentClockTransformValid && mLastQueuedMediaTimePTSValid) { + int64_t mediaTimeNow; + bool conversionResult = mCurrentClockTransform.doReverseTransform( + commonTimeNow, + &mediaTimeNow); + CHECK(conversionResult); + + if ((mediaTimeNow + + kAAHBufferTimeUs - + mLastQueuedMediaTimePTS) <= 0) { + break; + } + } + + MediaSource::ReadOptions options; + if (mIsSeeking) { + options.setSeekTo(mSeekTimeUs); + } + + MediaBuffer* mediaBuffer; + status_t err = mAudioSource->read(&mediaBuffer, &options); + if (err != NO_ERROR) { + if (err == ERROR_END_OF_STREAM) { + ALOGI("*** %s reached end of stream", __PRETTY_FUNCTION__); + notifyListener_l(MEDIA_BUFFERING_UPDATE, 100); + notifyListener_l(MEDIA_PLAYBACK_COMPLETE); + pause_l(false); + sendEOS_l(); + } else { + ALOGE("*** %s read failed err=%d", __PRETTY_FUNCTION__, err); + } + return; + } + + if (mIsSeeking) { + mIsSeeking = false; + notifyListener_l(MEDIA_SEEK_COMPLETE); + } + + uint8_t* data = (static_cast(mediaBuffer->data()) + + mediaBuffer->range_offset()); + ALOGV("*** %s got media buffer data=[%02hhx %02hhx %02hhx %02hhx]" + " offset=%d length=%d", __PRETTY_FUNCTION__, + data[0], data[1], data[2], data[3], + mediaBuffer->range_offset(), mediaBuffer->range_length()); + + int64_t mediaTimeUs; + CHECK(mediaBuffer->meta_data()->findInt64(kKeyTime, &mediaTimeUs)); + ALOGV("*** timeUs=%lld", mediaTimeUs); + + if (!mCurrentClockTransformValid) { + if (OK == mCCHelper.getCommonTime(&commonTimeNow)) { + mCurrentClockTransform.a_zero = mediaTimeUs; + mCurrentClockTransform.b_zero = commonTimeNow + + kAAHStartupLeadTimeUs; + mCurrentClockTransform.a_to_b_numer = 1; + mCurrentClockTransform.a_to_b_denom = mPlayRateIsPaused ? 0 : 1; + mCurrentClockTransformValid = true; + } else { + // Failed to get common time; either the service is down or + // common time is not synced. Raise an error and shutdown the + // player. + ALOGE("*** Cannot begin transmission, unable to fetch common" + " time. Dropping sample with pts=%lld", mediaTimeUs); + notifyListener_l(MEDIA_ERROR, + MEDIA_ERROR_UNKNOWN, + UNKNOWN_ERROR); + mPumpAudioEventPending = false; + break; + } + } + + ALOGV("*** transmitting packet with pts=%lld", mediaTimeUs); + + sp packet = new TRTPAudioPacket(); + packet->setPTS(mediaTimeUs); + packet->setSubstreamID(1); + + packet->setCodecType(TRTPAudioPacket::kCodecMPEG1Audio); + packet->setVolume(mTRTPVolume); + // TODO : introduce a throttle for this so we can control the + // frequency with which transforms get sent. + packet->setClockTransform(mCurrentClockTransform); + packet->setAccessUnitData(data, mediaBuffer->range_length()); + packet->setRandomAccessPoint(true); + + queuePacketToSender_l(packet); + mediaBuffer->release(); + + mLastQueuedMediaTimePTSValid = true; + mLastQueuedMediaTimePTS = mediaTimeUs; + } + + { // Explicit scope for the autolock pattern. + Mutex::Autolock autoLock(mLock); + + // If someone externally has cleared this flag, its because we should be + // shutting down. Do not reschedule ourselves. + if (!mPumpAudioEventPending) { + return; + } + + // Looks like no one canceled us explicitly. Clear our flag and post a + // new event to ourselves. + mPumpAudioEventPending = false; + postPumpAudioEvent_l(10000); + } +} + +void AAH_TXPlayer::queuePacketToSender_l(const sp& packet) { + if (mAAH_Sender == NULL) { + return; + } + + sp message = new AMessage(AAH_TXSender::kWhatSendPacket, + mAAH_Sender->handlerID()); + + { + Mutex::Autolock lock(mEndpointLock); + if (!mEndpointValid) { + return; + } + + message->setInt32(AAH_TXSender::kSendPacketIPAddr, mEndpoint.addr); + message->setInt32(AAH_TXSender::kSendPacketPort, mEndpoint.port); + } + + packet->setProgramID(mProgramID); + packet->setExpireTime(systemTime() + kAAHRetryKeepAroundTimeNs); + packet->pack(); + + message->setObject(AAH_TXSender::kSendPacketTRTPPacket, packet); + + message->post(); +} + +} // namespace android diff --git a/media/libaah_rtp/aah_tx_player.h b/media/libaah_rtp/aah_tx_player.h new file mode 100644 index 0000000..64cf5dc --- /dev/null +++ b/media/libaah_rtp/aah_tx_player.h @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __AAH_TX_PLAYER_H__ +#define __AAH_TX_PLAYER_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "aah_tx_sender.h" + +namespace android { + +class AAH_TXPlayer : public MediaPlayerHWInterface { + public: + AAH_TXPlayer(); + + virtual status_t initCheck(); + virtual status_t setDataSource(const char *url, + const KeyedVector* + headers); + virtual status_t setDataSource(int fd, int64_t offset, int64_t length); + virtual status_t setVideoSurface(const sp& surface); + virtual status_t setVideoSurfaceTexture(const sp& + surfaceTexture); + virtual status_t prepare(); + virtual status_t prepareAsync(); + virtual status_t start(); + virtual status_t stop(); + virtual status_t pause(); + virtual bool isPlaying(); + virtual status_t seekTo(int msec); + virtual status_t getCurrentPosition(int *msec); + virtual status_t getDuration(int *msec); + virtual status_t reset(); + virtual status_t setLooping(int loop); + virtual player_type playerType(); + virtual status_t setParameter(int key, const Parcel &request); + virtual status_t getParameter(int key, Parcel *reply); + virtual status_t invoke(const Parcel& request, Parcel *reply); + virtual status_t getMetadata(const media::Metadata::Filter& ids, + Parcel* records); + virtual status_t setVolume(float leftVolume, float rightVolume); + virtual status_t setAudioStreamType(audio_stream_type_t streamType); + + // invoke method IDs + enum { + // set the IP address and port of the A@H receiver + kInvokeSetAAHDstIPPort = 1, + + // set the destination IP address and port (and perhaps any additional + // parameters added in the future) packaged in one string + kInvokeSetAAHConfigBlob, + }; + + static const int64_t kAAHRetryKeepAroundTimeNs; + + protected: + virtual ~AAH_TXPlayer(); + + private: + friend struct AwesomeEvent; + + enum { + PLAYING = 1, + PREPARING = 8, + PREPARED = 16, + PREPARE_CANCELLED = 64, + CACHE_UNDERRUN = 128, + + // We are basically done preparing but are currently buffering + // sufficient data to begin playback and finish the preparation + // phase for good. + PREPARING_CONNECTED = 2048, + + INCOGNITO = 32768, + }; + + status_t setDataSource_l(const char *url, + const KeyedVector *headers); + status_t setDataSource_l(const sp& extractor); + status_t finishSetDataSource_l(); + status_t prepareAsync_l(); + void onPrepareAsyncEvent(); + void finishAsyncPrepare_l(); + void abortPrepare(status_t err); + status_t play_l(); + status_t pause_l(bool doClockUpdate = true); + status_t seekTo_l(int64_t timeUs); + void updateClockTransform_l(bool pause); + void sendEOS_l(); + void cancelPlayerEvents(bool keepBufferingGoing = false); + void reset_l(); + void notifyListener_l(int msg, int ext1 = 0, int ext2 = 0); + bool getBitrate_l(int64_t* bitrate); + status_t getDuration_l(int* msec); + bool getCachedDuration_l(int64_t* durationUs, bool* eos); + void ensureCacheIsFetching_l(); + void postBufferingEvent_l(); + void postPumpAudioEvent_l(int64_t delayUs); + void onBufferingUpdate(); + void onPumpAudio(); + void queuePacketToSender_l(const sp& packet); + + Mutex mLock; + + TimedEventQueue mQueue; + bool mQueueStarted; + + sp mBufferingEvent; + bool mBufferingEventPending; + + uint32_t mFlags; + uint32_t mExtractorFlags; + + String8 mUri; + KeyedVector mUriHeaders; + + sp mFileSource; + + sp mAsyncPrepareEvent; + Condition mPreparedCondition; + status_t mPrepareResult; + + bool mIsSeeking; + int64_t mSeekTimeUs; + + sp mPumpAudioEvent; + bool mPumpAudioEventPending; + + sp mConnectingDataSource; + sp mCachedSource; + + sp mAudioSource; + int64_t mDurationUs; + int64_t mBitrate; + + sp mAAH_Sender; + LinearTransform mCurrentClockTransform; + bool mCurrentClockTransformValid; + int64_t mLastQueuedMediaTimePTS; + bool mLastQueuedMediaTimePTSValid; + bool mPlayRateIsPaused; + CCHelper mCCHelper; + + Mutex mEndpointLock; + AAH_TXSender::Endpoint mEndpoint; + bool mEndpointValid; + bool mEndpointRegistered; + uint16_t mProgramID; + uint8_t mTRTPVolume; + + DISALLOW_EVIL_CONSTRUCTORS(AAH_TXPlayer); +}; + +} // namespace android + +#endif // __AAH_TX_PLAYER_H__ diff --git a/media/libaah_rtp/aah_tx_sender.cpp b/media/libaah_rtp/aah_tx_sender.cpp new file mode 100644 index 0000000..d991ea7 --- /dev/null +++ b/media/libaah_rtp/aah_tx_sender.cpp @@ -0,0 +1,602 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "aah_tx_player.h" +#include "aah_tx_sender.h" + +namespace android { + +const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr"; +const char* AAH_TXSender::kSendPacketPort = "port"; +const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp"; + +const int AAH_TXSender::kRetryTrimIntervalUs = 100000; +const int AAH_TXSender::kHeartbeatIntervalUs = 1000000; +const int AAH_TXSender::kRetryBufferCapacity = 100; +const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull; + +Mutex AAH_TXSender::sLock; +wp AAH_TXSender::sInstance; +uint32_t AAH_TXSender::sNextEpoch; +bool AAH_TXSender::sNextEpochValid = false; + +AAH_TXSender::AAH_TXSender() : mSocket(-1) { + mLastSentPacketTime = systemTime(); +} + +sp AAH_TXSender::GetInstance() { + Mutex::Autolock autoLock(sLock); + + sp sender = sInstance.promote(); + + if (sender == NULL) { + sender = new AAH_TXSender(); + if (sender == NULL) { + return NULL; + } + + sender->mLooper = new ALooper(); + if (sender->mLooper == NULL) { + return NULL; + } + + sender->mReflector = new AHandlerReflector(sender.get()); + if (sender->mReflector == NULL) { + return NULL; + } + + sender->mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (sender->mSocket == -1) { + ALOGW("%s unable to create socket", __PRETTY_FUNCTION__); + return NULL; + } + + struct sockaddr_in bind_addr; + memset(&bind_addr, 0, sizeof(bind_addr)); + bind_addr.sin_family = AF_INET; + if (bind(sender->mSocket, + reinterpret_cast(&bind_addr), + sizeof(bind_addr)) < 0) { + ALOGW("%s unable to bind socket (errno %d)", + __PRETTY_FUNCTION__, errno); + return NULL; + } + + sender->mRetryReceiver = new RetryReceiver(sender.get()); + if (sender->mRetryReceiver == NULL) { + return NULL; + } + + sender->mLooper->setName("AAH_TXSender"); + sender->mLooper->registerHandler(sender->mReflector); + sender->mLooper->start(false, false, PRIORITY_AUDIO); + + if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO) + != OK) { + ALOGW("%s unable to start retry thread", __PRETTY_FUNCTION__); + return NULL; + } + + sInstance = sender; + } + + return sender; +} + +AAH_TXSender::~AAH_TXSender() { + mLooper->stop(); + mLooper->unregisterHandler(mReflector->id()); + + if (mRetryReceiver != NULL) { + mRetryReceiver->requestExit(); + mRetryReceiver->mWakeupEvent.setEvent(); + if (mRetryReceiver->requestExitAndWait() != OK) { + ALOGW("%s shutdown of retry receiver failed", __PRETTY_FUNCTION__); + } + mRetryReceiver->mSender = NULL; + mRetryReceiver.clear(); + } + + if (mSocket != -1) { + close(mSocket); + } +} + +// Return the next epoch number usable for a newly instantiated endpoint. +uint32_t AAH_TXSender::getNextEpoch() { + Mutex::Autolock autoLock(sLock); + + if (sNextEpochValid) { + sNextEpoch = (sNextEpoch + 1) & TRTPPacket::kTRTPEpochMask; + } else { + sNextEpoch = ns2ms(systemTime()) & TRTPPacket::kTRTPEpochMask; + sNextEpochValid = true; + } + + return sNextEpoch; +} + +// Notify the sender that a player has started sending to this endpoint. +// Returns a program ID for use by the calling player. +uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) { + Mutex::Autolock lock(mEndpointLock); + + EndpointState* eps = mEndpointMap.valueFor(endpoint); + if (eps) { + eps->playerRefCount++; + } else { + eps = new EndpointState(getNextEpoch()); + mEndpointMap.add(endpoint, eps); + } + + // if this is the first registered endpoint, then send a message to start + // trimming retry buffers and a message to start sending heartbeats. + if (mEndpointMap.size() == 1) { + sp trimMessage = new AMessage(kWhatTrimRetryBuffers, + handlerID()); + trimMessage->post(kRetryTrimIntervalUs); + + sp heartbeatMessage = new AMessage(kWhatSendHeartbeats, + handlerID()); + heartbeatMessage->post(kHeartbeatIntervalUs); + } + + eps->nextProgramID++; + return eps->nextProgramID; +} + +// Notify the sender that a player has ceased sending to this endpoint. +// An endpoint's state can not be deleted until all of the endpoint's +// registered players have called unregisterEndpoint. +void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) { + Mutex::Autolock lock(mEndpointLock); + + EndpointState* eps = mEndpointMap.valueFor(endpoint); + if (eps) { + eps->playerRefCount--; + CHECK(eps->playerRefCount >= 0); + } +} + +void AAH_TXSender::onMessageReceived(const sp& msg) { + switch (msg->what()) { + case kWhatSendPacket: + onSendPacket(msg); + break; + + case kWhatTrimRetryBuffers: + trimRetryBuffers(); + break; + + case kWhatSendHeartbeats: + sendHeartbeats(); + break; + + default: + TRESPASS(); + break; + } +} + +void AAH_TXSender::onSendPacket(const sp& msg) { + sp obj; + CHECK(msg->findObject(kSendPacketTRTPPacket, &obj)); + sp packet = static_cast(obj.get()); + + uint32_t ipAddr; + CHECK(msg->findInt32(kSendPacketIPAddr, + reinterpret_cast(&ipAddr))); + + int32_t port32; + CHECK(msg->findInt32(kSendPacketPort, &port32)); + uint16_t port = port32; + + Mutex::Autolock lock(mEndpointLock); + doSendPacket_l(packet, Endpoint(ipAddr, port)); + mLastSentPacketTime = systemTime(); +} + +void AAH_TXSender::doSendPacket_l(const sp& packet, + const Endpoint& endpoint) { + EndpointState* eps = mEndpointMap.valueFor(endpoint); + if (!eps) { + // the endpoint state has disappeared, so the player that sent this + // packet must be dead. + return; + } + + // assign the packet's sequence number + packet->setEpoch(eps->epoch); + packet->setSeqNumber(eps->trtpSeqNumber++); + + // add the packet to the retry buffer + RetryBuffer& retry = eps->retry; + retry.push_back(packet); + + // send the packet + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = endpoint.addr; + addr.sin_port = htons(endpoint.port); + + ssize_t result = sendto(mSocket, + packet->getPacket(), + packet->getPacketLen(), + 0, + (const struct sockaddr *) &addr, + sizeof(addr)); + if (result == -1) { + ALOGW("%s sendto failed", __PRETTY_FUNCTION__); + } +} + +void AAH_TXSender::trimRetryBuffers() { + Mutex::Autolock lock(mEndpointLock); + + nsecs_t localTimeNow = systemTime(); + + Vector endpointsToRemove; + + for (size_t i = 0; i < mEndpointMap.size(); i++) { + EndpointState* eps = mEndpointMap.editValueAt(i); + RetryBuffer& retry = eps->retry; + + while (!retry.isEmpty()) { + if (retry[0]->getExpireTime() < localTimeNow) { + retry.pop_front(); + } else { + break; + } + } + + if (retry.isEmpty() && eps->playerRefCount == 0) { + endpointsToRemove.add(mEndpointMap.keyAt(i)); + } + } + + // remove the state for any endpoints that are no longer in use + for (size_t i = 0; i < endpointsToRemove.size(); i++) { + Endpoint& e = endpointsToRemove.editItemAt(i); + ALOGD("*** %s removing endpoint addr=%08x", __PRETTY_FUNCTION__, e.addr); + size_t index = mEndpointMap.indexOfKey(e); + delete mEndpointMap.valueAt(index); + mEndpointMap.removeItemsAt(index); + } + + // schedule the next trim + if (mEndpointMap.size()) { + sp trimMessage = new AMessage(kWhatTrimRetryBuffers, + handlerID()); + trimMessage->post(kRetryTrimIntervalUs); + } +} + +void AAH_TXSender::sendHeartbeats() { + Mutex::Autolock lock(mEndpointLock); + + if (shouldSendHeartbeats_l()) { + for (size_t i = 0; i < mEndpointMap.size(); i++) { + EndpointState* eps = mEndpointMap.editValueAt(i); + const Endpoint& ep = mEndpointMap.keyAt(i); + + sp packet = new TRTPControlPacket(); + packet->setCommandID(TRTPControlPacket::kCommandNop); + + packet->setExpireTime(systemTime() + + AAH_TXPlayer::kAAHRetryKeepAroundTimeNs); + packet->pack(); + + doSendPacket_l(packet, ep); + } + } + + // schedule the next heartbeat + if (mEndpointMap.size()) { + sp heartbeatMessage = new AMessage(kWhatSendHeartbeats, + handlerID()); + heartbeatMessage->post(kHeartbeatIntervalUs); + } +} + +bool AAH_TXSender::shouldSendHeartbeats_l() { + // assert(holding endpoint lock) + return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout)); +} + +// Receiver + +// initial 4-byte ID of a retry request packet +const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq'; + +// initial 4-byte ID of a retry NAK packet +const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak'; + +// initial 4-byte ID of a fast start request packet +const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst'; + +AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender) + : Thread(false), + mSender(sender) {} + + AAH_TXSender::RetryReceiver::~RetryReceiver() { + mWakeupEvent.clearPendingEvents(); + } + +// Returns true if val is within the interval bounded inclusively by +// start and end. Also handles the case where there is a rollover of the +// range between start and end. +template +static inline bool withinIntervalWithRollover(T val, T start, T end) { + return ((start <= end && val >= start && val <= end) || + (start > end && (val >= start || val <= end))); +} + +bool AAH_TXSender::RetryReceiver::threadLoop() { + struct pollfd pollFds[2]; + pollFds[0].fd = mSender->mSocket; + pollFds[0].events = POLLIN; + pollFds[0].revents = 0; + pollFds[1].fd = mWakeupEvent.getWakeupHandle(); + pollFds[1].events = POLLIN; + pollFds[1].revents = 0; + + int pollResult = poll(pollFds, NELEM(pollFds), -1); + if (pollResult == -1) { + ALOGE("%s poll failed", __PRETTY_FUNCTION__); + return false; + } + + if (exitPending()) { + ALOGI("*** %s exiting", __PRETTY_FUNCTION__); + return false; + } + + if (pollFds[0].revents) { + handleRetryRequest(); + } + + return true; +} + +void AAH_TXSender::RetryReceiver::handleRetryRequest() { + ALOGV("*** RX %s start", __PRETTY_FUNCTION__); + + RetryPacket request; + struct sockaddr requestSrcAddr; + socklen_t requestSrcAddrLen = sizeof(requestSrcAddr); + + ssize_t result = recvfrom(mSender->mSocket, &request, sizeof(request), 0, + &requestSrcAddr, &requestSrcAddrLen); + if (result == -1) { + ALOGE("%s recvfrom failed, errno=%d", __PRETTY_FUNCTION__, errno); + return; + } + + if (static_cast(result) < sizeof(RetryPacket)) { + ALOGW("%s short packet received", __PRETTY_FUNCTION__); + return; + } + + uint32_t host_request_id = ntohl(request.id); + if ((host_request_id != kRetryRequestID) && + (host_request_id != kFastStartRequestID)) { + ALOGW("%s received retry request with bogus ID (%08x)", + __PRETTY_FUNCTION__, host_request_id); + return; + } + + Endpoint endpoint(request.endpointIP, ntohs(request.endpointPort)); + + Mutex::Autolock lock(mSender->mEndpointLock); + + EndpointState* eps = mSender->mEndpointMap.valueFor(endpoint); + + if (eps == NULL || eps->retry.isEmpty()) { + // we have no retry buffer or an empty retry buffer for this endpoint, + // so NAK the entire request + RetryPacket nak = request; + nak.id = htonl(kRetryNakID); + result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, + &requestSrcAddr, requestSrcAddrLen); + if (result == -1) { + ALOGW("%s sendto failed", __PRETTY_FUNCTION__); + } + return; + } + + RetryBuffer& retry = eps->retry; + + uint16_t startSeq = ntohs(request.seqStart); + uint16_t endSeq = ntohs(request.seqEnd); + + uint16_t retryFirstSeq = retry[0]->getSeqNumber(); + uint16_t retryLastSeq = retry[retry.size() - 1]->getSeqNumber(); + + // If this is a fast start, then force the start of the retry to match the + // start of the retransmit ring buffer (unless the end of the retransmit + // ring buffer is already past the point of fast start) + if ((host_request_id == kFastStartRequestID) && + !((startSeq - retryFirstSeq) & 0x8000)) { + startSeq = retryFirstSeq; + } + + int startIndex; + if (withinIntervalWithRollover(startSeq, retryFirstSeq, retryLastSeq)) { + startIndex = static_cast(startSeq - retryFirstSeq); + } else { + startIndex = -1; + } + + int endIndex; + if (withinIntervalWithRollover(endSeq, retryFirstSeq, retryLastSeq)) { + endIndex = static_cast(endSeq - retryFirstSeq); + } else { + endIndex = -1; + } + + if (startIndex == -1 && endIndex == -1) { + // no part of the request range is found in the retry buffer + RetryPacket nak = request; + nak.id = htonl(kRetryNakID); + result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, + &requestSrcAddr, requestSrcAddrLen); + if (result == -1) { + ALOGW("%s sendto failed", __PRETTY_FUNCTION__); + } + return; + } + + if (startIndex == -1) { + // NAK a subrange at the front of the request range + RetryPacket nak = request; + nak.id = htonl(kRetryNakID); + nak.seqEnd = htons(retryFirstSeq - 1); + result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, + &requestSrcAddr, requestSrcAddrLen); + if (result == -1) { + ALOGW("%s sendto failed", __PRETTY_FUNCTION__); + } + + startIndex = 0; + } else if (endIndex == -1) { + // NAK a subrange at the back of the request range + RetryPacket nak = request; + nak.id = htonl(kRetryNakID); + nak.seqStart = htons(retryLastSeq + 1); + result = sendto(mSender->mSocket, &nak, sizeof(nak), 0, + &requestSrcAddr, requestSrcAddrLen); + if (result == -1) { + ALOGW("%s sendto failed", __PRETTY_FUNCTION__); + } + + endIndex = retry.size() - 1; + } + + // send the retry packets + for (int i = startIndex; i <= endIndex; i++) { + const sp& replyPacket = retry[i]; + + result = sendto(mSender->mSocket, + replyPacket->getPacket(), + replyPacket->getPacketLen(), + 0, + &requestSrcAddr, + requestSrcAddrLen); + + if (result == -1) { + ALOGW("%s sendto failed", __PRETTY_FUNCTION__); + } + } +} + +// Endpoint + +AAH_TXSender::Endpoint::Endpoint() + : addr(0) + , port(0) { } + +AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p) + : addr(a) + , port(p) {} + +bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const { + return ((addr < other.addr) || + (addr == other.addr && port < other.port)); +} + +// EndpointState + +AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch) + : retry(kRetryBufferCapacity) + , playerRefCount(1) + , trtpSeqNumber(0) + , nextProgramID(0) + , epoch(_epoch) { } + +// CircularBuffer + +template +CircularBuffer::CircularBuffer(size_t capacity) + : mCapacity(capacity) + , mHead(0) + , mTail(0) + , mFillCount(0) { + mBuffer = new T[capacity]; +} + +template +CircularBuffer::~CircularBuffer() { + delete [] mBuffer; +} + +template +void CircularBuffer::push_back(const T& item) { + if (this->isFull()) { + this->pop_front(); + } + mBuffer[mHead] = item; + mHead = (mHead + 1) % mCapacity; + mFillCount++; +} + +template +void CircularBuffer::pop_front() { + CHECK(!isEmpty()); + mBuffer[mTail] = T(); + mTail = (mTail + 1) % mCapacity; + mFillCount--; +} + +template +size_t CircularBuffer::size() const { + return mFillCount; +} + +template +bool CircularBuffer::isFull() const { + return (mFillCount == mCapacity); +} + +template +bool CircularBuffer::isEmpty() const { + return (mFillCount == 0); +} + +template +const T& CircularBuffer::itemAt(size_t index) const { + CHECK(index < mFillCount); + return mBuffer[(mTail + index) % mCapacity]; +} + +template +const T& CircularBuffer::operator[](size_t index) const { + return itemAt(index); +} + +} // namespace android diff --git a/media/libaah_rtp/aah_tx_sender.h b/media/libaah_rtp/aah_tx_sender.h new file mode 100644 index 0000000..74206c4 --- /dev/null +++ b/media/libaah_rtp/aah_tx_sender.h @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __AAH_TX_SENDER_H__ +#define __AAH_TX_SENDER_H__ + +#include +#include +#include +#include + +#include "aah_tx_packet.h" +#include "pipe_event.h" + +namespace android { + +template class CircularBuffer { + public: + CircularBuffer(size_t capacity); + ~CircularBuffer(); + void push_back(const T& item);; + void pop_front(); + size_t size() const; + bool isFull() const; + bool isEmpty() const; + const T& itemAt(size_t index) const; + const T& operator[](size_t index) const; + + private: + T* mBuffer; + size_t mCapacity; + size_t mHead; + size_t mTail; + size_t mFillCount; + + DISALLOW_EVIL_CONSTRUCTORS(CircularBuffer); +}; + +class AAH_TXSender : public virtual RefBase { + public: + ~AAH_TXSender(); + + static sp GetInstance(); + + ALooper::handler_id handlerID() { return mReflector->id(); } + + // an IP address and port + struct Endpoint { + Endpoint(); + Endpoint(uint32_t a, uint16_t p); + bool operator<(const Endpoint& other) const; + + uint32_t addr; + uint16_t port; + }; + + uint16_t registerEndpoint(const Endpoint& endpoint); + void unregisterEndpoint(const Endpoint& endpoint); + + enum { + kWhatSendPacket, + kWhatTrimRetryBuffers, + kWhatSendHeartbeats, + }; + + // fields for SendPacket messages + static const char* kSendPacketIPAddr; + static const char* kSendPacketPort; + static const char* kSendPacketTRTPPacket; + + private: + AAH_TXSender(); + + static Mutex sLock; + static wp sInstance; + static uint32_t sNextEpoch; + static bool sNextEpochValid; + + static uint32_t getNextEpoch(); + + typedef CircularBuffer > RetryBuffer; + + // state maintained on a per-endpoint basis + struct EndpointState { + EndpointState(uint32_t epoch); + RetryBuffer retry; + int playerRefCount; + uint16_t trtpSeqNumber; + uint16_t nextProgramID; + uint32_t epoch; + }; + + friend class AHandlerReflector; + void onMessageReceived(const sp& msg); + void onSendPacket(const sp& msg); + void doSendPacket_l(const sp& packet, + const Endpoint& endpoint); + void trimRetryBuffers(); + void sendHeartbeats(); + bool shouldSendHeartbeats_l(); + + sp mLooper; + sp > mReflector; + + int mSocket; + nsecs_t mLastSentPacketTime; + + DefaultKeyedVector mEndpointMap; + Mutex mEndpointLock; + + static const int kRetryTrimIntervalUs; + static const int kHeartbeatIntervalUs; + static const int kRetryBufferCapacity; + static const nsecs_t kHeartbeatTimeout; + + class RetryReceiver : public Thread { + private: + friend class AAH_TXSender; + + RetryReceiver(AAH_TXSender* sender); + virtual ~RetryReceiver(); + virtual bool threadLoop(); + void handleRetryRequest(); + + static const int kMaxReceiverPacketLen; + static const uint32_t kRetryRequestID; + static const uint32_t kFastStartRequestID; + static const uint32_t kRetryNakID; + + AAH_TXSender* mSender; + PipeEvent mWakeupEvent; + }; + + sp mRetryReceiver; + + DISALLOW_EVIL_CONSTRUCTORS(AAH_TXSender); +}; + +struct RetryPacket { + uint32_t id; + uint32_t endpointIP; + uint16_t endpointPort; + uint16_t seqStart; + uint16_t seqEnd; +} __attribute__((packed)); + +} // namespace android + +#endif // __AAH_TX_SENDER_H__ diff --git a/media/libaah_rtp/pipe_event.cpp b/media/libaah_rtp/pipe_event.cpp new file mode 100644 index 0000000..b8e6960 --- /dev/null +++ b/media/libaah_rtp/pipe_event.cpp @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "LibAAH_RTP" +#include + +#include +#include +#include +#include + +#include "pipe_event.h" + +namespace android { + +PipeEvent::PipeEvent() { + pipe_[0] = -1; + pipe_[1] = -1; + + // Create the pipe. + if (pipe(pipe_) >= 0) { + // Set non-blocking mode on the read side of the pipe so we can + // easily drain it whenever we wakeup. + fcntl(pipe_[0], F_SETFL, O_NONBLOCK); + } else { + ALOGE("Failed to create pipe event %d %d %d", + pipe_[0], pipe_[1], errno); + pipe_[0] = -1; + pipe_[1] = -1; + } +} + +PipeEvent::~PipeEvent() { + if (pipe_[0] >= 0) { + close(pipe_[0]); + } + + if (pipe_[1] >= 0) { + close(pipe_[1]); + } +} + +void PipeEvent::clearPendingEvents() { + char drain_buffer[16]; + while (read(pipe_[0], drain_buffer, sizeof(drain_buffer)) > 0) { + // No body. + } +} + +bool PipeEvent::wait(int timeout) { + struct pollfd wait_fd; + + wait_fd.fd = getWakeupHandle(); + wait_fd.events = POLLIN; + wait_fd.revents = 0; + + int res = poll(&wait_fd, 1, timeout); + + if (res < 0) { + ALOGE("Wait error in PipeEvent; sleeping to prevent overload!"); + usleep(1000); + } + + return (res > 0); +} + +void PipeEvent::setEvent() { + char foo = 'q'; + write(pipe_[1], &foo, 1); +} + +} // namespace android + diff --git a/media/libaah_rtp/pipe_event.h b/media/libaah_rtp/pipe_event.h new file mode 100644 index 0000000..e53b0fd --- /dev/null +++ b/media/libaah_rtp/pipe_event.h @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __PIPE_EVENT_H__ +#define __PIPE_EVENT_H__ + +#include + +namespace android { + +class PipeEvent { + public: + PipeEvent(); + ~PipeEvent(); + + bool initCheck() const { + return ((pipe_[0] >= 0) && (pipe_[1] >= 0)); + } + + int getWakeupHandle() const { return pipe_[0]; } + + // block until the event fires; returns true if the event fired and false if + // the wait timed out. Timeout is expressed in milliseconds; negative + // values mean wait forever. + bool wait(int timeout = -1); + + void clearPendingEvents(); + void setEvent(); + + private: + int pipe_[2]; + + DISALLOW_EVIL_CONSTRUCTORS(PipeEvent); +}; + +} // namespace android + +#endif // __PIPE_EVENT_H__ diff --git a/media/libmediaplayerservice/Android.mk b/media/libmediaplayerservice/Android.mk index a3e2517..e521648 100644 --- a/media/libmediaplayerservice/Android.mk +++ b/media/libmediaplayerservice/Android.mk @@ -29,7 +29,8 @@ LOCAL_SHARED_LIBRARIES := \ libstagefright_omx \ libstagefright_foundation \ libgui \ - libdl + libdl \ + libaah_rtp LOCAL_STATIC_LIBRARIES := \ libstagefright_nuplayer \ diff --git a/media/libmediaplayerservice/MediaPlayerService.cpp b/media/libmediaplayerservice/MediaPlayerService.cpp index 4df7f3d..764eddc 100644 --- a/media/libmediaplayerservice/MediaPlayerService.cpp +++ b/media/libmediaplayerservice/MediaPlayerService.cpp @@ -70,6 +70,11 @@ #include +namespace android { +sp createAAH_TXPlayer(); +sp createAAH_RXPlayer(); +} + namespace { using android::media::Metadata; using android::status_t; @@ -593,6 +598,14 @@ player_type getPlayerType(const char* url) return NU_PLAYER; } + if (!strncasecmp("aahRX://", url, 8)) { + return AAH_RX_PLAYER; + } + + if (!strncasecmp("aahTX://", url, 8)) { + return AAH_TX_PLAYER; + } + // use MidiFile for MIDI extensions int lenURL = strlen(url); for (int i = 0; i < NELEM(FILE_EXTS); ++i) { @@ -629,6 +642,14 @@ static sp createPlayer(player_type playerType, void* cookie, ALOGV("Create Test Player stub"); p = new TestPlayerStub(); break; + case AAH_RX_PLAYER: + ALOGV(" create A@H RX Player"); + p = createAAH_RXPlayer(); + break; + case AAH_TX_PLAYER: + ALOGV(" create A@H TX Player"); + p = createAAH_TXPlayer(); + break; default: ALOGE("Unknown player type: %d", playerType); return NULL; @@ -1031,9 +1052,21 @@ status_t MediaPlayerService::Client::setLooping(int loop) status_t MediaPlayerService::Client::setVolume(float leftVolume, float rightVolume) { ALOGV("[%d] setVolume(%f, %f)", mConnId, leftVolume, rightVolume); - // TODO: for hardware output, call player instead - Mutex::Autolock l(mLock); - if (mAudioOutput != 0) mAudioOutput->setVolume(leftVolume, rightVolume); + + // for hardware output, call player instead + sp p = getPlayer(); + { + Mutex::Autolock l(mLock); + if (p != 0 && p->hardwareOutput()) { + MediaPlayerHWInterface* hwp = + reinterpret_cast(p.get()); + return hwp->setVolume(leftVolume, rightVolume); + } else { + if (mAudioOutput != 0) mAudioOutput->setVolume(leftVolume, rightVolume); + return NO_ERROR; + } + } + return NO_ERROR; } -- cgit v1.1