loolwsd/Common.hpp | 1 loolwsd/LOOLBroker.cpp | 255 +++++++++++++++++++++++++++---------------------- loolwsd/Util.cpp | 5 3 files changed, 148 insertions(+), 113 deletions(-)
New commits: commit 0632da5edba5dc876ead45ea7ffcd4c79d2f6f41 Author: Ashod Nakashian <[email protected]> Date: Wed Jan 13 19:28:51 2016 -0500 loolwsd: reworked child management and load balancing in broker Change-Id: I92874d1aeb8fe46f3bbc1cb9d3b2b9d46632f4b9 Reviewed-on: https://gerrit.libreoffice.org/21473 Reviewed-by: Ashod Nakashian <[email protected]> Tested-by: Ashod Nakashian <[email protected]> diff --git a/loolwsd/Common.hpp b/loolwsd/Common.hpp index 10c3d7e..87585d3 100644 --- a/loolwsd/Common.hpp +++ b/loolwsd/Common.hpp @@ -17,6 +17,7 @@ constexpr int DEFAULT_CLIENT_PORT_NUMBER = 9980; constexpr int MASTER_PORT_NUMBER = 9981; constexpr int INTERVAL_PROBES = 10; constexpr int MAINTENANCE_INTERVAL = 1; +constexpr int CHILD_TIMEOUT_SECS = 10; constexpr int POLL_TIMEOUT = 1000000; // Pipe buffer is in function of URL size, a big URL will be handled in several // work loads. diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp index 575338a..eefa8e9 100644 --- a/loolwsd/LOOLBroker.cpp +++ b/loolwsd/LOOLBroker.cpp @@ -68,9 +68,9 @@ static int readerChild = -1; static int readerBroker = -1; static std::atomic<unsigned> forkCounter; -static std::atomic<std::chrono::seconds> maintenance; +static std::chrono::steady_clock::time_point lastMaintenanceTime = std::chrono::steady_clock::now(); static unsigned int childCounter = 0; -static unsigned int numPreSpawnedChildren = 0; +static signed numPreSpawnedChildren = 0; static std::mutex forkMutex; static std::map<Process::PID, int> _childProcesses; @@ -78,6 +78,31 @@ static std::map<std::string, Process::PID> _cacheURL; namespace { + /// Safely looks up the pipe descriptor + /// of a child. Returns -1 on error. + int getChildPipe(const Process::PID pid) + { + std::lock_guard<std::mutex> lock(forkMutex); + const auto it = _childProcesses.find(pid); + return (it != _childProcesses.end() ? it->second : -1); + } + + /// Safely removes a child process and + /// invalidates the URL cache. + void removeChild(const Process::PID pid) + { + std::lock_guard<std::mutex> lock(forkMutex); + const auto it = _childProcesses.find(pid); + if (it != _childProcesses.end()) + { + // Close the write pipe. + close(it->second); + _childProcesses.erase(it); + _cacheURL.clear(); + ++forkCounter; + } + } + ThreadLocal<std::string> sourceForLinkOrCopy; ThreadLocal<Path> destinationForLinkOrCopy; @@ -177,37 +202,45 @@ public: ssize_t nBytes = -1; aLine.clear(); - while (true) + try { - if ( _pStart == _pEnd ) + while (true) { - nBytes = Util::readMessage(nPipeReader, _aBuffer, sizeof(_aBuffer)); - if ( nBytes < 0 ) + if ( _pStart == _pEnd ) { - _pStart = _pEnd = nullptr; - break; - } - - _pStart = _aBuffer; - _pEnd = _aBuffer + nBytes; - } + nBytes = Util::readMessage(nPipeReader, _aBuffer, sizeof(_aBuffer)); + if ( nBytes < 0 ) + { + _pStart = _pEnd = nullptr; + break; + } - if ( _pStart != _pEnd ) - { - char aChar = *_pStart++; - while (_pStart != _pEnd && aChar != '\r' && aChar != '\n') - { - aLine += aChar; - aChar = *_pStart++; + _pStart = _aBuffer; + _pEnd = _aBuffer + nBytes; } - if ( aChar == '\r' && *_pStart == '\n') + if ( _pStart != _pEnd ) { - _pStart++; - break; + char aChar = *_pStart++; + while (_pStart != _pEnd && aChar != '\r' && aChar != '\n') + { + aLine += aChar; + aChar = *_pStart++; + } + + if ( aChar == '\r' && *_pStart == '\n') + { + _pStart++; + break; + } } } } + catch (const std::exception& exc) + { + Log::error(std::string("Exception: ") + exc.what()); + return -1; + } return nBytes; } @@ -238,22 +271,23 @@ public: return nBytes; } - ssize_t createThread(Process::PID nPID, const std::string& aTID, const std::string& aURL) + ssize_t createThread(const Process::PID nPID, const std::string& aTID, const std::string& aURL) { const std::string aMessage = "thread " + aTID + " " + aURL + "\r\n"; - return sendMessage(_childProcesses[nPID], aMessage); + return sendMessage(getChildPipe(nPID), aMessage); } void verifyChilds() { + Log::trace("Verifying Childs."); std::string aMessage; bool bError = false; // sanity cache - for (auto it =_cacheURL.cbegin(); it != _cacheURL.cend(); ) + for (auto it = _cacheURL.cbegin(); it != _cacheURL.cend(); ) { aMessage = "search " + it->first + "\r\n"; - if (sendMessage(_childProcesses[it->second], aMessage) < 0) + if (sendMessage(getChildPipe(it->second), aMessage) < 0) { bError = true; break; @@ -346,13 +380,9 @@ public: return; } - else - { - Log::debug("URL [" + aURL + "] is not in cache, searching " + - std::to_string(_childProcesses.size()) + " kits."); - } // not found in cache, full search. + Log::debug("URL [" + aURL + "] is not in cache"); const Process::PID nPID = searchURL(aURL); if ( nPID > 0 ) { @@ -367,8 +397,8 @@ public: } else { - Log::info("No children available, creating [" + std::to_string(numPreSpawnedChildren) + "] childs"); - forkCounter = numPreSpawnedChildren; + Log::info("No children available."); + ++forkCounter; } } } @@ -440,15 +470,16 @@ public: { pStart++; - forkMutex.lock(); - if (maintenance.load() > std::chrono::seconds(10)) + Log::trace("Recv: " + aMessage); + const auto duration = (std::chrono::steady_clock::now() - lastMaintenanceTime); + if (duration >= std::chrono::seconds(10)) { - maintenance = std::chrono::seconds::zero(); verifyChilds(); + lastMaintenanceTime = std::chrono::steady_clock::now(); } + handleInput(aMessage); aMessage.clear(); - forkMutex.unlock(); } } } @@ -493,7 +524,8 @@ static bool globalPreinit(const std::string &loSubPath) return preInit(("/" + loSubPath + "/program").c_str(), "file:///user") == 0; } -static int createLibreOfficeKit(const bool sharePages, const std::string& loSubPath, +static int createLibreOfficeKit(const bool sharePages, + const std::string& loSubPath, const std::string& jailId) { Poco::UInt64 child; @@ -556,45 +588,25 @@ static int createLibreOfficeKit(const bool sharePages, const std::string& loSubP return -1; } - Log::info() << "Adding Kit #" << childCounter << " PID " << child << Log::end; + Log::info() << "Adding Kit #" << childCounter << ", PID: " << child << Log::end; _childProcesses[child] = nFIFOWriter; + ++forkCounter; return child; } -static int startupLibreOfficeKit(const bool sharePages, const int nLOKits, - const std::string& loSubPath, const std::string& jailId) +static bool waitForTerminationChild(const Process::PID aPID, signed count = CHILD_TIMEOUT_SECS) { - Process::PID pId = -1; - - Log::info() << "Starting " << nLOKits << " LoKit instaces." << Log::end; - for (int nCntr = nLOKits; nCntr; nCntr--) - { - if ((pId = createLibreOfficeKit(sharePages, loSubPath, jailId)) < 0) - { - Log::error("Error: failed to create LibreOfficeKit."); - break; - } - } - - return pId; -} - - -static bool waitForTerminationChild(const Process::PID aPID) -{ - int status; - short nCntr = 3; - - while (nCntr-- > 0) + while (count-- > 0) { + int status; waitpid(aPID, &status, WUNTRACED | WNOHANG); if (WIFEXITED(status)) - break; + return true; sleep(MAINTENANCE_INTERVAL); } - return nCntr; + return false; } // Broker process @@ -691,7 +703,7 @@ int main(int argc, char** argv) exit(-1); } - if ( !numPreSpawnedChildren ) + if (numPreSpawnedChildren < 1) { Log::error("Error: --numprespawns is 0"); exit(-1); @@ -801,9 +813,11 @@ int main(int argc, char** argv) exit(-1); } + // Initialize LoKit and hope we can fork and save memory by sharing pages. const bool sharePages = globalPreinit(loSubPath); - if ( startupLibreOfficeKit(sharePages, numPreSpawnedChildren, loSubPath, jailId) < 0 ) + // We must have at least one child, more is created dynamically. + if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0) { Log::error("Error: failed to create children."); exit(-1); @@ -823,74 +837,95 @@ int main(int argc, char** argv) Log::info("loolbroker is ready."); unsigned timeoutCounter = 0; - while (!TerminationFlag && !_childProcesses.empty()) + while (!TerminationFlag) { + if (forkCounter > 0) + { + std::lock_guard<std::mutex> lock(forkMutex); + + // Figure out how many children we need. + const signed total = _childProcesses.size(); + const signed used = _cacheURL.size(); + const signed extra = total - used; + if (extra < numPreSpawnedChildren) + { + if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0) + Log::error("Error: fork failed."); + } + else + forkCounter = 0; + } + int status; const pid_t pid = waitpid(-1, &status, WUNTRACED | WNOHANG); if (pid > 0) { - if ( _childProcesses.find(pid) != _childProcesses.end() ) + if (WIFEXITED(status)) { - if ((WIFEXITED(status) || WIFSIGNALED(status) || WTERMSIG(status) ) ) - { - Log::error("Child [" + std::to_string(pid) + "] processes died."); - - forkMutex.lock(); - _childProcesses.erase(pid); - _cacheURL.clear(); - forkMutex.unlock(); - } - - if ( WCOREDUMP(status) ) - Log::error("Child [" + std::to_string(pid) + "] produced a core dump."); - - if ( WIFSTOPPED(status) ) - Log::error("Child [" + std::to_string(pid) + "] process was stopped by delivery of a signal."); + Log::info() << "Child process [" << pid << "] exited with code: " + << WEXITSTATUS(status) << "." << Log::end; - if ( WSTOPSIG(status) ) - Log::error("Child [" + std::to_string(pid) + "] process was stopped."); + removeChild(pid); + } + else + if (WIFSIGNALED(status)) + { + std::string fate = "died"; +#ifdef WCOREDUMP + if (WCOREDUMP(status)) + fate = "core-dumped"; +#endif + Log::error() << "Child process [" << pid << "] " << fate + << " with " << Util::signalName(WTERMSIG(status)) + << " signal. " << Log::end; - if ( WIFCONTINUED(status) ) - Log::error("Child [" + std::to_string(pid) + "] process was resumed."); + removeChild(pid); + } + else if (WIFSTOPPED(status)) + { + Log::info() << "Child process [" << pid << "] stopped with " + << Util::signalName(WSTOPSIG(status)) + << " signal. " << Log::end; + } + else if (WIFCONTINUED(status)) + { + Log::info() << "Child process [" << pid << "] resumed with SIGCONT." + << Log::end; } else { - Log::error("None of our known child processes died. PID: " + std::to_string(pid)); + Log::warn() << "Unknown status returned by waitpid: " + << std::hex << status << "." << Log::end; } } else if (pid < 0) - Log::error("Error: Child error."); - - if (forkCounter > 0) - { - forkMutex.lock(); - --forkCounter; - - if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0) - Log::error("Error: fork failed."); - - forkMutex.unlock(); - } + Log::error("Error: waitpid failed."); if (timeoutCounter++ == INTERVAL_PROBES) { timeoutCounter = 0; sleep(MAINTENANCE_INTERVAL); - maintenance.store( ++maintenance.load() ); } } // Terminate child processes - for (auto i : _childProcesses) + for (auto& it : _childProcesses) { - Log::info("Requesting child process " + std::to_string(i.first) + " to terminate."); - close(i.second); - Process::requestTermination(i.first); - if (!waitForTerminationChild(i.first)) + Log::info("Requesting child process " + std::to_string(it.first) + " to terminate."); + Process::requestTermination(it.first); + } + + // Wait and kill child processes + for (auto& it : _childProcesses) + { + if (!waitForTerminationChild(it.first)) { - Log::info("Forcing a child process " + std::to_string(i.first) + " to terminate."); - Process::kill(i.first); + Log::info("Forcing child process " + std::to_string(it.first) + " to terminate."); + Process::kill(it.first); } + + // Close the write pipe. + close(it.second); } aPipe.join(); diff --git a/loolwsd/Util.cpp b/loolwsd/Util.cpp index af263f0..5da99de 100644 --- a/loolwsd/Util.cpp +++ b/loolwsd/Util.cpp @@ -402,9 +402,9 @@ namespace Util aPoll.events = POLLIN; aPoll.revents = 0; - int nPoll = poll(&aPoll, 1, 3000); + const int nPoll = poll(&aPoll, 1, CHILD_TIMEOUT_SECS * 1000); if ( nPoll < 0 ) - goto ErrorPoll; + return -1; if ( nPoll == 0 ) errno = ETIME; @@ -412,7 +412,6 @@ namespace Util if( (aPoll.revents & POLLIN) != 0 ) nBytes = readFIFO(nPipe, pBuffer, nSize); - ErrorPoll: return nBytes; } _______________________________________________ Libreoffice-commits mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/libreoffice-commits
