This is an automated email from the ASF dual-hosted git repository.
dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 5ef103643f jsonrpc: Wait for the socket to be ready to read. (#11815)
5ef103643f is described below
commit 5ef103643f2913d9fc4aba15f7846c01ccc75b8e
Author: Damian Meden <[email protected]>
AuthorDate: Wed Nov 6 09:31:57 2024 +0100
jsonrpc: Wait for the socket to be ready to read. (#11815)
jsonrpc: Add poll to wait for the socket to be ready to read.
We found out that sometimes we hit a busy loop here, so adding a timeout
to make sure we do not hangup forever here.
---
include/shared/rpc/IPCSocketClient.h | 9 +++---
include/shared/rpc/RPCClient.h | 7 +++--
src/shared/rpc/IPCSocketClient.cc | 53 ++++++++++++++++++++++++------------
3 files changed, 45 insertions(+), 24 deletions(-)
diff --git a/include/shared/rpc/IPCSocketClient.h
b/include/shared/rpc/IPCSocketClient.h
index a90ea6be44..33fbdb62e8 100644
--- a/include/shared/rpc/IPCSocketClient.h
+++ b/include/shared/rpc/IPCSocketClient.h
@@ -43,7 +43,7 @@ using namespace std::chrono_literals;
///
/// Error handling: Enclose this inside a try/catch because if any error is
detected functions will throw.
struct IPCSocketClient {
- enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, STREAM_ERROR, UNKNOWN };
+ enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, READ_ERROR, TIMEOUT,
UNKNOWN };
using self_reference = IPCSocketClient &;
IPCSocketClient(std::string path = "/tmp/jsonrpc20.sock") :
_path{std::move(path)} { memset(&_server, 0, sizeof(_server)); }
@@ -52,13 +52,14 @@ struct IPCSocketClient {
/// Connect to the configured socket path.
/// Connection will retry every @c ms for @c attempts times if errno is
EAGAIN
- self_reference connect(std::chrono::milliseconds ms = 40ms, int attempts =
5);
+ self_reference connect(std::chrono::milliseconds wait_ms = 40ms, int
attempts = 5);
/// Send all the passed string to the socket.
self_reference send(std::string_view data);
- /// Read all the content from the socket till the message is complete.
- ReadStatus read_all(std::string &content);
+ /// Read all the content until the fd closes or timeout( @c timeout_ms * @c
attempts) has passed.
+ /// @return @c ReadStatus will be set accordingly with the operation result.
+ ReadStatus read_all(std::string &content, std::chrono::milliseconds
timeout_ms = 1000ms, int attempts = 10);
/// Closes the socket.
void
diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h
index e4492f156c..57c629f1aa 100644
--- a/include/shared/rpc/RPCClient.h
+++ b/include/shared/rpc/RPCClient.h
@@ -67,8 +67,11 @@ public:
// responses.
ink_assert(!"Buffer full, not enough space to read the response.");
break;
- case IPCSocketClient::ReadStatus::STREAM_ERROR:
- err_text = swoc::bwprint(err_text, "STREAM_ERROR: Error while
reading response. {}({})", std::strerror(errno), errno);
+ case IPCSocketClient::ReadStatus::READ_ERROR:
+ err_text = swoc::bwprint(err_text, "READ_ERROR: Error while reading
response. {}({})", std::strerror(errno), errno);
+ break;
+ case IPCSocketClient::ReadStatus::TIMEOUT:
+ err_text = swoc::bwprint(err_text, "TIMEOUT: Couldn't get the
response. {}({})", std::strerror(errno), errno);
break;
default:
err_text = "Something happened, we can't read the response. Unknown
error.";
diff --git a/src/shared/rpc/IPCSocketClient.cc
b/src/shared/rpc/IPCSocketClient.cc
index ab04f681a7..45a67061b5 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -35,7 +35,7 @@ namespace shared::rpc
{
IPCSocketClient::self_reference
-IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts)
+IPCSocketClient::connect(std::chrono::milliseconds wait_ms, int attempts)
{
std::string text;
int err, tries{attempts};
@@ -67,7 +67,7 @@ IPCSocketClient::connect(std::chrono::milliseconds ms, int
attempts)
if (errno == EAGAIN || errno == EINPROGRESS) {
// Connection cannot be completed immediately
// EAGAIN for UDS should suffice, but just in case.
- std::this_thread::sleep_for(ms);
+ std::this_thread::sleep_for(wait_ms);
err = errno;
continue;
} else {
@@ -117,35 +117,52 @@ IPCSocketClient ::send(std::string_view data)
}
IPCSocketClient::ReadStatus
-IPCSocketClient::read_all(std::string &content)
+IPCSocketClient::read_all(std::string &content, std::chrono::milliseconds
timeout_ms, int attempts)
{
if (this->is_closed()) {
// we had a failure.
- return {};
+ return ReadStatus::UNKNOWN;
}
MessageStorage<356000> bs;
-
- ReadStatus readStatus{ReadStatus::UNKNOWN};
- while (true) {
+ int attempts_left{attempts};
+ ReadStatus readStatus{ReadStatus::NO_ERROR};
+ // Try to read all the data from the socket. If a timeout happens we retry
+ // 'attemps' times. On error we just stop.
+ while (attempts_left > 0 || readStatus == ReadStatus::NO_ERROR) {
auto buf = bs.writable_data();
const auto to_read = bs.available(); // Available in the current memory
chunk.
- ssize_t ret{-1};
- do {
- ret = ::read(_sock, buf, to_read);
- } while (ret < 0 && (errno == EAGAIN || errno == EINTR));
+ ssize_t nread{-1};
- if (ret > 0) {
- bs.save(ret);
+ // Try again if timed out.
+ if (auto const r = read_ready(_sock, timeout_ms.count()); r == 0) {
+ readStatus = ReadStatus::TIMEOUT;
+ --attempts_left;
continue;
- } else {
- if (bs.stored() > 0) {
- readStatus = ReadStatus::NO_ERROR;
- break;
+ } else if (r < 0) {
+ // No more tries.
+ readStatus = ReadStatus::READ_ERROR;
+ break;
+ }
+
+ nread = ::read(_sock, buf, to_read);
+ if (nread > 0) {
+ bs.save(nread);
+ continue;
+ } else if (nread == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+ continue;
}
- readStatus = ReadStatus::STREAM_ERROR;
+ readStatus = ReadStatus::READ_ERROR;
+ break;
+ }
+ // EOF
+ if (bs.stored() > 0) {
+ readStatus = ReadStatus::NO_ERROR;
break;
}
+ readStatus = ReadStatus::READ_ERROR;
+ break;
}
content = bs.str();
return readStatus;