Rebased ref, commits from common ancestor:
commit a26347e28ee97f806fcf1c363e7828cce0a48884
Author:     Michael Meeks <[email protected]>
AuthorDate: Sat Mar 21 20:03:37 2020 +0000
Commit:     Michael Meeks <[email protected]>
CommitDate: Sat Mar 21 20:03:37 2020 +0000

    Proxy: don't leave out sockets lingering around for the !flush case.
    
    Change-Id: I13ad123a6c3a068a676eae5e509367e727e9ac06

diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 6af0a33bf..6b0db2261 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -181,11 +181,14 @@ void 
ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
 int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool 
text, bool flush)
 {
     _writeQueue.push_back(std::make_shared<Message>(msg, len, text));
-    auto sock = popOutSocket();
-    if (sock && flush)
+    if (flush)
     {
-        flushQueueTo(sock);
-        sock->shutdown();
+        auto sock = popOutSocket();
+        if (sock)
+        {
+            flushQueueTo(sock);
+            sock->shutdown();
+        }
     }
 
     return len;
commit 882a779c546e25d8446db229d3fef2be1c5c0dcf
Author:     Michael Meeks <[email protected]>
AuthorDate: Sat Mar 21 15:07:10 2020 +0000
Commit:     Michael Meeks <[email protected]>
CommitDate: Sat Mar 21 19:59:14 2020 +0000

    Proxy: improve debugging & naming.
    
    Change-Id: Ifba669a33855a67c9a4e968db42ef1a2cb301d63

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 95ff625f7..9f319215f 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -197,7 +197,7 @@
                };
                this.parseIncomingArray = function(arr) {
                        var decoder = new TextDecoder();
-                       console.debug('Parse incoming array of length ' + 
arr.length);
+                       console.debug('proxy: parse incoming array of length ' 
+ arr.length);
                        for (var i = 0; i < arr.length; ++i)
                        {
                                var left = arr.length - i;
@@ -255,7 +255,7 @@
                                        if (this.status == 200)
                                                that.parseIncomingArray(new 
Uint8Array(this.response));
                                        else
-                                               console.debug('Error on 
incoming response');
+                                               console.debug('proxy: error on 
incoming response');
                                });
                        }
                        req.send(that.sendQueue);
@@ -281,21 +281,24 @@
                                this.sendTimeout = setTimeout(this.doSend, 2 /* 
ms */);
                };
                this.close = function() {
-                       console.debug('close socket');
+                       console.debug('proxy: close socket');
                        this.readyState = 3;
                        this.onclose();
+                       clearInterval(this.waitInterval);
+                       this.waitInterval = undefined;
                };
                this.getEndPoint = function(type) {
                        var base = this.uri;
                        return base.replace(/^ws/, 'http') + '/' + type;
                };
-               console.debug('New proxy socket ' + this.id + ' ' + this.uri);
+               console.debug('proxy: new socket ' + this.id + ' ' + this.uri);
 
                // queue fetch of session id.
                this.getSessionId();
 
                // horrors ...
-               this.readInterval = setInterval(function() {
+               this.waitConnect = function() {
+                       console.debug('proxy: waiting - ' + that.readWaiting + 
' on session ' + that.sessionId);
                        if (that.readWaiting > 4) // max 4 waiting connections 
concurrently.
                                return;
                        if (that.sessionId == 'fetchsession')
@@ -310,13 +313,16 @@
                        });
                        req.addEventListener('loadend', function() {
                                that.readWaiting--;
+                               console.debug('proxy: wait ended, re-issue');
+                               that.waitConnect();
                        });
-                       req.open('GET', that.getEndPoint('read'));
+                       req.open('GET', that.getEndPoint('wait'));
                        req.setRequestHeader('SessionId', that.sessionId);
                        req.responseType = 'arraybuffer';
                        req.send('');
                        that.readWaiting++;
-               }, 250);
+               };
+               this.waitInterval = setInterval(this.waitConnect, 250);
        };
 
        if (global.socketProxy)
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 84db6ac84..05c8c1e9d 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2833,7 +2833,7 @@ private:
             none, url, docKey, _id, uriPublic);
 
         std::string fullURL = request.getURI();
-        std::string ending = "/ws/read";
+        std::string ending = "/ws/wait";
         bool isWaiting = (fullURL.size() > ending.size() &&
                           std::equal(ending.rbegin(), ending.rend(), 
fullURL.rbegin()));
         if (docBroker)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 0928b4541..6af0a33bf 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -31,7 +31,7 @@ void DocumentBroker::handleProxyRequest(
     std::shared_ptr<ClientSession> clientSession;
     if (sessionId == "fetchsession")
     {
-        LOG_TRC("Create session for " << _docKey);
+        LOG_TRC("proxy: Create session for " << _docKey);
         clientSession = createNewClientSession(
                 std::make_shared<ProxyProtocolHandler>(),
                 id, uriPublic, isReadOnly, hostNoTrust);
@@ -39,7 +39,7 @@ void DocumentBroker::handleProxyRequest(
         LOOLWSD::checkDiskSpaceAndWarnClients(true);
         LOOLWSD::checkSessionLimitsAndWarnClients();
 
-        LOG_TRC("Returning id " << clientSession->getId());
+        LOG_TRC("proxy: Returning sessionId " << clientSession->getId());
 
         std::ostringstream oss;
         oss << "HTTP/1.1 200 OK\r\n"
@@ -57,7 +57,7 @@ void DocumentBroker::handleProxyRequest(
     }
     else
     {
-        LOG_TRC("Find session for " << _docKey << " with id " << sessionId);
+        LOG_TRC("proxy: find session for " << _docKey << " with id " << 
sessionId);
         for (const auto &it : _sessions)
         {
             if (it.second->getId() == sessionId)
@@ -133,28 +133,29 @@ void ProxyProtocolHandler::handleRequest(bool isWaiting, 
const std::shared_ptr<S
 {
     auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
 
-    LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
+    LOG_INF("proxy: handle request type: " << (isWaiting ? "wait" : "respond") 
<<
+            " on socket #" << socket->getFD());
 
     if (!isWaiting)
     {
         if (!_msgHandler)
-            LOG_WRN("unusual - incoming message with no-one to handle it");
+            LOG_WRN("proxy: unusual - incoming message with no-one to handle 
it");
         else if (!parseEmitIncoming(streamSocket))
         {
             std::stringstream oss;
             streamSocket->dumpState(oss);
-            LOG_ERR("bad socket structure " << oss.str());
+            LOG_ERR("proxy: bad socket structure " << oss.str());
         }
     }
 
     if (!flushQueueTo(streamSocket) && isWaiting)
     {
-        LOG_TRC("Queue a waiting socket");
+        LOG_TRC("proxy: queue a waiting out socket #" << 
streamSocket->getFD());
         // longer running 'write socket' (marked 'read' by the client)
         _outSockets.push_back(streamSocket);
         if (_outSockets.size() > 16)
         {
-            LOG_ERR("Unexpected - client opening many concurrent waiting 
connections " << _outSockets.size());
+            LOG_ERR("proxy: Unexpected - client opening many concurrent 
waiting connections " << _outSockets.size());
             // cleanup older waiting sockets.
             auto sockWeak = _outSockets.front();
             _outSockets.erase(_outSockets.begin());
@@ -180,7 +181,7 @@ void 
ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
 int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool 
text, bool flush)
 {
     _writeQueue.push_back(std::make_shared<Message>(msg, len, text));
-    auto sock = popWriteSocket();
+    auto sock = popOutSocket();
     if (sock && flush)
     {
         flushQueueTo(sock);
@@ -230,16 +231,24 @@ int 
ProxyProtocolHandler::getPollEvents(std::chrono::steady_clock::time_point /*
     return events;
 }
 
-void ProxyProtocolHandler::performWrites()
+/// slurp from the core to us, @returns true if there are messages to send
+bool ProxyProtocolHandler::slurpHasMessages()
 {
-    if (_msgHandler)
+    if (_msgHandler && _msgHandler->hasQueuedMessages())
         _msgHandler->writeQueuedMessages();
-    if (_writeQueue.size() <= 0)
+
+    return _writeQueue.size() > 0;
+}
+
+void ProxyProtocolHandler::performWrites()
+{
+    if (!slurpHasMessages())
         return;
 
-    auto sock = popWriteSocket();
+    auto sock = popOutSocket();
     if (sock)
     {
+        LOG_TRC("proxy: performWrites");
         flushQueueTo(sock);
         sock->shutdown();
     }
@@ -247,9 +256,8 @@ void ProxyProtocolHandler::performWrites()
 
 bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> 
&socket)
 {
-    // slurp from the core to us.
-    if (_msgHandler && _msgHandler->hasQueuedMessages())
-        _msgHandler->writeQueuedMessages();
+    if (!slurpHasMessages())
+        return false;
 
     size_t totalSize = 0;
     for (auto it : _writeQueue)
@@ -258,6 +266,8 @@ bool ProxyProtocolHandler::flushQueueTo(const 
std::shared_ptr<StreamSocket> &soc
     if (!totalSize)
         return false;
 
+    LOG_TRC("proxy: flushQueue of size " << totalSize << " to socket #" << 
socket->getFD() << " & close");
+
     std::ostringstream oss;
     oss << "HTTP/1.1 200 OK\r\n"
         "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
@@ -276,7 +286,7 @@ bool ProxyProtocolHandler::flushQueueTo(const 
std::shared_ptr<StreamSocket> &soc
 }
 
 // LRU-ness ...
-std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
+std::shared_ptr<StreamSocket> ProxyProtocolHandler::popOutSocket()
 {
     std::weak_ptr<StreamSocket> sock;
     while (!_outSockets.empty())
@@ -285,8 +295,12 @@ std::shared_ptr<StreamSocket> 
ProxyProtocolHandler::popWriteSocket()
         _outSockets.erase(_outSockets.begin());
         auto realSock = sock.lock();
         if (realSock)
+        {
+            LOG_TRC("proxy: popped an out socket #" << realSock->getFD() << " 
leaving: " << _outSockets.size());
             return realSock;
+        }
     }
+    LOG_TRC("proxy: no out sockets to pop.");
     return std::shared_ptr<StreamSocket>();
 }
 
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 61f0f32be..22cd48fb2 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -61,7 +61,8 @@ public:
     void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
 
 private:
-    std::shared_ptr<StreamSocket> popWriteSocket();
+    std::shared_ptr<StreamSocket> popOutSocket();
+    bool slurpHasMessages();
     int sendMessage(const char *msg, const size_t len, bool text, bool flush);
     bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket);
 
_______________________________________________
Libreoffice-commits mailing list
[email protected]
https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits

Reply via email to