mgorny created this revision. mgorny added reviewers: labath, emaste, krytarowski, jingham. Herald added a subscriber: arichardson. Herald added a project: All. mgorny requested review of this revision.
Use MainLoop to implement Communication::ReadThread() instead of blocking reads. Most importantly, this makes it possible to use the newly implemented TriggerPendingCallbacks() mechanism to immediately interrupt waiting for new data. This in turn will make it possible to enable the possibility of writing to the descriptor using the same thread. Sponsored by: The FreeBSD Foundation https://reviews.llvm.org/D132283 Files: lldb/include/lldb/Core/Communication.h lldb/source/Core/Communication.cpp lldb/unittests/Core/CommunicationTest.cpp
Index: lldb/unittests/Core/CommunicationTest.cpp =================================================================== --- lldb/unittests/Core/CommunicationTest.cpp +++ lldb/unittests/Core/CommunicationTest.cpp @@ -13,6 +13,9 @@ #include "llvm/Testing/Support/Error.h" #include "gtest/gtest.h" +#include <chrono> +#include <condition_variable> +#include <mutex> #include <thread> #if LLDB_ENABLE_POSIX @@ -88,6 +91,31 @@ CommunicationReadTest(/*use_thread=*/true); } +TEST(CommunicationTest, StopReadThread) { + std::condition_variable finished; + std::mutex finished_mutex; + + std::thread test_thread{[&finished]() { + Pipe pipe; + ASSERT_THAT_ERROR(pipe.CreateNew(/*child_process_inherit=*/false).ToError(), + llvm::Succeeded()); + + Communication comm("test"); + comm.SetConnection(std::make_unique<ConnectionFileDescriptor>( + pipe.ReleaseReadFileDescriptor(), /*owns_fd=*/true)); + comm.SetCloseOnEOF(true); + + ASSERT_TRUE(comm.StartReadThread()); + ASSERT_TRUE(comm.StopReadThread()); + finished.notify_all(); + }}; + + // StopReadThread() can hang, so force an external timeout. + std::unique_lock<std::mutex> lock{finished_mutex}; + ASSERT_EQ(finished.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout); + test_thread.join(); +} + TEST(CommunicationTest, SynchronizeWhileClosing) { // Set up a communication object reading from a pipe. Pipe pipe; Index: lldb/source/Core/Communication.cpp =================================================================== --- lldb/source/Core/Communication.cpp +++ lldb/source/Core/Communication.cpp @@ -262,7 +262,9 @@ BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); - // error = m_read_thread.Cancel(); + m_io_loop.AddPendingCallback( + [](MainLoopBase &loop) { loop.RequestTermination(); }); + m_io_loop.TriggerPendingCallbacks(); Status error = m_read_thread.Join(nullptr); return error.Success(); @@ -336,54 +338,70 @@ LLDB_LOG(log, "Communication({0}) thread starting...", this); - uint8_t buf[1024]; - - Status error; - ConnectionStatus status = eConnectionStatusSuccess; - bool done = false; bool disconnect = false; - while (!done && m_read_thread_enabled) { - size_t bytes_read = ReadFromConnection( - buf, sizeof(buf), std::chrono::seconds(5), status, &error); - if (bytes_read > 0 || status == eConnectionStatusEndOfFile) - AppendBytesToCache(buf, bytes_read, true, status); - - switch (status) { - case eConnectionStatusSuccess: - break; - - case eConnectionStatusEndOfFile: - done = true; - disconnect = GetCloseOnEOF(); - break; - case eConnectionStatusError: // Check GetError() for details - if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { - // EIO on a pipe is usually caused by remote shutdown - disconnect = GetCloseOnEOF(); - done = true; - } - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; - case eConnectionStatusInterrupted: // Synchronization signal from - // SynchronizeWithReadThread() - // The connection returns eConnectionStatusInterrupted only when there is - // no input pending to be read, so we can signal that. - BroadcastEvent(eBroadcastBitNoMorePendingInput); - break; - case eConnectionStatusNoConnection: // No connection - case eConnectionStatusLostConnection: // Lost connection while connected to - // a valid connection - done = true; - [[fallthrough]]; - case eConnectionStatusTimedOut: // Request timed out - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; - } + if (m_connection_sp) { + Status error; + auto handle = m_io_loop.RegisterReadObject( + m_connection_sp->GetReadObject(), + [this, &disconnect](MainLoopBase &loop) { + Log *log = GetLog(LLDBLog::Communication); + + if (!m_read_thread_enabled) { + loop.RequestTermination(); + return; + } + + uint8_t buf[1024]; + ConnectionStatus status = eConnectionStatusSuccess; + Status error; + size_t bytes_read = ReadFromConnection( + buf, sizeof(buf), std::chrono::seconds(5), status, &error); + if (bytes_read > 0 || status == eConnectionStatusEndOfFile) + AppendBytesToCache(buf, bytes_read, true, status); + + switch (status) { + case eConnectionStatusSuccess: + break; + + case eConnectionStatusEndOfFile: + loop.RequestTermination(); + disconnect = GetCloseOnEOF(); + break; + case eConnectionStatusError: // Check GetError() for details + if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { + // EIO on a pipe is usually caused by remote shutdown + loop.RequestTermination(); + disconnect = GetCloseOnEOF(); + } + if (error.Fail()) + LLDB_LOG(log, "error: {0}, status = {1}", error, + Communication::ConnectionStatusAsString(status)); + break; + case eConnectionStatusInterrupted: // Synchronization signal from + // SynchronizeWithReadThread() + // The connection returns eConnectionStatusInterrupted only when + // there is no input pending to be read, so we can signal that. + BroadcastEvent(eBroadcastBitNoMorePendingInput); + break; + case eConnectionStatusNoConnection: // No connection + case eConnectionStatusLostConnection: // Lost connection while + // connected to a valid + // connection + loop.RequestTermination(); + [[fallthrough]]; + case eConnectionStatusTimedOut: // Request timed out + if (error.Fail()) + LLDB_LOG(log, "error: {0}, status = {1}", error, + Communication::ConnectionStatusAsString(status)); + break; + } + }, + error); + assert(error.Success()); + error = m_io_loop.Run(); + assert(error.Success()); } + log = GetLog(LLDBLog::Communication); LLDB_LOG(log, "Communication({0}) thread exiting...", this); Index: lldb/include/lldb/Core/Communication.h =================================================================== --- lldb/include/lldb/Core/Communication.h +++ lldb/include/lldb/Core/Communication.h @@ -10,6 +10,7 @@ #define LLDB_CORE_COMMUNICATION_H #include "lldb/Host/HostThread.h" +#include "lldb/Host/MainLoop.h" #include "lldb/Utility/Broadcaster.h" #include "lldb/Utility/Timeout.h" #include "lldb/lldb-defines.h" @@ -318,6 +319,7 @@ ///by this communications class. HostThread m_read_thread; ///< The read thread handle in case we need to ///cancel the thread. + MainLoop m_io_loop; //< Loop instance used by the read thread. std::atomic<bool> m_read_thread_enabled; std::atomic<bool> m_read_thread_did_exit; std::string
_______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits