This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch handle-dbs-specially-for-purge-checkpoints in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3b55d1d16582a46f887627ee75f28388fd92c31e Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Dec 23 01:38:38 2025 -0500 Fix handling shards dbs purge checkpoints in mem3_rep Previous PR [1] failed to account for shards db itself. Shards db (`_dbs`) is managed differently than regular shard copies. Its `mem3:shards(Dbs)` result is a single element shard list with a `#shard{}` having `node = node()` and `range = [0, ff..]`. They are replicated in a ring across all nodes, we expect to find a purge checkpoint pushing changes to the "next" node in a ring only. [1] https://github.com/apache/couchdb/pull/5827 --- src/mem3/src/mem3.erl | 7 ++- src/mem3/src/mem3_rep.erl | 122 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 103 insertions(+), 26 deletions(-) diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index f748ff7a0..6a96ae2a9 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -130,8 +130,7 @@ shards_int(DbName, Options) when is_list(DbName) -> shards_int(list_to_binary(DbName), Options); shards_int(DbName, Options) -> Ordered = lists:member(ordered, Options), - ShardDbName = - list_to_binary(config:get("mem3", "shards_db", "_dbs")), + ShardDbName = mem3_sync:shards_db(), case DbName of ShardDbName when Ordered -> %% shard_db is treated as a single sharded db to support calls to db_info @@ -141,7 +140,7 @@ shards_int(DbName, Options) -> node = config:node_name(), name = ShardDbName, dbname = ShardDbName, - range = [0, (2 bsl 31) - 1], + range = [0, ?RING_END], order = undefined } ]; @@ -153,7 +152,7 @@ shards_int(DbName, Options) -> node = config:node_name(), name = ShardDbName, dbname = ShardDbName, - range = [0, (2 bsl 31) - 1] + range = [0, ?RING_END] } ]; _ -> diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 73c7fb29f..153eb28d9 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -242,18 +242,32 @@ have_all_purge_checkpoints(true, Db, [_ | _] = Shards) -> {stop, Acc} end end, - Checkpoints = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()), - % Keep only shard copies. These are not necessarily ones with a matching - % ranges but also overlapping ranges, since the shards may have been split. - SrcRange = mem3:range(ShardName), - IsCopy = fun(#shard{name = Name, node = Node, range = Range}) -> - (not (Name == ShardName andalso Node == config:node_name())) andalso - mem3_util:range_overlap(SrcRange, Range) - end, - Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards, IsCopy(S)], - Copies1 = couch_util:set_from_list(Copies), - - sets:size(sets:subtract(Copies1, Checkpoints)) == 0; + CheckpointSet = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()), + CopySet = + case ShardName == mem3_sync:shards_db() of + true -> + % We're dealing with the shards db itself. By convention + % mem3:shards/1 returns a single #shard{} record with node = + % node(), name = _dbs, range = [0, ?RING_END] and it should + % replicate in a ring to the dbs copy on "next" node in a ring. + Next = mem3_sync:find_next_node(), + % If we're the only node, then next == node() + case Next == config:node_name() of + true -> couch_util:new_set(); + false -> couch_util:set_from_list([{Next, [0, ?RING_END]}]) + end; + false -> + % Keep only shard copies. These are not necessarily ones with a matching + % ranges but also overlapping ranges, since the shards may have been split. + SrcRange = mem3:range(ShardName), + IsCopy = fun(#shard{name = Name, node = Node, range = Range}) -> + (not (Name == ShardName andalso Node == config:node_name())) andalso + mem3_util:range_overlap(SrcRange, Range) + end, + Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards, IsCopy(S)], + couch_util:set_from_list(Copies) + end, + sets:size(sets:subtract(CopySet, CheckpointSet)) == 0; have_all_purge_checkpoints(false, _Db, _Shards) -> % If purges are not replicated then we assume we have all (0) checkpoints. true; @@ -305,17 +319,30 @@ shards(DbName) -> end. verify_checkpoint_shard(Shards, Props) when is_list(Shards), is_list(Props) -> - Range = couch_util:get_value(<<"range">>, Props), - Fun = fun(S, Acc) -> - case mem3:range(S) == Range of - true -> [mem3:node(S) | Acc]; - false -> Acc - end - end, - Nodes = lists:foldl(Fun, [], Shards), TBin = couch_util:get_value(<<"target">>, Props), TNode = binary_to_existing_atom(TBin, latin1), - lists:member(TNode, Nodes) andalso lists:member(TNode, mem3:nodes()). + ShardsDb = mem3_sync:shards_db(), + case Shards of + [#shard{dbname = ShardsDb}] -> + % This is shards db itself. It's a special case since replications + % copies are other shard db copies replicated in a ring + Next = mem3_sync:find_next_node(), + % If we're the only node, the next == node() + case Next == config:node_name() of + true -> false; + false -> TNode == Next + end; + _ -> + Range = couch_util:get_value(<<"range">>, Props), + Fun = fun(S, Acc) -> + case mem3:range(S) == Range of + true -> [mem3:node(S) | Acc]; + false -> Acc + end + end, + Nodes = lists:foldl(Fun, [], Shards), + lists:member(TNode, Nodes) andalso lists:member(TNode, mem3:nodes()) + end. %% @doc Find and return the largest update_seq in SourceDb %% that the client has seen from TargetNode. @@ -1293,6 +1320,7 @@ purge_checkpoints_test_() -> Ctx = test_util:start_couch([mem3, fabric]), config:set("mem3", "replicate_purges", "true", false), meck:new(mem3, [passthrough]), + meck:new(mem3_sync, [passthrough]), meck:expect(mem3, nodes, 0, [node(), n2, n3]), Ctx end, @@ -1304,7 +1332,9 @@ purge_checkpoints_test_() -> [ ?TDEF_FE(t_not_sharded), ?TDEF_FE(t_purges_not_replicated), - ?TDEF_FE(t_have_all_checkpoints) + ?TDEF_FE(t_have_all_checkpoints), + ?TDEF_FE(t_have_all_shards_db), + ?TDEF_FE(t_verify_checkpoint_shards_db) ] }. @@ -1394,4 +1424,52 @@ t_have_all_checkpoints(_) -> couch_db:close(Db), ok = couch_server:delete(SrcName, [?ADMIN_CTX]). +t_have_all_shards_db(_) -> + Dbs = mem3_sync:shards_db(), + {ok, Db} = mem3_util:ensure_exists(Dbs), + SrcUuid = couch_db:get_uuid(Db), + + Range = [0, ?RING_END], + Shards = [ + #shard{node = node(), name = Dbs, dbname = Dbs, range = Range} + ], + meck:expect(mem3, shards, 1, Shards), + + Src1 = #shard{name = Dbs, node = node(), range = Range}, + Tgt1 = #shard{name = Dbs, node = 'n2', range = Range}, + + % We're the only node: don't expect any other checkpoints + meck:expect(mem3_sync, find_next_node, 0, node()), + ?assert(have_all_purge_checkpoints(Dbs)), + + % There is another node and we don't have a checkpoint for it + meck:expect(mem3_sync, find_next_node, 0, 'n2'), + ?assert(not have_all_purge_checkpoints(Dbs)), + + Body1 = purge_cp_body(Src1, Tgt1, 42), + TgtUuid1 = couch_uuids:random(), + DocId1 = make_purge_id(SrcUuid, TgtUuid1), + Doc1 = #doc{id = DocId1, body = Body1}, + {ok, _} = couch_db:update_doc(Db, Doc1, [?ADMIN_CTX]), + couch_db:close(Db), + + % After adding the checkpoint for n2, we should get true again + ?assert(have_all_purge_checkpoints(Dbs)), + + ok = couch_server:delete(Dbs, [?ADMIN_CTX]). + +t_verify_checkpoint_shards_db(_) -> + Dbs = mem3_sync:shards_db(), + Range = [0, ?RING_END], + Shards = [ + #shard{node = node(), name = Dbs, dbname = Dbs, range = Range} + ], + Props1 = [ + {<<"target">>, atom_to_binary(n2, latin1)}, + {<<"range">>, Range} + ], + ?assert(not verify_checkpoint_shard(Shards, Props1)), + meck:expect(mem3_sync, find_next_node, 0, 'n2'), + ?assert(verify_checkpoint_shard(Shards, Props1)). + -endif.
