This is an automated email from the ASF dual-hosted git repository.

dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new c206611257 JSONRPC - Use dynamic and configurable buffer for incoming 
jsonrpc messages.(#11763)
c206611257 is described below

commit c20661125764e6472763d586cd736184782865cb
Author: Damian Meden <[email protected]>
AuthorDate: Mon Sep 23 10:34:17 2024 +0200

    JSONRPC - Use dynamic and configurable buffer for incoming jsonrpc 
messages.(#11763)
---
 doc/admin-guide/files/jsonrpc.yaml.en.rst        |   4 +
 include/mgmt/rpc/server/IPCSocketServer.h        |  15 +++-
 include/shared/rpc/MessageStorage.h              | 104 +++++++++++++++++++++++
 src/mgmt/rpc/server/IPCSocketServer.cc           |  46 +++++-----
 src/mgmt/rpc/server/unit_tests/test_rpcserver.cc |  49 ++++++++---
 src/shared/rpc/IPCSocketClient.cc                |  74 +---------------
 6 files changed, 187 insertions(+), 105 deletions(-)

diff --git a/doc/admin-guide/files/jsonrpc.yaml.en.rst 
b/doc/admin-guide/files/jsonrpc.yaml.en.rst
index 26aff3942f..de622e724f 100644
--- a/doc/admin-guide/files/jsonrpc.yaml.en.rst
+++ b/doc/admin-guide/files/jsonrpc.yaml.en.rst
@@ -110,6 +110,10 @@ Field Name                            Description
                                       ``true`` by default.
                                       In case of an unauthorized call is made, 
a corresponding rpc error will be returned, you can
                                       check 
:ref:`jsonrpc-node-errors-unauthorized-action` for details about the errors.
+``incoming_request_max_size``         Maximum allowed size for the incoming 
jsonrpc request. Default ``96000`` bytes. Size must be
+                                      specified in bytes. Note that memory 
will not be allocated all at once, if incoming message
+                                      does not fit in the first chunk of 
memory(32K) an extra amount will be allocated, till the
+                                      requests size hits the max size.
 ===================================== 
=========================================================================================
 
 
diff --git a/include/mgmt/rpc/server/IPCSocketServer.h 
b/include/mgmt/rpc/server/IPCSocketServer.h
index a44dc1b373..5a9bda39ac 100644
--- a/include/mgmt/rpc/server/IPCSocketServer.h
+++ b/include/mgmt/rpc/server/IPCSocketServer.h
@@ -35,6 +35,7 @@
 
 #include "mgmt/rpc/server/CommBase.h"
 #include "mgmt/rpc/config/JsonRPCConfig.h"
+#include "shared/rpc/MessageStorage.h"
 
 namespace rpc::comm
 {
@@ -53,6 +54,8 @@ class IPCSocketServer : public BaseCommInterface
     PEER_CREDENTIALS_ERROR = 1, ///< Error while trying to read the peer 
credentials from the unix socket.
     PERMISSION_DENIED      = 2  ///< Client's socket credential didn't wasn't 
sufficient to execute the method.
   };
+  static const size_t INTERNAL_BUFFER_SIZE{32000};
+  using Buffer = MessageStorage<INTERNAL_BUFFER_SIZE>;
   ///
   /// @brief Connection abstraction class that deals with sending and 
receiving data from the connected peer.
   ///
@@ -60,7 +63,7 @@ class IPCSocketServer : public BaseCommInterface
   /// the client object around.
   struct Client {
     /// @param fd Peer's socket.
-    Client(int fd);
+    Client(int fd, size_t max_req_size);
     /// Destructor will close the socket(if opened);
     ~Client();
 
@@ -74,7 +77,7 @@ class IPCSocketServer : public BaseCommInterface
     /// The size of the buffer to be read is not defined in this function, but 
rather passed in the @c bw parameter.
     /// @return A tuple with a boolean flag indicating if the operation did 
success or not, in case of any error, a text will
     /// be added with a description.
-    std::tuple<bool, std::string> read_all(swoc::FixedBufferWriter &bw) const;
+    std::tuple<bool, std::string> read_all(Buffer &bw) const;
     /// Write the the socket with the passed data.
     /// @return std::error_code.
     void write(std::string const &data, std::error_code &ec) const;
@@ -83,8 +86,9 @@ class IPCSocketServer : public BaseCommInterface
   private:
     /// Wait for data to be ready for reading.
     /// @return true if the data is ready, false otherwise.
-    bool poll_for_data(std::chrono::milliseconds timeout) const;
-    int  _fd; ///< connected peer's socket.
+    bool   poll_for_data(std::chrono::milliseconds timeout) const;
+    int    _fd;           ///< connected peer's socket.
+    size_t _max_req_size; ///< Max incoming request size.
   };
 
 public:
@@ -114,6 +118,7 @@ protected: // unit test access
     static constexpr auto BACKLOG_KEY_STR{"backlog"};
     static constexpr auto 
MAX_RETRY_ON_TR_ERROR_KEY_STR{"max_retry_on_transient_errors"};
     static constexpr auto RESTRICTED_API{"restricted_api"};
+    static constexpr auto MAX_BUFFER_SIZE{"incoming_request_max_size"};
     // is it safe to call Layout now?
     std::string sockPathName;
     std::string lockPathName;
@@ -122,6 +127,7 @@ protected: // unit test access
     int  maxRetriesOnTransientErrors{64};
     bool restrictedAccessApi{
       NON_RESTRICTED_API}; // This config value will drive the permissions of 
the jsonrpc socket(either 0700(default) or 0777).
+    size_t incomingRequestMaxBufferSize{INTERNAL_BUFFER_SIZE * 3};
   };
 
   friend struct YAML::convert<rpc::comm::IPCSocketServer::Config>;
@@ -142,5 +148,6 @@ private:
 
   struct sockaddr_un _serverAddr;
   int                _socket{-1};
+  int                _lock_fd{-1};
 };
 } // namespace rpc::comm
diff --git a/include/shared/rpc/MessageStorage.h 
b/include/shared/rpc/MessageStorage.h
new file mode 100644
index 0000000000..8d455d35ef
--- /dev/null
+++ b/include/shared/rpc/MessageStorage.h
@@ -0,0 +1,104 @@
+/**
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+#pragma once
+
+#include <sstream>
+#include <swoc/BufferWriter.h>
+
+/// @brief Simple storage to keep the jsonrpc server's response.
+///
+///        With small content it will just use the LocalBufferWriter, if the
+///        content gets bigger, then it will just save the buffer into a string
+///        and reuse the already created LocalBufferWriter. If stored data 
fits in the
+///        original bw, then no need to create the extra string. If message 
fist in the
+///        first chunk, no extra space will be allocated into the storage 
string.
+///        This is not meant to be performant as is mainly used for single 
request
+///        data storage, which will be at some point stored into a string 
anyways.
+/// @note  User should deal with the buffer limit.
+///
+template <size_t N> class MessageStorage
+{
+  std::string                _content;
+  swoc::LocalBufferWriter<N> _bw;
+  size_t                     _written{0};
+
+public:
+  char *
+  writable_data()
+  {
+    return _bw.aux_data();
+  }
+
+  void
+  save(size_t n)
+  {
+    _bw.commit(n);
+
+    if (_bw.remaining() == 0) { // no more space available, flush what's on 
the bw
+                                // and reset it.
+      flush();
+    }
+  }
+
+  size_t
+  available() const
+  {
+    return _bw.remaining();
+  }
+
+  void
+  flush()
+  {
+    if (_bw.size() == 0) {
+      return;
+    }
+
+    if (_written == 0) {
+      _content.reserve(_bw.size());
+    } else {
+      // need more space.
+      _content.reserve(_written + _bw.size());
+    }
+
+    _content.append(_bw.data(), _bw.size());
+    _written += _bw.size();
+
+    _bw.clear();
+  }
+
+  std::string
+  str()
+  {
+    if (stored() <= _bw.size()) {
+      // just get it directly from the BW.
+      return {_bw.data(), _bw.size()};
+    }
+
+    // There is content in the bw that needs to be saved into the internal 
string.
+    flush();
+    return _content;
+  }
+
+  size_t
+  stored() const
+  {
+    return _written ? _written : _bw.size();
+  }
+};
diff --git a/src/mgmt/rpc/server/IPCSocketServer.cc 
b/src/mgmt/rpc/server/IPCSocketServer.cc
index 89641e26ab..748f7ab2f3 100644
--- a/src/mgmt/rpc/server/IPCSocketServer.cc
+++ b/src/mgmt/rpc/server/IPCSocketServer.cc
@@ -46,8 +46,7 @@
 
 namespace
 {
-constexpr size_t MAX_REQUEST_BUFFER_SIZE{32000};
-constexpr auto   logTag = "rpc.net";
+constexpr auto logTag = "rpc.net";
 
 // Quick check for errors(base on the errno);
 bool check_for_transient_errors();
@@ -207,7 +206,6 @@ IPCSocketServer::run()
 {
   _running.store(true);
 
-  swoc::LocalBufferWriter<MAX_REQUEST_BUFFER_SIZE> bw;
   while (_running) {
     // poll till socket it's ready.
     if (!this->poll_for_new_client()) {
@@ -219,11 +217,12 @@ IPCSocketServer::run()
 
     std::error_code ec;
     if (int fd = this->accept(ec); !ec) {
-      Client client{fd};
+      Client client{fd, _conf.incomingRequestMaxBufferSize};
+      Buffer bw;
 
       if (auto [ok, errStr] = client.read_all(bw); ok) {
-        const auto   json = std::string{bw.data(), bw.size()};
-        rpc::Context ctx;
+        const std::string &json = bw.str();
+        rpc::Context       ctx;
         // we want to make sure the peer's credentials are ok.
         ctx.get_auth().add_checker(
           [&](TSRPCHandlerOptions const &opt, swoc::Errata &errata) -> void { 
late_check_peer_credentials(fd, opt, errata); });
@@ -240,8 +239,6 @@ IPCSocketServer::run()
     } else {
       Debug(logTag, "Error while accepting a new connection on the socket: 
%s", ec.message().c_str());
     }
-
-    bw.clear();
   }
 
   this->close();
@@ -294,13 +291,14 @@ IPCSocketServer::accept(std::error_code &ec) const
 void
 IPCSocketServer::bind(std::error_code &ec)
 {
-  int lock_fd = open(_conf.lockPathName.c_str(), O_RDONLY | O_CREAT, 0600);
-  if (lock_fd == -1) {
+  _lock_fd = open(_conf.lockPathName.c_str(), O_RDONLY | O_CREAT, 0600);
+  if (_lock_fd == -1) {
     ec = std::make_error_code(static_cast<std::errc>(errno));
     return;
   }
 
-  int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
+  int ret = flock(_lock_fd, LOCK_EX | LOCK_NB);
+
   if (ret != 0) {
     ec = std::make_error_code(static_cast<std::errc>(errno));
     return;
@@ -346,10 +344,15 @@ IPCSocketServer::close()
     ::close(_socket);
     _socket = -1;
   }
+
+  if (_lock_fd > 0) {
+    ::close(_lock_fd);
+    _lock_fd = -1;
+  }
 }
 //// client
 
-IPCSocketServer::Client::Client(int fd) : _fd{fd} {}
+IPCSocketServer::Client::Client(int fd, size_t max_req_size) : _fd{fd}, 
_max_req_size{max_req_size} {}
 IPCSocketServer::Client::~Client()
 {
   this->close();
@@ -395,11 +398,11 @@ IPCSocketServer::Client::read(swoc::MemSpan<char> span) 
const
 }
 
 std::tuple<bool, std::string>
-IPCSocketServer::Client::read_all(swoc::FixedBufferWriter &bw) const
+IPCSocketServer::Client::read_all(Buffer &bw) const
 {
   std::string buff;
-  while (bw.remaining() > 0) {
-    auto ret = read({bw.aux_data(), bw.remaining()});
+  while (true) {
+    auto ret = read({bw.writable_data(), bw.available()});
     if (ret < 0) {
       if (check_for_transient_errors()) {
         continue;
@@ -409,20 +412,20 @@ IPCSocketServer::Client::read_all(swoc::FixedBufferWriter 
&bw) const
     }
 
     if (ret == 0) {
-      if (bw.size()) {
-        return {false, swoc::bwprint(buff, "Peer disconnected after reading {} 
bytes.", bw.size())};
+      if (bw.stored()) {
+        return {false, swoc::bwprint(buff, "Peer disconnected after reading {} 
bytes.", bw.stored())};
       }
       return {false, swoc::bwprint(buff, "Peer disconnected. EOF")};
     }
-    bw.commit(ret);
-    if (bw.remaining() > 0) {
+    bw.save(ret);
+    if (_max_req_size - bw.stored() > 0) { // we can still read more.
       using namespace std::chrono_literals;
       if (!this->poll_for_data(1ms)) {
         return {true, buff};
       }
       continue;
     } else {
-      swoc::bwprint(buff, "Buffer is full, we hit the limit: {}", 
bw.capacity());
+      swoc::bwprint(buff, "Buffer is full, we hit the limit: {}", 
_max_req_size);
       break;
     }
   }
@@ -495,6 +498,9 @@ template <> struct 
convert<rpc::comm::IPCSocketServer::Config> {
     if (auto n = node[config::RESTRICTED_API]) {
       rhs.restrictedAccessApi = n.as<bool>();
     }
+    if (auto n = node[config::MAX_BUFFER_SIZE]) {
+      rhs.incomingRequestMaxBufferSize = n.as<size_t>();
+    }
     return true;
   }
 };
diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc 
b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
index 5b2cb2411f..138abe1dc2 100644
--- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
+++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
@@ -69,6 +69,7 @@ static const std::string 
sockPath{"tests/var/jsonrpc20_test.sock"};
 static const std::string lockPath{"tests/var/jsonrpc20_test.lock"};
 static constexpr int     default_backlog{5};
 static constexpr int     default_maxRetriesOnTransientErrors{64};
+static constexpr size_t  default_incoming_req_max_size{32000 * 3};
 static constexpr auto    logTag{"rpc.test.client"};
 
 struct RPCServerTestListener : Catch::TestEventListenerBase {
@@ -80,7 +81,7 @@ struct RPCServerTestListener : Catch::TestEventListenerBase {
   testRunStarting(Catch::TestRunInfo const & /* testRunInfo ATS_UNUSED */) 
override
   {
     Layout::create();
-    init_diags("rpc|rpc.test", nullptr);
+    init_diags("rpc", nullptr);
     RecProcessInit();
 
     signal(SIGPIPE, SIG_IGN);
@@ -95,7 +96,7 @@ struct RPCServerTestListener : Catch::TestEventListenerBase {
     rpc::config::RPCConfig serverConfig;
 
     auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")" 
+ lockPath + R"(", "sock_path_name": ")" + sockPath +
-                 R"(",  "backlog": 5,"max_retry_on_transient_errors": 64 
}}})"};
+                 R"(",  "backlog": 5,"max_retry_on_transient_errors": 64, 
"incoming_request_max_size": 32000 }}})"};
     YAML::Node configNode = YAML::Load(confStr);
     serverConfig.load(configNode["rpc"]);
     try {
@@ -111,21 +112,36 @@ struct RPCServerTestListener : 
Catch::TestEventListenerBase {
   void
   testRunEnded(Catch::TestRunStats const & /* testRunStats ATS_UNUSED */) 
override
   {
-    // jsonrpcServer->stop_thread();
-    // delete main_thread;
     if (jsonrpcServer) {
-      delete jsonrpcServer;
+      delete jsonrpcServer; // will stop the thread
     }
   }
 
 private:
-  // std::unique_ptr<rpc::RPCServer> jrpcServer;
   std::unique_ptr<EThread> main_thread;
 };
 CATCH_REGISTER_LISTENER(RPCServerTestListener)
 
 RPCServerTestListener::~RPCServerTestListener() {}
 
+void
+restart_json_rpc_server(YAML::Node n)
+{
+  rpc::config::RPCConfig serverConfig;
+  serverConfig.load(n["rpc"]);
+
+  if (jsonrpcServer) {
+    delete jsonrpcServer;
+  }
+
+  try {
+    jsonrpcServer = new rpc::RPCServer(serverConfig);
+    jsonrpcServer->start_thread();
+  } catch (std::exception const &ex) {
+    Debug(logTag, "Oops: %s", ex.what());
+  }
+}
+
 DEFINE_JSONRPC_PROTO_FUNCTION(some_foo) // id, params
 {
   swoc::Rv<YAML::Node> resp;
@@ -320,12 +336,12 @@ TEST_CASE("Basic message sending to a running server", 
"[socket]")
 
 TEST_CASE("Sending a message bigger than the internal server's buffer. 32000", 
"[buffer][error]")
 {
-  REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
+  REQUIRE(rpc::add_method_handler("do_nothing32000", &do_nothing));
+  const int S{32000}; // + the rest of the json message.
+  auto json{R"({"jsonrpc": "2.0", "method": "do_nothing32000", "params": 
{"msg":")" + random_string(S) + R"("}, "id":"32k_1"})"};
 
   SECTION("Message larger than the the accepted size.")
   {
-    const int S{32000}; // + the rest of the json message.
-    auto      json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": 
{"msg":")" + random_string(S) + R"("}, "id":"EfGh-1"})"};
     REQUIRE_NOTHROW([&]() {
       ScopedLocalSocket rpc_client;
       auto              resp = rpc_client.query(json);
@@ -333,7 +349,19 @@ TEST_CASE("Sending a message bigger than the internal 
server's buffer. 32000", "
     }());
   }
 
-  REQUIRE(rpc::test_remove_handler("do_nothing"));
+  SECTION("Retry the big message after reconfigure(restart rpc server) the 
incoming request size limit.")
+  {
+    auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")" 
+ lockPath + R"(", "sock_path_name": ")" + sockPath +
+                 R"(",  "backlog": 5,"max_retry_on_transient_errors": 64, 
"incoming_request_max_size": 62000 }}})"};
+    YAML::Node n = YAML::Load(confStr);
+    restart_json_rpc_server(n);
+    REQUIRE_NOTHROW([&]() {
+      ScopedLocalSocket rpc_client;
+      auto              resp = rpc_client.query(json);
+      REQUIRE(resp == R"({"jsonrpc": "2.0", "result": {"size": "32000"}, "id": 
"32k_1"})");
+    }());
+  }
+  REQUIRE(rpc::test_remove_handler("do_nothing32000"));
 }
 
 TEST_CASE("Test with invalid json message", "[socket]")
@@ -510,6 +538,7 @@ TEST_CASE("Test configuration parsing from a YAML node. UDS 
values", "[string]")
   REQUIRE(socket->get_conf().maxRetriesOnTransientErrors == 
default_maxRetriesOnTransientErrors);
   REQUIRE(socket->get_conf().sockPathName == sockPath);
   REQUIRE(socket->get_conf().lockPathName == lockPath);
+  REQUIRE(socket->get_conf().incomingRequestMaxBufferSize == 
default_incoming_req_max_size);
 }
 
 TEST_CASE("Test configuration parsing from a file. UDS Server", "[file]")
diff --git a/src/shared/rpc/IPCSocketClient.cc 
b/src/shared/rpc/IPCSocketClient.cc
index 14b09710a9..ab04f681a7 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -21,84 +21,16 @@
 
 #include <stdexcept>
 #include <chrono>
-#include <sstream>
 #include <utility>
 #include <thread>
 
 #include "tsutil/ts_bw_format.h"
 
 #include "shared/rpc/IPCSocketClient.h"
+#include "shared/rpc/MessageStorage.h"
 #include <tscore/ink_assert.h>
 #include <tscore/ink_sock.h>
 
-namespace
-{
-/// @brief Simple buffer to store the jsonrpc server's response.
-///
-///        With small content it will just use the LocalBufferWriter, if the
-///        content gets bigger, then it will just save the buffer into a stream
-///        and reuse the already created BufferWritter.
-template <size_t N> class BufferStream
-{
-  std::ostringstream         _os;
-  swoc::LocalBufferWriter<N> _bw;
-  size_t                     _written{0};
-
-public:
-  char *
-  writable_data()
-  {
-    return _bw.aux_data();
-  }
-
-  void
-  save(size_t n)
-  {
-    _bw.commit(n);
-
-    if (_bw.remaining() == 0) { // no more space available, flush what's on 
the bw
-                                // and reset it.
-      flush();
-    }
-  }
-
-  size_t
-  available() const
-  {
-    return _bw.remaining();
-  }
-
-  void
-  flush()
-  {
-    if (_bw.size() == 0) {
-      return;
-    }
-    _os.write(_bw.view().data(), _bw.size());
-    _written += _bw.size();
-
-    _bw.clear();
-  }
-
-  std::string
-  str()
-  {
-    if (stored() <= _bw.size()) {
-      return {_bw.data(), _bw.size()};
-    }
-
-    flush();
-    return _os.str();
-  }
-
-  size_t
-  stored() const
-  {
-    return _written ? _written : _bw.size();
-  }
-};
-} // namespace
-
 namespace shared::rpc
 {
 
@@ -192,12 +124,12 @@ IPCSocketClient::read_all(std::string &content)
     return {};
   }
 
-  BufferStream<356000> bs;
+  MessageStorage<356000> bs;
 
   ReadStatus readStatus{ReadStatus::UNKNOWN};
   while (true) {
     auto       buf     = bs.writable_data();
-    const auto to_read = bs.available();
+    const auto to_read = bs.available(); // Available in the current memory 
chunk.
     ssize_t    ret{-1};
     do {
       ret = ::read(_sock, buf, to_read);

Reply via email to