This is an automated email from the ASF dual-hosted git repository.
zwoop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 8f3bd99b34 rate_limit: Adds a --rate option for RPS limits (#11271)
8f3bd99b34 is described below
commit 8f3bd99b3498618376244777cba7bab79127f2d7
Author: Leif Hedstrom <[email protected]>
AuthorDate: Fri May 17 11:51:54 2024 -0600
rate_limit: Adds a --rate option for RPS limits (#11271)
---
doc/admin-guide/plugins/rate_limit.en.rst | 17 +-
plugins/experimental/rate_limit/CMakeLists.txt | 1 +
plugins/experimental/rate_limit/limiter.cc | 94 ++++++++++
plugins/experimental/rate_limit/limiter.h | 240 ++++++++++++++++++------
plugins/experimental/rate_limit/rate_limit.cc | 42 +++--
plugins/experimental/rate_limit/sni_limiter.cc | 23 ++-
plugins/experimental/rate_limit/sni_selector.cc | 7 +-
plugins/experimental/rate_limit/txn_limiter.cc | 8 +-
8 files changed, 347 insertions(+), 85 deletions(-)
diff --git a/doc/admin-guide/plugins/rate_limit.en.rst
b/doc/admin-guide/plugins/rate_limit.en.rst
index 4b54465bc3..7f3d5ccf1a 100644
--- a/doc/admin-guide/plugins/rate_limit.en.rst
+++ b/doc/admin-guide/plugins/rate_limit.en.rst
@@ -49,7 +49,13 @@ are available:
.. option:: --limit
- The maximum number of active client transactions.
+ The maximum number of active client transactions. This option can also be
+ used in conjunction with ``--rate``.
+
+.. option:: --rate
+
+ The acceptable rate, in transaction or connections per second, that we will
+ allow. This option can also be used in conjunction with ``--limit``.
.. option:: --queue
@@ -140,6 +146,7 @@ and nodes are documented below.
selector:
- sni: test1.example.com
limit: 1000
+ rate: 200
queue:
size: 1000
max-age: 30
@@ -176,7 +183,13 @@ For the top level `selector` node, the following options
are available:
.. option:: limit
- The maximum number of active client transactions.
+ The maximum number of active client transactions. This can also be used
+ in conjunction with the ``rate`` option.
+
+.. option:: rate
+
+ This limits the number of new sessions per second. It can be used in
+ conjunction with the ``limit`` option.
.. option:: aliases
diff --git a/plugins/experimental/rate_limit/CMakeLists.txt
b/plugins/experimental/rate_limit/CMakeLists.txt
index eed36d51d0..992a1555dd 100644
--- a/plugins/experimental/rate_limit/CMakeLists.txt
+++ b/plugins/experimental/rate_limit/CMakeLists.txt
@@ -19,6 +19,7 @@ project(rate_limit)
add_atsplugin(
rate_limit
+ limiter.cc
ip_reputation.cc
rate_limit.cc
sni_limiter.cc
diff --git a/plugins/experimental/rate_limit/limiter.cc
b/plugins/experimental/rate_limit/limiter.cc
new file mode 100644
index 0000000000..f51f5e9de9
--- /dev/null
+++ b/plugins/experimental/rate_limit/limiter.cc
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "limiter.h"
+
+// order must align with the above
+static const char *types[] = {
+ "sni",
+ "remap",
+};
+
+// Order should match the enum in limiter.h
+static const char *suffixes[] = {
+ "queued",
+ "rejected",
+ "expired",
+ "resumed",
+};
+
+// This function will run on the dedicated thread, until the global bucket
manager is dead
+void
+BucketManager::refill_thread()
+{
+ while (_running) {
+ auto startTime = std::chrono::steady_clock::now();
+
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ for (auto &bucket : _buckets) {
+ bucket->refill();
+ }
+ }
+
+ auto sleepTime = std::chrono::milliseconds(BUCKET_REFILL_INTERVAL) -
(std::chrono::steady_clock::now() - startTime);
+
+ if (sleepTime > std::chrono::milliseconds(0)) {
+ std::this_thread::sleep_for(sleepTime);
+ }
+ }
+}
+
+void
+metric_helper(std::array<int, RATE_LIMITER_METRIC_MAX> &metrics, uint type,
const std::string &tag, const std::string &name,
+ const std::string &prefix)
+{
+ std::string metric_prefix = prefix;
+
+ metric_prefix.push_back('.');
+ metric_prefix.append(types[type]);
+
+ if (!tag.empty()) {
+ metric_prefix.push_back('.');
+ metric_prefix.append(tag);
+ } else if (!name.empty()) {
+ metric_prefix.push_back('.');
+ metric_prefix.append(name);
+ }
+
+ for (int i = 0; i < RATE_LIMITER_METRIC_MAX; i++) {
+ size_t const metricsz = metric_prefix.length() + strlen(suffixes[i]) + 2;
// padding for dot+terminator
+ char *const metric = static_cast<char *>(TSmalloc(metricsz));
+
+ snprintf(metric, metricsz, "%s.%s", metric_prefix.data(), suffixes[i]);
+ metrics[i] = TS_ERROR;
+
+ if (TSStatFindName(metric, &metrics[i]) == TS_ERROR) {
+ metrics[i] = TSStatCreate(metric, TS_RECORDDATATYPE_INT,
TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM);
+ }
+
+ if (metrics[i] != TS_ERROR) {
+ Dbg(dbg_ctl, "established metric '%s' as ID %d", metric, metrics[i]);
+ } else {
+ TSError("failed to create metric '%s'", metric);
+ }
+
+ TSfree(metric);
+ }
+}
diff --git a/plugins/experimental/rate_limit/limiter.h
b/plugins/experimental/rate_limit/limiter.h
index c058f10dcf..2e40d5e140 100644
--- a/plugins/experimental/rate_limit/limiter.h
+++ b/plugins/experimental/rate_limit/limiter.h
@@ -24,24 +24,18 @@
#include <string>
#include <climits>
#include <mutex>
+#include <thread>
#include "tscore/ink_config.h"
#include "ts/ts.h"
#include <yaml-cpp/yaml.h>
#include "utilities.h"
-constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{300}; // Examine
the queue every 300ms
-using QueueTime =
std::chrono::time_point<std::chrono::system_clock>;
+constexpr auto BUCKET_REFILL_INTERVAL = std::chrono::milliseconds{25}; //
Increase rate limit buckets every 25ms
+constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{300}; //
Examine the queue every 300ms
+using QueueTime =
std::chrono::time_point<std::chrono::system_clock>;
-enum { RATE_LIMITER_TYPE_SNI = 0, RATE_LIMITER_TYPE_REMAP,
RATE_LIMITER_TYPE_MAX };
-
-// order must align with the above
-static const char *types[] = {
- "sni",
- "remap",
-};
-
-// no metric for requests we accept; accepted requests should be counted under
their usual metrics
+// No metric for requests we accept; accepted requests should be counted under
their usual metrics
enum {
RATE_LIMITER_METRIC_QUEUED = 0,
RATE_LIMITER_METRIC_REJECTED,
@@ -51,16 +45,132 @@ enum {
RATE_LIMITER_METRIC_MAX
};
-// order must align with the above
-static const char *suffixes[] = {
- "queued",
- "rejected",
- "expired",
- "resumed",
-};
+int bucket_refill_cont(TSCont cont, TSEvent event, void *edata);
+class BucketManager
+{
+ using self_type = BucketManager;
+
+public:
+ class RateBucket
+ {
+ using self_type = RateBucket;
+
+ public:
+ RateBucket(uint32_t max) : _count(0), _max(max) {}
+ ~RateBucket() = default;
+
+ RateBucket(self_type &&) = delete;
+ self_type &operator=(const self_type &) = delete;
+ self_type &operator=(self_type &&) = delete;
+
+ uint32_t
+ count() const
+ {
+ return _count.load(std::memory_order_acquire);
+ }
+
+ bool
+ consume()
+ {
+ uint32_t val = _count.load(std::memory_order_acquire);
+
+ while (val > 0) {
+ if (_count.compare_exchange_weak(val, val - 1,
std::memory_order_release, std::memory_order_acquire)) {
+ break;
+ }
+ }
+ TSReleaseAssert(val <= _max);
+
+ return val > 0;
+ }
+
+ // This should only be called from the manager, as such no locking is
needed
+ private:
+ friend class BucketManager;
+
+ void
+ refill()
+ {
+ static const uint32_t amount = _max / (1000 /
BUCKET_REFILL_INTERVAL.count());
+ uint32_t old = _count.load(std::memory_order_acquire);
+ uint32_t nval;
+
+ do {
+ nval = old + amount;
+ } while (!_count.compare_exchange_weak(old, std::min(nval, _max),
std::memory_order_release, std::memory_order_acquire));
+ }
+
+ std::atomic<uint32_t> _count;
+ uint32_t _max;
+
+ }; // End class RateBucket
+
+ BucketManager() = default;
+ ~BucketManager()
+ {
+ if (_running) {
+ _running = false;
+ _thread.join(); // Wait for the thread to finish
+ }
+ }
+
+ BucketManager(self_type &) = delete;
+ BucketManager(self_type &&) = delete;
+ self_type &operator=(const self_type &) = delete;
+ self_type &operator=(self_type &&) = delete;
+
+ static BucketManager &
+ getInstance()
+ {
+ static self_type instance;
+ return instance;
+ }
+
+ void refill_thread();
+
+ std::shared_ptr<RateBucket>
+ add(uint32_t max)
+ {
+ auto bucket = std::make_shared<RateBucket>(max);
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ if (!_running) {
+ _running = true;
+ _thread = std::thread(&BucketManager::refill_thread, this);
+ }
+
+ _buckets.push_back(bucket);
+
+ return bucket;
+ }
+
+ void
+ remove(std::shared_ptr<RateBucket> bucket)
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = std::find(_buckets.begin(),
_buckets.end(), bucket);
+
+ if (it != _buckets.end()) {
+ _buckets.erase(it);
+ }
+ }
+
+private:
+ std::vector<std::shared_ptr<RateBucket>> _buckets;
+ std::mutex _mutex; // Protect the
bucket list
+ bool _running = false; // Is the Bucket
manager thread running already ?
+ std::thread _thread; // The thread
refilling the buckets
+
+}; // End class BucketManager
+
+enum { RATE_LIMITER_TYPE_SNI = 0, RATE_LIMITER_TYPE_REMAP,
RATE_LIMITER_TYPE_MAX };
+enum class ReserveStatus { UNLIMITED = 0, RESERVED, FULL, HIGH_RATE };
static const char *RATE_LIMITER_METRIC_PREFIX = "plugin.rate_limiter";
+void metric_helper(std::array<int, RATE_LIMITER_METRIC_MAX> &metrics, uint
type, const std::string &tag, const std::string &name,
+ const std::string &prefix = RATE_LIMITER_METRIC_PREFIX);
+
///////////////////////////////////////////////////////////////////////////////
// Base class for all limiters
//
@@ -75,17 +185,28 @@ public:
self_type &operator=(const self_type &) = delete;
self_type &operator=(self_type &&) = delete;
- virtual ~RateLimiter() = default;
+ virtual ~RateLimiter() { BucketManager::getInstance().remove(_bucket); }
+
+ void
+ initializeMetrics(uint type, std::string tag, std::string prefix =
RATE_LIMITER_METRIC_PREFIX)
+ {
+ TSReleaseAssert(type < RATE_LIMITER_TYPE_MAX);
+ metric_helper(_metrics, type, tag, name(), prefix);
+ }
virtual bool
parseYaml(const YAML::Node &node)
{
if (node["limit"]) {
_limit = node["limit"].as<uint32_t>();
- } else {
- // ToDo: Should we require the limit ?
}
+ if (node["rate"]) {
+ _limit = node["rate"].as<uint32_t>();
+ }
+
+ // ToDo: One or both of these should be required
+
const YAML::Node &queue = node["queue"];
// If enabled, we default to UINT32_MAX, but the object default is still 0
(no queue)
@@ -110,60 +231,43 @@ public:
return true;
}
+ // Add a rate bucket for this limiter
void
- initializeMetrics(uint type, std::string tag, std::string prefix =
RATE_LIMITER_METRIC_PREFIX)
+ addBucket()
{
- TSReleaseAssert(type < RATE_LIMITER_TYPE_MAX);
- memset(_metrics, 0, sizeof(_metrics));
-
- std::string metric_prefix = prefix;
- metric_prefix.push_back('.');
- metric_prefix.append(types[type]);
-
- if (!tag.empty()) {
- metric_prefix.push_back('.');
- metric_prefix.append(tag);
- } else if (!name().empty()) {
- metric_prefix.push_back('.');
- metric_prefix.append(name());
- }
-
- for (int i = 0; i < RATE_LIMITER_METRIC_MAX; i++) {
- size_t const metricsz = metric_prefix.length() + strlen(suffixes[i]) +
2; // padding for dot+terminator
- char *const metric = static_cast<char *>(TSmalloc(metricsz));
- snprintf(metric, metricsz, "%s.%s", metric_prefix.data(), suffixes[i]);
-
- _metrics[i] = TS_ERROR;
+ TSAssert(_rate > 0);
+ _bucket = BucketManager::getInstance().add(_rate);
+ }
- if (TSStatFindName(metric, &_metrics[i]) == TS_ERROR) {
- _metrics[i] = TSStatCreate(metric, TS_RECORDDATATYPE_INT,
TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM);
- }
+ // Reserve / release a slot from the active resource limits.
- if (_metrics[i] != TS_ERROR) {
- Dbg(dbg_ctl, "established metric '%s' as ID %d", metric, _metrics[i]);
+ ReserveStatus
+ reserve()
+ {
+ if (_rate > 0) {
+ if (!_bucket->consume()) {
+ Dbg(dbg_ctl, "Rate limit exceeded");
+ return ReserveStatus::HIGH_RATE;
} else {
- TSError("failed to create metric '%s'", metric);
+ Dbg(dbg_ctl, "Rate limit OK, count() == %u", _bucket->count());
}
+ }
- TSfree(metric);
+ if (!has_limit()) { // If we have no limits and not at rate
+ return ReserveStatus::UNLIMITED;
}
- }
- // Reserve / release a slot from the active resource limits. Reserve will
return
- // false if we are unable to reserve a slot.
- bool
- reserve()
- {
std::lock_guard<std::mutex> lock(_active_lock);
- TSReleaseAssert(_active <= limit());
- if (_active < limit()) {
+ TSReleaseAssert(_active <= _limit);
+ if (_active < _limit) {
++_active;
Dbg(dbg_ctl, "Reserving a slot, active entities == %u", _active.load());
- return true;
+
+ return ReserveStatus::RESERVED;
}
- return false;
+ return ReserveStatus::FULL;
}
void
@@ -259,6 +363,18 @@ public:
return _limit;
}
+ bool
+ has_limit() const
+ {
+ return _limit != UINT32_MAX && _limit != 0;
+ }
+
+ uint32_t
+ rate() const
+ {
+ return _rate;
+ }
+
uint32_t
max_queue() const
{
@@ -280,6 +396,7 @@ public:
protected:
std::string _name = "_limiter_"; //
The name/descr (e.g. SNI name) of this limiter
uint32_t _limit = UINT32_MAX; //
No limit unless specified ...
+ uint32_t _rate = 0; //
Rate limit (if any)
uint32_t _max_queue = 0; //
No queue by default
std::chrono::milliseconds _max_age = std::chrono::milliseconds::zero(); //
Max age (ms) in the queue
@@ -290,5 +407,6 @@ private:
std::mutex _queue_lock, _active_lock; // Resource locks
std::deque<QueueItem> _queue; // Queue for the pending
TXN's. ToDo: Should also move (see below)
- int _metrics[RATE_LIMITER_METRIC_MAX];
+ std::array<int, RATE_LIMITER_METRIC_MAX> _metrics{};
+ std::shared_ptr<BucketManager::RateBucket> _bucket; // The rate bucket
(optional)
};
diff --git a/plugins/experimental/rate_limit/rate_limit.cc
b/plugins/experimental/rate_limit/rate_limit.cc
index f67541165f..f65d451d2b 100644
--- a/plugins/experimental/rate_limit/rate_limit.cc
+++ b/plugins/experimental/rate_limit/rate_limit.cc
@@ -95,8 +95,14 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char *
/* errbuf ATS_UNUSE
limiter->initialize(argc, const_cast<const char **>(argv));
*ih = static_cast<void *>(limiter);
- Dbg(dbg_ctl, "Added active_in limiter rule (limit=%u, queue=%u,
max-age=%ldms, error=%u, conntrack=%s)", limiter->limit(),
- limiter->max_queue(), static_cast<long>(limiter->max_age().count()),
limiter->error(), limiter->conntrack() ? "yes" : "no");
+ if (limiter->rate() > 0) {
+ // Setup rate based limit, if configured (this is rate as in "requests per
second")
+ limiter->addBucket();
+ }
+
+ Dbg(dbg_ctl, "Added active_in limiter rule (limit=%u, rate=%u, queue=%u,
max-age=%ldms, error=%u, conntrack=%s)",
+ limiter->limit(), limiter->rate(), limiter->max_queue(),
static_cast<long>(limiter->max_age().count()), limiter->error(),
+ limiter->conntrack() ? "yes" : "no");
return TS_SUCCESS;
}
@@ -121,17 +127,13 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp,
TSRemapRequestInfo *rri)
}
}
- if (!limiter->reserve()) {
- if (!limiter->max_queue() || limiter->full()) {
- // We are running at limit, and the queue has reached max capacity,
give back an error and be done.
- TSHttpTxnStatusSet(txnp, static_cast<TSHttpStatus>(limiter->error()));
- limiter->setupTxnCont(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK);
- Dbg(dbg_ctl, "Rejecting request, we're at capacity and queue is full");
- } else {
- limiter->setupTxnCont(txnp, TS_HTTP_POST_REMAP_HOOK);
- Dbg(dbg_ctl, "Adding rate limiting hook, we are at capacity");
- }
- } else {
+ auto status = limiter->reserve();
+
+ switch (status) {
+ case ReserveStatus::UNLIMITED:
+ // No limits, just let it pass through
+ break;
+ case ReserveStatus::RESERVED:
if (limiter->conntrack()) {
limiter->setupSsnCont(ssnp);
Dbg(dbg_ctl, "Adding ssn-close hook, we're not at capacity");
@@ -139,6 +141,20 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp,
TSRemapRequestInfo *rri)
limiter->setupTxnCont(txnp, TS_HTTP_TXN_CLOSE_HOOK);
Dbg(dbg_ctl, "Adding txn-close hook, we're not at capacity");
}
+ break;
+
+ case ReserveStatus::FULL:
+ case ReserveStatus::HIGH_RATE:
+ if (!limiter->max_queue() || limiter->full()) {
+ // We are running at limit, and the queue has reached max capacity,
give back an error and be done.
+ TSHttpTxnStatusSet(txnp, static_cast<TSHttpStatus>(limiter->error()));
+ limiter->setupTxnCont(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK);
+ Dbg(dbg_ctl, "Rejecting request, we're at %s and queue is full",
status == ReserveStatus::FULL ? "capacity" : "high rate");
+ } else {
+ limiter->setupTxnCont(txnp, TS_HTTP_POST_REMAP_HOOK);
+ Dbg(dbg_ctl, "Adding queue delay hook, we are at %s", status ==
ReserveStatus::FULL ? "capacity" : "high rate");
+ }
+ break;
}
}
diff --git a/plugins/experimental/rate_limit/sni_limiter.cc
b/plugins/experimental/rate_limit/sni_limiter.cc
index 08e72ca940..67241ad178 100644
--- a/plugins/experimental/rate_limit/sni_limiter.cc
+++ b/plugins/experimental/rate_limit/sni_limiter.cc
@@ -126,8 +126,22 @@ sni_limit_cont(TSCont contp, TSEvent event, void *edata)
Dbg(dbg_ctl, "CLIENT_HELLO on %.*s, no IP reputation",
static_cast<int>(sni_name.length()), sni_name.data());
}
- // If we passed the IP reputation filter, continue rate limiting these
connections
- if (!limiter->reserve()) {
+ auto status = limiter->reserve();
+
+ switch (status) {
+ case ReserveStatus::UNLIMITED:
+ // Unlimited, kinda odd, but ok
+ TSUserArgSet(vc, gVCIdx, nullptr);
+ TSVConnReenable(vc);
+ break;
+ case ReserveStatus::RESERVED:
+ // Not at limit on the handshake, we can re-enable
+ TSUserArgSet(vc, gVCIdx, reinterpret_cast<void *>(limiter));
+ TSVConnReenable(vc);
+ break;
+
+ case ReserveStatus::FULL:
+ case ReserveStatus::HIGH_RATE:
if (!limiter->max_queue() || limiter->full()) {
// We are running at limit, and the queue has reached max capacity,
give back an error and be done.
Dbg(dbg_ctl, "Rejecting connection, we're at capacity and queue is
full");
@@ -143,11 +157,8 @@ sni_limit_cont(TSCont contp, TSEvent event, void *edata)
Dbg(dbg_ctl, "Queueing the VC, we are at capacity");
limiter->incrementMetric(RATE_LIMITER_METRIC_QUEUED);
}
- } else {
- // Not at limit on the handshake, we can re-enable
- TSUserArgSet(vc, gVCIdx, reinterpret_cast<void *>(limiter));
- TSVConnReenable(vc);
}
+ break;
} else {
// No limiter for this SNI at all, clear the args etc. just in case
TSUserArgSet(vc, gVCIdx, nullptr);
diff --git a/plugins/experimental/rate_limit/sni_selector.cc
b/plugins/experimental/rate_limit/sni_selector.cc
index 1ff1c4b134..7d5ea62c21 100644
--- a/plugins/experimental/rate_limit/sni_selector.cc
+++ b/plugins/experimental/rate_limit/sni_selector.cc
@@ -130,6 +130,11 @@ SniSelector::yamlParser(const std::string &yaml_file)
addLimiter(limiter);
}
+ // Setup rate based limit, if configured (this is rate as in
"requests per second")
+ if (limiter->rate() > 0) {
+ limiter->addBucket();
+ }
+
// Add aliases, if any
const YAML::Node &aliases = sni["aliases"];
@@ -214,7 +219,7 @@ sni_queue_cont(TSCont cont, TSEvent event, void *edata)
if (owner) { // Don't operate on the aliases
// Try to enable some queued VCs (if any) if there are slots available
- while (limiter->size() > 0 && limiter->reserve()) {
+ while (limiter->size() > 0 && limiter->reserve() !=
ReserveStatus::RESERVED) { // Can't be UNLIMITED here
auto [vc, contp, start_time] = limiter->pop();
std::chrono::milliseconds delay =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time);
diff --git a/plugins/experimental/rate_limit/txn_limiter.cc
b/plugins/experimental/rate_limit/txn_limiter.cc
index 5f79e528b7..04f48366a3 100644
--- a/plugins/experimental/rate_limit/txn_limiter.cc
+++ b/plugins/experimental/rate_limit/txn_limiter.cc
@@ -74,7 +74,7 @@ txn_queue_cont(TSCont cont, TSEvent event, void *edata)
QueueTime now = std::chrono::system_clock::now(); // Only do this once
per "loop"
// Try to enable some queued txns (if any) if there are slots available
- while (limiter->size() > 0 && limiter->reserve()) {
+ while (limiter->size() > 0 && limiter->reserve() != ReserveStatus::FULL) {
// Can't be UNLIMITED here
auto [txnp, contp, start_time] = limiter->pop();
std::chrono::milliseconds delay =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time);
@@ -123,6 +123,7 @@ TxnRateLimiter::initialize(int argc, const char *argv[])
{const_cast<char *>("prefix"), required_argument, nullptr, 'p' },
{const_cast<char *>("tag"), required_argument, nullptr, 't' },
{const_cast<char *>("conntrack"), no_argument, nullptr, 'c' },
+ {const_cast<char *>("rate"), required_argument, nullptr, 'R' },
// EOF
{nullptr, no_argument, nullptr, '\0'},
};
@@ -161,13 +162,16 @@ TxnRateLimiter::initialize(int argc, const char *argv[])
case 'c':
this->_conntrack = true;
break;
+ case 'R':
+ this->_rate = strtol(optarg, nullptr, 10);
+ break;
}
if (opt == -1) {
break;
}
}
- if (this->max_queue() > 0) {
+ if (this->max_queue() > 0 && this->has_limit()) { // Only setup the queue if
we have a limit
_queue_cont = TSContCreate(txn_queue_cont, TSMutexCreate());
TSReleaseAssert(_queue_cont);
TSContDataSet(_queue_cont, this);