mgorny created this revision.
mgorny added reviewers: labath, krytarowski, emaste, jingham.
Herald added a subscriber: arichardson.
Herald added a project: All.
mgorny requested review of this revision.

When read thread is enabled, perform all the writes in the thread
as well.  Most importantly, this ensures that we do not attempt to
simultaneously read and write the same fd from different threads.

Sponsored by: The FreeBSD Foundation


https://reviews.llvm.org/D132578

Files:
  lldb/include/lldb/Core/Communication.h
  lldb/source/Core/Communication.cpp
  lldb/unittests/Core/CommunicationTest.cpp

Index: lldb/unittests/Core/CommunicationTest.cpp
===================================================================
--- lldb/unittests/Core/CommunicationTest.cpp
+++ lldb/unittests/Core/CommunicationTest.cpp
@@ -118,6 +118,88 @@
   CommunicationReadTest(/*use_thread=*/true);
 }
 
+static void CommunicationWriteTest(bool use_read_thread) {
+  Pipe pipe;
+  ASSERT_THAT_ERROR(pipe.CreateNew(/*child_process_inherit=*/false).ToError(),
+                    llvm::Succeeded());
+
+  Communication comm("test");
+  comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(
+      pipe.ReleaseWriteFileDescriptor(), /*owns_fd=*/true));
+  comm.SetCloseOnEOF(true);
+
+  if (use_read_thread)
+    ASSERT_TRUE(comm.StartReadThread());
+
+  // In our test case, a short Write() should be atomic.
+  lldb::ConnectionStatus status = lldb::eConnectionStatusSuccess;
+  Status error;
+  std::string test_str{"test"};
+  EXPECT_EQ(comm.Write(test_str.data(), test_str.size(), status, &error), 4U);
+  EXPECT_EQ(status, lldb::eConnectionStatusSuccess);
+  EXPECT_THAT_ERROR(error.ToError(), llvm::Succeeded());
+
+  char buf[5];
+  size_t bytes_read;
+  ASSERT_THAT_ERROR(
+      pipe.ReadWithTimeout(buf, 4U, std::chrono::seconds(0), bytes_read)
+          .ToError(),
+      llvm::Succeeded());
+  buf[4] = 0;
+  EXPECT_EQ(test_str, buf);
+
+  // Test WriteAll() too.
+  test_str[3] = '2';
+  error.Clear();
+  EXPECT_EQ(comm.WriteAll(test_str.data(), test_str.size(), status, &error),
+            4U);
+  EXPECT_EQ(status, lldb::eConnectionStatusSuccess);
+  EXPECT_THAT_ERROR(error.ToError(), llvm::Succeeded());
+
+  ASSERT_THAT_ERROR(
+      pipe.ReadWithTimeout(buf, 4U, std::chrono::seconds(0), bytes_read)
+          .ToError(),
+      llvm::Succeeded());
+  buf[4] = 0;
+  EXPECT_EQ(test_str, buf);
+
+  EXPECT_TRUE(comm.StopReadThread());
+
+  // Test using Communication without a connection.
+  comm.SetConnection(nullptr);
+  pipe.CloseReadFileDescriptor();
+  if (use_read_thread)
+    ASSERT_TRUE(comm.StartReadThread());
+  error.Clear();
+  EXPECT_EQ(comm.Write(test_str.data(), test_str.size(), status, &error), 0U);
+  EXPECT_EQ(status, lldb::eConnectionStatusNoConnection);
+  EXPECT_THAT_ERROR(error.ToError(), llvm::Failed());
+  EXPECT_TRUE(comm.StopReadThread());
+
+  // Test using Communication that is disconnected.
+  ASSERT_THAT_ERROR(pipe.CreateNew(/*child_process_inherit=*/false).ToError(),
+                    llvm::Succeeded());
+  comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(
+      pipe.ReleaseWriteFileDescriptor(), /*owns_fd=*/true));
+  comm.SetCloseOnEOF(true);
+  ASSERT_EQ(comm.Disconnect(), lldb::eConnectionStatusSuccess);
+  if (use_read_thread)
+    ASSERT_TRUE(comm.StartReadThread());
+  error.Clear();
+  EXPECT_EQ(comm.Write(test_str.data(), test_str.size(), status, &error), 0U);
+  EXPECT_EQ(status, lldb::eConnectionStatusNoConnection);
+  EXPECT_THAT_ERROR(error.ToError(), llvm::Failed());
+  EXPECT_TRUE(comm.StopReadThread());
+}
+
+TEST(CommunicationTest, Write) {
+  CommunicationWriteTest(/*use_thread=*/false);
+}
+
+TEST(CommunicationTest, WriteThread) {
+  CommunicationWriteTest(/*use_thread=*/true);
+}
+
 TEST(CommunicationTest, StopReadThread) {
   std::condition_variable finished;
   std::mutex finished_mutex;
Index: lldb/source/Core/Communication.cpp
===================================================================
--- lldb/source/Core/Communication.cpp
+++ lldb/source/Core/Communication.cpp
@@ -186,21 +186,53 @@
 
 size_t Communication::Write(const void *src, size_t src_len,
                             ConnectionStatus &status, Status *error_ptr) {
-  lldb::ConnectionSP connection_sp(m_connection_sp);
-
   std::lock_guard<std::mutex> guard(m_write_mutex);
   LLDB_LOG(GetLog(LLDBLog::Communication),
            "{0} Communication::Write (src = {1}, src_len = {2}"
            ") connection = {3}",
-           this, src, (uint64_t)src_len, connection_sp.get());
+           this, src, (uint64_t)src_len, m_connection_sp.get());
 
-  if (connection_sp)
-    return connection_sp->Write(src, src_len, status, error_ptr);
+  if (m_read_thread_enabled) {
+    assert(m_write_bytes.empty());
 
-  if (error_ptr)
-    error_ptr->SetErrorString("Invalid connection.");
-  status = eConnectionStatusNoConnection;
-  return 0;
+    if (!m_connection_sp) {
+      if (error_ptr)
+        error_ptr->SetErrorString("Invalid connection.");
+      status = eConnectionStatusNoConnection;
+      return 0;
+    }
+
+    ListenerSP listener_sp(Listener::MakeListener("Communication::Write"));
+    listener_sp->StartListeningForEvents(
+        this, eBroadcastBitWriteDone | eBroadcastBitReadThreadDidExit);
+
+    m_write_bytes.assign(static_cast<const char *>(src), src_len);
+    m_io_loop->AddPendingCallback(
+        [this](MainLoopBase &loop) { WriteThread(); });
+    m_io_loop->TriggerPendingCallbacks();
+
+    EventSP event_sp;
+    while (m_read_thread_enabled) {
+      if (listener_sp->GetEvent(event_sp, std::chrono::seconds(5))) {
+        const uint32_t event_type = event_sp->GetType();
+        if (event_type & eBroadcastBitWriteDone) {
+          // TODO: error handling
+          assert(m_write_bytes.empty());
+          status = eConnectionStatusSuccess;
+          return src_len;
+        }
+
+        if (event_type & eBroadcastBitReadThreadDidExit)
+          break;
+      }
+    }
+
+    // If read thread exited before performing the write, fall back
+    // to writing directly.
+    m_write_bytes.clear();
+  }
+
+  return WriteToConnection(src, src_len, status, error_ptr);
 }
 
 size_t Communication::WriteAll(const void *src, size_t src_len,
@@ -331,6 +363,19 @@
   return 0;
 }
 
+size_t Communication::WriteToConnection(const void *src, size_t src_len,
+                                        ConnectionStatus &status,
+                                        Status *error_ptr) {
+  lldb::ConnectionSP connection_sp(m_connection_sp);
+  if (connection_sp)
+    return connection_sp->Write(src, src_len, status, error_ptr);
+
+  if (error_ptr)
+    error_ptr->SetErrorString("Invalid connection.");
+  status = eConnectionStatusNoConnection;
+  return 0;
+}
+
 bool Communication::ReadThreadIsRunning() { return m_read_thread_enabled; }
 
 lldb::thread_result_t Communication::ReadThread() {
@@ -430,6 +475,25 @@
   return {};
 }
 
+void Communication::WriteThread() {
+  // There should be only one pending request queued.
+  assert(!m_write_bytes.empty());
+
+  ConnectionStatus status = eConnectionStatusSuccess;
+  Status error;
+  do {
+    size_t bytes_written = WriteToConnection(
+        m_write_bytes.data(), m_write_bytes.size(), status, &error);
+    if (bytes_written > 0)
+      m_write_bytes.erase(0, bytes_written);
+  } while (!m_write_bytes.empty() && status == eConnectionStatusSuccess);
+
+  // TODO: error handling
+  assert(status == eConnectionStatusSuccess);
+
+  BroadcastEvent(eBroadcastBitWriteDone);
+}
+
 void Communication::SetReadThreadBytesReceivedCallback(
     ReadThreadBytesReceived callback, void *callback_baton) {
   m_callback = callback;
Index: lldb/include/lldb/Core/Communication.h
===================================================================
--- lldb/include/lldb/Core/Communication.h
+++ lldb/include/lldb/Core/Communication.h
@@ -99,6 +99,9 @@
       eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread
                                                    ///to indicate all pending
                                                    ///input has been processed.
+      eBroadcastBitWriteDone = (1u << 6), ///< Sent by the read thread
+                                          ///to indicate that a write request
+                                          ///has been processed.
       kLoUserBroadcastBit =
           (1u << 16), ///< Subclasses can used bits 31:16 for any needed events.
       kHiUserBroadcastBit = (1u << 31),
@@ -325,6 +328,7 @@
   std::atomic<bool> m_read_thread_did_exit;
   std::string
       m_bytes; ///< A buffer to cache bytes read in the ReadThread function.
+  std::string m_write_bytes; ///< A buffer used to pass write bytes.
   std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded
                                       ///access to the cached bytes.
   lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough
@@ -340,6 +344,8 @@
   size_t ReadFromConnection(void *dst, size_t dst_len,
                             const Timeout<std::micro> &timeout,
                             lldb::ConnectionStatus &status, Status *error_ptr);
+  size_t WriteToConnection(const void *src, size_t src_len,
+                           lldb::ConnectionStatus &status, Status *error_ptr);
 
   /// Append new bytes that get read from the read thread into the internal
   /// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes
@@ -380,6 +386,8 @@
   ///     The number of bytes extracted from the data cache.
   size_t GetCachedBytes(void *dst, size_t dst_len);
 
+  void WriteThread();
+
 private:
   Communication(const Communication &) = delete;
   const Communication &operator=(const Communication &) = delete;
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to