This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch handle-dbs-specially-for-purge-checkpoints
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3b55d1d16582a46f887627ee75f28388fd92c31e
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Dec 23 01:38:38 2025 -0500

    Fix handling shards dbs purge checkpoints in mem3_rep
    
    Previous PR [1] failed to account for shards db itself. Shards db (`_dbs`) 
is
    managed differently than regular shard copies. Its `mem3:shards(Dbs)` 
result is
    a single element shard list with a `#shard{}` having `node = node()` and 
`range
    = [0, ff..]`. They are replicated in a ring across all nodes, we expect to 
find
    a purge checkpoint pushing changes to the "next" node in a ring only.
    
    [1] https://github.com/apache/couchdb/pull/5827
---
 src/mem3/src/mem3.erl     |   7 ++-
 src/mem3/src/mem3_rep.erl | 122 +++++++++++++++++++++++++++++++++++++---------
 2 files changed, 103 insertions(+), 26 deletions(-)

diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index f748ff7a0..6a96ae2a9 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -130,8 +130,7 @@ shards_int(DbName, Options) when is_list(DbName) ->
     shards_int(list_to_binary(DbName), Options);
 shards_int(DbName, Options) ->
     Ordered = lists:member(ordered, Options),
-    ShardDbName =
-        list_to_binary(config:get("mem3", "shards_db", "_dbs")),
+    ShardDbName = mem3_sync:shards_db(),
     case DbName of
         ShardDbName when Ordered ->
             %% shard_db is treated as a single sharded db to support calls to 
db_info
@@ -141,7 +140,7 @@ shards_int(DbName, Options) ->
                     node = config:node_name(),
                     name = ShardDbName,
                     dbname = ShardDbName,
-                    range = [0, (2 bsl 31) - 1],
+                    range = [0, ?RING_END],
                     order = undefined
                 }
             ];
@@ -153,7 +152,7 @@ shards_int(DbName, Options) ->
                     node = config:node_name(),
                     name = ShardDbName,
                     dbname = ShardDbName,
-                    range = [0, (2 bsl 31) - 1]
+                    range = [0, ?RING_END]
                 }
             ];
         _ ->
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 73c7fb29f..153eb28d9 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -242,18 +242,32 @@ have_all_purge_checkpoints(true, Db, [_ | _] = Shards) ->
                 {stop, Acc}
         end
     end,
-    Checkpoints = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()),
-    % Keep only shard copies. These are not necessarily ones with a matching
-    % ranges but also overlapping ranges, since the shards may have been split.
-    SrcRange = mem3:range(ShardName),
-    IsCopy = fun(#shard{name = Name, node = Node, range = Range}) ->
-        (not (Name == ShardName andalso Node == config:node_name())) andalso
-            mem3_util:range_overlap(SrcRange, Range)
-    end,
-    Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards, IsCopy(S)],
-    Copies1 = couch_util:set_from_list(Copies),
-
-    sets:size(sets:subtract(Copies1, Checkpoints)) == 0;
+    CheckpointSet = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()),
+    CopySet =
+        case ShardName == mem3_sync:shards_db() of
+            true ->
+                % We're dealing with the shards db itself. By convention
+                % mem3:shards/1 returns a single #shard{} record with node =
+                % node(), name = _dbs, range = [0, ?RING_END] and it should
+                % replicate in a ring to the dbs copy on "next" node in a ring.
+                Next = mem3_sync:find_next_node(),
+                % If we're the only node, then next == node()
+                case Next == config:node_name() of
+                    true -> couch_util:new_set();
+                    false -> couch_util:set_from_list([{Next, [0, ?RING_END]}])
+                end;
+            false ->
+                % Keep only shard copies. These are not necessarily ones with 
a matching
+                % ranges but also overlapping ranges, since the shards may 
have been split.
+                SrcRange = mem3:range(ShardName),
+                IsCopy = fun(#shard{name = Name, node = Node, range = Range}) 
->
+                    (not (Name == ShardName andalso Node == 
config:node_name())) andalso
+                        mem3_util:range_overlap(SrcRange, Range)
+                end,
+                Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards, 
IsCopy(S)],
+                couch_util:set_from_list(Copies)
+        end,
+    sets:size(sets:subtract(CopySet, CheckpointSet)) == 0;
 have_all_purge_checkpoints(false, _Db, _Shards) ->
     % If purges are not replicated then we assume we have all (0) checkpoints.
     true;
@@ -305,17 +319,30 @@ shards(DbName) ->
     end.
 
 verify_checkpoint_shard(Shards, Props) when is_list(Shards), is_list(Props) ->
-    Range = couch_util:get_value(<<"range">>, Props),
-    Fun = fun(S, Acc) ->
-        case mem3:range(S) == Range of
-            true -> [mem3:node(S) | Acc];
-            false -> Acc
-        end
-    end,
-    Nodes = lists:foldl(Fun, [], Shards),
     TBin = couch_util:get_value(<<"target">>, Props),
     TNode = binary_to_existing_atom(TBin, latin1),
-    lists:member(TNode, Nodes) andalso lists:member(TNode, mem3:nodes()).
+    ShardsDb = mem3_sync:shards_db(),
+    case Shards of
+        [#shard{dbname = ShardsDb}] ->
+            % This is shards db itself. It's a special case since replications
+            % copies are other shard db copies replicated in a ring
+            Next = mem3_sync:find_next_node(),
+            % If we're the only node, the next == node()
+            case Next == config:node_name() of
+                true -> false;
+                false -> TNode == Next
+            end;
+        _ ->
+            Range = couch_util:get_value(<<"range">>, Props),
+            Fun = fun(S, Acc) ->
+                case mem3:range(S) == Range of
+                    true -> [mem3:node(S) | Acc];
+                    false -> Acc
+                end
+            end,
+            Nodes = lists:foldl(Fun, [], Shards),
+            lists:member(TNode, Nodes) andalso lists:member(TNode, 
mem3:nodes())
+    end.
 
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
@@ -1293,6 +1320,7 @@ purge_checkpoints_test_() ->
             Ctx = test_util:start_couch([mem3, fabric]),
             config:set("mem3", "replicate_purges", "true", false),
             meck:new(mem3, [passthrough]),
+            meck:new(mem3_sync, [passthrough]),
             meck:expect(mem3, nodes, 0, [node(), n2, n3]),
             Ctx
         end,
@@ -1304,7 +1332,9 @@ purge_checkpoints_test_() ->
         [
             ?TDEF_FE(t_not_sharded),
             ?TDEF_FE(t_purges_not_replicated),
-            ?TDEF_FE(t_have_all_checkpoints)
+            ?TDEF_FE(t_have_all_checkpoints),
+            ?TDEF_FE(t_have_all_shards_db),
+            ?TDEF_FE(t_verify_checkpoint_shards_db)
         ]
     }.
 
@@ -1394,4 +1424,52 @@ t_have_all_checkpoints(_) ->
     couch_db:close(Db),
     ok = couch_server:delete(SrcName, [?ADMIN_CTX]).
 
+t_have_all_shards_db(_) ->
+    Dbs = mem3_sync:shards_db(),
+    {ok, Db} = mem3_util:ensure_exists(Dbs),
+    SrcUuid = couch_db:get_uuid(Db),
+
+    Range = [0, ?RING_END],
+    Shards = [
+        #shard{node = node(), name = Dbs, dbname = Dbs, range = Range}
+    ],
+    meck:expect(mem3, shards, 1, Shards),
+
+    Src1 = #shard{name = Dbs, node = node(), range = Range},
+    Tgt1 = #shard{name = Dbs, node = 'n2', range = Range},
+
+    % We're the only node: don't expect any other checkpoints
+    meck:expect(mem3_sync, find_next_node, 0, node()),
+    ?assert(have_all_purge_checkpoints(Dbs)),
+
+    % There is another node and we don't have a checkpoint for it
+    meck:expect(mem3_sync, find_next_node, 0, 'n2'),
+    ?assert(not have_all_purge_checkpoints(Dbs)),
+
+    Body1 = purge_cp_body(Src1, Tgt1, 42),
+    TgtUuid1 = couch_uuids:random(),
+    DocId1 = make_purge_id(SrcUuid, TgtUuid1),
+    Doc1 = #doc{id = DocId1, body = Body1},
+    {ok, _} = couch_db:update_doc(Db, Doc1, [?ADMIN_CTX]),
+    couch_db:close(Db),
+
+    % After adding the checkpoint for n2, we should get true again
+    ?assert(have_all_purge_checkpoints(Dbs)),
+
+    ok = couch_server:delete(Dbs, [?ADMIN_CTX]).
+
+t_verify_checkpoint_shards_db(_) ->
+    Dbs = mem3_sync:shards_db(),
+    Range = [0, ?RING_END],
+    Shards = [
+        #shard{node = node(), name = Dbs, dbname = Dbs, range = Range}
+    ],
+    Props1 = [
+        {<<"target">>, atom_to_binary(n2, latin1)},
+        {<<"range">>, Range}
+    ],
+    ?assert(not verify_checkpoint_shard(Shards, Props1)),
+    meck:expect(mem3_sync, find_next_node, 0, 'n2'),
+    ?assert(verify_checkpoint_shard(Shards, Props1)).
+
 -endif.

Reply via email to