diff options
author | Andreas Huber <andih@google.com> | 2012-08-28 09:47:45 -0700 |
---|---|---|
committer | Andreas Huber <andih@google.com> | 2012-08-28 09:52:36 -0700 |
commit | bb197f84c4119651e5face418285688ddaf08ea3 (patch) | |
tree | ba20bed635c17735d57a62ce20173e02db1392e6 | |
parent | 690921927f289da73556ea71c28981194af8ffcd (diff) | |
download | frameworks_av-bb197f84c4119651e5face418285688ddaf08ea3.zip frameworks_av-bb197f84c4119651e5face418285688ddaf08ea3.tar.gz frameworks_av-bb197f84c4119651e5face418285688ddaf08ea3.tar.bz2 |
Test to measure UDP roundtrip time between two devices on the same network.
Change-Id: I4c9a5190efe18da8b6be7d68bda91df878c4118c
-rw-r--r-- | media/libstagefright/wifi-display/ANetworkSession.cpp | 994 | ||||
-rw-r--r-- | media/libstagefright/wifi-display/ANetworkSession.h | 109 | ||||
-rw-r--r-- | media/libstagefright/wifi-display/Android.mk | 22 | ||||
-rw-r--r-- | media/libstagefright/wifi-display/ParsedMessage.cpp | 284 | ||||
-rw-r--r-- | media/libstagefright/wifi-display/ParsedMessage.h | 58 | ||||
-rw-r--r-- | media/libstagefright/wifi-display/udptest.cpp | 355 |
6 files changed, 1822 insertions, 0 deletions
diff --git a/media/libstagefright/wifi-display/ANetworkSession.cpp b/media/libstagefright/wifi-display/ANetworkSession.cpp new file mode 100644 index 0000000..ee0600c --- /dev/null +++ b/media/libstagefright/wifi-display/ANetworkSession.cpp @@ -0,0 +1,994 @@ +/* + * Copyright 2012, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "NetworkSession" +#include <utils/Log.h> + +#include "ANetworkSession.h" +#include "ParsedMessage.h" + +#include <arpa/inet.h> +#include <fcntl.h> +#include <net/if.h> +#include <netdb.h> +#include <netinet/in.h> +#include <sys/socket.h> + +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/Utils.h> + +namespace android { + +static const size_t kMaxUDPSize = 1500; + +struct ANetworkSession::NetworkThread : public Thread { + NetworkThread(ANetworkSession *session); + +protected: + virtual ~NetworkThread(); + +private: + ANetworkSession *mSession; + + virtual bool threadLoop(); + + DISALLOW_EVIL_CONSTRUCTORS(NetworkThread); +}; + +struct ANetworkSession::Session : public RefBase { + enum State { + CONNECTING, + CONNECTED, + LISTENING, + DATAGRAM, + }; + + Session(int32_t sessionID, + State state, + int s, + const sp<AMessage> ¬ify); + + int32_t sessionID() const; + int socket() const; + sp<AMessage> getNotificationMessage() const; + + bool isListening() const; + + bool wantsToRead(); + bool wantsToWrite(); + + status_t readMore(); + status_t writeMore(); + + status_t sendRequest(const void *data, ssize_t size); + +protected: + virtual ~Session(); + +private: + int32_t mSessionID; + State mState; + int mSocket; + sp<AMessage> mNotify; + bool mSawReceiveFailure, mSawSendFailure; + + AString mOutBuffer; + List<size_t> mOutBufferSizes; + + AString mInBuffer; + + void notifyError(bool send, status_t err, const char *detail); + void notify(NotificationReason reason); + + DISALLOW_EVIL_CONSTRUCTORS(Session); +}; +//////////////////////////////////////////////////////////////////////////////// + +ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session) + : mSession(session) { +} + +ANetworkSession::NetworkThread::~NetworkThread() { +} + +bool ANetworkSession::NetworkThread::threadLoop() { + mSession->threadLoop(); + + return true; +} + +//////////////////////////////////////////////////////////////////////////////// + +ANetworkSession::Session::Session( + int32_t sessionID, + State state, + int s, + const sp<AMessage> ¬ify) + : mSessionID(sessionID), + mState(state), + mSocket(s), + mNotify(notify), + mSawReceiveFailure(false), + mSawSendFailure(false) { + if (mState == CONNECTED) { + struct sockaddr_in localAddr; + socklen_t localAddrLen = sizeof(localAddr); + + int res = getsockname( + mSocket, (struct sockaddr *)&localAddr, &localAddrLen); + CHECK_GE(res, 0); + + struct sockaddr_in remoteAddr; + socklen_t remoteAddrLen = sizeof(remoteAddr); + + res = getpeername( + mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen); + CHECK_GE(res, 0); + + in_addr_t addr = ntohl(localAddr.sin_addr.s_addr); + AString localAddrString = StringPrintf( + "%d.%d.%d.%d", + (addr >> 24), + (addr >> 16) & 0xff, + (addr >> 8) & 0xff, + addr & 0xff); + + addr = ntohl(remoteAddr.sin_addr.s_addr); + AString remoteAddrString = StringPrintf( + "%d.%d.%d.%d", + (addr >> 24), + (addr >> 16) & 0xff, + (addr >> 8) & 0xff, + addr & 0xff); + + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("sessionID", mSessionID); + msg->setInt32("reason", kWhatClientConnected); + msg->setString("server-ip", localAddrString.c_str()); + msg->setInt32("server-port", ntohs(localAddr.sin_port)); + msg->setString("client-ip", remoteAddrString.c_str()); + msg->setInt32("client-port", ntohs(remoteAddr.sin_port)); + msg->post(); + } +} + +ANetworkSession::Session::~Session() { + ALOGI("Session %d gone", mSessionID); + + close(mSocket); + mSocket = -1; +} + +int32_t ANetworkSession::Session::sessionID() const { + return mSessionID; +} + +int ANetworkSession::Session::socket() const { + return mSocket; +} + +sp<AMessage> ANetworkSession::Session::getNotificationMessage() const { + return mNotify; +} + +bool ANetworkSession::Session::isListening() const { + return mState == LISTENING; +} + +bool ANetworkSession::Session::wantsToRead() { + return !mSawReceiveFailure && mState != CONNECTING; +} + +bool ANetworkSession::Session::wantsToWrite() { + return !mSawSendFailure + && (mState == CONNECTING + || ((mState == CONNECTED || mState == DATAGRAM) + && !mOutBuffer.empty())); +} + +status_t ANetworkSession::Session::readMore() { + if (mState == DATAGRAM) { + status_t err; + do { + sp<ABuffer> buf = new ABuffer(kMaxUDPSize); + + struct sockaddr_in remoteAddr; + socklen_t remoteAddrLen = sizeof(remoteAddr); + + ssize_t n; + do { + n = recvfrom( + mSocket, buf->data(), buf->capacity(), 0, + (struct sockaddr *)&remoteAddr, &remoteAddrLen); + } while (n < 0 && errno == EINTR); + + err = OK; + if (n < 0) { + err = -errno; + } else if (n == 0) { + err = -ECONNRESET; + } else { + buf->setRange(0, n); + + int64_t nowUs = ALooper::GetNowUs(); + buf->meta()->setInt64("arrivalTimeUs", nowUs); + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("sessionID", mSessionID); + notify->setInt32("reason", kWhatDatagram); + + uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr); + notify->setString( + "fromAddr", + StringPrintf( + "%u.%u.%u.%u", + ip >> 24, + (ip >> 16) & 0xff, + (ip >> 8) & 0xff, + ip & 0xff).c_str()); + + notify->setInt32("fromPort", ntohs(remoteAddr.sin_port)); + + notify->setBuffer("data", buf); + notify->post(); + } + } while (err == OK); + + if (err == -EAGAIN) { + err = OK; + } + + if (err != OK) { + notifyError(false /* send */, err, "Recvfrom failed."); + mSawReceiveFailure = true; + } + + return err; + } + + char tmp[512]; + ssize_t n; + do { + n = recv(mSocket, tmp, sizeof(tmp), 0); + } while (n < 0 && errno == EINTR); + + status_t err = OK; + + if (n > 0) { + mInBuffer.append(tmp, n); + +#if 0 + ALOGI("in:"); + hexdump(tmp, n); +#endif + } else if (n < 0) { + err = -errno; + } else { + err = -ECONNRESET; + } + + for (;;) { + size_t length; + + if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') { + if (mInBuffer.size() < 4) { + break; + } + + length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2); + + if (mInBuffer.size() < 4 + length) { + break; + } + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("sessionID", mSessionID); + notify->setInt32("reason", kWhatBinaryData); + notify->setInt32("channel", mInBuffer.c_str()[1]); + + sp<ABuffer> data = new ABuffer(length); + memcpy(data->data(), mInBuffer.c_str() + 4, length); + + int64_t nowUs = ALooper::GetNowUs(); + data->meta()->setInt64("arrivalTimeUs", nowUs); + + notify->setBuffer("data", data); + notify->post(); + + mInBuffer.erase(0, 4 + length); + continue; + } + + sp<ParsedMessage> msg = + ParsedMessage::Parse( + mInBuffer.c_str(), mInBuffer.size(), err != OK, &length); + + if (msg == NULL) { + break; + } + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("sessionID", mSessionID); + notify->setInt32("reason", kWhatData); + notify->setObject("data", msg); + notify->post(); + +#if 1 + // XXX The dongle sends the wrong content length header on a + // SET_PARAMETER request that signals a "wfd_idr_request". + // (17 instead of 19). + const char *content = msg->getContent(); + if (content && !memcmp(content, "wfd_idr_request\r\n", 17)) { + length += 2; + } +#endif + + mInBuffer.erase(0, length); + + if (err != OK) { + break; + } + } + + if (err != OK) { + notifyError(false /* send */, err, "Recv failed."); + mSawReceiveFailure = true; + } + + return err; +} + +status_t ANetworkSession::Session::writeMore() { + if (mState == DATAGRAM) { + CHECK(!mOutBufferSizes.empty()); + + status_t err; + do { + size_t size = *mOutBufferSizes.begin(); + + CHECK_GE(mOutBuffer.size(), size); + + int n; + do { + n = send(mSocket, mOutBuffer.c_str(), size, 0); + } while (n < 0 && errno == EINTR); + + err = OK; + + if (n > 0) { + mOutBufferSizes.erase(mOutBufferSizes.begin()); + mOutBuffer.erase(0, n); + } else if (n < 0) { + err = -errno; + } else if (n == 0) { + err = -ECONNRESET; + } + } while (err == OK && !mOutBufferSizes.empty()); + + if (err == -EAGAIN) { + err = OK; + } + + if (err != OK) { + notifyError(true /* send */, err, "Send datagram failed."); + mSawSendFailure = true; + } + + return err; + } + + if (mState == CONNECTING) { + int err; + socklen_t optionLen = sizeof(err); + CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); + CHECK_EQ(optionLen, (socklen_t)sizeof(err)); + + if (err != 0) { + notifyError(kWhatError, -err, "Connection failed"); + mSawSendFailure = true; + + return UNKNOWN_ERROR; + } + + mState = CONNECTED; + notify(kWhatConnected); + + return OK; + } + + CHECK_EQ(mState, CONNECTED); + CHECK(!mOutBuffer.empty()); + + ssize_t n; + do { + n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0); + } while (n < 0 && errno == EINTR); + + status_t err = OK; + + if (n > 0) { + ALOGI("out:"); + hexdump(mOutBuffer.c_str(), n); + + mOutBuffer.erase(0, n); + } else if (n < 0) { + err = -errno; + } else if (n == 0) { + err = -ECONNRESET; + } + + if (err != OK) { + notifyError(true /* send */, err, "Send failed."); + mSawSendFailure = true; + } + + return err; +} + +status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) { + CHECK(mState == CONNECTED || mState == DATAGRAM); + + mOutBuffer.append( + (const char *)data, + (size >= 0) ? size : strlen((const char *)data)); + + if (mState == DATAGRAM) { + CHECK_GE(size, 0); + mOutBufferSizes.push_back(size); + } + + return OK; +} + +void ANetworkSession::Session::notifyError( + bool send, status_t err, const char *detail) { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("sessionID", mSessionID); + msg->setInt32("reason", kWhatError); + msg->setInt32("send", send); + msg->setInt32("err", err); + msg->setString("detail", detail); + msg->post(); +} + +void ANetworkSession::Session::notify(NotificationReason reason) { + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("sessionID", mSessionID); + msg->setInt32("reason", reason); + msg->post(); +} + +//////////////////////////////////////////////////////////////////////////////// + +ANetworkSession::ANetworkSession() + : mNextSessionID(1) { + mPipeFd[0] = mPipeFd[1] = -1; +} + +ANetworkSession::~ANetworkSession() { + stop(); +} + +status_t ANetworkSession::start() { + if (mThread != NULL) { + return INVALID_OPERATION; + } + + int res = pipe(mPipeFd); + if (res != 0) { + mPipeFd[0] = mPipeFd[1] = -1; + return -errno; + } + + mThread = new NetworkThread(this); + + status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO); + + if (err != OK) { + mThread.clear(); + + close(mPipeFd[0]); + close(mPipeFd[1]); + mPipeFd[0] = mPipeFd[1] = -1; + + return err; + } + + return OK; +} + +status_t ANetworkSession::stop() { + if (mThread == NULL) { + return INVALID_OPERATION; + } + + mThread->requestExit(); + interrupt(); + mThread->requestExitAndWait(); + + mThread.clear(); + + close(mPipeFd[0]); + close(mPipeFd[1]); + mPipeFd[0] = mPipeFd[1] = -1; + + return OK; +} + +status_t ANetworkSession::createRTSPClient( + const char *host, unsigned port, const sp<AMessage> ¬ify, + int32_t *sessionID) { + return createClientOrServer( + kModeCreateRTSPClient, + 0 /* port */, + host, + port, + notify, + sessionID); +} + +status_t ANetworkSession::createRTSPServer( + unsigned port, const sp<AMessage> ¬ify, int32_t *sessionID) { + return createClientOrServer( + kModeCreateRTSPServer, + port, + NULL /* remoteHost */, + 0 /* remotePort */, + notify, + sessionID); +} + +status_t ANetworkSession::createUDPSession( + unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID) { + return createUDPSession(localPort, NULL, 0, notify, sessionID); +} + +status_t ANetworkSession::createUDPSession( + unsigned localPort, + const char *remoteHost, + unsigned remotePort, + const sp<AMessage> ¬ify, + int32_t *sessionID) { + return createClientOrServer( + kModeCreateUDPSession, + localPort, + remoteHost, + remotePort, + notify, + sessionID); +} + +status_t ANetworkSession::destroySession(int32_t sessionID) { + Mutex::Autolock autoLock(mLock); + + ssize_t index = mSessions.indexOfKey(sessionID); + + if (index < 0) { + return -ENOENT; + } + + mSessions.removeItemsAt(index); + + interrupt(); + + return OK; +} + +// static +status_t ANetworkSession::MakeSocketNonBlocking(int s) { + int flags = fcntl(s, F_GETFL, 0); + if (flags < 0) { + flags = 0; + } + + int res = fcntl(s, F_SETFL, flags | O_NONBLOCK); + if (res < 0) { + return -errno; + } + + return OK; +} + +status_t ANetworkSession::createClientOrServer( + Mode mode, + unsigned port, + const char *remoteHost, + unsigned remotePort, + const sp<AMessage> ¬ify, + int32_t *sessionID) { + Mutex::Autolock autoLock(mLock); + + *sessionID = 0; + status_t err = OK; + int s, res; + sp<Session> session; + + s = socket( + AF_INET, + (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM, + 0); + + if (s < 0) { + err = -errno; + goto bail; + } + + if (mode == kModeCreateRTSPServer) { + const int yes = 1; + res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + + if (res < 0) { + err = -errno; + goto bail2; + } + } + + if (mode == kModeCreateUDPSession) { + int size = 256 * 1024; + + res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); + + if (res < 0) { + err = -errno; + goto bail2; + } + + res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); + + if (res < 0) { + err = -errno; + goto bail2; + } + } + + err = MakeSocketNonBlocking(s); + + if (err != OK) { + goto bail2; + } + + struct sockaddr_in addr; + memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); + addr.sin_family = AF_INET; + + if (mode == kModeCreateRTSPClient) { + struct hostent *ent= gethostbyname(remoteHost); + if (ent == NULL) { + err = -h_errno; + goto bail2; + } + + addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; + addr.sin_port = htons(remotePort); + } else { + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + } + + if (mode == kModeCreateRTSPClient) { + res = connect(s, (const struct sockaddr *)&addr, sizeof(addr)); + + CHECK_LT(res, 0); + if (errno == EINPROGRESS) { + res = 0; + } + } else { + res = bind(s, (const struct sockaddr *)&addr, sizeof(addr)); + + if (res == 0) { + if (mode == kModeCreateRTSPServer) { + res = listen(s, 4); + } else { + CHECK_EQ(mode, kModeCreateUDPSession); + + if (remoteHost != NULL) { + struct sockaddr_in remoteAddr; + memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(remotePort); + + struct hostent *ent= gethostbyname(remoteHost); + if (ent == NULL) { + err = -h_errno; + goto bail2; + } + + remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; + + res = connect( + s, + (const struct sockaddr *)&remoteAddr, + sizeof(remoteAddr)); + } + } + } + } + + if (res < 0) { + err = -errno; + goto bail2; + } + + Session::State state; + switch (mode) { + case kModeCreateRTSPClient: + state = Session::CONNECTING; + break; + + case kModeCreateRTSPServer: + state = Session::LISTENING; + break; + + default: + CHECK_EQ(mode, kModeCreateUDPSession); + state = Session::DATAGRAM; + break; + } + + session = new Session( + mNextSessionID++, + state, + s, + notify); + + mSessions.add(session->sessionID(), session); + + interrupt(); + + *sessionID = session->sessionID(); + + goto bail; + +bail2: + close(s); + s = -1; + +bail: + return err; +} + +status_t ANetworkSession::connectUDPSession( + int32_t sessionID, const char *remoteHost, unsigned remotePort) { + Mutex::Autolock autoLock(mLock); + + ssize_t index = mSessions.indexOfKey(sessionID); + + if (index < 0) { + return -ENOENT; + } + + const sp<Session> session = mSessions.valueAt(index); + int s = session->socket(); + + struct sockaddr_in remoteAddr; + memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(remotePort); + + status_t err = OK; + struct hostent *ent= gethostbyname(remoteHost); + if (ent == NULL) { + err = -h_errno; + } else { + remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; + + int res = connect( + s, + (const struct sockaddr *)&remoteAddr, + sizeof(remoteAddr)); + + if (res < 0) { + err = -errno; + } + } + + return err; +} + +status_t ANetworkSession::sendRequest( + int32_t sessionID, const void *data, ssize_t size) { + Mutex::Autolock autoLock(mLock); + + ssize_t index = mSessions.indexOfKey(sessionID); + + if (index < 0) { + return -ENOENT; + } + + const sp<Session> session = mSessions.valueAt(index); + + status_t err = session->sendRequest(data, size); + + interrupt(); + + return err; +} + +void ANetworkSession::interrupt() { + static const char dummy = 0; + + ssize_t n; + do { + n = write(mPipeFd[1], &dummy, 1); + } while (n < 0 && errno == EINTR); + + if (n < 0) { + ALOGW("Error writing to pipe (%s)", strerror(errno)); + } +} + +void ANetworkSession::threadLoop() { + fd_set rs, ws; + FD_ZERO(&rs); + FD_ZERO(&ws); + + FD_SET(mPipeFd[0], &rs); + int maxFd = mPipeFd[0]; + + { + Mutex::Autolock autoLock(mLock); + + for (size_t i = 0; i < mSessions.size(); ++i) { + const sp<Session> &session = mSessions.valueAt(i); + + int s = session->socket(); + + if (s < 0) { + continue; + } + + if (session->wantsToRead()) { + FD_SET(s, &rs); + if (s > maxFd) { + maxFd = s; + } + } + + if (session->wantsToWrite()) { + FD_SET(s, &ws); + if (s > maxFd) { + maxFd = s; + } + } + } + } + + int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */); + + if (res == 0) { + return; + } + + if (res < 0) { + if (errno == EINTR) { + return; + } + + ALOGE("select failed w/ error %d (%s)", errno, strerror(errno)); + return; + } + + if (FD_ISSET(mPipeFd[0], &rs)) { + char c; + ssize_t n; + do { + n = read(mPipeFd[0], &c, 1); + } while (n < 0 && errno == EINTR); + + if (n < 0) { + ALOGW("Error reading from pipe (%s)", strerror(errno)); + } + + --res; + } + + { + Mutex::Autolock autoLock(mLock); + + List<sp<Session> > sessionsToAdd; + + for (size_t i = mSessions.size(); res > 0 && i-- > 0;) { + const sp<Session> &session = mSessions.valueAt(i); + + int s = session->socket(); + + if (s < 0) { + continue; + } + + if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) { + --res; + } + + if (FD_ISSET(s, &rs)) { + if (session->isListening()) { + struct sockaddr_in remoteAddr; + socklen_t remoteAddrLen = sizeof(remoteAddr); + + int clientSocket = accept( + s, (struct sockaddr *)&remoteAddr, &remoteAddrLen); + + if (clientSocket >= 0) { + status_t err = MakeSocketNonBlocking(clientSocket); + + if (err != OK) { + ALOGE("Unable to make client socket non blocking, " + "failed w/ error %d (%s)", + err, strerror(-err)); + + close(clientSocket); + clientSocket = -1; + } else { + in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr); + + ALOGI("incoming connection from %d.%d.%d.%d:%d " + "(socket %d)", + (addr >> 24), + (addr >> 16) & 0xff, + (addr >> 8) & 0xff, + addr & 0xff, + ntohs(remoteAddr.sin_port), + clientSocket); + + sp<Session> clientSession = + // using socket sd as sessionID + new Session( + mNextSessionID++, + Session::CONNECTED, + clientSocket, + session->getNotificationMessage()); + + sessionsToAdd.push_back(clientSession); + } + } else { + ALOGE("accept returned error %d (%s)", + errno, strerror(errno)); + } + } else { + status_t err = session->readMore(); + if (err != OK) { + ALOGI("readMore on socket %d failed w/ error %d (%s)", + s, err, strerror(-err)); + } + } + } + + if (FD_ISSET(s, &ws)) { + status_t err = session->writeMore(); + if (err != OK) { + ALOGI("writeMore on socket %d failed w/ error %d (%s)", + s, err, strerror(-err)); + } + } + } + + while (!sessionsToAdd.empty()) { + sp<Session> session = *sessionsToAdd.begin(); + sessionsToAdd.erase(sessionsToAdd.begin()); + + mSessions.add(session->sessionID(), session); + + ALOGI("added clientSession %d", session->sessionID()); + } + } +} + +} // namespace android + diff --git a/media/libstagefright/wifi-display/ANetworkSession.h b/media/libstagefright/wifi-display/ANetworkSession.h new file mode 100644 index 0000000..0402317 --- /dev/null +++ b/media/libstagefright/wifi-display/ANetworkSession.h @@ -0,0 +1,109 @@ +/* + * Copyright 2012, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef A_NETWORK_SESSION_H_ + +#define A_NETWORK_SESSION_H_ + +#include <media/stagefright/foundation/ABase.h> +#include <utils/KeyedVector.h> +#include <utils/RefBase.h> +#include <utils/Thread.h> + +namespace android { + +struct AMessage; + +struct ANetworkSession : public RefBase { + ANetworkSession(); + + status_t start(); + status_t stop(); + + status_t createRTSPClient( + const char *host, unsigned port, const sp<AMessage> ¬ify, + int32_t *sessionID); + + status_t createRTSPServer( + unsigned port, const sp<AMessage> ¬ify, int32_t *sessionID); + + status_t createUDPSession( + unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID); + + status_t createUDPSession( + unsigned localPort, + const char *remoteHost, + unsigned remotePort, + const sp<AMessage> ¬ify, + int32_t *sessionID); + + status_t connectUDPSession( + int32_t sessionID, const char *remoteHost, unsigned remotePort); + + status_t destroySession(int32_t sessionID); + + status_t sendRequest( + int32_t sessionID, const void *data, ssize_t size = -1); + + enum NotificationReason { + kWhatError, + kWhatConnected, + kWhatClientConnected, + kWhatData, + kWhatDatagram, + kWhatBinaryData, + }; + +protected: + virtual ~ANetworkSession(); + +private: + struct NetworkThread; + struct Session; + + Mutex mLock; + sp<Thread> mThread; + + int32_t mNextSessionID; + + int mPipeFd[2]; + + KeyedVector<int32_t, sp<Session> > mSessions; + + enum Mode { + kModeCreateUDPSession, + kModeCreateRTSPServer, + kModeCreateRTSPClient, + }; + status_t createClientOrServer( + Mode mode, + unsigned port, + const char *remoteHost, + unsigned remotePort, + const sp<AMessage> ¬ify, + int32_t *sessionID); + + void threadLoop(); + void interrupt(); + + static status_t MakeSocketNonBlocking(int s); + + DISALLOW_EVIL_CONSTRUCTORS(ANetworkSession); +}; + +} // namespace android + +#endif // A_NETWORK_SESSION_H_ diff --git a/media/libstagefright/wifi-display/Android.mk b/media/libstagefright/wifi-display/Android.mk new file mode 100644 index 0000000..114ff62 --- /dev/null +++ b/media/libstagefright/wifi-display/Android.mk @@ -0,0 +1,22 @@ +LOCAL_PATH:= $(call my-dir) + +include $(CLEAR_VARS) + +LOCAL_SRC_FILES:= \ + udptest.cpp \ + ANetworkSession.cpp \ + ParsedMessage.cpp \ + +LOCAL_SHARED_LIBRARIES:= \ + libbinder \ + libgui \ + libmedia \ + libstagefright \ + libstagefright_foundation \ + libutils \ + +LOCAL_MODULE:= udptest + +LOCAL_MODULE_TAGS := debug + +include $(BUILD_EXECUTABLE) diff --git a/media/libstagefright/wifi-display/ParsedMessage.cpp b/media/libstagefright/wifi-display/ParsedMessage.cpp new file mode 100644 index 0000000..c0e60c3 --- /dev/null +++ b/media/libstagefright/wifi-display/ParsedMessage.cpp @@ -0,0 +1,284 @@ +/* + * Copyright 2012, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ParsedMessage.h" + +#include <ctype.h> +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> + +namespace android { + +// static +sp<ParsedMessage> ParsedMessage::Parse( + const char *data, size_t size, bool noMoreData, size_t *length) { + sp<ParsedMessage> msg = new ParsedMessage; + ssize_t res = msg->parse(data, size, noMoreData); + + if (res < 0) { + *length = 0; + return NULL; + } + + *length = res; + return msg; +} + +ParsedMessage::ParsedMessage() { +} + +ParsedMessage::~ParsedMessage() { +} + +bool ParsedMessage::findString(const char *name, AString *value) const { + AString key = name; + key.tolower(); + + ssize_t index = mDict.indexOfKey(key); + + if (index < 0) { + value->clear(); + + return false; + } + + *value = mDict.valueAt(index); + return true; +} + +bool ParsedMessage::findInt32(const char *name, int32_t *value) const { + AString stringValue; + + if (!findString(name, &stringValue)) { + return false; + } + + char *end; + *value = strtol(stringValue.c_str(), &end, 10); + + if (end == stringValue.c_str() || *end != '\0') { + *value = 0; + return false; + } + + return true; +} + +const char *ParsedMessage::getContent() const { + return mContent.c_str(); +} + +ssize_t ParsedMessage::parse(const char *data, size_t size, bool noMoreData) { + if (size == 0) { + return -1; + } + + ssize_t lastDictIndex = -1; + + size_t offset = 0; + while (offset < size) { + size_t lineEndOffset = offset; + while (lineEndOffset + 1 < size + && (data[lineEndOffset] != '\r' + || data[lineEndOffset + 1] != '\n')) { + ++lineEndOffset; + } + + if (lineEndOffset + 1 >= size) { + return -1; + } + + AString line(&data[offset], lineEndOffset - offset); + + if (offset == 0) { + // Special handling for the request/status line. + + mDict.add(AString("_"), line); + offset = lineEndOffset + 2; + + continue; + } + + if (lineEndOffset == offset) { + offset += 2; + break; + } + + if (line.c_str()[0] == ' ' || line.c_str()[0] == '\t') { + // Support for folded header values. + + if (lastDictIndex >= 0) { + // Otherwise it's malformed since the first header line + // cannot continue anything... + + AString &value = mDict.editValueAt(lastDictIndex); + value.append(line); + } + + offset = lineEndOffset + 2; + continue; + } + + ssize_t colonPos = line.find(":"); + if (colonPos >= 0) { + AString key(line, 0, colonPos); + key.trim(); + key.tolower(); + + line.erase(0, colonPos + 1); + + lastDictIndex = mDict.add(key, line); + } + + offset = lineEndOffset + 2; + } + + for (size_t i = 0; i < mDict.size(); ++i) { + mDict.editValueAt(i).trim(); + } + + // Found the end of headers. + + int32_t contentLength; + if (!findInt32("content-length", &contentLength) || contentLength < 0) { + contentLength = 0; + } + + size_t totalLength = offset + contentLength; + + if (size < totalLength) { + return -1; + } + + mContent.setTo(&data[offset], contentLength); + + return totalLength; +} + +void ParsedMessage::getRequestField(size_t index, AString *field) const { + AString line; + CHECK(findString("_", &line)); + + size_t prevOffset = 0; + size_t offset = 0; + for (size_t i = 0; i <= index; ++i) { + ssize_t spacePos = line.find(" ", offset); + + if (spacePos < 0) { + spacePos = line.size(); + } + + prevOffset = offset; + offset = spacePos + 1; + } + + field->setTo(line, prevOffset, offset - prevOffset - 1); +} + +bool ParsedMessage::getStatusCode(int32_t *statusCode) const { + AString statusCodeString; + getRequestField(1, &statusCodeString); + + char *end; + *statusCode = strtol(statusCodeString.c_str(), &end, 10); + + if (*end != '\0' || end == statusCodeString.c_str() + || (*statusCode) < 100 || (*statusCode) > 999) { + *statusCode = 0; + return false; + } + + return true; +} + +AString ParsedMessage::debugString() const { + AString line; + CHECK(findString("_", &line)); + + line.append("\n"); + + for (size_t i = 0; i < mDict.size(); ++i) { + const AString &key = mDict.keyAt(i); + const AString &value = mDict.valueAt(i); + + if (key == AString("_")) { + continue; + } + + line.append(key); + line.append(": "); + line.append(value); + line.append("\n"); + } + + line.append("\n"); + line.append(mContent); + + return line; +} + +// static +bool ParsedMessage::GetAttribute( + const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + +// static +bool ParsedMessage::GetInt32Attribute( + const char *s, const char *key, int32_t *value) { + AString stringValue; + if (!GetAttribute(s, key, &stringValue)) { + *value = 0; + return false; + } + + char *end; + *value = strtol(stringValue.c_str(), &end, 10); + + if (end == stringValue.c_str() || *end != '\0') { + *value = 0; + return false; + } + + return true; +} + +} // namespace android + diff --git a/media/libstagefright/wifi-display/ParsedMessage.h b/media/libstagefright/wifi-display/ParsedMessage.h new file mode 100644 index 0000000..00f578f --- /dev/null +++ b/media/libstagefright/wifi-display/ParsedMessage.h @@ -0,0 +1,58 @@ +/* + * Copyright 2012, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <media/stagefright/foundation/ABase.h> +#include <media/stagefright/foundation/AString.h> +#include <utils/KeyedVector.h> +#include <utils/RefBase.h> + +namespace android { + +struct ParsedMessage : public RefBase { + static sp<ParsedMessage> Parse( + const char *data, size_t size, bool noMoreData, size_t *length); + + bool findString(const char *name, AString *value) const; + bool findInt32(const char *name, int32_t *value) const; + + const char *getContent() const; + + void getRequestField(size_t index, AString *field) const; + bool getStatusCode(int32_t *statusCode) const; + + AString debugString() const; + + static bool GetAttribute(const char *s, const char *key, AString *value); + + static bool GetInt32Attribute( + const char *s, const char *key, int32_t *value); + + +protected: + virtual ~ParsedMessage(); + +private: + KeyedVector<AString, AString> mDict; + AString mContent; + + ParsedMessage(); + + ssize_t parse(const char *data, size_t size, bool noMoreData); + + DISALLOW_EVIL_CONSTRUCTORS(ParsedMessage); +}; + +} // namespace android diff --git a/media/libstagefright/wifi-display/udptest.cpp b/media/libstagefright/wifi-display/udptest.cpp new file mode 100644 index 0000000..1cd82c3 --- /dev/null +++ b/media/libstagefright/wifi-display/udptest.cpp @@ -0,0 +1,355 @@ +/* + * Copyright 2012, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NEBUG 0 +#define LOG_TAG "udptest" +#include <utils/Log.h> + +#include "ANetworkSession.h" + +#include <binder/ProcessState.h> +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AHandler.h> +#include <media/stagefright/foundation/ALooper.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/Utils.h> + +namespace android { + +struct TestHandler : public AHandler { + TestHandler(const sp<ANetworkSession> &netSession); + + void startServer(unsigned localPort); + void startClient(const char *remoteHost, unsigned remotePort); + +protected: + virtual ~TestHandler(); + + virtual void onMessageReceived(const sp<AMessage> &msg); + +private: + enum { + kWhatStartServer, + kWhatStartClient, + kWhatUDPNotify, + kWhatSendPacket, + }; + + sp<ANetworkSession> mNetSession; + + bool mIsServer; + bool mConnected; + int32_t mUDPSession; + uint32_t mSeqNo; + double mTotalTimeUs; + int32_t mCount; + + void postSendPacket(int64_t delayUs = 0ll); + + DISALLOW_EVIL_CONSTRUCTORS(TestHandler); +}; + +TestHandler::TestHandler(const sp<ANetworkSession> &netSession) + : mNetSession(netSession), + mIsServer(false), + mConnected(false), + mUDPSession(0), + mSeqNo(0), + mTotalTimeUs(0.0), + mCount(0) { +} + +TestHandler::~TestHandler() { +} + +void TestHandler::startServer(unsigned localPort) { + sp<AMessage> msg = new AMessage(kWhatStartServer, id()); + msg->setInt32("localPort", localPort); + msg->post(); +} + +void TestHandler::startClient(const char *remoteHost, unsigned remotePort) { + sp<AMessage> msg = new AMessage(kWhatStartClient, id()); + msg->setString("remoteHost", remoteHost); + msg->setInt32("remotePort", remotePort); + msg->post(); +} + +void TestHandler::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatStartClient: + { + AString remoteHost; + CHECK(msg->findString("remoteHost", &remoteHost)); + + int32_t remotePort; + CHECK(msg->findInt32("remotePort", &remotePort)); + + sp<AMessage> notify = new AMessage(kWhatUDPNotify, id()); + + CHECK_EQ((status_t)OK, + mNetSession->createUDPSession( + 0 /* localPort */, + remoteHost.c_str(), + remotePort, + notify, + &mUDPSession)); + + postSendPacket(); + break; + } + + case kWhatStartServer: + { + mIsServer = true; + + int32_t localPort; + CHECK(msg->findInt32("localPort", &localPort)); + + sp<AMessage> notify = new AMessage(kWhatUDPNotify, id()); + + CHECK_EQ((status_t)OK, + mNetSession->createUDPSession( + localPort, notify, &mUDPSession)); + + break; + } + + case kWhatSendPacket: + { + char buffer[12]; + memset(buffer, 0, sizeof(buffer)); + + buffer[0] = mSeqNo >> 24; + buffer[1] = (mSeqNo >> 16) & 0xff; + buffer[2] = (mSeqNo >> 8) & 0xff; + buffer[3] = mSeqNo & 0xff; + ++mSeqNo; + + int64_t nowUs = ALooper::GetNowUs(); + buffer[4] = nowUs >> 56; + buffer[5] = (nowUs >> 48) & 0xff; + buffer[6] = (nowUs >> 40) & 0xff; + buffer[7] = (nowUs >> 32) & 0xff; + buffer[8] = (nowUs >> 24) & 0xff; + buffer[9] = (nowUs >> 16) & 0xff; + buffer[10] = (nowUs >> 8) & 0xff; + buffer[11] = nowUs & 0xff; + + CHECK_EQ((status_t)OK, + mNetSession->sendRequest( + mUDPSession, buffer, sizeof(buffer))); + + postSendPacket(20000ll); + break; + } + + case kWhatUDPNotify: + { + int32_t reason; + CHECK(msg->findInt32("reason", &reason)); + + switch (reason) { + case ANetworkSession::kWhatError: + { + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + int32_t err; + CHECK(msg->findInt32("err", &err)); + + AString detail; + CHECK(msg->findString("detail", &detail)); + + ALOGE("An error occurred in session %d (%d, '%s/%s').", + sessionID, + err, + detail.c_str(), + strerror(-err)); + + mNetSession->destroySession(sessionID); + break; + } + + case ANetworkSession::kWhatDatagram: + { + int32_t sessionID; + CHECK(msg->findInt32("sessionID", &sessionID)); + + sp<ABuffer> data; + CHECK(msg->findBuffer("data", &data)); + + if (mIsServer) { + if (!mConnected) { + AString fromAddr; + CHECK(msg->findString("fromAddr", &fromAddr)); + + int32_t fromPort; + CHECK(msg->findInt32("fromPort", &fromPort)); + + CHECK_EQ((status_t)OK, + mNetSession->connectUDPSession( + mUDPSession, fromAddr.c_str(), fromPort)); + + mConnected = true; + } + + int64_t nowUs = ALooper::GetNowUs(); + + sp<ABuffer> buffer = new ABuffer(data->size() + 8); + memcpy(buffer->data(), data->data(), data->size()); + + uint8_t *ptr = buffer->data() + data->size(); + + *ptr++ = nowUs >> 56; + *ptr++ = (nowUs >> 48) & 0xff; + *ptr++ = (nowUs >> 40) & 0xff; + *ptr++ = (nowUs >> 32) & 0xff; + *ptr++ = (nowUs >> 24) & 0xff; + *ptr++ = (nowUs >> 16) & 0xff; + *ptr++ = (nowUs >> 8) & 0xff; + *ptr++ = nowUs & 0xff; + + CHECK_EQ((status_t)OK, + mNetSession->sendRequest( + mUDPSession, buffer->data(), buffer->size())); + } else { + CHECK_EQ(data->size(), 20u); + + uint32_t seqNo = U32_AT(data->data()); + int64_t t1 = U64_AT(data->data() + 4); + int64_t t2 = U64_AT(data->data() + 12); + + int64_t t3; + CHECK(data->meta()->findInt64("arrivalTimeUs", &t3)); + +#if 0 + printf("roundtrip seqNo %u, time = %lld us\n", + seqNo, t3 - t1); +#else + mTotalTimeUs += t3 - t1; + ++mCount; + printf("avg. roundtrip time %.2f us\n", mTotalTimeUs / mCount); +#endif + } + break; + } + + default: + TRESPASS(); + } + + break; + } + + default: + TRESPASS(); + } +} + +void TestHandler::postSendPacket(int64_t delayUs) { + (new AMessage(kWhatSendPacket, id()))->post(delayUs); +} + +} // namespace android + +static void usage(const char *me) { + fprintf(stderr, + "usage: %s -c host[:port]\tconnect to test server\n" + " -l \tcreate a test server\n", + me); +} + +int main(int argc, char **argv) { + using namespace android; + + ProcessState::self()->startThreadPool(); + + int32_t localPort = -1; + int32_t connectToPort = -1; + AString connectToHost; + + int res; + while ((res = getopt(argc, argv, "hc:l:")) >= 0) { + switch (res) { + case 'c': + { + const char *colonPos = strrchr(optarg, ':'); + + if (colonPos == NULL) { + connectToHost = optarg; + connectToPort = 49152; + } else { + connectToHost.setTo(optarg, colonPos - optarg); + + char *end; + connectToPort = strtol(colonPos + 1, &end, 10); + + if (*end != '\0' || end == colonPos + 1 + || connectToPort < 1 || connectToPort > 65535) { + fprintf(stderr, "Illegal port specified.\n"); + exit(1); + } + } + break; + } + + case 'l': + { + char *end; + localPort = strtol(optarg, &end, 10); + + if (*end != '\0' || end == optarg + || localPort < 1 || localPort > 65535) { + fprintf(stderr, "Illegal port specified.\n"); + exit(1); + } + break; + } + + case '?': + case 'h': + usage(argv[0]); + exit(1); + } + } + + if (localPort < 0 && connectToPort < 0) { + fprintf(stderr, + "You need to select either client or server mode.\n"); + exit(1); + } + + sp<ANetworkSession> netSession = new ANetworkSession; + netSession->start(); + + sp<ALooper> looper = new ALooper; + + sp<TestHandler> handler = new TestHandler(netSession); + looper->registerHandler(handler); + + if (localPort >= 0) { + handler->startServer(localPort); + } else { + handler->startClient(connectToHost.c_str(), connectToPort); + } + + looper->start(true /* runOnCallingThread */); + + return 0; +} + |