This is an automated email from the ASF dual-hosted git repository.

dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ef103643f jsonrpc: Wait for the socket to be ready to read. (#11815)
5ef103643f is described below

commit 5ef103643f2913d9fc4aba15f7846c01ccc75b8e
Author: Damian Meden <[email protected]>
AuthorDate: Wed Nov 6 09:31:57 2024 +0100

    jsonrpc: Wait for the socket to be ready to read. (#11815)
    
    jsonrpc: Add poll to wait for the socket to be ready to read.
    We found out that sometimes we hit a busy loop here, so adding a timeout
    to make sure we do not hangup forever here.
---
 include/shared/rpc/IPCSocketClient.h |  9 +++---
 include/shared/rpc/RPCClient.h       |  7 +++--
 src/shared/rpc/IPCSocketClient.cc    | 53 ++++++++++++++++++++++++------------
 3 files changed, 45 insertions(+), 24 deletions(-)

diff --git a/include/shared/rpc/IPCSocketClient.h 
b/include/shared/rpc/IPCSocketClient.h
index a90ea6be44..33fbdb62e8 100644
--- a/include/shared/rpc/IPCSocketClient.h
+++ b/include/shared/rpc/IPCSocketClient.h
@@ -43,7 +43,7 @@ using namespace std::chrono_literals;
 ///
 /// Error handling: Enclose this inside a try/catch because if any error is 
detected functions will throw.
 struct IPCSocketClient {
-  enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, STREAM_ERROR, UNKNOWN };
+  enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, READ_ERROR, TIMEOUT, 
UNKNOWN };
   using self_reference = IPCSocketClient &;
 
   IPCSocketClient(std::string path = "/tmp/jsonrpc20.sock") : 
_path{std::move(path)} { memset(&_server, 0, sizeof(_server)); }
@@ -52,13 +52,14 @@ struct IPCSocketClient {
 
   /// Connect to the configured socket path.
   /// Connection will retry every @c ms for @c attempts times if errno is 
EAGAIN
-  self_reference connect(std::chrono::milliseconds ms = 40ms, int attempts = 
5);
+  self_reference connect(std::chrono::milliseconds wait_ms = 40ms, int 
attempts = 5);
 
   /// Send all the passed string to the socket.
   self_reference send(std::string_view data);
 
-  /// Read all the content from the socket till the message is complete.
-  ReadStatus read_all(std::string &content);
+  /// Read all the content until the fd closes or timeout( @c timeout_ms * @c 
attempts) has passed.
+  /// @return @c ReadStatus will be set accordingly with the operation result.
+  ReadStatus read_all(std::string &content, std::chrono::milliseconds 
timeout_ms = 1000ms, int attempts = 10);
 
   /// Closes the socket.
   void
diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h
index e4492f156c..57c629f1aa 100644
--- a/include/shared/rpc/RPCClient.h
+++ b/include/shared/rpc/RPCClient.h
@@ -67,8 +67,11 @@ public:
           // responses.
           ink_assert(!"Buffer full, not enough space to read the response.");
           break;
-        case IPCSocketClient::ReadStatus::STREAM_ERROR:
-          err_text = swoc::bwprint(err_text, "STREAM_ERROR: Error while 
reading response. {}({})", std::strerror(errno), errno);
+        case IPCSocketClient::ReadStatus::READ_ERROR:
+          err_text = swoc::bwprint(err_text, "READ_ERROR: Error while reading 
response. {}({})", std::strerror(errno), errno);
+          break;
+        case IPCSocketClient::ReadStatus::TIMEOUT:
+          err_text = swoc::bwprint(err_text, "TIMEOUT: Couldn't get the 
response. {}({})", std::strerror(errno), errno);
           break;
         default:
           err_text = "Something happened, we can't read the response. Unknown 
error.";
diff --git a/src/shared/rpc/IPCSocketClient.cc 
b/src/shared/rpc/IPCSocketClient.cc
index ab04f681a7..45a67061b5 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -35,7 +35,7 @@ namespace shared::rpc
 {
 
 IPCSocketClient::self_reference
-IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts)
+IPCSocketClient::connect(std::chrono::milliseconds wait_ms, int attempts)
 {
   std::string text;
   int         err, tries{attempts};
@@ -67,7 +67,7 @@ IPCSocketClient::connect(std::chrono::milliseconds ms, int 
attempts)
     if (errno == EAGAIN || errno == EINPROGRESS) {
       // Connection cannot be completed immediately
       // EAGAIN for UDS should suffice, but just in case.
-      std::this_thread::sleep_for(ms);
+      std::this_thread::sleep_for(wait_ms);
       err = errno;
       continue;
     } else {
@@ -117,35 +117,52 @@ IPCSocketClient ::send(std::string_view data)
 }
 
 IPCSocketClient::ReadStatus
-IPCSocketClient::read_all(std::string &content)
+IPCSocketClient::read_all(std::string &content, std::chrono::milliseconds 
timeout_ms, int attempts)
 {
   if (this->is_closed()) {
     // we had a failure.
-    return {};
+    return ReadStatus::UNKNOWN;
   }
 
   MessageStorage<356000> bs;
-
-  ReadStatus readStatus{ReadStatus::UNKNOWN};
-  while (true) {
+  int                    attempts_left{attempts};
+  ReadStatus             readStatus{ReadStatus::NO_ERROR};
+  // Try to read all the data from the socket. If a timeout happens we retry
+  // 'attemps' times. On error we just stop.
+  while (attempts_left > 0 || readStatus == ReadStatus::NO_ERROR) {
     auto       buf     = bs.writable_data();
     const auto to_read = bs.available(); // Available in the current memory 
chunk.
-    ssize_t    ret{-1};
-    do {
-      ret = ::read(_sock, buf, to_read);
-    } while (ret < 0 && (errno == EAGAIN || errno == EINTR));
+    ssize_t    nread{-1};
 
-    if (ret > 0) {
-      bs.save(ret);
+    // Try again if timed out.
+    if (auto const r = read_ready(_sock, timeout_ms.count()); r == 0) {
+      readStatus = ReadStatus::TIMEOUT;
+      --attempts_left;
       continue;
-    } else {
-      if (bs.stored() > 0) {
-        readStatus = ReadStatus::NO_ERROR;
-        break;
+    } else if (r < 0) {
+      // No more tries.
+      readStatus = ReadStatus::READ_ERROR;
+      break;
+    }
+
+    nread = ::read(_sock, buf, to_read);
+    if (nread > 0) {
+      bs.save(nread);
+      continue;
+    } else if (nread == -1) {
+      if (errno == EAGAIN || errno == EINTR) {
+        continue;
       }
-      readStatus = ReadStatus::STREAM_ERROR;
+      readStatus = ReadStatus::READ_ERROR;
+      break;
+    }
+    // EOF
+    if (bs.stored() > 0) {
+      readStatus = ReadStatus::NO_ERROR;
       break;
     }
+    readStatus = ReadStatus::READ_ERROR;
+    break;
   }
   content = bs.str();
   return readStatus;

Reply via email to