diff options
Diffstat (limited to 'media/libstagefright/rtsp/ARTSPConnection.cpp')
-rw-r--r-- | media/libstagefright/rtsp/ARTSPConnection.cpp | 117 |
1 files changed, 101 insertions, 16 deletions
diff --git a/media/libstagefright/rtsp/ARTSPConnection.cpp b/media/libstagefright/rtsp/ARTSPConnection.cpp index 9826990..fd6aa62 100644 --- a/media/libstagefright/rtsp/ARTSPConnection.cpp +++ b/media/libstagefright/rtsp/ARTSPConnection.cpp @@ -19,6 +19,7 @@ #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/MediaErrors.h> #include <arpa/inet.h> #include <fcntl.h> @@ -67,6 +68,12 @@ void ARTSPConnection::sendRequest( msg->post(); } +void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) { + sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id()); + msg->setMessage("reply", reply); + msg->post(); +} + void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { switch (msg->what()) { case kWhatConnect: @@ -89,6 +96,12 @@ void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { onReceiveResponse(); break; + case kWhatObserveBinaryData: + { + CHECK(msg->findMessage("reply", &mObserveBinaryMessage)); + break; + } + default: TRESPASS(); break; @@ -136,6 +149,20 @@ bool ARTSPConnection::ParseURL( return true; } +static void MakeSocketBlocking(int s, bool blocking) { + // Make socket non-blocking. + int flags = fcntl(s, F_GETFL, 0); + CHECK_NE(flags, -1); + + if (blocking) { + flags &= ~O_NONBLOCK; + } else { + flags |= O_NONBLOCK; + } + + CHECK_NE(fcntl(s, F_SETFL, flags), -1); +} + void ARTSPConnection::onConnect(const sp<AMessage> &msg) { ++mConnectionID; @@ -150,10 +177,7 @@ void ARTSPConnection::onConnect(const sp<AMessage> &msg) { mSocket = socket(AF_INET, SOCK_STREAM, 0); - // Make socket non-blocking. - int flags = fcntl(mSocket, F_GETFL, 0); - CHECK_NE(flags, -1); - CHECK_NE(fcntl(mSocket, F_SETFL, flags | O_NONBLOCK), -1); + MakeSocketBlocking(mSocket, false); AString url; CHECK(msg->findString("url", &url)); @@ -210,7 +234,7 @@ void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) { mSocket = -1; flushPendingRequests(); - } + } sp<AMessage> reply; CHECK(msg->findMessage("reply", &reply)); @@ -347,7 +371,13 @@ void ARTSPConnection::onReceiveResponse() { CHECK_GE(res, 0); if (res == 1) { - if (!receiveRTSPReponse()) { + MakeSocketBlocking(mSocket, true); + + bool success = receiveRTSPReponse(); + + MakeSocketBlocking(mSocket, false); + + if (!success) { // Something horrible, irreparable has happened. flushPendingRequests(); return; @@ -379,16 +409,13 @@ void ARTSPConnection::postReceiveReponseEvent() { mReceiveResponseEventPending = true; } -bool ARTSPConnection::receiveLine(AString *line) { - line->clear(); - - bool sawCR = false; - for (;;) { - char c; - ssize_t n = recv(mSocket, &c, 1, 0); +status_t ARTSPConnection::receive(void *data, size_t size) { + size_t offset = 0; + while (offset < size) { + ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0); if (n == 0) { // Server closed the connection. - return false; + return ERROR_IO; } else if (n < 0) { if (errno == EINTR) { continue; @@ -397,6 +424,22 @@ bool ARTSPConnection::receiveLine(AString *line) { TRESPASS(); } + offset += (size_t)n; + } + + return OK; +} + +bool ARTSPConnection::receiveLine(AString *line) { + line->clear(); + + bool sawCR = false; + for (;;) { + char c; + if (receive(&c, 1) != OK) { + return false; + } + if (sawCR && c == '\n') { line->erase(line->size() - 1, 1); return true; @@ -404,17 +447,59 @@ bool ARTSPConnection::receiveLine(AString *line) { line->append(&c, 1); + if (c == '$' && line->size() == 1) { + // Special-case for interleaved binary data. + return true; + } + sawCR = (c == '\r'); } } +sp<ABuffer> ARTSPConnection::receiveBinaryData() { + uint8_t x[3]; + if (receive(x, 3) != OK) { + return NULL; + } + + sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]); + if (receive(buffer->data(), buffer->size()) != OK) { + return NULL; + } + + buffer->meta()->setInt32("index", (int32_t)x[0]); + + return buffer; +} + bool ARTSPConnection::receiveRTSPReponse() { - sp<ARTSPResponse> response = new ARTSPResponse; + AString statusLine; - if (!receiveLine(&response->mStatusLine)) { + if (!receiveLine(&statusLine)) { return false; } + if (statusLine == "$") { + sp<ABuffer> buffer = receiveBinaryData(); + + if (buffer == NULL) { + return false; + } + + if (mObserveBinaryMessage != NULL) { + sp<AMessage> notify = mObserveBinaryMessage->dup(); + notify->setObject("buffer", buffer); + notify->post(); + } else { + LOG(WARNING) << "received binary data, but no one cares."; + } + + return true; + } + + sp<ARTSPResponse> response = new ARTSPResponse; + response->mStatusLine = statusLine; + LOG(INFO) << "status: " << response->mStatusLine; ssize_t space1 = response->mStatusLine.find(" "); |