This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 42e8ba749 Add write limiting to the scanner
42e8ba749 is described below

commit 42e8ba749aebbcb390b5cc12e467ca4ce2b8ba9e
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Sep 9 12:54:23 2025 -0400

    Add write limiting to the scanner
    
    Previously, scanner applied db, shard and doc open rate limits. However,
    plugins may want to also perform updates and still ensure they always stay 
in
    the background and only consume a limited amount of resources in a cluster. 
For
    that add a `doc_write` rate limit option.
    
    Plugins which perform write can use the ``couch_scanner_rate_limiter``
    explicitly: initialize, then consume tokens from it during every
    update (possibly indicated they used more than one token in a single 
operation)
    and then sleep the recommended amount of time provided the rate limiter.
    
    Added a simple example of how it could work in the 
couch_scanner_rate_limiter
    module in the comments at the top.
---
 rel/overlay/etc/default.ini                        |  6 +++
 .../src/couch_scanner_rate_limiter.erl             | 56 ++++++++++++++++++----
 src/docs/src/config/scanner.rst                    |  9 ++++
 3 files changed, 63 insertions(+), 8 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 7b89bc3ff..63db3f854 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -1047,6 +1047,12 @@ url = {{nouveau_url}}
 ; is shared across all running plugins.
 ;doc_rate_limit = 1000
 
+; Limit the rate per second at which plugins write/update documents. The rate
+; is shared across all running plugins. Unlike other rate limit which are
+; applied automatically by the plugin backend this rate assume the plugins will
+; explicitly use the couch_scanner_rate_limiter API when performing writes.
+;doc_write_rate_limit = 500
+
 ; Batch size to use when fetching design documents. For lots of small design
 ; documents this value could be increased to 500 or 1000. If design documents
 ; are large (100KB+) it could make sense to decrease it a bit to 25 or 10.
diff --git a/src/couch_scanner/src/couch_scanner_rate_limiter.erl 
b/src/couch_scanner/src/couch_scanner_rate_limiter.erl
index 2717be69e..605445c19 100644
--- a/src/couch_scanner/src/couch_scanner_rate_limiter.erl
+++ b/src/couch_scanner/src/couch_scanner_rate_limiter.erl
@@ -21,6 +21,24 @@
 %
 % [1] https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease
 %
+% Example of usage:
+%
+%  initialize:
+%    Limiter = couch_scanner_rate_limiter:get(),
+%
+%  use:
+%    bulk_docs(#{docs => [doc1, doc2, doc3]}),
+%    {Wait, Limiter1} = couch_scanner_rate_limiter:update(Limiter, doc_write, 
3),
+%    timer:sleep(Wait)
+%       or
+%    receive .... after Wait -> ... end
+%
+%  The Type can be:
+%     * db : rate of clustered db opens
+%     * shard: rate of shard files opened
+%     * doc : rate of document reads
+%     * doc_write : rate of document writes (or other per document updates, 
could be purges, too)
+%
 
 -module(couch_scanner_rate_limiter).
 
@@ -29,7 +47,8 @@
 -export([
     start_link/0,
     get/0,
-    update/2
+    update/2,
+    update/3
 ]).
 
 % gen_server callbacks
@@ -62,16 +81,17 @@
 -define(DB_RATE_DEFAULT, 25).
 -define(SHARD_RATE_DEFAULT, 50).
 -define(DOC_RATE_DEFAULT, 1000).
+-define(DOC_WRITE_RATE_DEFAULT, 500).
 
 % Atomic ref indices. They start at 1.
--define(INDICES, #{db => 1, shard => 2, doc => 3}).
+-define(INDICES, #{db => 1, shard => 2, doc => 3, doc_write => 4}).
 
 % Record maintained by the clients. Each client will have one of these handles.
 % With each update/2 call they will update their own backoff values.
 %
 -record(client_st, {
     ref,
-    % db|shard|doc => {Backoff, UpdateTStamp}
+    % db|shard|doc|doc_write => {Backoff, UpdateTStamp}
     backoffs = #{}
 }).
 
@@ -83,13 +103,17 @@
 get() ->
     Ref = gen_server:call(?MODULE, get, infinity),
     NowMSec = erlang:monotonic_time(millisecond),
-    Backoffs = maps:from_keys([db, shard, doc], {?INIT_BACKOFF, NowMSec}),
+    Backoffs = maps:from_keys([db, shard, doc, doc_write], {?INIT_BACKOFF, 
NowMSec}),
     #client_st{ref = Ref, backoffs = Backoffs}.
 
-update(#client_st{ref = Ref, backoffs = Backoffs} = St, Type) when
-    Type =:= db orelse Type =:= shard orelse Type =:= doc
+update(St, Type) ->
+    update(St, Type, 1).
+
+update(#client_st{ref = Ref, backoffs = Backoffs} = St, Type, Count) when
+    (is_integer(Count) andalso Count >= 0) andalso
+        (Type =:= db orelse Type =:= shard orelse Type =:= doc orelse Type =:= 
doc_write)
 ->
-    AtLimit = atomics:sub_get(Ref, map_get(Type, ?INDICES), 1) =< 0,
+    AtLimit = atomics:sub_get(Ref, map_get(Type, ?INDICES), Count) =< 0,
     {Backoff, TStamp} = map_get(Type, Backoffs),
     NowMSec = erlang:monotonic_time(millisecond),
     case NowMSec - TStamp > ?SENSITIVITY_MSEC of
@@ -142,6 +166,7 @@ refill(#st{ref = Ref} = St) ->
     ok = atomics:put(Ref, map_get(db, ?INDICES), db_limit()),
     ok = atomics:put(Ref, map_get(shard, ?INDICES), shard_limit()),
     ok = atomics:put(Ref, map_get(doc, ?INDICES), doc_limit()),
+    ok = atomics:put(Ref, map_get(doc_write, ?INDICES), doc_write_limit()),
     schedule_refill(St).
 
 update_backoff(true, 0) ->
@@ -160,6 +185,9 @@ shard_limit() ->
 doc_limit() ->
     cfg_int("doc_rate_limit", ?DOC_RATE_DEFAULT).
 
+doc_write_limit() ->
+    cfg_int("doc_write_rate_limit", ?DOC_WRITE_RATE_DEFAULT).
+
 cfg_int(Key, Default) when is_list(Key), is_integer(Default) ->
     config:get_integer("couch_scanner", Key, Default).
 
@@ -175,6 +203,7 @@ couch_scanner_rate_limiter_test_() ->
         [
             ?TDEF_FE(t_init),
             ?TDEF_FE(t_update),
+            ?TDEF_FE(t_update_multiple),
             ?TDEF_FE(t_refill)
         ]
     }.
@@ -184,7 +213,8 @@ t_init(_) ->
     ?assertEqual(ok, refill()),
     ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
db)),
     ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
shard)),
-    ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
doc)).
+    ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
doc)),
+    ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
doc_write)).
 
 t_update(_) ->
     ClientSt = ?MODULE:get(),
@@ -196,6 +226,16 @@ t_update(_) ->
     {Backoff, _} = update(ClientSt1, db),
     ?assertEqual(?MAX_BACKOFF, Backoff).
 
+t_update_multiple(_) ->
+    ClientSt = ?MODULE:get(),
+    Fun = fun(_, Acc) ->
+        {_, Acc1} = update(Acc, doc_write, 100),
+        reset_time(Acc1, doc_write)
+    end,
+    ClientSt1 = lists:foldl(Fun, ClientSt, lists:seq(1, 50)),
+    {Backoff, _} = update(ClientSt1, doc_write, 100),
+    ?assertEqual(?MAX_BACKOFF, Backoff).
+
 t_refill(_) ->
     ClientSt = ?MODULE:get(),
     Fun = fun(_, Acc) ->
diff --git a/src/docs/src/config/scanner.rst b/src/docs/src/config/scanner.rst
index d3644c23f..f36619f4a 100644
--- a/src/docs/src/config/scanner.rst
+++ b/src/docs/src/config/scanner.rst
@@ -85,6 +85,15 @@ Scanner Options
             [couch_scanner]
             doc_rate_limit = 1000
 
+    .. config:option:: doc_write_rate_limit
+
+        Limit the rate at which plugins update documents. This rate limit
+        applies to plugins which explicitly use the
+        ``couch_scanner_rate_limiter`` module for rate limiting ::
+
+            [couch_scanner]
+            doc_write_rate_limit = 500
+
     .. config:option:: ddoc_batch_size
 
         Batch size to use when fetching design documents. For lots of small

Reply via email to