/* * Copyright 2013, The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NEBUG 0 #define LOG_TAG "rtptest" #include #include "rtp/RTPSender.h" #include "rtp/RTPReceiver.h" #include "TimeSyncer.h" #include #include #include #include #include #include #include #include #include #include #include #include #define MEDIA_FILENAME "/sdcard/Frame Counter HD 30FPS_1080p.mp4" namespace android { struct PacketSource : public RefBase { PacketSource() {} virtual sp getNextAccessUnit() = 0; protected: virtual ~PacketSource() {} private: DISALLOW_EVIL_CONSTRUCTORS(PacketSource); }; struct MediaPacketSource : public PacketSource { MediaPacketSource() : mMaxSampleSize(1024 * 1024) { mExtractor = new NuMediaExtractor; CHECK_EQ((status_t)OK, mExtractor->setDataSource(MEDIA_FILENAME)); bool haveVideo = false; for (size_t i = 0; i < mExtractor->countTracks(); ++i) { sp format; CHECK_EQ((status_t)OK, mExtractor->getTrackFormat(i, &format)); AString mime; CHECK(format->findString("mime", &mime)); if (!strcasecmp(MEDIA_MIMETYPE_VIDEO_AVC, mime.c_str())) { mExtractor->selectTrack(i); haveVideo = true; break; } } CHECK(haveVideo); } virtual sp getNextAccessUnit() { int64_t timeUs; status_t err = mExtractor->getSampleTime(&timeUs); if (err != OK) { return NULL; } sp accessUnit = new ABuffer(mMaxSampleSize); CHECK_EQ((status_t)OK, mExtractor->readSampleData(accessUnit)); accessUnit->meta()->setInt64("timeUs", timeUs); CHECK_EQ((status_t)OK, mExtractor->advance()); return accessUnit; } protected: virtual ~MediaPacketSource() { } private: sp mExtractor; size_t mMaxSampleSize; DISALLOW_EVIL_CONSTRUCTORS(MediaPacketSource); }; struct SimplePacketSource : public PacketSource { SimplePacketSource() : mCounter(0) { } virtual sp getNextAccessUnit() { sp buffer = new ABuffer(4); uint8_t *dst = buffer->data(); dst[0] = mCounter >> 24; dst[1] = (mCounter >> 16) & 0xff; dst[2] = (mCounter >> 8) & 0xff; dst[3] = mCounter & 0xff; buffer->meta()->setInt64("timeUs", mCounter * 1000000ll / kFrameRate); ++mCounter; return buffer; } protected: virtual ~SimplePacketSource() { } private: enum { kFrameRate = 30 }; uint32_t mCounter; DISALLOW_EVIL_CONSTRUCTORS(SimplePacketSource); }; struct TestHandler : public AHandler { TestHandler(const sp &netSession); void listen(); void connect(const char *host, int32_t port); protected: virtual ~TestHandler(); virtual void onMessageReceived(const sp &msg); private: enum { kWhatListen, kWhatConnect, kWhatReceiverNotify, kWhatSenderNotify, kWhatSendMore, kWhatStop, kWhatTimeSyncerNotify, }; #if 1 static const RTPBase::TransportMode kRTPMode = RTPBase::TRANSPORT_UDP; static const RTPBase::TransportMode kRTCPMode = RTPBase::TRANSPORT_UDP; #else static const RTPBase::TransportMode kRTPMode = RTPBase::TRANSPORT_TCP; static const RTPBase::TransportMode kRTCPMode = RTPBase::TRANSPORT_NONE; #endif #if 1 static const RTPBase::PacketizationMode kPacketizationMode = RTPBase::PACKETIZATION_H264; #else static const RTPBase::PacketizationMode kPacketizationMode = RTPBase::PACKETIZATION_NONE; #endif sp mNetSession; sp mSource; sp mSender; sp mReceiver; sp mTimeSyncer; bool mTimeSyncerStarted; int64_t mFirstTimeRealUs; int64_t mFirstTimeMediaUs; int64_t mTimeOffsetUs; bool mTimeOffsetValid; status_t readMore(); DISALLOW_EVIL_CONSTRUCTORS(TestHandler); }; TestHandler::TestHandler(const sp &netSession) : mNetSession(netSession), mTimeSyncerStarted(false), mFirstTimeRealUs(-1ll), mFirstTimeMediaUs(-1ll), mTimeOffsetUs(-1ll), mTimeOffsetValid(false) { } TestHandler::~TestHandler() { } void TestHandler::listen() { sp msg = new AMessage(kWhatListen, id()); msg->post(); } void TestHandler::connect(const char *host, int32_t port) { sp msg = new AMessage(kWhatConnect, id()); msg->setString("host", host); msg->setInt32("port", port); msg->post(); } static void dumpDelay(int64_t delayMs) { static const int64_t kMinDelayMs = 0; static const int64_t kMaxDelayMs = 300; const char *kPattern = "########################################"; size_t kPatternSize = strlen(kPattern); int n = (kPatternSize * (delayMs - kMinDelayMs)) / (kMaxDelayMs - kMinDelayMs); if (n < 0) { n = 0; } else if ((size_t)n > kPatternSize) { n = kPatternSize; } ALOGI("(%4lld ms) %s\n", delayMs, kPattern + kPatternSize - n); } void TestHandler::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatListen: { sp notify = new AMessage(kWhatTimeSyncerNotify, id()); mTimeSyncer = new TimeSyncer(mNetSession, notify); looper()->registerHandler(mTimeSyncer); notify = new AMessage(kWhatReceiverNotify, id()); mReceiver = new RTPReceiver( mNetSession, notify, RTPReceiver::FLAG_AUTO_CONNECT); looper()->registerHandler(mReceiver); CHECK_EQ((status_t)OK, mReceiver->registerPacketType(33, kPacketizationMode)); int32_t receiverRTPPort; CHECK_EQ((status_t)OK, mReceiver->initAsync( kRTPMode, kRTCPMode, &receiverRTPPort)); printf("picked receiverRTPPort %d\n", receiverRTPPort); #if 0 CHECK_EQ((status_t)OK, mReceiver->connect( "127.0.0.1", senderRTPPort, senderRTPPort + 1)); #endif break; } case kWhatConnect: { AString host; CHECK(msg->findString("host", &host)); sp notify = new AMessage(kWhatTimeSyncerNotify, id()); mTimeSyncer = new TimeSyncer(mNetSession, notify); looper()->registerHandler(mTimeSyncer); mTimeSyncer->startServer(8123); int32_t receiverRTPPort; CHECK(msg->findInt32("port", &receiverRTPPort)); #if 1 mSource = new MediaPacketSource; #else mSource = new SimplePacketSource; #endif notify = new AMessage(kWhatSenderNotify, id()); mSender = new RTPSender(mNetSession, notify); looper()->registerHandler(mSender); int32_t senderRTPPort; CHECK_EQ((status_t)OK, mSender->initAsync( host.c_str(), receiverRTPPort, kRTPMode, kRTCPMode == RTPBase::TRANSPORT_NONE ? -1 : receiverRTPPort + 1, kRTCPMode, &senderRTPPort)); printf("picked senderRTPPort %d\n", senderRTPPort); break; } case kWhatSenderNotify: { ALOGI("kWhatSenderNotify"); int32_t what; CHECK(msg->findInt32("what", &what)); switch (what) { case RTPSender::kWhatInitDone: { int32_t err; CHECK(msg->findInt32("err", &err)); ALOGI("RTPSender::initAsync completed w/ err %d", err); if (err == OK) { err = readMore(); if (err != OK) { (new AMessage(kWhatStop, id()))->post(); } } break; } case RTPSender::kWhatError: break; } break; } case kWhatReceiverNotify: { ALOGV("kWhatReceiverNotify"); int32_t what; CHECK(msg->findInt32("what", &what)); switch (what) { case RTPReceiver::kWhatInitDone: { int32_t err; CHECK(msg->findInt32("err", &err)); ALOGI("RTPReceiver::initAsync completed w/ err %d", err); break; } case RTPReceiver::kWhatError: break; case RTPReceiver::kWhatAccessUnit: { #if 0 if (!mTimeSyncerStarted) { mTimeSyncer->startClient("172.18.41.216", 8123); mTimeSyncerStarted = true; } sp accessUnit; CHECK(msg->findBuffer("accessUnit", &accessUnit)); int64_t timeUs; CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); if (mTimeOffsetValid) { timeUs -= mTimeOffsetUs; int64_t nowUs = ALooper::GetNowUs(); int64_t delayMs = (nowUs - timeUs) / 1000ll; dumpDelay(delayMs); } #endif break; } case RTPReceiver::kWhatPacketLost: ALOGV("kWhatPacketLost"); break; default: TRESPASS(); } break; } case kWhatSendMore: { sp accessUnit; CHECK(msg->findBuffer("accessUnit", &accessUnit)); CHECK_EQ((status_t)OK, mSender->queueBuffer( accessUnit, 33, kPacketizationMode)); status_t err = readMore(); if (err != OK) { (new AMessage(kWhatStop, id()))->post(); } break; } case kWhatStop: { if (mReceiver != NULL) { looper()->unregisterHandler(mReceiver->id()); mReceiver.clear(); } if (mSender != NULL) { looper()->unregisterHandler(mSender->id()); mSender.clear(); } mSource.clear(); looper()->stop(); break; } case kWhatTimeSyncerNotify: { CHECK(msg->findInt64("offset", &mTimeOffsetUs)); mTimeOffsetValid = true; break; } default: TRESPASS(); } } status_t TestHandler::readMore() { sp accessUnit = mSource->getNextAccessUnit(); if (accessUnit == NULL) { return ERROR_END_OF_STREAM; } int64_t timeUs; CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); int64_t nowUs = ALooper::GetNowUs(); int64_t whenUs; if (mFirstTimeRealUs < 0ll) { mFirstTimeRealUs = whenUs = nowUs; mFirstTimeMediaUs = timeUs; } else { whenUs = mFirstTimeRealUs + timeUs - mFirstTimeMediaUs; } accessUnit->meta()->setInt64("timeUs", whenUs); sp msg = new AMessage(kWhatSendMore, id()); msg->setBuffer("accessUnit", accessUnit); msg->post(whenUs - nowUs); return OK; } } // namespace android static void usage(const char *me) { fprintf(stderr, "usage: %s -c host:port\tconnect to remote host\n" " -l \tlisten\n", me); } int main(int argc, char **argv) { using namespace android; // srand(time(NULL)); ProcessState::self()->startThreadPool(); DataSource::RegisterDefaultSniffers(); bool listen = false; int32_t connectToPort = -1; AString connectToHost; int res; while ((res = getopt(argc, argv, "hc:l")) >= 0) { switch (res) { case 'c': { const char *colonPos = strrchr(optarg, ':'); if (colonPos == NULL) { usage(argv[0]); exit(1); } connectToHost.setTo(optarg, colonPos - optarg); char *end; connectToPort = strtol(colonPos + 1, &end, 10); if (*end != '\0' || end == colonPos + 1 || connectToPort < 1 || connectToPort > 65535) { fprintf(stderr, "Illegal port specified.\n"); exit(1); } break; } case 'l': { listen = true; break; } case '?': case 'h': usage(argv[0]); exit(1); } } if (!listen && connectToPort < 0) { fprintf(stderr, "You need to select either client or server mode.\n"); exit(1); } sp netSession = new ANetworkSession; netSession->start(); sp looper = new ALooper; sp handler = new TestHandler(netSession); looper->registerHandler(handler); if (listen) { handler->listen(); } if (connectToPort >= 0) { handler->connect(connectToHost.c_str(), connectToPort); } looper->start(true /* runOnCallingThread */); return 0; }