mgorny updated this revision to Diff 455951.
mgorny added a comment.

Rebase, move `StopReadThread()` test here.


CHANGES SINCE LAST ACTION
  https://reviews.llvm.org/D132283/new/

https://reviews.llvm.org/D132283

Files:
  lldb/include/lldb/Core/Communication.h
  lldb/source/Core/Communication.cpp
  lldb/unittests/Core/CommunicationTest.cpp

Index: lldb/unittests/Core/CommunicationTest.cpp
===================================================================
--- lldb/unittests/Core/CommunicationTest.cpp
+++ lldb/unittests/Core/CommunicationTest.cpp
@@ -116,6 +116,30 @@
   CommunicationReadTest(/*use_thread=*/true);
 }
 
+TEST_F(CommunicationTest, StopReadThread) {
+  std::condition_variable finished;
+  std::mutex finished_mutex;
+
+  std::thread test_thread{[&]() {
+    std::unique_ptr<TCPSocket> a, b;
+    ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b));
+
+    Communication comm("test");
+    comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(b.release()));
+    comm.SetCloseOnEOF(true);
+
+    ASSERT_TRUE(comm.StartReadThread());
+    ASSERT_TRUE(comm.StopReadThread());
+    finished.notify_all();
+  }};
+
+  // StopReadThread() can hang, so force an external timeout.
+  std::unique_lock<std::mutex> lock{finished_mutex};
+  ASSERT_EQ(finished.wait_for(lock, std::chrono::seconds(3)),
+            std::cv_status::no_timeout);
+  test_thread.join();
+}
+
 TEST_F(CommunicationTest, SynchronizeWhileClosing) {
   std::unique_ptr<TCPSocket> a, b;
   ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b));
Index: lldb/source/Core/Communication.cpp
===================================================================
--- lldb/source/Core/Communication.cpp
+++ lldb/source/Core/Communication.cpp
@@ -228,6 +228,8 @@
 
   m_read_thread_enabled = true;
   m_read_thread_did_exit = false;
+  // Allocate the I/O loop in main thread to avoid races.
+  m_io_loop.reset(new MainLoop());
   auto maybe_thread = ThreadLauncher::LaunchThread(
       thread_name, [this] { return ReadThread(); });
   if (maybe_thread) {
@@ -258,9 +260,13 @@
 
   BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
 
-  // error = m_read_thread.Cancel();
+  m_io_loop->AddPendingCallback(
+      [](MainLoopBase &loop) { loop.RequestTermination(); });
+  m_io_loop->TriggerPendingCallbacks();
 
   Status error = m_read_thread.Join(nullptr);
+  if (error.Success())
+    m_io_loop.reset(nullptr);
   return error.Success();
 }
 
@@ -332,56 +338,76 @@
 
   LLDB_LOG(log, "Communication({0}) thread starting...", this);
 
-  uint8_t buf[1024];
-
-  Status error;
-  ConnectionStatus status = eConnectionStatusSuccess;
-  bool done = false;
   bool disconnect = false;
-  while (!done && m_read_thread_enabled) {
-    size_t bytes_read = ReadFromConnection(
-        buf, sizeof(buf), std::chrono::seconds(5), status, &error);
-    if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
-      AppendBytesToCache(buf, bytes_read, true, status);
-
-    switch (status) {
-    case eConnectionStatusSuccess:
-      break;
-
-    case eConnectionStatusEndOfFile:
-      done = true;
-      disconnect = GetCloseOnEOF();
-      break;
-    case eConnectionStatusError: // Check GetError() for details
-      if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
-        // EIO on a pipe is usually caused by remote shutdown
-        disconnect = GetCloseOnEOF();
-        done = true;
-      }
-      if (error.Fail())
-        LLDB_LOG(log, "error: {0}, status = {1}", error,
-                 Communication::ConnectionStatusAsString(status));
-      break;
-    case eConnectionStatusInterrupted: // Synchronization signal from
-                                       // SynchronizeWithReadThread()
-      // The connection returns eConnectionStatusInterrupted only when there is
-      // no input pending to be read, so we can signal that.
-      BroadcastEvent(eBroadcastBitNoMorePendingInput);
-      break;
-    case eConnectionStatusNoConnection:   // No connection
-    case eConnectionStatusLostConnection: // Lost connection while connected to
-                                          // a valid connection
-      done = true;
-      [[fallthrough]];
-    case eConnectionStatusTimedOut: // Request timed out
-      if (error.Fail())
-        LLDB_LOG(log, "error: {0}, status = {1}", error,
-                 Communication::ConnectionStatusAsString(status));
-      break;
+  ConnectionStatus status = eConnectionStatusSuccess;
+  Status error;
+  if (IsConnected()) {
+    Status loop_error;
+    auto handle = m_io_loop->RegisterReadObject(
+        m_connection_sp->GetReadObject(),
+        [this, &disconnect, &status, &error](MainLoopBase &loop) {
+          Log *log = GetLog(LLDBLog::Communication);
+
+          if (!m_read_thread_enabled) {
+            loop.RequestTermination();
+            return;
+          }
+
+          uint8_t buf[1024];
+          size_t bytes_read = ReadFromConnection(
+              buf, sizeof(buf), std::chrono::seconds(5), status, &error);
+          if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
+            AppendBytesToCache(buf, bytes_read, true, status);
+
+          switch (status) {
+          case eConnectionStatusSuccess:
+          case eConnectionStatusInterrupted:
+            break;
+
+          case eConnectionStatusEndOfFile:
+            loop.RequestTermination();
+            disconnect = GetCloseOnEOF();
+            break;
+          case eConnectionStatusError: // Check GetError() for details
+            if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
+              // EIO on a pipe is usually caused by remote shutdown
+              loop.RequestTermination();
+              disconnect = GetCloseOnEOF();
+            }
+            if (error.Fail())
+              LLDB_LOG(log, "error: {0}, status = {1}", error,
+                       Communication::ConnectionStatusAsString(status));
+            break;
+          case eConnectionStatusNoConnection:   // No connection
+          case eConnectionStatusLostConnection: // Lost connection while
+                                                // connected to a valid
+                                                // connection
+            loop.RequestTermination();
+            [[fallthrough]];
+          case eConnectionStatusTimedOut: // Request timed out
+            if (error.Fail())
+              LLDB_LOG(log, "error: {0}, status = {1}", error,
+                       Communication::ConnectionStatusAsString(status));
+            break;
+          }
+        },
+        loop_error);
+    if (loop_error.Success())
+      loop_error = m_io_loop->Run();
+    if (!loop_error.Success()) {
+      error = std::move(loop_error);
+      status = lldb::eConnectionStatusError;
     }
+  } else {
+    if (m_connection_sp)
+      status = lldb::eConnectionStatusLostConnection;
+    else
+      status = lldb::eConnectionStatusNoConnection;
+    error.SetErrorString(ConnectionStatusAsString(status));
   }
   m_pass_status = status;
   m_pass_error = std::move(error);
+
   LLDB_LOG(log, "Communication({0}) thread exiting...", this);
 
   // Handle threads wishing to synchronize with us.
@@ -424,7 +450,10 @@
     return;
 
   // Notify the read thread.
-  m_connection_sp->InterruptRead();
+  m_io_loop->AddPendingCallback([this](MainLoopBase &loop) {
+    BroadcastEvent(eBroadcastBitNoMorePendingInput);
+  });
+  m_io_loop->TriggerPendingCallbacks();
 
   // Wait for the synchronization event.
   EventSP event_sp;
Index: lldb/include/lldb/Core/Communication.h
===================================================================
--- lldb/include/lldb/Core/Communication.h
+++ lldb/include/lldb/Core/Communication.h
@@ -10,6 +10,7 @@
 #define LLDB_CORE_COMMUNICATION_H
 
 #include "lldb/Host/HostThread.h"
+#include "lldb/Host/MainLoop.h"
 #include "lldb/Utility/Broadcaster.h"
 #include "lldb/Utility/Timeout.h"
 #include "lldb/lldb-defines.h"
@@ -318,6 +319,8 @@
                                       ///by this communications class.
   HostThread m_read_thread; ///< The read thread handle in case we need to
                             ///cancel the thread.
+  std::unique_ptr<MainLoop> m_io_loop; ///< The loop instance used by
+                                       ///the read thread.
   std::atomic<bool> m_read_thread_enabled;
   std::atomic<bool> m_read_thread_did_exit;
   std::string
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to