This is an automated email from the ASF dual-hosted git repository.
dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new c206611257 JSONRPC - Use dynamic and configurable buffer for incoming
jsonrpc messages.(#11763)
c206611257 is described below
commit c20661125764e6472763d586cd736184782865cb
Author: Damian Meden <[email protected]>
AuthorDate: Mon Sep 23 10:34:17 2024 +0200
JSONRPC - Use dynamic and configurable buffer for incoming jsonrpc
messages.(#11763)
---
doc/admin-guide/files/jsonrpc.yaml.en.rst | 4 +
include/mgmt/rpc/server/IPCSocketServer.h | 15 +++-
include/shared/rpc/MessageStorage.h | 104 +++++++++++++++++++++++
src/mgmt/rpc/server/IPCSocketServer.cc | 46 +++++-----
src/mgmt/rpc/server/unit_tests/test_rpcserver.cc | 49 ++++++++---
src/shared/rpc/IPCSocketClient.cc | 74 +---------------
6 files changed, 187 insertions(+), 105 deletions(-)
diff --git a/doc/admin-guide/files/jsonrpc.yaml.en.rst
b/doc/admin-guide/files/jsonrpc.yaml.en.rst
index 26aff3942f..de622e724f 100644
--- a/doc/admin-guide/files/jsonrpc.yaml.en.rst
+++ b/doc/admin-guide/files/jsonrpc.yaml.en.rst
@@ -110,6 +110,10 @@ Field Name Description
``true`` by default.
In case of an unauthorized call is made,
a corresponding rpc error will be returned, you can
check
:ref:`jsonrpc-node-errors-unauthorized-action` for details about the errors.
+``incoming_request_max_size`` Maximum allowed size for the incoming
jsonrpc request. Default ``96000`` bytes. Size must be
+ specified in bytes. Note that memory
will not be allocated all at once, if incoming message
+ does not fit in the first chunk of
memory(32K) an extra amount will be allocated, till the
+ requests size hits the max size.
=====================================
=========================================================================================
diff --git a/include/mgmt/rpc/server/IPCSocketServer.h
b/include/mgmt/rpc/server/IPCSocketServer.h
index a44dc1b373..5a9bda39ac 100644
--- a/include/mgmt/rpc/server/IPCSocketServer.h
+++ b/include/mgmt/rpc/server/IPCSocketServer.h
@@ -35,6 +35,7 @@
#include "mgmt/rpc/server/CommBase.h"
#include "mgmt/rpc/config/JsonRPCConfig.h"
+#include "shared/rpc/MessageStorage.h"
namespace rpc::comm
{
@@ -53,6 +54,8 @@ class IPCSocketServer : public BaseCommInterface
PEER_CREDENTIALS_ERROR = 1, ///< Error while trying to read the peer
credentials from the unix socket.
PERMISSION_DENIED = 2 ///< Client's socket credential didn't wasn't
sufficient to execute the method.
};
+ static const size_t INTERNAL_BUFFER_SIZE{32000};
+ using Buffer = MessageStorage<INTERNAL_BUFFER_SIZE>;
///
/// @brief Connection abstraction class that deals with sending and
receiving data from the connected peer.
///
@@ -60,7 +63,7 @@ class IPCSocketServer : public BaseCommInterface
/// the client object around.
struct Client {
/// @param fd Peer's socket.
- Client(int fd);
+ Client(int fd, size_t max_req_size);
/// Destructor will close the socket(if opened);
~Client();
@@ -74,7 +77,7 @@ class IPCSocketServer : public BaseCommInterface
/// The size of the buffer to be read is not defined in this function, but
rather passed in the @c bw parameter.
/// @return A tuple with a boolean flag indicating if the operation did
success or not, in case of any error, a text will
/// be added with a description.
- std::tuple<bool, std::string> read_all(swoc::FixedBufferWriter &bw) const;
+ std::tuple<bool, std::string> read_all(Buffer &bw) const;
/// Write the the socket with the passed data.
/// @return std::error_code.
void write(std::string const &data, std::error_code &ec) const;
@@ -83,8 +86,9 @@ class IPCSocketServer : public BaseCommInterface
private:
/// Wait for data to be ready for reading.
/// @return true if the data is ready, false otherwise.
- bool poll_for_data(std::chrono::milliseconds timeout) const;
- int _fd; ///< connected peer's socket.
+ bool poll_for_data(std::chrono::milliseconds timeout) const;
+ int _fd; ///< connected peer's socket.
+ size_t _max_req_size; ///< Max incoming request size.
};
public:
@@ -114,6 +118,7 @@ protected: // unit test access
static constexpr auto BACKLOG_KEY_STR{"backlog"};
static constexpr auto
MAX_RETRY_ON_TR_ERROR_KEY_STR{"max_retry_on_transient_errors"};
static constexpr auto RESTRICTED_API{"restricted_api"};
+ static constexpr auto MAX_BUFFER_SIZE{"incoming_request_max_size"};
// is it safe to call Layout now?
std::string sockPathName;
std::string lockPathName;
@@ -122,6 +127,7 @@ protected: // unit test access
int maxRetriesOnTransientErrors{64};
bool restrictedAccessApi{
NON_RESTRICTED_API}; // This config value will drive the permissions of
the jsonrpc socket(either 0700(default) or 0777).
+ size_t incomingRequestMaxBufferSize{INTERNAL_BUFFER_SIZE * 3};
};
friend struct YAML::convert<rpc::comm::IPCSocketServer::Config>;
@@ -142,5 +148,6 @@ private:
struct sockaddr_un _serverAddr;
int _socket{-1};
+ int _lock_fd{-1};
};
} // namespace rpc::comm
diff --git a/include/shared/rpc/MessageStorage.h
b/include/shared/rpc/MessageStorage.h
new file mode 100644
index 0000000000..8d455d35ef
--- /dev/null
+++ b/include/shared/rpc/MessageStorage.h
@@ -0,0 +1,104 @@
+/**
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+#pragma once
+
+#include <sstream>
+#include <swoc/BufferWriter.h>
+
+/// @brief Simple storage to keep the jsonrpc server's response.
+///
+/// With small content it will just use the LocalBufferWriter, if the
+/// content gets bigger, then it will just save the buffer into a string
+/// and reuse the already created LocalBufferWriter. If stored data
fits in the
+/// original bw, then no need to create the extra string. If message
fist in the
+/// first chunk, no extra space will be allocated into the storage
string.
+/// This is not meant to be performant as is mainly used for single
request
+/// data storage, which will be at some point stored into a string
anyways.
+/// @note User should deal with the buffer limit.
+///
+template <size_t N> class MessageStorage
+{
+ std::string _content;
+ swoc::LocalBufferWriter<N> _bw;
+ size_t _written{0};
+
+public:
+ char *
+ writable_data()
+ {
+ return _bw.aux_data();
+ }
+
+ void
+ save(size_t n)
+ {
+ _bw.commit(n);
+
+ if (_bw.remaining() == 0) { // no more space available, flush what's on
the bw
+ // and reset it.
+ flush();
+ }
+ }
+
+ size_t
+ available() const
+ {
+ return _bw.remaining();
+ }
+
+ void
+ flush()
+ {
+ if (_bw.size() == 0) {
+ return;
+ }
+
+ if (_written == 0) {
+ _content.reserve(_bw.size());
+ } else {
+ // need more space.
+ _content.reserve(_written + _bw.size());
+ }
+
+ _content.append(_bw.data(), _bw.size());
+ _written += _bw.size();
+
+ _bw.clear();
+ }
+
+ std::string
+ str()
+ {
+ if (stored() <= _bw.size()) {
+ // just get it directly from the BW.
+ return {_bw.data(), _bw.size()};
+ }
+
+ // There is content in the bw that needs to be saved into the internal
string.
+ flush();
+ return _content;
+ }
+
+ size_t
+ stored() const
+ {
+ return _written ? _written : _bw.size();
+ }
+};
diff --git a/src/mgmt/rpc/server/IPCSocketServer.cc
b/src/mgmt/rpc/server/IPCSocketServer.cc
index 89641e26ab..748f7ab2f3 100644
--- a/src/mgmt/rpc/server/IPCSocketServer.cc
+++ b/src/mgmt/rpc/server/IPCSocketServer.cc
@@ -46,8 +46,7 @@
namespace
{
-constexpr size_t MAX_REQUEST_BUFFER_SIZE{32000};
-constexpr auto logTag = "rpc.net";
+constexpr auto logTag = "rpc.net";
// Quick check for errors(base on the errno);
bool check_for_transient_errors();
@@ -207,7 +206,6 @@ IPCSocketServer::run()
{
_running.store(true);
- swoc::LocalBufferWriter<MAX_REQUEST_BUFFER_SIZE> bw;
while (_running) {
// poll till socket it's ready.
if (!this->poll_for_new_client()) {
@@ -219,11 +217,12 @@ IPCSocketServer::run()
std::error_code ec;
if (int fd = this->accept(ec); !ec) {
- Client client{fd};
+ Client client{fd, _conf.incomingRequestMaxBufferSize};
+ Buffer bw;
if (auto [ok, errStr] = client.read_all(bw); ok) {
- const auto json = std::string{bw.data(), bw.size()};
- rpc::Context ctx;
+ const std::string &json = bw.str();
+ rpc::Context ctx;
// we want to make sure the peer's credentials are ok.
ctx.get_auth().add_checker(
[&](TSRPCHandlerOptions const &opt, swoc::Errata &errata) -> void {
late_check_peer_credentials(fd, opt, errata); });
@@ -240,8 +239,6 @@ IPCSocketServer::run()
} else {
Debug(logTag, "Error while accepting a new connection on the socket:
%s", ec.message().c_str());
}
-
- bw.clear();
}
this->close();
@@ -294,13 +291,14 @@ IPCSocketServer::accept(std::error_code &ec) const
void
IPCSocketServer::bind(std::error_code &ec)
{
- int lock_fd = open(_conf.lockPathName.c_str(), O_RDONLY | O_CREAT, 0600);
- if (lock_fd == -1) {
+ _lock_fd = open(_conf.lockPathName.c_str(), O_RDONLY | O_CREAT, 0600);
+ if (_lock_fd == -1) {
ec = std::make_error_code(static_cast<std::errc>(errno));
return;
}
- int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
+ int ret = flock(_lock_fd, LOCK_EX | LOCK_NB);
+
if (ret != 0) {
ec = std::make_error_code(static_cast<std::errc>(errno));
return;
@@ -346,10 +344,15 @@ IPCSocketServer::close()
::close(_socket);
_socket = -1;
}
+
+ if (_lock_fd > 0) {
+ ::close(_lock_fd);
+ _lock_fd = -1;
+ }
}
//// client
-IPCSocketServer::Client::Client(int fd) : _fd{fd} {}
+IPCSocketServer::Client::Client(int fd, size_t max_req_size) : _fd{fd},
_max_req_size{max_req_size} {}
IPCSocketServer::Client::~Client()
{
this->close();
@@ -395,11 +398,11 @@ IPCSocketServer::Client::read(swoc::MemSpan<char> span)
const
}
std::tuple<bool, std::string>
-IPCSocketServer::Client::read_all(swoc::FixedBufferWriter &bw) const
+IPCSocketServer::Client::read_all(Buffer &bw) const
{
std::string buff;
- while (bw.remaining() > 0) {
- auto ret = read({bw.aux_data(), bw.remaining()});
+ while (true) {
+ auto ret = read({bw.writable_data(), bw.available()});
if (ret < 0) {
if (check_for_transient_errors()) {
continue;
@@ -409,20 +412,20 @@ IPCSocketServer::Client::read_all(swoc::FixedBufferWriter
&bw) const
}
if (ret == 0) {
- if (bw.size()) {
- return {false, swoc::bwprint(buff, "Peer disconnected after reading {}
bytes.", bw.size())};
+ if (bw.stored()) {
+ return {false, swoc::bwprint(buff, "Peer disconnected after reading {}
bytes.", bw.stored())};
}
return {false, swoc::bwprint(buff, "Peer disconnected. EOF")};
}
- bw.commit(ret);
- if (bw.remaining() > 0) {
+ bw.save(ret);
+ if (_max_req_size - bw.stored() > 0) { // we can still read more.
using namespace std::chrono_literals;
if (!this->poll_for_data(1ms)) {
return {true, buff};
}
continue;
} else {
- swoc::bwprint(buff, "Buffer is full, we hit the limit: {}",
bw.capacity());
+ swoc::bwprint(buff, "Buffer is full, we hit the limit: {}",
_max_req_size);
break;
}
}
@@ -495,6 +498,9 @@ template <> struct
convert<rpc::comm::IPCSocketServer::Config> {
if (auto n = node[config::RESTRICTED_API]) {
rhs.restrictedAccessApi = n.as<bool>();
}
+ if (auto n = node[config::MAX_BUFFER_SIZE]) {
+ rhs.incomingRequestMaxBufferSize = n.as<size_t>();
+ }
return true;
}
};
diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
index 5b2cb2411f..138abe1dc2 100644
--- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
+++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
@@ -69,6 +69,7 @@ static const std::string
sockPath{"tests/var/jsonrpc20_test.sock"};
static const std::string lockPath{"tests/var/jsonrpc20_test.lock"};
static constexpr int default_backlog{5};
static constexpr int default_maxRetriesOnTransientErrors{64};
+static constexpr size_t default_incoming_req_max_size{32000 * 3};
static constexpr auto logTag{"rpc.test.client"};
struct RPCServerTestListener : Catch::TestEventListenerBase {
@@ -80,7 +81,7 @@ struct RPCServerTestListener : Catch::TestEventListenerBase {
testRunStarting(Catch::TestRunInfo const & /* testRunInfo ATS_UNUSED */)
override
{
Layout::create();
- init_diags("rpc|rpc.test", nullptr);
+ init_diags("rpc", nullptr);
RecProcessInit();
signal(SIGPIPE, SIG_IGN);
@@ -95,7 +96,7 @@ struct RPCServerTestListener : Catch::TestEventListenerBase {
rpc::config::RPCConfig serverConfig;
auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")"
+ lockPath + R"(", "sock_path_name": ")" + sockPath +
- R"(", "backlog": 5,"max_retry_on_transient_errors": 64
}}})"};
+ R"(", "backlog": 5,"max_retry_on_transient_errors": 64,
"incoming_request_max_size": 32000 }}})"};
YAML::Node configNode = YAML::Load(confStr);
serverConfig.load(configNode["rpc"]);
try {
@@ -111,21 +112,36 @@ struct RPCServerTestListener :
Catch::TestEventListenerBase {
void
testRunEnded(Catch::TestRunStats const & /* testRunStats ATS_UNUSED */)
override
{
- // jsonrpcServer->stop_thread();
- // delete main_thread;
if (jsonrpcServer) {
- delete jsonrpcServer;
+ delete jsonrpcServer; // will stop the thread
}
}
private:
- // std::unique_ptr<rpc::RPCServer> jrpcServer;
std::unique_ptr<EThread> main_thread;
};
CATCH_REGISTER_LISTENER(RPCServerTestListener)
RPCServerTestListener::~RPCServerTestListener() {}
+void
+restart_json_rpc_server(YAML::Node n)
+{
+ rpc::config::RPCConfig serverConfig;
+ serverConfig.load(n["rpc"]);
+
+ if (jsonrpcServer) {
+ delete jsonrpcServer;
+ }
+
+ try {
+ jsonrpcServer = new rpc::RPCServer(serverConfig);
+ jsonrpcServer->start_thread();
+ } catch (std::exception const &ex) {
+ Debug(logTag, "Oops: %s", ex.what());
+ }
+}
+
DEFINE_JSONRPC_PROTO_FUNCTION(some_foo) // id, params
{
swoc::Rv<YAML::Node> resp;
@@ -320,12 +336,12 @@ TEST_CASE("Basic message sending to a running server",
"[socket]")
TEST_CASE("Sending a message bigger than the internal server's buffer. 32000",
"[buffer][error]")
{
- REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
+ REQUIRE(rpc::add_method_handler("do_nothing32000", &do_nothing));
+ const int S{32000}; // + the rest of the json message.
+ auto json{R"({"jsonrpc": "2.0", "method": "do_nothing32000", "params":
{"msg":")" + random_string(S) + R"("}, "id":"32k_1"})"};
SECTION("Message larger than the the accepted size.")
{
- const int S{32000}; // + the rest of the json message.
- auto json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params":
{"msg":")" + random_string(S) + R"("}, "id":"EfGh-1"})"};
REQUIRE_NOTHROW([&]() {
ScopedLocalSocket rpc_client;
auto resp = rpc_client.query(json);
@@ -333,7 +349,19 @@ TEST_CASE("Sending a message bigger than the internal
server's buffer. 32000", "
}());
}
- REQUIRE(rpc::test_remove_handler("do_nothing"));
+ SECTION("Retry the big message after reconfigure(restart rpc server) the
incoming request size limit.")
+ {
+ auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")"
+ lockPath + R"(", "sock_path_name": ")" + sockPath +
+ R"(", "backlog": 5,"max_retry_on_transient_errors": 64,
"incoming_request_max_size": 62000 }}})"};
+ YAML::Node n = YAML::Load(confStr);
+ restart_json_rpc_server(n);
+ REQUIRE_NOTHROW([&]() {
+ ScopedLocalSocket rpc_client;
+ auto resp = rpc_client.query(json);
+ REQUIRE(resp == R"({"jsonrpc": "2.0", "result": {"size": "32000"}, "id":
"32k_1"})");
+ }());
+ }
+ REQUIRE(rpc::test_remove_handler("do_nothing32000"));
}
TEST_CASE("Test with invalid json message", "[socket]")
@@ -510,6 +538,7 @@ TEST_CASE("Test configuration parsing from a YAML node. UDS
values", "[string]")
REQUIRE(socket->get_conf().maxRetriesOnTransientErrors ==
default_maxRetriesOnTransientErrors);
REQUIRE(socket->get_conf().sockPathName == sockPath);
REQUIRE(socket->get_conf().lockPathName == lockPath);
+ REQUIRE(socket->get_conf().incomingRequestMaxBufferSize ==
default_incoming_req_max_size);
}
TEST_CASE("Test configuration parsing from a file. UDS Server", "[file]")
diff --git a/src/shared/rpc/IPCSocketClient.cc
b/src/shared/rpc/IPCSocketClient.cc
index 14b09710a9..ab04f681a7 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -21,84 +21,16 @@
#include <stdexcept>
#include <chrono>
-#include <sstream>
#include <utility>
#include <thread>
#include "tsutil/ts_bw_format.h"
#include "shared/rpc/IPCSocketClient.h"
+#include "shared/rpc/MessageStorage.h"
#include <tscore/ink_assert.h>
#include <tscore/ink_sock.h>
-namespace
-{
-/// @brief Simple buffer to store the jsonrpc server's response.
-///
-/// With small content it will just use the LocalBufferWriter, if the
-/// content gets bigger, then it will just save the buffer into a stream
-/// and reuse the already created BufferWritter.
-template <size_t N> class BufferStream
-{
- std::ostringstream _os;
- swoc::LocalBufferWriter<N> _bw;
- size_t _written{0};
-
-public:
- char *
- writable_data()
- {
- return _bw.aux_data();
- }
-
- void
- save(size_t n)
- {
- _bw.commit(n);
-
- if (_bw.remaining() == 0) { // no more space available, flush what's on
the bw
- // and reset it.
- flush();
- }
- }
-
- size_t
- available() const
- {
- return _bw.remaining();
- }
-
- void
- flush()
- {
- if (_bw.size() == 0) {
- return;
- }
- _os.write(_bw.view().data(), _bw.size());
- _written += _bw.size();
-
- _bw.clear();
- }
-
- std::string
- str()
- {
- if (stored() <= _bw.size()) {
- return {_bw.data(), _bw.size()};
- }
-
- flush();
- return _os.str();
- }
-
- size_t
- stored() const
- {
- return _written ? _written : _bw.size();
- }
-};
-} // namespace
-
namespace shared::rpc
{
@@ -192,12 +124,12 @@ IPCSocketClient::read_all(std::string &content)
return {};
}
- BufferStream<356000> bs;
+ MessageStorage<356000> bs;
ReadStatus readStatus{ReadStatus::UNKNOWN};
while (true) {
auto buf = bs.writable_data();
- const auto to_read = bs.available();
+ const auto to_read = bs.available(); // Available in the current memory
chunk.
ssize_t ret{-1};
do {
ret = ::read(_sock, buf, to_read);