This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f3bd99b34 rate_limit: Adds a --rate option for RPS limits (#11271)
8f3bd99b34 is described below

commit 8f3bd99b3498618376244777cba7bab79127f2d7
Author: Leif Hedstrom <[email protected]>
AuthorDate: Fri May 17 11:51:54 2024 -0600

    rate_limit: Adds a --rate option for RPS limits (#11271)
---
 doc/admin-guide/plugins/rate_limit.en.rst       |  17 +-
 plugins/experimental/rate_limit/CMakeLists.txt  |   1 +
 plugins/experimental/rate_limit/limiter.cc      |  94 ++++++++++
 plugins/experimental/rate_limit/limiter.h       | 240 ++++++++++++++++++------
 plugins/experimental/rate_limit/rate_limit.cc   |  42 +++--
 plugins/experimental/rate_limit/sni_limiter.cc  |  23 ++-
 plugins/experimental/rate_limit/sni_selector.cc |   7 +-
 plugins/experimental/rate_limit/txn_limiter.cc  |   8 +-
 8 files changed, 347 insertions(+), 85 deletions(-)

diff --git a/doc/admin-guide/plugins/rate_limit.en.rst 
b/doc/admin-guide/plugins/rate_limit.en.rst
index 4b54465bc3..7f3d5ccf1a 100644
--- a/doc/admin-guide/plugins/rate_limit.en.rst
+++ b/doc/admin-guide/plugins/rate_limit.en.rst
@@ -49,7 +49,13 @@ are available:
 
 .. option:: --limit
 
-   The maximum number of active client transactions.
+   The maximum number of active client transactions. This option can also be
+   used in conjunction with ``--rate``.
+
+.. option:: --rate
+
+   The acceptable rate, in transaction or connections per second, that we will
+   allow. This option can also be used in conjunction with ``--limit``.
 
 .. option:: --queue
 
@@ -140,6 +146,7 @@ and nodes are documented below.
       selector:
          - sni: test1.example.com
             limit: 1000
+            rate: 200
             queue:
                size: 1000
                max-age: 30
@@ -176,7 +183,13 @@ For the top level `selector` node, the following options 
are available:
 
 .. option:: limit
 
-   The maximum number of active client transactions.
+   The maximum number of active client transactions. This can also be used
+   in conjunction with the ``rate`` option.
+
+.. option:: rate
+
+   This limits the number of new sessions per second. It can be used in
+   conjunction with the ``limit`` option.
 
 .. option:: aliases
 
diff --git a/plugins/experimental/rate_limit/CMakeLists.txt 
b/plugins/experimental/rate_limit/CMakeLists.txt
index eed36d51d0..992a1555dd 100644
--- a/plugins/experimental/rate_limit/CMakeLists.txt
+++ b/plugins/experimental/rate_limit/CMakeLists.txt
@@ -19,6 +19,7 @@ project(rate_limit)
 
 add_atsplugin(
   rate_limit
+  limiter.cc
   ip_reputation.cc
   rate_limit.cc
   sni_limiter.cc
diff --git a/plugins/experimental/rate_limit/limiter.cc 
b/plugins/experimental/rate_limit/limiter.cc
new file mode 100644
index 0000000000..f51f5e9de9
--- /dev/null
+++ b/plugins/experimental/rate_limit/limiter.cc
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "limiter.h"
+
+// order must align with the above
+static const char *types[] = {
+  "sni",
+  "remap",
+};
+
+// Order should match the enum in limiter.h
+static const char *suffixes[] = {
+  "queued",
+  "rejected",
+  "expired",
+  "resumed",
+};
+
+// This function will run on the dedicated thread, until the global bucket 
manager is dead
+void
+BucketManager::refill_thread()
+{
+  while (_running) {
+    auto startTime = std::chrono::steady_clock::now();
+
+    {
+      std::lock_guard<std::mutex> lock(_mutex);
+
+      for (auto &bucket : _buckets) {
+        bucket->refill();
+      }
+    }
+
+    auto sleepTime = std::chrono::milliseconds(BUCKET_REFILL_INTERVAL) - 
(std::chrono::steady_clock::now() - startTime);
+
+    if (sleepTime > std::chrono::milliseconds(0)) {
+      std::this_thread::sleep_for(sleepTime);
+    }
+  }
+}
+
+void
+metric_helper(std::array<int, RATE_LIMITER_METRIC_MAX> &metrics, uint type, 
const std::string &tag, const std::string &name,
+              const std::string &prefix)
+{
+  std::string metric_prefix = prefix;
+
+  metric_prefix.push_back('.');
+  metric_prefix.append(types[type]);
+
+  if (!tag.empty()) {
+    metric_prefix.push_back('.');
+    metric_prefix.append(tag);
+  } else if (!name.empty()) {
+    metric_prefix.push_back('.');
+    metric_prefix.append(name);
+  }
+
+  for (int i = 0; i < RATE_LIMITER_METRIC_MAX; i++) {
+    size_t const metricsz = metric_prefix.length() + strlen(suffixes[i]) + 2; 
// padding for dot+terminator
+    char *const  metric   = static_cast<char *>(TSmalloc(metricsz));
+
+    snprintf(metric, metricsz, "%s.%s", metric_prefix.data(), suffixes[i]);
+    metrics[i] = TS_ERROR;
+
+    if (TSStatFindName(metric, &metrics[i]) == TS_ERROR) {
+      metrics[i] = TSStatCreate(metric, TS_RECORDDATATYPE_INT, 
TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM);
+    }
+
+    if (metrics[i] != TS_ERROR) {
+      Dbg(dbg_ctl, "established metric '%s' as ID %d", metric, metrics[i]);
+    } else {
+      TSError("failed to create metric '%s'", metric);
+    }
+
+    TSfree(metric);
+  }
+}
diff --git a/plugins/experimental/rate_limit/limiter.h 
b/plugins/experimental/rate_limit/limiter.h
index c058f10dcf..2e40d5e140 100644
--- a/plugins/experimental/rate_limit/limiter.h
+++ b/plugins/experimental/rate_limit/limiter.h
@@ -24,24 +24,18 @@
 #include <string>
 #include <climits>
 #include <mutex>
+#include <thread>
 
 #include "tscore/ink_config.h"
 #include "ts/ts.h"
 #include <yaml-cpp/yaml.h>
 #include "utilities.h"
 
-constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{300}; // Examine 
the queue every 300ms
-using QueueTime                 = 
std::chrono::time_point<std::chrono::system_clock>;
+constexpr auto BUCKET_REFILL_INTERVAL = std::chrono::milliseconds{25};  // 
Increase rate limit buckets every 25ms
+constexpr auto QUEUE_DELAY_TIME       = std::chrono::milliseconds{300}; // 
Examine the queue every 300ms
+using QueueTime                       = 
std::chrono::time_point<std::chrono::system_clock>;
 
-enum { RATE_LIMITER_TYPE_SNI = 0, RATE_LIMITER_TYPE_REMAP, 
RATE_LIMITER_TYPE_MAX };
-
-// order must align with the above
-static const char *types[] = {
-  "sni",
-  "remap",
-};
-
-// no metric for requests we accept; accepted requests should be counted under 
their usual metrics
+// No metric for requests we accept; accepted requests should be counted under 
their usual metrics
 enum {
   RATE_LIMITER_METRIC_QUEUED = 0,
   RATE_LIMITER_METRIC_REJECTED,
@@ -51,16 +45,132 @@ enum {
   RATE_LIMITER_METRIC_MAX
 };
 
-// order must align with the above
-static const char *suffixes[] = {
-  "queued",
-  "rejected",
-  "expired",
-  "resumed",
-};
+int bucket_refill_cont(TSCont cont, TSEvent event, void *edata);
+class BucketManager
+{
+  using self_type = BucketManager;
+
+public:
+  class RateBucket
+  {
+    using self_type = RateBucket;
+
+  public:
+    RateBucket(uint32_t max) : _count(0), _max(max) {}
+    ~RateBucket() = default;
+
+    RateBucket(self_type &&)                = delete;
+    self_type &operator=(const self_type &) = delete;
+    self_type &operator=(self_type &&)      = delete;
+
+    uint32_t
+    count() const
+    {
+      return _count.load(std::memory_order_acquire);
+    }
+
+    bool
+    consume()
+    {
+      uint32_t val = _count.load(std::memory_order_acquire);
+
+      while (val > 0) {
+        if (_count.compare_exchange_weak(val, val - 1, 
std::memory_order_release, std::memory_order_acquire)) {
+          break;
+        }
+      }
+      TSReleaseAssert(val <= _max);
+
+      return val > 0;
+    }
+
+    // This should only be called from the manager, as such no locking is 
needed
+  private:
+    friend class BucketManager;
+
+    void
+    refill()
+    {
+      static const uint32_t amount = _max / (1000 / 
BUCKET_REFILL_INTERVAL.count());
+      uint32_t              old    = _count.load(std::memory_order_acquire);
+      uint32_t              nval;
+
+      do {
+        nval = old + amount;
+      } while (!_count.compare_exchange_weak(old, std::min(nval, _max), 
std::memory_order_release, std::memory_order_acquire));
+    }
+
+    std::atomic<uint32_t> _count;
+    uint32_t              _max;
+
+  }; // End class RateBucket
+
+  BucketManager() = default;
+  ~BucketManager()
+  {
+    if (_running) {
+      _running = false;
+      _thread.join(); // Wait for the thread to finish
+    }
+  }
+
+  BucketManager(self_type &)              = delete;
+  BucketManager(self_type &&)             = delete;
+  self_type &operator=(const self_type &) = delete;
+  self_type &operator=(self_type &&)      = delete;
+
+  static BucketManager &
+  getInstance()
+  {
+    static self_type instance;
+    return instance;
+  }
+
+  void refill_thread();
+
+  std::shared_ptr<RateBucket>
+  add(uint32_t max)
+  {
+    auto                        bucket = std::make_shared<RateBucket>(max);
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    if (!_running) {
+      _running = true;
+      _thread  = std::thread(&BucketManager::refill_thread, this);
+    }
+
+    _buckets.push_back(bucket);
+
+    return bucket;
+  }
+
+  void
+  remove(std::shared_ptr<RateBucket> bucket)
+  {
+    std::lock_guard<std::mutex> lock(_mutex);
+    auto                        it = std::find(_buckets.begin(), 
_buckets.end(), bucket);
+
+    if (it != _buckets.end()) {
+      _buckets.erase(it);
+    }
+  }
+
+private:
+  std::vector<std::shared_ptr<RateBucket>> _buckets;
+  std::mutex                               _mutex;           // Protect the 
bucket list
+  bool                                     _running = false; // Is the Bucket 
manager thread running already ?
+  std::thread                              _thread;          // The thread 
refilling the buckets
+
+}; // End class BucketManager
+
+enum { RATE_LIMITER_TYPE_SNI = 0, RATE_LIMITER_TYPE_REMAP, 
RATE_LIMITER_TYPE_MAX };
+enum class ReserveStatus { UNLIMITED = 0, RESERVED, FULL, HIGH_RATE };
 
 static const char *RATE_LIMITER_METRIC_PREFIX = "plugin.rate_limiter";
 
+void metric_helper(std::array<int, RATE_LIMITER_METRIC_MAX> &metrics, uint 
type, const std::string &tag, const std::string &name,
+                   const std::string &prefix = RATE_LIMITER_METRIC_PREFIX);
+
 ///////////////////////////////////////////////////////////////////////////////
 // Base class for all limiters
 //
@@ -75,17 +185,28 @@ public:
   self_type &operator=(const self_type &) = delete;
   self_type &operator=(self_type &&)      = delete;
 
-  virtual ~RateLimiter() = default;
+  virtual ~RateLimiter() { BucketManager::getInstance().remove(_bucket); }
+
+  void
+  initializeMetrics(uint type, std::string tag, std::string prefix = 
RATE_LIMITER_METRIC_PREFIX)
+  {
+    TSReleaseAssert(type < RATE_LIMITER_TYPE_MAX);
+    metric_helper(_metrics, type, tag, name(), prefix);
+  }
 
   virtual bool
   parseYaml(const YAML::Node &node)
   {
     if (node["limit"]) {
       _limit = node["limit"].as<uint32_t>();
-    } else {
-      // ToDo: Should we require the limit ?
     }
 
+    if (node["rate"]) {
+      _limit = node["rate"].as<uint32_t>();
+    }
+
+    // ToDo: One or both of these should be required
+
     const YAML::Node &queue = node["queue"];
 
     // If enabled, we default to UINT32_MAX, but the object default is still 0 
(no queue)
@@ -110,60 +231,43 @@ public:
     return true;
   }
 
+  // Add a rate bucket for this limiter
   void
-  initializeMetrics(uint type, std::string tag, std::string prefix = 
RATE_LIMITER_METRIC_PREFIX)
+  addBucket()
   {
-    TSReleaseAssert(type < RATE_LIMITER_TYPE_MAX);
-    memset(_metrics, 0, sizeof(_metrics));
-
-    std::string metric_prefix = prefix;
-    metric_prefix.push_back('.');
-    metric_prefix.append(types[type]);
-
-    if (!tag.empty()) {
-      metric_prefix.push_back('.');
-      metric_prefix.append(tag);
-    } else if (!name().empty()) {
-      metric_prefix.push_back('.');
-      metric_prefix.append(name());
-    }
-
-    for (int i = 0; i < RATE_LIMITER_METRIC_MAX; i++) {
-      size_t const metricsz = metric_prefix.length() + strlen(suffixes[i]) + 
2; // padding for dot+terminator
-      char *const  metric   = static_cast<char *>(TSmalloc(metricsz));
-      snprintf(metric, metricsz, "%s.%s", metric_prefix.data(), suffixes[i]);
-
-      _metrics[i] = TS_ERROR;
+    TSAssert(_rate > 0);
+    _bucket = BucketManager::getInstance().add(_rate);
+  }
 
-      if (TSStatFindName(metric, &_metrics[i]) == TS_ERROR) {
-        _metrics[i] = TSStatCreate(metric, TS_RECORDDATATYPE_INT, 
TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM);
-      }
+  // Reserve / release a slot from the active resource limits.
 
-      if (_metrics[i] != TS_ERROR) {
-        Dbg(dbg_ctl, "established metric '%s' as ID %d", metric, _metrics[i]);
+  ReserveStatus
+  reserve()
+  {
+    if (_rate > 0) {
+      if (!_bucket->consume()) {
+        Dbg(dbg_ctl, "Rate limit exceeded");
+        return ReserveStatus::HIGH_RATE;
       } else {
-        TSError("failed to create metric '%s'", metric);
+        Dbg(dbg_ctl, "Rate limit OK, count() == %u", _bucket->count());
       }
+    }
 
-      TSfree(metric);
+    if (!has_limit()) { // If we have no limits and not at rate
+      return ReserveStatus::UNLIMITED;
     }
-  }
 
-  // Reserve / release a slot from the active resource limits. Reserve will 
return
-  // false if we are unable to reserve a slot.
-  bool
-  reserve()
-  {
     std::lock_guard<std::mutex> lock(_active_lock);
 
-    TSReleaseAssert(_active <= limit());
-    if (_active < limit()) {
+    TSReleaseAssert(_active <= _limit);
+    if (_active < _limit) {
       ++_active;
       Dbg(dbg_ctl, "Reserving a slot, active entities == %u", _active.load());
-      return true;
+
+      return ReserveStatus::RESERVED;
     }
 
-    return false;
+    return ReserveStatus::FULL;
   }
 
   void
@@ -259,6 +363,18 @@ public:
     return _limit;
   }
 
+  bool
+  has_limit() const
+  {
+    return _limit != UINT32_MAX && _limit != 0;
+  }
+
+  uint32_t
+  rate() const
+  {
+    return _rate;
+  }
+
   uint32_t
   max_queue() const
   {
@@ -280,6 +396,7 @@ public:
 protected:
   std::string               _name      = "_limiter_";                       // 
The name/descr (e.g. SNI name) of this limiter
   uint32_t                  _limit     = UINT32_MAX;                        // 
No limit unless specified ...
+  uint32_t                  _rate      = 0;                                 // 
Rate limit (if any)
   uint32_t                  _max_queue = 0;                                 // 
No queue by default
   std::chrono::milliseconds _max_age   = std::chrono::milliseconds::zero(); // 
Max age (ms) in the queue
 
@@ -290,5 +407,6 @@ private:
   std::mutex            _queue_lock, _active_lock; // Resource locks
   std::deque<QueueItem> _queue;                    // Queue for the pending 
TXN's. ToDo: Should also move (see below)
 
-  int _metrics[RATE_LIMITER_METRIC_MAX];
+  std::array<int, RATE_LIMITER_METRIC_MAX>   _metrics{};
+  std::shared_ptr<BucketManager::RateBucket> _bucket; // The rate bucket 
(optional)
 };
diff --git a/plugins/experimental/rate_limit/rate_limit.cc 
b/plugins/experimental/rate_limit/rate_limit.cc
index f67541165f..f65d451d2b 100644
--- a/plugins/experimental/rate_limit/rate_limit.cc
+++ b/plugins/experimental/rate_limit/rate_limit.cc
@@ -95,8 +95,14 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * 
/* errbuf ATS_UNUSE
   limiter->initialize(argc, const_cast<const char **>(argv));
   *ih = static_cast<void *>(limiter);
 
-  Dbg(dbg_ctl, "Added active_in limiter rule (limit=%u, queue=%u, 
max-age=%ldms, error=%u, conntrack=%s)", limiter->limit(),
-      limiter->max_queue(), static_cast<long>(limiter->max_age().count()), 
limiter->error(), limiter->conntrack() ? "yes" : "no");
+  if (limiter->rate() > 0) {
+    // Setup rate based limit, if configured (this is rate as in "requests per 
second")
+    limiter->addBucket();
+  }
+
+  Dbg(dbg_ctl, "Added active_in limiter rule (limit=%u, rate=%u, queue=%u, 
max-age=%ldms, error=%u, conntrack=%s)",
+      limiter->limit(), limiter->rate(), limiter->max_queue(), 
static_cast<long>(limiter->max_age().count()), limiter->error(),
+      limiter->conntrack() ? "yes" : "no");
 
   return TS_SUCCESS;
 }
@@ -121,17 +127,13 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp, 
TSRemapRequestInfo *rri)
       }
     }
 
-    if (!limiter->reserve()) {
-      if (!limiter->max_queue() || limiter->full()) {
-        // We are running at limit, and the queue has reached max capacity, 
give back an error and be done.
-        TSHttpTxnStatusSet(txnp, static_cast<TSHttpStatus>(limiter->error()));
-        limiter->setupTxnCont(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK);
-        Dbg(dbg_ctl, "Rejecting request, we're at capacity and queue is full");
-      } else {
-        limiter->setupTxnCont(txnp, TS_HTTP_POST_REMAP_HOOK);
-        Dbg(dbg_ctl, "Adding rate limiting hook, we are at capacity");
-      }
-    } else {
+    auto status = limiter->reserve();
+
+    switch (status) {
+    case ReserveStatus::UNLIMITED:
+      // No limits, just let it pass through
+      break;
+    case ReserveStatus::RESERVED:
       if (limiter->conntrack()) {
         limiter->setupSsnCont(ssnp);
         Dbg(dbg_ctl, "Adding ssn-close hook, we're not at capacity");
@@ -139,6 +141,20 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp, 
TSRemapRequestInfo *rri)
         limiter->setupTxnCont(txnp, TS_HTTP_TXN_CLOSE_HOOK);
         Dbg(dbg_ctl, "Adding txn-close hook, we're not at capacity");
       }
+      break;
+
+    case ReserveStatus::FULL:
+    case ReserveStatus::HIGH_RATE:
+      if (!limiter->max_queue() || limiter->full()) {
+        // We are running at limit, and the queue has reached max capacity, 
give back an error and be done.
+        TSHttpTxnStatusSet(txnp, static_cast<TSHttpStatus>(limiter->error()));
+        limiter->setupTxnCont(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK);
+        Dbg(dbg_ctl, "Rejecting request, we're at %s and queue is full", 
status == ReserveStatus::FULL ? "capacity" : "high rate");
+      } else {
+        limiter->setupTxnCont(txnp, TS_HTTP_POST_REMAP_HOOK);
+        Dbg(dbg_ctl, "Adding queue delay hook, we are at %s", status == 
ReserveStatus::FULL ? "capacity" : "high rate");
+      }
+      break;
     }
   }
 
diff --git a/plugins/experimental/rate_limit/sni_limiter.cc 
b/plugins/experimental/rate_limit/sni_limiter.cc
index 08e72ca940..67241ad178 100644
--- a/plugins/experimental/rate_limit/sni_limiter.cc
+++ b/plugins/experimental/rate_limit/sni_limiter.cc
@@ -126,8 +126,22 @@ sni_limit_cont(TSCont contp, TSEvent event, void *edata)
         Dbg(dbg_ctl, "CLIENT_HELLO on %.*s, no IP reputation", 
static_cast<int>(sni_name.length()), sni_name.data());
       }
 
-      // If we passed the IP reputation filter, continue rate limiting these 
connections
-      if (!limiter->reserve()) {
+      auto status = limiter->reserve();
+
+      switch (status) {
+      case ReserveStatus::UNLIMITED:
+        // Unlimited, kinda odd, but ok
+        TSUserArgSet(vc, gVCIdx, nullptr);
+        TSVConnReenable(vc);
+        break;
+      case ReserveStatus::RESERVED:
+        // Not at limit on the handshake, we can re-enable
+        TSUserArgSet(vc, gVCIdx, reinterpret_cast<void *>(limiter));
+        TSVConnReenable(vc);
+        break;
+
+      case ReserveStatus::FULL:
+      case ReserveStatus::HIGH_RATE:
         if (!limiter->max_queue() || limiter->full()) {
           // We are running at limit, and the queue has reached max capacity, 
give back an error and be done.
           Dbg(dbg_ctl, "Rejecting connection, we're at capacity and queue is 
full");
@@ -143,11 +157,8 @@ sni_limit_cont(TSCont contp, TSEvent event, void *edata)
           Dbg(dbg_ctl, "Queueing the VC, we are at capacity");
           limiter->incrementMetric(RATE_LIMITER_METRIC_QUEUED);
         }
-      } else {
-        // Not at limit on the handshake, we can re-enable
-        TSUserArgSet(vc, gVCIdx, reinterpret_cast<void *>(limiter));
-        TSVConnReenable(vc);
       }
+      break;
     } else {
       // No limiter for this SNI at all, clear the args etc. just in case
       TSUserArgSet(vc, gVCIdx, nullptr);
diff --git a/plugins/experimental/rate_limit/sni_selector.cc 
b/plugins/experimental/rate_limit/sni_selector.cc
index 1ff1c4b134..7d5ea62c21 100644
--- a/plugins/experimental/rate_limit/sni_selector.cc
+++ b/plugins/experimental/rate_limit/sni_selector.cc
@@ -130,6 +130,11 @@ SniSelector::yamlParser(const std::string &yaml_file)
             addLimiter(limiter);
           }
 
+          // Setup rate based limit, if configured (this is rate as in 
"requests per second")
+          if (limiter->rate() > 0) {
+            limiter->addBucket();
+          }
+
           // Add aliases, if any
           const YAML::Node &aliases = sni["aliases"];
 
@@ -214,7 +219,7 @@ sni_queue_cont(TSCont cont, TSEvent event, void *edata)
 
     if (owner) { // Don't operate on the aliases
       // Try to enable some queued VCs (if any) if there are slots available
-      while (limiter->size() > 0 && limiter->reserve()) {
+      while (limiter->size() > 0 && limiter->reserve() != 
ReserveStatus::RESERVED) { // Can't be UNLIMITED here
         auto [vc, contp, start_time]    = limiter->pop();
         std::chrono::milliseconds delay = 
std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time);
 
diff --git a/plugins/experimental/rate_limit/txn_limiter.cc 
b/plugins/experimental/rate_limit/txn_limiter.cc
index 5f79e528b7..04f48366a3 100644
--- a/plugins/experimental/rate_limit/txn_limiter.cc
+++ b/plugins/experimental/rate_limit/txn_limiter.cc
@@ -74,7 +74,7 @@ txn_queue_cont(TSCont cont, TSEvent event, void *edata)
   QueueTime now     = std::chrono::system_clock::now(); // Only do this once 
per "loop"
 
   // Try to enable some queued txns (if any) if there are slots available
-  while (limiter->size() > 0 && limiter->reserve()) {
+  while (limiter->size() > 0 && limiter->reserve() != ReserveStatus::FULL) { 
// Can't be UNLIMITED here
     auto [txnp, contp, start_time]  = limiter->pop();
     std::chrono::milliseconds delay = 
std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time);
 
@@ -123,6 +123,7 @@ TxnRateLimiter::initialize(int argc, const char *argv[])
     {const_cast<char *>("prefix"),    required_argument, nullptr, 'p' },
     {const_cast<char *>("tag"),       required_argument, nullptr, 't' },
     {const_cast<char *>("conntrack"), no_argument,       nullptr, 'c' },
+    {const_cast<char *>("rate"),      required_argument, nullptr, 'R' },
     // EOF
     {nullptr,                         no_argument,       nullptr, '\0'},
   };
@@ -161,13 +162,16 @@ TxnRateLimiter::initialize(int argc, const char *argv[])
     case 'c':
       this->_conntrack = true;
       break;
+    case 'R':
+      this->_rate = strtol(optarg, nullptr, 10);
+      break;
     }
     if (opt == -1) {
       break;
     }
   }
 
-  if (this->max_queue() > 0) {
+  if (this->max_queue() > 0 && this->has_limit()) { // Only setup the queue if 
we have a limit
     _queue_cont = TSContCreate(txn_queue_cont, TSMutexCreate());
     TSReleaseAssert(_queue_cont);
     TSContDataSet(_queue_cont, this);

Reply via email to