From a239dd722e760fe4fd7379b454d7722e1f312928 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Mon, 18 Mar 2013 15:11:40 -0700 Subject: Change ANetworkSession implementation to optionally attach timestamps to fragments of data to be transferred and to log statistics when data is finally submitted to the POSIX layer. Change-Id: Icbfcac203cdc5c9eac1634e84d34bb380b316a01 --- .../wifi-display/ANetworkSession.cpp | 150 +++++++++++++++------ .../libstagefright/wifi-display/ANetworkSession.h | 3 +- media/libstagefright/wifi-display/MediaSender.cpp | 4 + .../libstagefright/wifi-display/rtp/RTPSender.cpp | 21 ++- media/libstagefright/wifi-display/rtp/RTPSender.h | 4 +- media/libstagefright/wifi-display/wfd.cpp | 2 + 6 files changed, 135 insertions(+), 49 deletions(-) diff --git a/media/libstagefright/wifi-display/ANetworkSession.cpp b/media/libstagefright/wifi-display/ANetworkSession.cpp index 23bb04e..df20ae2 100644 --- a/media/libstagefright/wifi-display/ANetworkSession.cpp +++ b/media/libstagefright/wifi-display/ANetworkSession.cpp @@ -81,7 +81,8 @@ struct ANetworkSession::Session : public RefBase { status_t readMore(); status_t writeMore(); - status_t sendRequest(const void *data, ssize_t size); + status_t sendRequest( + const void *data, ssize_t size, bool timeValid, int64_t timeUs); void setIsRTSPConnection(bool yesno); @@ -89,6 +90,15 @@ protected: virtual ~Session(); private: + enum { + FRAGMENT_FLAG_TIME_VALID = 1, + }; + struct Fragment { + uint32_t mFlags; + int64_t mTimeUs; + sp mBuffer; + }; + int32_t mSessionID; State mState; bool mIsRTSPConnection; @@ -96,11 +106,7 @@ private: sp mNotify; bool mSawReceiveFailure, mSawSendFailure; - // for TCP / stream data - AString mOutBuffer; - - // for UDP / datagrams - List > mOutDatagrams; + List mOutFragments; AString mInBuffer; @@ -109,6 +115,8 @@ private: void notifyError(bool send, status_t err, const char *detail); void notify(NotificationReason reason); + void dumpFragmentStats(const Fragment &frag); + DISALLOW_EVIL_CONSTRUCTORS(Session); }; //////////////////////////////////////////////////////////////////////////////// @@ -221,8 +229,8 @@ bool ANetworkSession::Session::wantsToRead() { bool ANetworkSession::Session::wantsToWrite() { return !mSawSendFailure && (mState == CONNECTING - || (mState == CONNECTED && !mOutBuffer.empty()) - || (mState == DATAGRAM && !mOutDatagrams.empty())); + || (mState == CONNECTED && !mOutFragments.empty()) + || (mState == DATAGRAM && !mOutFragments.empty())); } status_t ANetworkSession::Session::readMore() { @@ -407,13 +415,41 @@ status_t ANetworkSession::Session::readMore() { return err; } +void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) { +#if 0 + int64_t nowUs = ALooper::GetNowUs(); + int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll; + + static const int64_t kMinDelayMs = 0; + static const int64_t kMaxDelayMs = 300; + + const char *kPattern = "########################################"; + size_t kPatternSize = strlen(kPattern); + + int n = (kPatternSize * (delayMs - kMinDelayMs)) + / (kMaxDelayMs - kMinDelayMs); + + if (n < 0) { + n = 0; + } else if ((size_t)n > kPatternSize) { + n = kPatternSize; + } + + ALOGI("[%lld]: (%4lld ms) %s\n", + frag.mTimeUs / 1000, + delayMs, + kPattern + kPatternSize - n); +#endif +} + status_t ANetworkSession::Session::writeMore() { if (mState == DATAGRAM) { - CHECK(!mOutDatagrams.empty()); + CHECK(!mOutFragments.empty()); status_t err; do { - const sp &datagram = *mOutDatagrams.begin(); + const Fragment &frag = *mOutFragments.begin(); + const sp &datagram = frag.mBuffer; uint8_t *data = datagram->data(); if (data[0] == 0x80 && (data[1] & 0x7f) == 33) { @@ -441,17 +477,21 @@ status_t ANetworkSession::Session::writeMore() { err = OK; if (n > 0) { - mOutDatagrams.erase(mOutDatagrams.begin()); + if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { + dumpFragmentStats(frag); + } + + mOutFragments.erase(mOutFragments.begin()); } else if (n < 0) { err = -errno; } else if (n == 0) { err = -ECONNRESET; } - } while (err == OK && !mOutDatagrams.empty()); + } while (err == OK && !mOutFragments.empty()); if (err == -EAGAIN) { - if (!mOutDatagrams.empty()) { - ALOGI("%d datagrams remain queued.", mOutDatagrams.size()); + if (!mOutFragments.empty()) { + ALOGI("%d datagrams remain queued.", mOutFragments.size()); } err = OK; } @@ -484,23 +524,37 @@ status_t ANetworkSession::Session::writeMore() { } CHECK_EQ(mState, CONNECTED); - CHECK(!mOutBuffer.empty()); + CHECK(!mOutFragments.empty()); ssize_t n; - do { - n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0); - } while (n < 0 && errno == EINTR); + while (!mOutFragments.empty()) { + const Fragment &frag = *mOutFragments.begin(); - status_t err = OK; + do { + n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0); + } while (n < 0 && errno == EINTR); - if (n > 0) { -#if 0 - ALOGI("out:"); - hexdump(mOutBuffer.c_str(), n); -#endif + if (n <= 0) { + break; + } - mOutBuffer.erase(0, n); - } else if (n < 0) { + frag.mBuffer->setRange( + frag.mBuffer->offset() + n, frag.mBuffer->size() - n); + + if (frag.mBuffer->size() > 0) { + break; + } + + if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { + dumpFragmentStats(frag); + } + + mOutFragments.erase(mOutFragments.begin()); + } + + status_t err = OK; + + if (n < 0) { err = -errno; } else if (n == 0) { err = -ECONNRESET; @@ -537,32 +591,43 @@ status_t ANetworkSession::Session::writeMore() { return err; } -status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) { +status_t ANetworkSession::Session::sendRequest( + const void *data, ssize_t size, bool timeValid, int64_t timeUs) { CHECK(mState == CONNECTED || mState == DATAGRAM); - if (mState == DATAGRAM) { - CHECK_GE(size, 0); - - sp datagram = new ABuffer(size); - memcpy(datagram->data(), data, size); + if (size < 0) { + size = strlen((const char *)data); + } - mOutDatagrams.push_back(datagram); + if (size == 0) { return OK; } + sp buffer; + if (mState == CONNECTED && !mIsRTSPConnection) { CHECK_LE(size, 65535); - uint8_t prefix[2]; - prefix[0] = size >> 8; - prefix[1] = size & 0xff; + buffer = new ABuffer(size + 2); + buffer->data()[0] = size >> 8; + buffer->data()[1] = size & 0xff; + memcpy(buffer->data() + 2, data, size); + } else { + buffer = new ABuffer(size); + memcpy(buffer->data(), data, size); + } + + Fragment frag; - mOutBuffer.append((const char *)prefix, sizeof(prefix)); + frag.mFlags = 0; + if (timeValid) { + frag.mFlags = FRAGMENT_FLAG_TIME_VALID; + frag.mTimeUs = timeUs; } - mOutBuffer.append( - (const char *)data, - (size >= 0) ? size : strlen((const char *)data)); + frag.mBuffer = buffer; + + mOutFragments.push_back(frag); return OK; } @@ -985,7 +1050,8 @@ status_t ANetworkSession::connectUDPSession( } status_t ANetworkSession::sendRequest( - int32_t sessionID, const void *data, ssize_t size) { + int32_t sessionID, const void *data, ssize_t size, + bool timeValid, int64_t timeUs) { Mutex::Autolock autoLock(mLock); ssize_t index = mSessions.indexOfKey(sessionID); @@ -996,7 +1062,7 @@ status_t ANetworkSession::sendRequest( const sp session = mSessions.valueAt(index); - status_t err = session->sendRequest(data, size); + status_t err = session->sendRequest(data, size, timeValid, timeUs); interrupt(); diff --git a/media/libstagefright/wifi-display/ANetworkSession.h b/media/libstagefright/wifi-display/ANetworkSession.h index 0d7cbd6..7c62b29 100644 --- a/media/libstagefright/wifi-display/ANetworkSession.h +++ b/media/libstagefright/wifi-display/ANetworkSession.h @@ -74,7 +74,8 @@ struct ANetworkSession : public RefBase { status_t destroySession(int32_t sessionID); status_t sendRequest( - int32_t sessionID, const void *data, ssize_t size = -1); + int32_t sessionID, const void *data, ssize_t size = -1, + bool timeValid = false, int64_t timeUs = -1ll); enum NotificationReason { kWhatError, diff --git a/media/libstagefright/wifi-display/MediaSender.cpp b/media/libstagefright/wifi-display/MediaSender.cpp index a41f81b..d13a92e 100644 --- a/media/libstagefright/wifi-display/MediaSender.cpp +++ b/media/libstagefright/wifi-display/MediaSender.cpp @@ -252,6 +252,10 @@ status_t MediaSender::queueAccessUnit( fwrite(tsPackets->data(), 1, tsPackets->size(), mLogFile); } + int64_t timeUs; + CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + tsPackets->meta()->setInt64("timeUs", timeUs); + err = mTSSender->queueBuffer( tsPackets, 33 /* packetType */, diff --git a/media/libstagefright/wifi-display/rtp/RTPSender.cpp b/media/libstagefright/wifi-display/rtp/RTPSender.cpp index 8cd712d..c8e265c 100644 --- a/media/libstagefright/wifi-display/rtp/RTPSender.cpp +++ b/media/libstagefright/wifi-display/rtp/RTPSender.cpp @@ -194,6 +194,9 @@ status_t RTPSender::queueTSPackets( const sp &tsPackets, uint8_t packetType) { CHECK_EQ(0, tsPackets->size() % 188); + int64_t timeUs; + CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs)); + const size_t numTSPackets = tsPackets->size() / 188; size_t srcOffset = 0; @@ -232,13 +235,19 @@ status_t RTPSender::queueTSPackets( memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188); udpPacket->setRange(0, 12 + numTSPackets * 188); - status_t err = sendRTPPacket(udpPacket, true /* storeInHistory */); + + srcOffset += numTSPackets * 188; + bool isLastPacket = (srcOffset == tsPackets->size()); + + status_t err = sendRTPPacket( + udpPacket, + true /* storeInHistory */, + isLastPacket /* timeValid */, + timeUs); if (err != OK) { return err; } - - srcOffset += numTSPackets * 188; } return OK; @@ -395,11 +404,13 @@ status_t RTPSender::queueAVCBuffer( } status_t RTPSender::sendRTPPacket( - const sp &buffer, bool storeInHistory) { + const sp &buffer, bool storeInHistory, + bool timeValid, int64_t timeUs) { CHECK(mRTPConnected); status_t err = mNetSession->sendRequest( - mRTPSessionID, buffer->data(), buffer->size()); + mRTPSessionID, buffer->data(), buffer->size(), + timeValid, timeUs); if (err != OK) { return err; diff --git a/media/libstagefright/wifi-display/rtp/RTPSender.h b/media/libstagefright/wifi-display/rtp/RTPSender.h index 83c6223..90b1796 100644 --- a/media/libstagefright/wifi-display/rtp/RTPSender.h +++ b/media/libstagefright/wifi-display/rtp/RTPSender.h @@ -94,7 +94,9 @@ private: status_t queueTSPackets(const sp &tsPackets, uint8_t packetType); status_t queueAVCBuffer(const sp &accessUnit, uint8_t packetType); - status_t sendRTPPacket(const sp &packet, bool storeInHistory); + status_t sendRTPPacket( + const sp &packet, bool storeInHistory, + bool timeValid = false, int64_t timeUs = -1ll); void onNetNotify(bool isRTP, const sp &msg); diff --git a/media/libstagefright/wifi-display/wfd.cpp b/media/libstagefright/wifi-display/wfd.cpp index 3a7a6e2..4f7dcc8 100644 --- a/media/libstagefright/wifi-display/wfd.cpp +++ b/media/libstagefright/wifi-display/wfd.cpp @@ -200,6 +200,8 @@ static void createFileSource( CHECK_EQ((status_t)OK, source->start(iface.c_str())); client->waitUntilDone(); + + source->stop(); } } // namespace android -- cgit v1.1