augusto2112 created this revision.
augusto2112 added reviewers: JDevlieghere, bulbazord.
Herald added a project: All.
augusto2112 requested review of this revision.
Herald added a project: LLDB.
Herald added a subscriber: lldb-commits.
This patch picks up where https://reviews.llvm.org/D157159 left of, but
allows for concurrent reads/writes, but protects setting up and tearing
down the underlying Connection object.
Repository:
rG LLVM Github Monorepo
https://reviews.llvm.org/D157760
Files:
lldb/include/lldb/Core/Communication.h
lldb/include/lldb/Core/ThreadedCommunication.h
lldb/source/Core/Communication.cpp
lldb/source/Core/ThreadedCommunication.cpp
lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp
Index: lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp
===================================================================
--- lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp
+++ lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp
@@ -557,7 +557,6 @@
}
}
StopAsyncThread();
- m_comm.Clear();
SetPrivateState(eStateDetached);
ResumePrivateStateThread();
Index: lldb/source/Core/ThreadedCommunication.cpp
===================================================================
--- lldb/source/Core/ThreadedCommunication.cpp
+++ lldb/source/Core/ThreadedCommunication.cpp
@@ -61,11 +61,6 @@
this, GetBroadcasterName());
}
-void ThreadedCommunication::Clear() {
- SetReadThreadBytesReceivedCallback(nullptr, nullptr);
- StopReadThread(nullptr);
- Communication::Clear();
-}
ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) {
assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
@@ -77,6 +72,7 @@
const Timeout<std::micro> &timeout,
ConnectionStatus &status,
Status *error_ptr) {
+ std::shared_lock guard(m_connection_mutex);
Log *log = GetLog(LLDBLog::Communication);
LLDB_LOG(
log,
@@ -152,7 +148,7 @@
// We aren't using a read thread, just read the data synchronously in this
// thread.
- return Communication::Read(dst, dst_len, timeout, status, error_ptr);
+ return Communication::ReadUnlocked(dst, dst_len, timeout, status, error_ptr);
}
bool ThreadedCommunication::StartReadThread(Status *error_ptr) {
@@ -273,46 +269,50 @@
ConnectionStatus status = eConnectionStatusSuccess;
bool done = false;
bool disconnect = false;
- while (!done && m_read_thread_enabled) {
- size_t bytes_read = ReadFromConnection(
- buf, sizeof(buf), std::chrono::seconds(5), status, &error);
- if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
- AppendBytesToCache(buf, bytes_read, true, status);
-
- switch (status) {
- case eConnectionStatusSuccess:
- break;
-
- case eConnectionStatusEndOfFile:
- done = true;
- disconnect = GetCloseOnEOF();
- break;
- case eConnectionStatusError: // Check GetError() for details
- if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
- // EIO on a pipe is usually caused by remote shutdown
+
+ {
+ std::shared_lock guard(m_connection_mutex);
+ while (!done && m_read_thread_enabled) {
+ size_t bytes_read = ReadUnlocked(buf, sizeof(buf),
+ std::chrono::seconds(5), status, &error);
+ if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
+ AppendBytesToCache(buf, bytes_read, true, status);
+
+ switch (status) {
+ case eConnectionStatusSuccess:
+ break;
+
+ case eConnectionStatusEndOfFile:
+ done = true;
disconnect = GetCloseOnEOF();
+ break;
+ case eConnectionStatusError: // Check GetError() for details
+ if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
+ // EIO on a pipe is usually caused by remote shutdown
+ disconnect = GetCloseOnEOF();
+ done = true;
+ }
+ if (error.Fail())
+ LLDB_LOG(log, "error: {0}, status = {1}", error,
+ ThreadedCommunication::ConnectionStatusAsString(status));
+ break;
+ case eConnectionStatusInterrupted: // Synchronization signal from
+ // SynchronizeWithReadThread()
+ // The connection returns eConnectionStatusInterrupted only when there
+ // is no input pending to be read, so we can signal that.
+ BroadcastEvent(eBroadcastBitNoMorePendingInput);
+ break;
+ case eConnectionStatusNoConnection: // No connection
+ case eConnectionStatusLostConnection: // Lost connection while connected
+ // to a valid connection
done = true;
+ [[fallthrough]];
+ case eConnectionStatusTimedOut: // Request timed out
+ if (error.Fail())
+ LLDB_LOG(log, "error: {0}, status = {1}", error,
+ ThreadedCommunication::ConnectionStatusAsString(status));
+ break;
}
- if (error.Fail())
- LLDB_LOG(log, "error: {0}, status = {1}", error,
- ThreadedCommunication::ConnectionStatusAsString(status));
- break;
- case eConnectionStatusInterrupted: // Synchronization signal from
- // SynchronizeWithReadThread()
- // The connection returns eConnectionStatusInterrupted only when there is
- // no input pending to be read, so we can signal that.
- BroadcastEvent(eBroadcastBitNoMorePendingInput);
- break;
- case eConnectionStatusNoConnection: // No connection
- case eConnectionStatusLostConnection: // Lost connection while connected to
- // a valid connection
- done = true;
- [[fallthrough]];
- case eConnectionStatusTimedOut: // Request timed out
- if (error.Fail())
- LLDB_LOG(log, "error: {0}, status = {1}", error,
- ThreadedCommunication::ConnectionStatusAsString(status));
- break;
}
}
m_pass_status = status;
@@ -361,8 +361,12 @@
if (!m_read_thread_enabled || m_read_thread_did_exit)
return;
- // Notify the read thread.
- m_connection_sp->InterruptRead();
+ {
+ // Notify the read thread.
+ std::shared_lock guard(m_connection_mutex);
+ if (m_connection_sp)
+ m_connection_sp->InterruptRead();
+ }
// Wait for the synchronization event.
EventSP event_sp;
Index: lldb/source/Core/Communication.cpp
===================================================================
--- lldb/source/Core/Communication.cpp
+++ lldb/source/Core/Communication.cpp
@@ -27,110 +27,110 @@
using namespace lldb_private;
Communication::Communication()
- : m_connection_sp(), m_write_mutex(), m_close_on_eof(true) {
-}
-
-Communication::~Communication() {
- Clear();
-}
+ : m_connection_sp(), m_connection_mutex(), m_close_on_eof(true) {}
-void Communication::Clear() {
- Disconnect(nullptr);
-}
+Communication::~Communication() { Disconnect(nullptr); }
ConnectionStatus Communication::Connect(const char *url, Status *error_ptr) {
- Clear();
+ std::unique_lock guard(m_connection_mutex);
LLDB_LOG(GetLog(LLDBLog::Communication),
"{0} Communication::Connect (url = {1})", this, url);
- lldb::ConnectionSP connection_sp(m_connection_sp);
- if (connection_sp)
- return connection_sp->Connect(url, error_ptr);
+ DisconnectUnlocked();
+
+ if (m_connection_sp)
+ return m_connection_sp->Connect(url, error_ptr);
if (error_ptr)
error_ptr->SetErrorString("Invalid connection.");
return eConnectionStatusNoConnection;
}
ConnectionStatus Communication::Disconnect(Status *error_ptr) {
+ std::unique_lock guard(m_connection_mutex);
+ return DisconnectUnlocked(error_ptr);
+}
+
+ConnectionStatus Communication::DisconnectUnlocked(Status *error_ptr) {
LLDB_LOG(GetLog(LLDBLog::Communication), "{0} Communication::Disconnect ()",
this);
- lldb::ConnectionSP connection_sp(m_connection_sp);
- if (connection_sp) {
- ConnectionStatus status = connection_sp->Disconnect(error_ptr);
- // We currently don't protect connection_sp with any mutex for multi-
- // threaded environments. So lets not nuke our connection class without
- // putting some multi-threaded protections in. We also probably don't want
- // to pay for the overhead it might cause if every time we access the
- // connection we have to take a lock.
- //
- // This unique pointer will cleanup after itself when this object goes
- // away, so there is no need to currently have it destroy itself
- // immediately upon disconnect.
- // connection_sp.reset();
+ if (m_connection_sp) {
+ ConnectionStatus status = m_connection_sp->Disconnect(error_ptr);
return status;
}
return eConnectionStatusNoConnection;
}
bool Communication::IsConnected() const {
- lldb::ConnectionSP connection_sp(m_connection_sp);
- return (connection_sp ? connection_sp->IsConnected() : false);
+ std::shared_lock guard(m_connection_mutex);
+ return (m_connection_sp ? m_connection_sp->IsConnected() : false);
}
bool Communication::HasConnection() const {
+ std::shared_lock guard(m_connection_mutex);
return m_connection_sp.get() != nullptr;
}
size_t Communication::Read(void *dst, size_t dst_len,
const Timeout<std::micro> &timeout,
ConnectionStatus &status, Status *error_ptr) {
- Log *log = GetLog(LLDBLog::Communication);
- LLDB_LOG(
- log,
- "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
- this, dst, dst_len, timeout, m_connection_sp.get());
-
- return ReadFromConnection(dst, dst_len, timeout, status, error_ptr);
+ std::shared_lock guard(m_connection_mutex);
+ return ReadUnlocked(dst, dst_len, timeout, status, error_ptr);
}
size_t Communication::Write(const void *src, size_t src_len,
ConnectionStatus &status, Status *error_ptr) {
- lldb::ConnectionSP connection_sp(m_connection_sp);
+ // We need to lock the write mutex so no concurrent writes happen, but also
+ // lock the connection mutex so it's not reset mid write. We need both mutexes
+ // because reads and writes from the connection can happen concurrently.
+ std::shared_lock guard(m_connection_mutex);
+ std::lock_guard<std::mutex> guard_write(m_write_mutex);
+ return WriteUnlocked(src, src_len, status, error_ptr);
+}
+
+size_t Communication::WriteUnlocked(const void *src, size_t src_len,
+ ConnectionStatus &status,
+ Status *error_ptr) {
+ if (!m_connection_sp) {
+ if (error_ptr)
+ error_ptr->SetErrorString("Invalid connection.");
+ status = eConnectionStatusNoConnection;
+ return 0;
+ }
- std::lock_guard<std::mutex> guard(m_write_mutex);
LLDB_LOG(GetLog(LLDBLog::Communication),
"{0} Communication::Write (src = {1}, src_len = {2}"
") connection = {3}",
- this, src, (uint64_t)src_len, connection_sp.get());
-
- if (connection_sp)
- return connection_sp->Write(src, src_len, status, error_ptr);
+ this, src, (uint64_t)src_len, m_connection_sp.get());
- if (error_ptr)
- error_ptr->SetErrorString("Invalid connection.");
- status = eConnectionStatusNoConnection;
- return 0;
+ return m_connection_sp->Write(src, src_len, status, error_ptr);
}
size_t Communication::WriteAll(const void *src, size_t src_len,
ConnectionStatus &status, Status *error_ptr) {
+ std::shared_lock guard(m_connection_mutex);
+ std::lock_guard<std::mutex> guard_write(m_write_mutex);
size_t total_written = 0;
do
- total_written += Write(static_cast<const char *>(src) + total_written,
- src_len - total_written, status, error_ptr);
+ total_written +=
+ WriteUnlocked(static_cast<const char *>(src) + total_written,
+ src_len - total_written, status, error_ptr);
while (status == eConnectionStatusSuccess && total_written < src_len);
return total_written;
}
-size_t Communication::ReadFromConnection(void *dst, size_t dst_len,
- const Timeout<std::micro> &timeout,
- ConnectionStatus &status,
- Status *error_ptr) {
- lldb::ConnectionSP connection_sp(m_connection_sp);
- if (connection_sp)
- return connection_sp->Read(dst, dst_len, timeout, status, error_ptr);
+size_t Communication::ReadUnlocked(void *dst, size_t dst_len,
+ const Timeout<std::micro> &timeout,
+ ConnectionStatus &status,
+ Status *error_ptr) {
+ Log *log = GetLog(LLDBLog::Communication);
+ LLDB_LOG(
+ log,
+ "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
+ this, dst, dst_len, timeout, m_connection_sp.get());
+ if (m_connection_sp)
+ return m_connection_sp->Read(dst, dst_len, timeout, status, error_ptr);
if (error_ptr)
error_ptr->SetErrorString("Invalid connection.");
@@ -139,7 +139,8 @@
}
void Communication::SetConnection(std::unique_ptr<Connection> connection) {
- Disconnect(nullptr);
+ std::unique_lock guard(m_connection_mutex);
+ DisconnectUnlocked(nullptr);
m_connection_sp = std::move(connection);
}
Index: lldb/include/lldb/Core/ThreadedCommunication.h
===================================================================
--- lldb/include/lldb/Core/ThreadedCommunication.h
+++ lldb/include/lldb/Core/ThreadedCommunication.h
@@ -97,8 +97,6 @@
/// The destructor is virtual since this class gets subclassed.
~ThreadedCommunication() override;
- void Clear() override;
-
/// Disconnect the communications connection if one is currently connected.
///
/// \return
Index: lldb/include/lldb/Core/Communication.h
===================================================================
--- lldb/include/lldb/Core/Communication.h
+++ lldb/include/lldb/Core/Communication.h
@@ -16,6 +16,7 @@
#include "lldb/lldb-types.h"
#include <mutex>
+#include <shared_mutex>
#include <string>
namespace lldb_private {
@@ -46,8 +47,6 @@
/// The destructor is virtual since this class gets subclassed.
virtual ~Communication();
- virtual void Clear();
-
/// Connect using the current connection by passing \a url to its connect
/// function. string.
///
@@ -84,7 +83,10 @@
bool HasConnection() const;
- lldb_private::Connection *GetConnection() { return m_connection_sp.get(); }
+ lldb_private::Connection *GetConnection() {
+ std::shared_lock guard(m_connection_mutex);
+ return m_connection_sp.get();
+ }
/// Read bytes from the current connection.
///
@@ -169,13 +171,24 @@
///by this communications class.
std::mutex
m_write_mutex; ///< Don't let multiple threads write at the same time...
+ mutable std::shared_mutex m_connection_mutex;
bool m_close_on_eof;
+ /// Same as read but with m_connection_mutex unlocked.
+ size_t ReadUnlocked(void *dst, size_t dst_len,
+ const Timeout<std::micro> &timeout,
+ lldb::ConnectionStatus &status, Status *error_ptr);
+
size_t ReadFromConnection(void *dst, size_t dst_len,
const Timeout<std::micro> &timeout,
lldb::ConnectionStatus &status, Status *error_ptr);
private:
+ /// Same as Disconnect but with with m_connection_mutex unlocked.
+ lldb::ConnectionStatus DisconnectUnlocked(Status *error_ptr = nullptr);
+ /// Same as Write but with both m_write_mutex and m_connection_mutex unlocked.
+ size_t WriteUnlocked(const void *src, size_t src_len,
+ lldb::ConnectionStatus &status, Status *error_ptr);
Communication(const Communication &) = delete;
const Communication &operator=(const Communication &) = delete;
};
_______________________________________________
lldb-commits mailing list
[email protected]
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits