/* * Copyright 2014, The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "MediaCodecSource" #define DEBUG_DRIFT_TIME 0 #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace android { static void ReleaseMediaBufferReference(const sp &accessUnit) { void *mbuf; if (accessUnit->meta()->findPointer("mediaBuffer", &mbuf) && mbuf != NULL) { ALOGV("releasing mbuf %p", mbuf); accessUnit->meta()->setPointer("mediaBuffer", NULL); static_cast(mbuf)->release(); mbuf = NULL; } } struct MediaCodecSource::Puller : public AHandler { Puller(const sp &source); status_t start(const sp &meta, const sp ¬ify); void stopAsync(); void pause(); void resume(); protected: virtual void onMessageReceived(const sp &msg); virtual ~Puller(); private: enum { kWhatStart = 'msta', kWhatStop, kWhatPull, kWhatPause, kWhatResume, }; sp mSource; sp mNotify; sp mLooper; int32_t mPullGeneration; bool mIsAudio; bool mPaused; bool mReachedEOS; status_t postSynchronouslyAndReturnError(const sp &msg); void schedulePull(); void handleEOS(); DISALLOW_EVIL_CONSTRUCTORS(Puller); }; MediaCodecSource::Puller::Puller(const sp &source) : mSource(source), mLooper(new ALooper()), mPullGeneration(0), mIsAudio(false), mPaused(false), mReachedEOS(false) { sp meta = source->getFormat(); const char *mime; CHECK(meta->findCString(kKeyMIMEType, &mime)); mIsAudio = !strncasecmp(mime, "audio/", 6); mLooper->setName("pull_looper"); } MediaCodecSource::Puller::~Puller() { mLooper->unregisterHandler(id()); mLooper->stop(); } status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError( const sp &msg) { sp response; status_t err = msg->postAndAwaitResponse(&response); if (err != OK) { return err; } if (!response->findInt32("err", &err)) { err = OK; } return err; } status_t MediaCodecSource::Puller::start(const sp &meta, const sp ¬ify) { ALOGV("puller (%s) start", mIsAudio ? "audio" : "video"); mLooper->start( false /* runOnCallingThread */, false /* canCallJava */, PRIORITY_AUDIO); mLooper->registerHandler(this); mNotify = notify; sp msg = new AMessage(kWhatStart, id()); msg->setObject("meta", meta); return postSynchronouslyAndReturnError(msg); } void MediaCodecSource::Puller::stopAsync() { ALOGV("puller (%s) stopAsync", mIsAudio ? "audio" : "video"); (new AMessage(kWhatStop, id()))->post(); } void MediaCodecSource::Puller::pause() { (new AMessage(kWhatPause, id()))->post(); } void MediaCodecSource::Puller::resume() { (new AMessage(kWhatResume, id()))->post(); } void MediaCodecSource::Puller::schedulePull() { sp msg = new AMessage(kWhatPull, id()); msg->setInt32("generation", mPullGeneration); msg->post(); } void MediaCodecSource::Puller::handleEOS() { if (!mReachedEOS) { ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video"); mReachedEOS = true; sp notify = mNotify->dup(); notify->setPointer("accessUnit", NULL); notify->post(); } } void MediaCodecSource::Puller::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatStart: { sp obj; CHECK(msg->findObject("meta", &obj)); mReachedEOS = false; status_t err = mSource->start(static_cast(obj.get())); if (err == OK) { schedulePull(); } sp response = new AMessage; response->setInt32("err", err); uint32_t replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); break; } case kWhatStop: { ALOGV("source (%s) stopping", mIsAudio ? "audio" : "video"); mSource->stop(); ALOGV("source (%s) stopped", mIsAudio ? "audio" : "video"); ++mPullGeneration; handleEOS(); break; } case kWhatPull: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mPullGeneration) { break; } MediaBuffer *mbuf; status_t err = mSource->read(&mbuf); if (mPaused) { if (err == OK) { mbuf->release(); mbuf = NULL; } msg->post(); break; } if (err != OK) { if (err == ERROR_END_OF_STREAM) { ALOGV("stream ended, mbuf %p", mbuf); } else { ALOGE("error %d reading stream.", err); } handleEOS(); } else { sp notify = mNotify->dup(); notify->setPointer("accessUnit", mbuf); notify->post(); msg->post(); } break; } case kWhatPause: { mPaused = true; break; } case kWhatResume: { mPaused = false; break; } default: TRESPASS(); } } // static sp MediaCodecSource::Create( const sp &looper, const sp &format, const sp &source, uint32_t flags) { sp mediaSource = new MediaCodecSource(looper, format, source, flags); if (mediaSource->init() == OK) { return mediaSource; } return NULL; } status_t MediaCodecSource::start(MetaData* params) { sp msg = new AMessage(kWhatStart, mReflector->id()); msg->setObject("meta", params); return postSynchronouslyAndReturnError(msg); } status_t MediaCodecSource::stop() { sp msg = new AMessage(kWhatStop, mReflector->id()); return postSynchronouslyAndReturnError(msg); } status_t MediaCodecSource::pause() { (new AMessage(kWhatPause, mReflector->id()))->post(); return OK; } sp MediaCodecSource::getGraphicBufferProducer() { CHECK(mFlags & FLAG_USE_SURFACE_INPUT); return mGraphicBufferProducer; } status_t MediaCodecSource::read( MediaBuffer** buffer, const ReadOptions* /* options */) { Mutex::Autolock autolock(mOutputBufferLock); *buffer = NULL; while (mOutputBufferQueue.size() == 0 && !mEncodedReachedEOS) { mOutputBufferCond.wait(mOutputBufferLock); } if (!mEncodedReachedEOS) { *buffer = *mOutputBufferQueue.begin(); mOutputBufferQueue.erase(mOutputBufferQueue.begin()); return OK; } return mErrorCode; } void MediaCodecSource::signalBufferReturned(MediaBuffer *buffer) { buffer->setObserver(0); buffer->release(); } MediaCodecSource::MediaCodecSource( const sp &looper, const sp &outputFormat, const sp &source, uint32_t flags) : mLooper(looper), mOutputFormat(outputFormat), mMeta(new MetaData), mFlags(flags), mIsVideo(false), mStarted(false), mStopping(false), mDoMoreWorkPending(false), mPullerReachedEOS(false), mFirstSampleTimeUs(-1ll), mEncodedReachedEOS(false), mErrorCode(OK) { CHECK(mLooper != NULL); AString mime; CHECK(mOutputFormat->findString("mime", &mime)); if (!strncasecmp("video/", mime.c_str(), 6)) { mIsVideo = true; } if (!(mFlags & FLAG_USE_SURFACE_INPUT)) { mPuller = new Puller(source); } } MediaCodecSource::~MediaCodecSource() { releaseEncoder(); mCodecLooper->stop(); mLooper->unregisterHandler(mReflector->id()); } status_t MediaCodecSource::init() { status_t err = initEncoder(); if (err != OK) { releaseEncoder(); } return err; } status_t MediaCodecSource::initEncoder() { mReflector = new AHandlerReflector(this); mLooper->registerHandler(mReflector); mCodecLooper = new ALooper; mCodecLooper->setName("codec_looper"); mCodecLooper->start(); if (mFlags & FLAG_USE_METADATA_INPUT) { mOutputFormat->setInt32("store-metadata-in-buffers", 1); } if (mFlags & FLAG_USE_SURFACE_INPUT) { mOutputFormat->setInt32("create-input-buffers-suspended", 1); } AString outputMIME; CHECK(mOutputFormat->findString("mime", &outputMIME)); mEncoder = MediaCodec::CreateByType( mCodecLooper, outputMIME.c_str(), true /* encoder */); if (mEncoder == NULL) { return NO_INIT; } ALOGV("output format is '%s'", mOutputFormat->debugString(0).c_str()); status_t err = mEncoder->configure( mOutputFormat, NULL /* nativeWindow */, NULL /* crypto */, MediaCodec::CONFIGURE_FLAG_ENCODE); if (err != OK) { return err; } mEncoder->getOutputFormat(&mOutputFormat); convertMessageToMetaData(mOutputFormat, mMeta); if (mFlags & FLAG_USE_SURFACE_INPUT) { CHECK(mIsVideo); err = mEncoder->createInputSurface(&mGraphicBufferProducer); if (err != OK) { return err; } } err = mEncoder->start(); if (err != OK) { return err; } err = mEncoder->getInputBuffers(&mEncoderInputBuffers); if (err != OK) { return err; } err = mEncoder->getOutputBuffers(&mEncoderOutputBuffers); if (err != OK) { return err; } mEncodedReachedEOS = false; mErrorCode = OK; return OK; } void MediaCodecSource::releaseEncoder() { if (mEncoder == NULL) { return; } mEncoder->release(); mEncoder.clear(); while (!mInputBufferQueue.empty()) { MediaBuffer *mbuf = *mInputBufferQueue.begin(); mInputBufferQueue.erase(mInputBufferQueue.begin()); if (mbuf != NULL) { mbuf->release(); } } for (size_t i = 0; i < mEncoderInputBuffers.size(); ++i) { sp accessUnit = mEncoderInputBuffers.itemAt(i); ReleaseMediaBufferReference(accessUnit); } mEncoderInputBuffers.clear(); mEncoderOutputBuffers.clear(); } bool MediaCodecSource::reachedEOS() { return mEncodedReachedEOS && ((mPuller == NULL) || mPullerReachedEOS); } status_t MediaCodecSource::postSynchronouslyAndReturnError( const sp &msg) { sp response; status_t err = msg->postAndAwaitResponse(&response); if (err != OK) { return err; } if (!response->findInt32("err", &err)) { err = OK; } return err; } void MediaCodecSource::signalEOS(status_t err) { if (!mEncodedReachedEOS) { ALOGI("encoder (%s) reached EOS", mIsVideo ? "video" : "audio"); { Mutex::Autolock autoLock(mOutputBufferLock); // release all unread media buffers for (List::iterator it = mOutputBufferQueue.begin(); it != mOutputBufferQueue.end(); it++) { (*it)->release(); } mOutputBufferQueue.clear(); mEncodedReachedEOS = true; mErrorCode = err; mOutputBufferCond.signal(); } releaseEncoder(); } if (mStopping && reachedEOS()) { ALOGI("MediaCodecSource (%s) fully stopped", mIsVideo ? "video" : "audio"); // posting reply to everyone that's waiting List::iterator it; for (it = mStopReplyIDQueue.begin(); it != mStopReplyIDQueue.end(); it++) { (new AMessage)->postReply(*it); } mStopReplyIDQueue.clear(); mStopping = false; } } void MediaCodecSource::suspend() { CHECK(mFlags & FLAG_USE_SURFACE_INPUT); if (mEncoder != NULL) { sp params = new AMessage; params->setInt32("drop-input-frames", true); mEncoder->setParameters(params); } } void MediaCodecSource::resume(int64_t skipFramesBeforeUs) { CHECK(mFlags & FLAG_USE_SURFACE_INPUT); if (mEncoder != NULL) { sp params = new AMessage; params->setInt32("drop-input-frames", false); if (skipFramesBeforeUs > 0) { params->setInt64("skip-frames-before", skipFramesBeforeUs); } mEncoder->setParameters(params); } } void MediaCodecSource::scheduleDoMoreWork() { if (mDoMoreWorkPending) { return; } mDoMoreWorkPending = true; if (mEncoderActivityNotify == NULL) { mEncoderActivityNotify = new AMessage( kWhatEncoderActivity, mReflector->id()); } mEncoder->requestActivityNotification(mEncoderActivityNotify); } status_t MediaCodecSource::feedEncoderInputBuffers() { while (!mInputBufferQueue.empty() && !mAvailEncoderInputIndices.empty()) { MediaBuffer* mbuf = *mInputBufferQueue.begin(); mInputBufferQueue.erase(mInputBufferQueue.begin()); size_t bufferIndex = *mAvailEncoderInputIndices.begin(); mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin()); int64_t timeUs = 0ll; uint32_t flags = 0; size_t size = 0; if (mbuf != NULL) { CHECK(mbuf->meta_data()->findInt64(kKeyTime, &timeUs)); // push decoding time for video, or drift time for audio if (mIsVideo) { mDecodingTimeQueue.push_back(timeUs); } else { #if DEBUG_DRIFT_TIME if (mFirstSampleTimeUs < 0ll) { mFirstSampleTimeUs = timeUs; } int64_t driftTimeUs = 0; if (mbuf->meta_data()->findInt64(kKeyDriftTime, &driftTimeUs) && driftTimeUs) { driftTimeUs = timeUs - mFirstSampleTimeUs - driftTimeUs; } mDriftTimeQueue.push_back(driftTimeUs); #endif // DEBUG_DRIFT_TIME } size = mbuf->size(); memcpy(mEncoderInputBuffers.itemAt(bufferIndex)->data(), mbuf->data(), size); if (mIsVideo) { // video encoder will release MediaBuffer when done // with underlying data. mEncoderInputBuffers.itemAt(bufferIndex)->meta() ->setPointer("mediaBuffer", mbuf); } else { mbuf->release(); } } else { flags = MediaCodec::BUFFER_FLAG_EOS; } status_t err = mEncoder->queueInputBuffer( bufferIndex, 0, size, timeUs, flags); if (err != OK) { return err; } } return OK; } status_t MediaCodecSource::doMoreWork() { status_t err; if (!(mFlags & FLAG_USE_SURFACE_INPUT)) { for (;;) { size_t bufferIndex; err = mEncoder->dequeueInputBuffer(&bufferIndex); if (err != OK) { break; } mAvailEncoderInputIndices.push_back(bufferIndex); } feedEncoderInputBuffers(); } for (;;) { size_t bufferIndex; size_t offset; size_t size; int64_t timeUs; uint32_t flags; native_handle_t* handle = NULL; err = mEncoder->dequeueOutputBuffer( &bufferIndex, &offset, &size, &timeUs, &flags); if (err != OK) { if (err == INFO_FORMAT_CHANGED) { continue; } else if (err == INFO_OUTPUT_BUFFERS_CHANGED) { mEncoder->getOutputBuffers(&mEncoderOutputBuffers); continue; } if (err == -EAGAIN) { err = OK; } break; } if (!(flags & MediaCodec::BUFFER_FLAG_EOS)) { sp outbuf = mEncoderOutputBuffers.itemAt(bufferIndex); MediaBuffer *mbuf = new MediaBuffer(outbuf->size()); memcpy(mbuf->data(), outbuf->data(), outbuf->size()); if (!(flags & MediaCodec::BUFFER_FLAG_CODECCONFIG)) { if (mIsVideo) { int64_t decodingTimeUs; if (mFlags & FLAG_USE_SURFACE_INPUT) { // GraphicBufferSource is supposed to discard samples // queued before start, and offset timeUs by start time CHECK_GE(timeUs, 0ll); // TODO: // Decoding time for surface source is unavailable, // use presentation time for now. May need to move // this logic into MediaCodec. decodingTimeUs = timeUs; } else { CHECK(!mDecodingTimeQueue.empty()); decodingTimeUs = *(mDecodingTimeQueue.begin()); mDecodingTimeQueue.erase(mDecodingTimeQueue.begin()); } mbuf->meta_data()->setInt64(kKeyDecodingTime, decodingTimeUs); ALOGV("[video] time %lld us (%.2f secs), dts/pts diff %lld", timeUs, timeUs / 1E6, decodingTimeUs - timeUs); } else { int64_t driftTimeUs = 0; #if DEBUG_DRIFT_TIME CHECK(!mDriftTimeQueue.empty()); driftTimeUs = *(mDriftTimeQueue.begin()); mDriftTimeQueue.erase(mDriftTimeQueue.begin()); mbuf->meta_data()->setInt64(kKeyDriftTime, driftTimeUs); #endif // DEBUG_DRIFT_TIME ALOGV("[audio] time %lld us (%.2f secs), drift %lld", timeUs, timeUs / 1E6, driftTimeUs); } mbuf->meta_data()->setInt64(kKeyTime, timeUs); } else { mbuf->meta_data()->setInt32(kKeyIsCodecConfig, true); } if (flags & MediaCodec::BUFFER_FLAG_SYNCFRAME) { mbuf->meta_data()->setInt32(kKeyIsSyncFrame, true); } mbuf->setObserver(this); mbuf->add_ref(); { Mutex::Autolock autoLock(mOutputBufferLock); mOutputBufferQueue.push_back(mbuf); mOutputBufferCond.signal(); } } mEncoder->releaseOutputBuffer(bufferIndex); if (flags & MediaCodec::BUFFER_FLAG_EOS) { err = ERROR_END_OF_STREAM; break; } } return err; } status_t MediaCodecSource::onStart(MetaData *params) { if (mStopping) { ALOGE("Failed to start while we're stopping"); return INVALID_OPERATION; } if (mStarted) { ALOGI("MediaCodecSource (%s) resuming", mIsVideo ? "video" : "audio"); if (mFlags & FLAG_USE_SURFACE_INPUT) { resume(); } else { CHECK(mPuller != NULL); mPuller->resume(); } return OK; } ALOGI("MediaCodecSource (%s) starting", mIsVideo ? "video" : "audio"); status_t err = OK; if (mFlags & FLAG_USE_SURFACE_INPUT) { int64_t startTimeUs; if (!params || !params->findInt64(kKeyTime, &startTimeUs)) { startTimeUs = -1ll; } resume(startTimeUs); scheduleDoMoreWork(); } else { CHECK(mPuller != NULL); sp notify = new AMessage( kWhatPullerNotify, mReflector->id()); err = mPuller->start(params, notify); if (err != OK) { mPullerReachedEOS = true; return err; } } ALOGI("MediaCodecSource (%s) started", mIsVideo ? "video" : "audio"); mStarted = true; return OK; } void MediaCodecSource::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatPullerNotify: { MediaBuffer *mbuf; CHECK(msg->findPointer("accessUnit", (void**)&mbuf)); if (mbuf == NULL) { ALOGI("puller (%s) reached EOS", mIsVideo ? "video" : "audio"); mPullerReachedEOS = true; } if (mEncoder == NULL) { ALOGV("got msg '%s' after encoder shutdown.", msg->debugString().c_str()); if (mbuf != NULL) { mbuf->release(); } else { signalEOS(); } break; } mInputBufferQueue.push_back(mbuf); feedEncoderInputBuffers(); scheduleDoMoreWork(); break; } case kWhatEncoderActivity: { mDoMoreWorkPending = false; if (mEncoder == NULL) { break; } status_t err = doMoreWork(); if (err == OK) { scheduleDoMoreWork(); } else { // reached EOS, or error signalEOS(err); } break; } case kWhatStart: { uint32_t replyID; CHECK(msg->senderAwaitsResponse(&replyID)); sp obj; CHECK(msg->findObject("meta", &obj)); MetaData *params = static_cast(obj.get()); sp response = new AMessage; response->setInt32("err", onStart(params)); response->postReply(replyID); break; } case kWhatStop: { ALOGI("MediaCodecSource (%s) stopping", mIsVideo ? "video" : "audio"); uint32_t replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (reachedEOS()) { // if we already reached EOS, reply and return now ALOGI("MediaCodecSource (%s) already stopped", mIsVideo ? "video" : "audio"); (new AMessage)->postReply(replyID); break; } mStopReplyIDQueue.push_back(replyID); if (mStopping) { // nothing to do if we're already stopping, reply will be posted // to all when we're stopped. break; } mStopping = true; // if using surface, signal source EOS and wait for EOS to come back. // otherwise, release encoder and post EOS if haven't done already if (mFlags & FLAG_USE_SURFACE_INPUT) { mEncoder->signalEndOfInputStream(); } else { CHECK(mPuller != NULL); mPuller->stopAsync(); signalEOS(); } break; } case kWhatPause: { if (mFlags && FLAG_USE_SURFACE_INPUT) { suspend(); } else { CHECK(mPuller != NULL); mPuller->pause(); } break; } default: TRESPASS(); } } } // namespace android