This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch tseq
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4313659488c55524939e22dc54c53f47cb121edf
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jul 21 11:56:38 2025 -0400

    Add an HTTP API to query and reset time-seq values
    
    In fabric the get/set calls are similar to how db metadata calls like get 
revs
    limit / set revs limit work, but to keep them all together added them to the
    single `fabric_time_seq` module.
    
    At the HTTP API level add the ability to inspect and reset the time-seq
    structure per database. This is an escape hatch in case something went wrong
    with time synchronization. Users should always be able to reset the time seq
    structure and start from scratch.
    
    To inspect the time-seq structure add a `GET $db/_time_seq` API. For each 
shard
    it returns the time bins and the number of sequence updates in it:
    
    ```json
    {
        "00000000-7fffffff": {
            "[email protected]": [["2025-07-21T16:00:00Z", 1]],
            "[email protected]": [["2025-07-21T16:00:00Z", 1]],
            "[email protected]": [["2025-07-21T16:00:00Z", 1]]
        },
        "80000000-ffffffff": {
            "[email protected]": [["2025-07-21T16:00:00Z", 3]],
            "[email protected]": [["2025-07-21T16:00:00Z", 3]],
            "[email protected]": [["2025-07-21T16:00:00Z", 3]]
        }
    }
    ```
    
    The `DELETE $db/_time_seq` API endpoint will reset the data structure. After
    calling, the `GET $db/_time_seq` result will look like:
    
    ```json
    {
        "00000000-7fffffff": {
            "[email protected]": [],
            "[email protected]": [],
            "[email protected]": []
        },
        "80000000-ffffffff": {
            "[email protected]": [],
            "[email protected]": [],
            "[email protected]": []
        }
    }
    ```
    
    Noticed we repeated the `couch_util:to_hex(<<B:32/integer>>)` construct at
    least three times throughout the code, so insead of adding a forth one, 
opted
    to create a small utility function in `mem3_util`.
---
 src/chttpd/src/chttpd_db.erl              |  25 +++++
 src/fabric/src/fabric.erl                 |  44 +++++++++
 src/fabric/src/fabric_db_create.erl       |   6 +-
 src/fabric/src/fabric_rpc.erl             |  15 +++
 src/fabric/src/fabric_time_seq.erl        | 151 ++++++++++++++++++++++++++++++
 src/mem3/src/mem3_httpd.erl               |   4 +-
 src/mem3/src/mem3_reshard_dbdoc.erl       |   4 +-
 src/mem3/src/mem3_util.erl                |  16 +++-
 src/mem3/test/eunit/mem3_reshard_test.erl |  29 +++++-
 9 files changed, 278 insertions(+), 16 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index a43baeae4..db28737a3 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -870,6 +870,31 @@ db_req(#httpd{method = 'GET', path_parts = [_, 
<<"_purged_infos_limit">>]} = Req
     send_json(Req, fabric:get_purge_infos_limit(Db));
 db_req(#httpd{path_parts = [_, <<"_purged_infos_limit">>]} = Req, _Db) ->
     send_method_not_allowed(Req, "GET,PUT");
+db_req(#httpd{method = 'GET', path_parts = [_, <<"_time_seq">>]} = Req, Db) ->
+    Options = [{user_ctx, Req#httpd.user_ctx}],
+    case fabric:get_time_seq(Db, Options) of
+        {ok, #{} = RangeNodeToTSeq} ->
+            Props = maps:fold(
+                fun([B, E], #{} = ByNode, Acc) ->
+                    Range = list_to_binary(mem3_util:range_to_hex([B, E])),
+                    MapF = fun(_, TSeq) -> couch_time_seq:histogram(TSeq) end,
+                    [{Range, maps:map(MapF, ByNode)} | Acc]
+                end,
+                [],
+                RangeNodeToTSeq
+            ),
+            send_json(Req, {lists:sort(Props)});
+        Error ->
+            throw(Error)
+    end;
+db_req(#httpd{method = 'DELETE', path_parts = [_, <<"_time_seq">>]} = Req, Db) 
->
+    Options = [{user_ctx, Req#httpd.user_ctx}],
+    case fabric:set_time_seq(Db, couch_time_seq:new(), Options) of
+        ok -> send_json(Req, {[{<<"ok">>, true}]});
+        Error -> throw(Error)
+    end;
+db_req(#httpd{path_parts = [_, <<"_time_seq">>]} = Req, _Db) ->
+    send_method_not_allowed(Req, "GET,DELETE");
 % Special case to enable using an unencoded slash in the URL of design docs,
 % as slashes in document IDs must otherwise be URL encoded.
 db_req(
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index d552a387d..4b9204338 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -69,6 +69,15 @@
     db_uuids/1
 ]).
 
+% time_seq stuff
+-export([
+    time_seq_since/2,
+    get_time_seq/1,
+    get_time_seq/2,
+    set_time_seq/2,
+    set_time_seq/3
+]).
+
 -type dbname() :: (iodata() | tuple()).
 -type docid() :: iodata().
 -type revision() :: {integer(), binary()}.
@@ -185,6 +194,25 @@ set_security(DbName, SecObj) ->
 set_security(DbName, SecObj, Options) ->
     fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
 
+%% @doc reset the time seq data structure
+-spec set_time_seq(dbname(), any()) -> ok.
+set_time_seq(DbName, TSeq) ->
+    set_time_seq(DbName, TSeq, [?ADMIN_CTX]).
+
+-spec set_time_seq(dbname(), any(), [option()]) -> ok.
+set_time_seq(DbName, TSeq, Options) ->
+    fabric_time_seq:set_time_seq(dbname(DbName), TSeq, opts(Options)).
+
+%% @doc get the time seq data structure summary
+-spec get_time_seq(dbname()) -> #{shard => any()}.
+get_time_seq(DbName) ->
+    get_time_seq(DbName, [?ADMIN_CTX]).
+
+%% @doc get the time seq data structure summary
+-spec get_time_seq(dbname(), [options()]) -> #{shard => any()}.
+get_time_seq(DbName, Options) ->
+    fabric_time_seq:get_time_seq(dbname(DbName), opts(Options)).
+
 %% @doc sets the upper bound for the number of stored purge requests
 -spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok.
 set_purge_infos_limit(DbName, Limit, Options) when
@@ -637,6 +665,22 @@ dbname(Db) ->
 db_uuids(DbName) ->
     fabric_db_uuids:go(dbname(DbName)).
 
+%% @doc get db update sequence before a timestamp
+-spec time_seq_since(dbname(), list() | binary() | pos_integer()) ->
+    {ok, binary()} | {error, any()}.
+time_seq_since(DbName, Time) when is_binary(Time) ->
+    time_seq_since(DbName, binary_to_list(Time));
+time_seq_since(DbName, Time) when is_list(Time) ->
+    try calendar:rfc3339_to_system_time(Time) of
+        TimeUnix ->
+            time_seq_since(DbName, TimeUnix)
+    catch
+        _:_ ->
+            {error, invalid_time_format}
+    end;
+time_seq_since(DbName, Time) when is_integer(Time), Time >= 0 ->
+    fabric_time_seq:since(dbname(DbName), Time).
+
 name(Thing) ->
     couch_util:to_binary(Thing).
 
diff --git a/src/fabric/src/fabric_db_create.erl 
b/src/fabric/src/fabric_db_create.erl
index f7c6e5402..ac22a4713 100644
--- a/src/fabric/src/fabric_db_create.erl
+++ b/src/fabric/src/fabric_db_create.erl
@@ -158,11 +158,7 @@ make_document([#shard{dbname = DbName} | _] = Shards, 
Suffix, Options) ->
     {RawOut, ByNodeOut, ByRangeOut} =
         lists:foldl(
             fun(#shard{node = N, range = [B, E]}, {Raw, ByNode, ByRange}) ->
-                Range = ?l2b([
-                    couch_util:to_hex(<<B:32/integer>>),
-                    "-",
-                    couch_util:to_hex(<<E:32/integer>>)
-                ]),
+                Range = ?l2b(mem3_util:range_to_hex([B, E])),
                 Node = couch_util:to_binary(N),
                 {
                     [[<<"add">>, Range, Node] | Raw],
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 67f529e09..35b7b68a6 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -54,6 +54,12 @@
     get_uuid/1
 ]).
 
+-export([
+    time_seq_since/2,
+    get_time_seq/2,
+    set_time_seq/3
+]).
+
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -347,6 +353,15 @@ compact(ShardName, DesignName) ->
 get_uuid(DbName) ->
     with_db(DbName, [], {couch_db, get_uuid, []}).
 
+time_seq_since(DbName, Time) ->
+    with_db(DbName, [], {couch_db, time_seq_since, [Time]}).
+
+get_time_seq(DbName, Options) ->
+    with_db(DbName, Options, {couch_db, get_time_seq, []}).
+
+set_time_seq(DbName, TSeq, Options) ->
+    with_db(DbName, Options, {couch_db, set_time_seq, [TSeq]}).
+
 %%
 %% internal
 %%
diff --git a/src/fabric/src/fabric_time_seq.erl 
b/src/fabric/src/fabric_time_seq.erl
new file mode 100644
index 000000000..aa31f5e60
--- /dev/null
+++ b/src/fabric/src/fabric_time_seq.erl
@@ -0,0 +1,151 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_time_seq).
+
+-export([
+    since/2,
+    set_time_seq/3,
+    get_time_seq/2
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+% Since sequence RPC call.
+
+% Return a clustered changes sequence which starts before the provided 
timestamp argument
+
+since(DbName, Time) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, time_seq_since, [Time]),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try fabric_util:recv(Workers, #shard.ref, fun since_handle_message/3, 
Acc0) of
+        {timeout, {WorkersDict, _}} ->
+            DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+            fabric_util:log_timeout(DefunctWorkers, "time_seq_since"),
+            {error, timeout};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+since_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Counters, 
Resps}) ->
+    case fabric_ring:node_down(NodeRef, Counters, Resps) of
+        {ok, Counters1} -> {ok, {Counters1, Resps}};
+        error -> {error, {nodedown, <<"progress not possible">>}}
+    end;
+since_handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
+        {ok, Counters1} -> {ok, {Counters1, Resps}};
+        error -> {error, Reason}
+    end;
+since_handle_message(Sequence, Shard, {Counters, Resps}) when 
is_integer(Sequence) ->
+    case fabric_ring:handle_response(Shard, Sequence, Counters, Resps) of
+        {ok, {Counters1, Resps1}} ->
+            {ok, {Counters1, Resps1}};
+        {stop, Resps1} ->
+            Seqs = fabric_dict:fold(
+                fun(S, Seq, Acc) ->
+                    [{S#shard{ref = undefined}, Seq} | Acc]
+                end,
+                [],
+                Resps1
+            ),
+            {stop, fabric_view_changes:pack_seqs(Seqs)}
+    end;
+since_handle_message(Reason, Shard, {Counters, Resps}) ->
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
+        {ok, Counters1} -> {ok, {Counters1, Resps}};
+        error -> {error, Reason}
+    end.
+
+% Set time_seq RPC call
+
+set_time_seq(DbName, TSeq, Options) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, set_time_seq, [TSeq, Options]),
+    Handler = fun set_time_seq_handle_message/3,
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+        {ok, ok} ->
+            ok;
+        {timeout, {DefunctWorkers, _}} ->
+            fabric_util:log_timeout(DefunctWorkers, "set_time_seq"),
+            {error, timeout};
+        Error ->
+            Error
+    end.
+
+set_time_seq_handle_message(ok, _, {_Workers, 0}) ->
+    {stop, ok};
+set_time_seq_handle_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+set_time_seq_handle_message(Error, _, _Acc) ->
+    {error, Error}.
+
+% Get time_seq RPC call
+
+% Return a per/range, per/node time_seq nested map result
+%    #{Range => Node => TSeq}
+%
+get_time_seq(DbName, Options) when is_binary(DbName) ->
+    Shards = mem3:live_shards(DbName, [config:node_name() | nodes()]),
+    Workers = fabric_util:submit_jobs(Shards, get_time_seq, [Options]),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try fabric_util:recv(Workers, #shard.ref, fun 
get_time_seq_handle_message/3, Acc0) of
+        {timeout, {WorkersDict, _}} ->
+            DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+            fabric_util:log_timeout(DefunctWorkers, "get_time_seq"),
+            {error, timeout};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+get_time_seq_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Cntrs, 
Res}) ->
+    case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, {nodedown, <<"progress not possible">>}}
+    end;
+get_time_seq_handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end;
+get_time_seq_handle_message(#{} = TSeq, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_response(Shard, TSeq, Cntrs, Res, [all]) of
+        {ok, {Cntrs1, Res1}} ->
+            {ok, {Cntrs1, Res1}};
+        {stop, Res1} ->
+            FoldF = fun(#shard{range = R, node = N}, S, Acc) -> [{R, N, S} | 
Acc] end,
+            TSeqList = fabric_dict:fold(FoldF, [], Res1),
+            ByRangeKeyF = fun({R, _, _}) -> R end,
+            ByRangeValF = fun({_, N, S}) -> {N, S} end,
+            % #{Range1 = [{Node1, TSeq1}, {Node2, TSeq2}], Range2 => [...], 
...}
+            TSeqsByRange = maps:groups_from_list(ByRangeKeyF, ByRangeValF, 
TSeqList),
+            Result = maps:map(
+                fun(_Range, NodeTimeSeqs) ->
+                    maps:from_list(NodeTimeSeqs)
+                end,
+                TSeqsByRange
+            ),
+            {stop, Result}
+    end;
+get_time_seq_handle_message(Reason, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end.
diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl
index 745fe815c..fdc1a575c 100644
--- a/src/mem3/src/mem3_httpd.erl
+++ b/src/mem3/src/mem3_httpd.erl
@@ -104,9 +104,7 @@ json_shards([], AccIn) ->
     List = dict:to_list(AccIn),
     {lists:sort(List)};
 json_shards([#shard{node = Node, range = [B, E]} | Rest], AccIn) ->
-    HexBeg = couch_util:to_hex(<<B:32/integer>>),
-    HexEnd = couch_util:to_hex(<<E:32/integer>>),
-    Range = list_to_binary(HexBeg ++ "-" ++ HexEnd),
+    Range = list_to_binary(mem3_util:range_to_hex([B, E])),
     json_shards(Rest, dict:append(Range, Node, AccIn)).
 
 sync_shard(ShardName) ->
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl 
b/src/mem3/src/mem3_reshard_dbdoc.erl
index cb5bf3aee..3f7e8ddff 100644
--- a/src/mem3/src/mem3_reshard_dbdoc.erl
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -205,9 +205,7 @@ node_key(#shard{node = Node}) ->
     couch_util:to_binary(Node).
 
 range_key(#shard{range = [B, E]}) ->
-    BHex = couch_util:to_hex(<<B:32/integer>>),
-    EHex = couch_util:to_hex(<<E:32/integer>>),
-    list_to_binary([BHex, "-", EHex]).
+    list_to_binary(mem3_util:range_to_hex([B, E])).
 
 shard_update_timeout_msec() ->
     config:get_integer("reshard", "shard_update_timeout_msec", 300000).
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 0ff22f07f..c53a6b4ac 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -44,7 +44,8 @@
     non_overlapping_shards/1,
     non_overlapping_shards/3,
     calculate_max_n/1,
-    calculate_max_n/3
+    calculate_max_n/3,
+    range_to_hex/1
 ]).
 
 %% do not use outside mem3.
@@ -73,9 +74,7 @@ name_shard(#ordered_shard{dbname = DbName, range = Range} = 
Shard, Suffix) ->
 make_name(DbName, [B, E], Suffix) ->
     [
         "shards/",
-        couch_util:to_hex(<<B:32/integer>>),
-        "-",
-        couch_util:to_hex(<<E:32/integer>>),
+        range_to_hex([B, E]),
         "/",
         DbName,
         Suffix
@@ -172,6 +171,11 @@ update_db_doc(DbName, #doc{id = Id, body = Body} = Doc, 
ShouldMutate) ->
         couch_db:close(Db)
     end.
 
+range_to_hex([B, E]) when is_integer(B), is_integer(E) ->
+    HexB = couch_util:to_hex(<<B:32/integer>>),
+    HexE = couch_util:to_hex(<<E:32/integer>>),
+    HexB ++ "-" ++ HexE.
+
 delete_db_doc(DocId) ->
     gen_server:cast(mem3_shards, {cache_remove, DocId}),
     delete_db_doc(mem3_sync:shards_db(), DocId, true).
@@ -780,4 +784,8 @@ calculate_max_n_custom_range_test_() ->
 shard(Begin, End) ->
     #shard{range = [Begin, End]}.
 
+range_to_hex_test() ->
+    Range = [2147483648, 4294967295],
+    ?assertEqual("80000000-ffffffff", range_to_hex(Range)).
+
 -endif.
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl 
b/src/mem3/test/eunit/mem3_reshard_test.erl
index 2579649f9..669ac1f58 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -103,6 +103,14 @@ split_one_shard(#{db1 := Db}) ->
             SecObj = {[{<<"foo">>, <<"bar">>}]},
             set_security(Db, SecObj),
 
+            {ok, TSeqs} = get_time_seq(Db),
+            Node = config:node_name(),
+            #{
+                [16#00000000, 16#ffffffff] := #{
+                    Node := #{bins := [{Time1, TSeq1} | _]}
+                }
+            } = TSeqs,
+
             % DbInfo is saved after setting metadata bits
             % as those could bump the update sequence
             DbInfo0 = get_db_info(Db),
@@ -153,7 +161,23 @@ split_one_shard(#{db1 := Db}) ->
 
             % Don't forget about the local but don't include internal 
checkpoints
             % as some of those are munged and transformed during the split
-            ?assertEqual(without_meta_locals(Local0), 
without_meta_locals(Local1))
+            ?assertEqual(without_meta_locals(Local0), 
without_meta_locals(Local1)),
+
+            % Verify the time seq structure. There should be one per range. 
With the
+            % same time and sequence as the source
+            {ok, TSeqs1} = get_time_seq(Db),
+            #{
+                [16#00000000, 16#7fffffff] := #{
+                    Node := #{bins := [{Time2, TSeq2} | _]}
+                },
+                [16#80000000, 16#ffffffff] := #{
+                    Node := #{bins := [{Time3, TSeq3} | _]}
+                }
+            } = TSeqs1,
+            ?assertEqual(TSeq1, TSeq2),
+            ?assertEqual(TSeq1, TSeq3),
+            ?assertEqual(Time1, Time2),
+            ?assertEqual(Time1, Time3)
         end)}.
 
 % Test to check that shard with high number of purges can be split
@@ -781,6 +805,9 @@ set_security(DbName, SecObj) ->
 get_security(DbName) ->
     with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end).
 
+get_time_seq(DbName) ->
+    with_proc(fun() -> fabric:get_time_seq(DbName, [?ADMIN_CTX]) end).
+
 get_db_info(DbName) ->
     with_proc(fun() ->
         {ok, Info} = fabric:get_db_info(DbName),

Reply via email to