From 0792ce7e0924ebb0dbe7b7cfcd79d12cbdb03ed2 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Thu, 26 Aug 2010 11:17:32 -0700 Subject: Support for RTP packets arriving interleaved with RTSP responses. Change-Id: Ib32fba257da32a199134cf8943117cf3eaa07a25 --- media/libstagefright/rtsp/ARTPConnection.cpp | 68 ++++++++++++++++++++- media/libstagefright/rtsp/ARTPConnection.h | 7 ++- media/libstagefright/rtsp/ARTPSession.cpp | 3 +- media/libstagefright/rtsp/ARTSPConnection.cpp | 88 ++++++++++++++++++++++++--- media/libstagefright/rtsp/ARTSPConnection.h | 7 +++ media/libstagefright/rtsp/MyHandler.h | 64 ++++++++++++++++--- 6 files changed, 216 insertions(+), 21 deletions(-) (limited to 'media') diff --git a/media/libstagefright/rtsp/ARTPConnection.cpp b/media/libstagefright/rtsp/ARTPConnection.cpp index 6816c45..42a22b7 100644 --- a/media/libstagefright/rtsp/ARTPConnection.cpp +++ b/media/libstagefright/rtsp/ARTPConnection.cpp @@ -57,6 +57,8 @@ struct ARTPConnection::StreamInfo { int32_t mNumRTCPPacketsReceived; struct sockaddr_in mRemoteRTCPAddr; + + bool mIsInjected; }; ARTPConnection::ARTPConnection(uint32_t flags) @@ -72,13 +74,15 @@ void ARTPConnection::addStream( int rtpSocket, int rtcpSocket, const sp &sessionDesc, size_t index, - const sp ¬ify) { + const sp ¬ify, + bool injected) { sp msg = new AMessage(kWhatAddStream, id()); msg->setInt32("rtp-socket", rtpSocket); msg->setInt32("rtcp-socket", rtcpSocket); msg->setObject("session-desc", sessionDesc); msg->setSize("index", index); msg->setMessage("notify", notify); + msg->setInt32("injected", injected); msg->post(); } @@ -154,6 +158,12 @@ void ARTPConnection::onMessageReceived(const sp &msg) { break; } + case kWhatInjectPacket: + { + onInjectPacket(msg); + break; + } + default: { TRESPASS(); @@ -172,6 +182,11 @@ void ARTPConnection::onAddStream(const sp &msg) { CHECK(msg->findInt32("rtcp-socket", &s)); info->mRTCPSocket = s; + int32_t injected; + CHECK(msg->findInt32("injected", &injected)); + + info->mIsInjected = injected; + sp obj; CHECK(msg->findObject("session-desc", &obj)); info->mSessionDesc = static_cast(obj.get()); @@ -182,7 +197,9 @@ void ARTPConnection::onAddStream(const sp &msg) { info->mNumRTCPPacketsReceived = 0; memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); - postPollEvent(); + if (!injected) { + postPollEvent(); + } } void ARTPConnection::onRemoveStream(const sp &msg) { @@ -231,6 +248,10 @@ void ARTPConnection::onPollStreams() { int maxSocket = -1; for (List::iterator it = mStreams.begin(); it != mStreams.end(); ++it) { + if ((*it).mIsInjected) { + continue; + } + FD_SET(it->mRTPSocket, &rs); FD_SET(it->mRTCPSocket, &rs); @@ -248,6 +269,10 @@ void ARTPConnection::onPollStreams() { if (res > 0) { for (List::iterator it = mStreams.begin(); it != mStreams.end(); ++it) { + if ((*it).mIsInjected) { + continue; + } + if (FD_ISSET(it->mRTPSocket, &rs)) { receive(&*it, true); } @@ -301,6 +326,8 @@ void ARTPConnection::onPollStreams() { } status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { + CHECK(!s->mIsInjected); + sp buffer = new ABuffer(65536); socklen_t remoteAddrLen = @@ -559,5 +586,42 @@ sp ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { return source; } +void ARTPConnection::injectPacket(int index, const sp &buffer) { + sp msg = new AMessage(kWhatInjectPacket, id()); + msg->setInt32("index", index); + msg->setObject("buffer", buffer); + msg->post(); +} + +void ARTPConnection::onInjectPacket(const sp &msg) { + int32_t index; + CHECK(msg->findInt32("index", &index)); + + sp obj; + CHECK(msg->findObject("buffer", &obj)); + + sp buffer = static_cast(obj.get()); + + List::iterator it = mStreams.begin(); + while (it != mStreams.end() + && it->mRTPSocket != index && it->mRTCPSocket != index) { + ++it; + } + + if (it == mStreams.end()) { + TRESPASS(); + } + + StreamInfo *s = &*it; + + status_t err; + if (it->mRTPSocket == index) { + err = parseRTP(s, buffer); + } else { + ++s->mNumRTCPPacketsReceived; + err = parseRTCP(s, buffer); + } +} + } // namespace android diff --git a/media/libstagefright/rtsp/ARTPConnection.h b/media/libstagefright/rtsp/ARTPConnection.h index c535199..77f81fa 100644 --- a/media/libstagefright/rtsp/ARTPConnection.h +++ b/media/libstagefright/rtsp/ARTPConnection.h @@ -38,10 +38,13 @@ struct ARTPConnection : public AHandler { void addStream( int rtpSocket, int rtcpSocket, const sp &sessionDesc, size_t index, - const sp ¬ify); + const sp ¬ify, + bool injected); void removeStream(int rtpSocket, int rtcpSocket); + void injectPacket(int index, const sp &buffer); + // Creates a pair of UDP datagram sockets bound to adjacent ports // (the rtpSocket is bound to an even port, the rtcpSocket to the // next higher port). @@ -57,6 +60,7 @@ private: kWhatAddStream, kWhatRemoveStream, kWhatPollStreams, + kWhatInjectPacket, }; static const int64_t kSelectTimeoutUs; @@ -72,6 +76,7 @@ private: void onAddStream(const sp &msg); void onRemoveStream(const sp &msg); void onPollStreams(); + void onInjectPacket(const sp &msg); void onSendReceiverReports(); status_t receive(StreamInfo *info, bool receiveRTP); diff --git a/media/libstagefright/rtsp/ARTPSession.cpp b/media/libstagefright/rtsp/ARTPSession.cpp index e082078..d2c56f7 100644 --- a/media/libstagefright/rtsp/ARTPSession.cpp +++ b/media/libstagefright/rtsp/ARTPSession.cpp @@ -83,7 +83,8 @@ status_t ARTPSession::setup(const sp &desc) { sp notify = new AMessage(kWhatAccessUnitComplete, id()); notify->setSize("track-index", mTracks.size() - 1); - mRTPConn->addStream(rtpSocket, rtcpSocket, mDesc, i, notify); + mRTPConn->addStream( + rtpSocket, rtcpSocket, mDesc, i, notify, false /* injected */); info->mPacketSource = source; } diff --git a/media/libstagefright/rtsp/ARTSPConnection.cpp b/media/libstagefright/rtsp/ARTSPConnection.cpp index 5f8f5fd..cbd4836 100644 --- a/media/libstagefright/rtsp/ARTSPConnection.cpp +++ b/media/libstagefright/rtsp/ARTSPConnection.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -67,6 +68,12 @@ void ARTSPConnection::sendRequest( msg->post(); } +void ARTSPConnection::observeBinaryData(const sp &reply) { + sp msg = new AMessage(kWhatObserveBinaryData, id()); + msg->setMessage("reply", reply); + msg->post(); +} + void ARTSPConnection::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatConnect: @@ -89,6 +96,12 @@ void ARTSPConnection::onMessageReceived(const sp &msg) { onReceiveResponse(); break; + case kWhatObserveBinaryData: + { + CHECK(msg->findMessage("reply", &mObserveBinaryMessage)); + break; + } + default: TRESPASS(); break; @@ -396,16 +409,13 @@ void ARTSPConnection::postReceiveReponseEvent() { mReceiveResponseEventPending = true; } -bool ARTSPConnection::receiveLine(AString *line) { - line->clear(); - - bool sawCR = false; - for (;;) { - char c; - ssize_t n = recv(mSocket, &c, 1, 0); +status_t ARTSPConnection::receive(void *data, size_t size) { + size_t offset = 0; + while (offset < size) { + ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0); if (n == 0) { // Server closed the connection. - return false; + return ERROR_IO; } else if (n < 0) { if (errno == EINTR) { continue; @@ -414,6 +424,22 @@ bool ARTSPConnection::receiveLine(AString *line) { TRESPASS(); } + offset += (size_t)n; + } + + return OK; +} + +bool ARTSPConnection::receiveLine(AString *line) { + line->clear(); + + bool sawCR = false; + for (;;) { + char c; + if (receive(&c, 1) != OK) { + return false; + } + if (sawCR && c == '\n') { line->erase(line->size() - 1, 1); return true; @@ -421,17 +447,59 @@ bool ARTSPConnection::receiveLine(AString *line) { line->append(&c, 1); + if (c == '$' && line->size() == 1) { + // Special-case for interleaved binary data. + return true; + } + sawCR = (c == '\r'); } } +sp ARTSPConnection::receiveBinaryData() { + uint8_t x[3]; + if (receive(x, 3) != OK) { + return NULL; + } + + sp buffer = new ABuffer((x[1] << 8) | x[2]); + if (receive(buffer->data(), buffer->size()) != OK) { + return NULL; + } + + buffer->meta()->setInt32("index", (int32_t)x[0]); + + return buffer; +} + bool ARTSPConnection::receiveRTSPReponse() { - sp response = new ARTSPResponse; + AString statusLine; - if (!receiveLine(&response->mStatusLine)) { + if (!receiveLine(&statusLine)) { return false; } + if (statusLine == "$") { + sp buffer = receiveBinaryData(); + + if (buffer == NULL) { + return false; + } + + if (mObserveBinaryMessage != NULL) { + sp notify = mObserveBinaryMessage->dup(); + notify->setObject("buffer", buffer); + notify->post(); + } else { + LOG(WARNING) << "received binary data, but no one cares."; + } + + return true; + } + + sp response = new ARTSPResponse; + response->mStatusLine = statusLine; + LOG(INFO) << "status: " << response->mStatusLine; ssize_t space1 = response->mStatusLine.find(" "); diff --git a/media/libstagefright/rtsp/ARTSPConnection.h b/media/libstagefright/rtsp/ARTSPConnection.h index 3577a2f..96e0d5b 100644 --- a/media/libstagefright/rtsp/ARTSPConnection.h +++ b/media/libstagefright/rtsp/ARTSPConnection.h @@ -40,6 +40,8 @@ struct ARTSPConnection : public AHandler { void sendRequest(const char *request, const sp &reply); + void observeBinaryData(const sp &reply); + protected: virtual ~ARTSPConnection(); virtual void onMessageReceived(const sp &msg); @@ -57,6 +59,7 @@ private: kWhatCompleteConnection = 'comc', kWhatSendRequest = 'sreq', kWhatReceiveResponse = 'rres', + kWhatObserveBinaryData = 'obin', }; static const int64_t kSelectTimeoutUs; @@ -69,6 +72,8 @@ private: KeyedVector > mPendingRequests; + sp mObserveBinaryMessage; + void onConnect(const sp &msg); void onDisconnect(const sp &msg); void onCompleteConnection(const sp &msg); @@ -80,7 +85,9 @@ private: // Return false iff something went unrecoverably wrong. bool receiveRTSPReponse(); + status_t receive(void *data, size_t size); bool receiveLine(AString *line); + sp receiveBinaryData(); bool notifyResponseListener(const sp &response); static bool ParseURL( diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h index e248463..4c6f058 100644 --- a/media/libstagefright/rtsp/MyHandler.h +++ b/media/libstagefright/rtsp/MyHandler.h @@ -29,6 +29,8 @@ #include #include +#define USE_TCP_INTERLEAVED 0 + namespace android { struct MyHandler : public AHandler { @@ -55,6 +57,9 @@ struct MyHandler : public AHandler { mLooper->registerHandler(mConn); (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); + sp notify = new AMessage('biny', id()); + mConn->observeBinaryData(notify); + sp reply = new AMessage('conn', id()); mConn->connect(mSessionURL.c_str(), reply); } @@ -91,6 +96,8 @@ struct MyHandler : public AHandler { sp reply = new AMessage('desc', id()); mConn->sendRequest(request.c_str(), reply); + } else { + (new AMessage('disc', id()))->post(); } break; } @@ -183,8 +190,10 @@ struct MyHandler : public AHandler { if (result != OK) { if (track) { - close(track->mRTPSocket); - close(track->mRTCPSocket); + if (!track->mUsingInterleavedTCP) { + close(track->mRTPSocket); + close(track->mRTCPSocket); + } mTracks.removeItemsAt(trackIndex); } @@ -216,7 +225,7 @@ struct MyHandler : public AHandler { mRTPConn->addStream( track->mRTPSocket, track->mRTCPSocket, mSessionDesc, index, - notify); + notify, track->mUsingInterleavedTCP); mSetupTracksSuccessful = true; } @@ -263,6 +272,9 @@ struct MyHandler : public AHandler { mDoneMsg->setInt32("result", OK); mDoneMsg->post(); mDoneMsg = NULL; + + sp timeout = new AMessage('tiou', id()); + timeout->post(10000000ll); } else { sp reply = new AMessage('disc', id()); mConn->disconnect(reply); @@ -451,6 +463,29 @@ struct MyHandler : public AHandler { break; } + case 'biny': + { + sp obj; + CHECK(msg->findObject("buffer", &obj)); + sp buffer = static_cast(obj.get()); + + int32_t index; + CHECK(buffer->meta()->findInt32("index", &index)); + + mRTPConn->injectPacket(index, buffer); + break; + } + + case 'tiou': + { + if (mFirstAccessUnit) { + LOG(WARNING) << "Never received any data, disconnecting."; + + } + (new AMessage('abor', id()))->post(); + break; + } + default: TRESPASS(); break; @@ -485,6 +520,7 @@ private: struct TrackInfo { int mRTPSocket; int mRTCPSocket; + bool mUsingInterleavedTCP; sp mPacketSource; }; @@ -515,19 +551,33 @@ private: mTracks.push(TrackInfo()); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); info->mPacketSource = source; - - unsigned rtpPort; - ARTPConnection::MakePortPair( - &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); + info->mUsingInterleavedTCP = false; AString request = "SETUP "; request.append(trackURL); request.append(" RTSP/1.0\r\n"); +#if USE_TCP_INTERLEAVED + size_t interleaveIndex = 2 * (mTracks.size() - 1); + info->mUsingInterleavedTCP = true; + info->mRTPSocket = interleaveIndex; + info->mRTCPSocket = interleaveIndex + 1; + + request.append("Transport: RTP/AVP/TCP;interleaved="); + request.append(interleaveIndex); + request.append("-"); + request.append(interleaveIndex + 1); +#else + unsigned rtpPort; + ARTPConnection::MakePortPair( + &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); + request.append("Transport: RTP/AVP/UDP;unicast;client_port="); request.append(rtpPort); request.append("-"); request.append(rtpPort + 1); +#endif + request.append("\r\n"); if (index > 1) { -- cgit v1.1