This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 2c92efdf8 Fix handling shards dbs purge checkpoints in mem3_rep
2c92efdf8 is described below
commit 2c92efdf8a1f734333d989182ae03d220d5e7d24
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Dec 23 01:38:38 2025 -0500
Fix handling shards dbs purge checkpoints in mem3_rep
Previous PR [1] failed to account for shards db itself. Shards db (`_dbs`)
is
managed differently than regular shard copies. Its `mem3:shards(Dbs)`
result is
a single element shard list with a `#shard{}` having `node = node()` and
`range
= [0, ff..]`. They are replicated in a ring across all nodes, we expect to
find
a purge checkpoint pushing changes to the "next" node in a ring only.
[1] https://github.com/apache/couchdb/pull/5827
---
src/mem3/src/mem3.erl | 7 ++-
src/mem3/src/mem3_rep.erl | 122 +++++++++++++++++++++++++++++++++++++---------
2 files changed, 103 insertions(+), 26 deletions(-)
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index f748ff7a0..6a96ae2a9 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -130,8 +130,7 @@ shards_int(DbName, Options) when is_list(DbName) ->
shards_int(list_to_binary(DbName), Options);
shards_int(DbName, Options) ->
Ordered = lists:member(ordered, Options),
- ShardDbName =
- list_to_binary(config:get("mem3", "shards_db", "_dbs")),
+ ShardDbName = mem3_sync:shards_db(),
case DbName of
ShardDbName when Ordered ->
%% shard_db is treated as a single sharded db to support calls to
db_info
@@ -141,7 +140,7 @@ shards_int(DbName, Options) ->
node = config:node_name(),
name = ShardDbName,
dbname = ShardDbName,
- range = [0, (2 bsl 31) - 1],
+ range = [0, ?RING_END],
order = undefined
}
];
@@ -153,7 +152,7 @@ shards_int(DbName, Options) ->
node = config:node_name(),
name = ShardDbName,
dbname = ShardDbName,
- range = [0, (2 bsl 31) - 1]
+ range = [0, ?RING_END]
}
];
_ ->
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 73c7fb29f..153eb28d9 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -242,18 +242,32 @@ have_all_purge_checkpoints(true, Db, [_ | _] = Shards) ->
{stop, Acc}
end
end,
- Checkpoints = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()),
- % Keep only shard copies. These are not necessarily ones with a matching
- % ranges but also overlapping ranges, since the shards may have been split.
- SrcRange = mem3:range(ShardName),
- IsCopy = fun(#shard{name = Name, node = Node, range = Range}) ->
- (not (Name == ShardName andalso Node == config:node_name())) andalso
- mem3_util:range_overlap(SrcRange, Range)
- end,
- Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards, IsCopy(S)],
- Copies1 = couch_util:set_from_list(Copies),
-
- sets:size(sets:subtract(Copies1, Checkpoints)) == 0;
+ CheckpointSet = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()),
+ CopySet =
+ case ShardName == mem3_sync:shards_db() of
+ true ->
+ % We're dealing with the shards db itself. By convention
+ % mem3:shards/1 returns a single #shard{} record with node =
+ % node(), name = _dbs, range = [0, ?RING_END] and it should
+ % replicate in a ring to the dbs copy on "next" node in a ring.
+ Next = mem3_sync:find_next_node(),
+ % If we're the only node, then next == node()
+ case Next == config:node_name() of
+ true -> couch_util:new_set();
+ false -> couch_util:set_from_list([{Next, [0, ?RING_END]}])
+ end;
+ false ->
+ % Keep only shard copies. These are not necessarily ones with
a matching
+ % ranges but also overlapping ranges, since the shards may
have been split.
+ SrcRange = mem3:range(ShardName),
+ IsCopy = fun(#shard{name = Name, node = Node, range = Range})
->
+ (not (Name == ShardName andalso Node ==
config:node_name())) andalso
+ mem3_util:range_overlap(SrcRange, Range)
+ end,
+ Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards,
IsCopy(S)],
+ couch_util:set_from_list(Copies)
+ end,
+ sets:size(sets:subtract(CopySet, CheckpointSet)) == 0;
have_all_purge_checkpoints(false, _Db, _Shards) ->
% If purges are not replicated then we assume we have all (0) checkpoints.
true;
@@ -305,17 +319,30 @@ shards(DbName) ->
end.
verify_checkpoint_shard(Shards, Props) when is_list(Shards), is_list(Props) ->
- Range = couch_util:get_value(<<"range">>, Props),
- Fun = fun(S, Acc) ->
- case mem3:range(S) == Range of
- true -> [mem3:node(S) | Acc];
- false -> Acc
- end
- end,
- Nodes = lists:foldl(Fun, [], Shards),
TBin = couch_util:get_value(<<"target">>, Props),
TNode = binary_to_existing_atom(TBin, latin1),
- lists:member(TNode, Nodes) andalso lists:member(TNode, mem3:nodes()).
+ ShardsDb = mem3_sync:shards_db(),
+ case Shards of
+ [#shard{dbname = ShardsDb}] ->
+ % This is shards db itself. It's a special case since replications
+ % copies are other shard db copies replicated in a ring
+ Next = mem3_sync:find_next_node(),
+ % If we're the only node, the next == node()
+ case Next == config:node_name() of
+ true -> false;
+ false -> TNode == Next
+ end;
+ _ ->
+ Range = couch_util:get_value(<<"range">>, Props),
+ Fun = fun(S, Acc) ->
+ case mem3:range(S) == Range of
+ true -> [mem3:node(S) | Acc];
+ false -> Acc
+ end
+ end,
+ Nodes = lists:foldl(Fun, [], Shards),
+ lists:member(TNode, Nodes) andalso lists:member(TNode,
mem3:nodes())
+ end.
%% @doc Find and return the largest update_seq in SourceDb
%% that the client has seen from TargetNode.
@@ -1293,6 +1320,7 @@ purge_checkpoints_test_() ->
Ctx = test_util:start_couch([mem3, fabric]),
config:set("mem3", "replicate_purges", "true", false),
meck:new(mem3, [passthrough]),
+ meck:new(mem3_sync, [passthrough]),
meck:expect(mem3, nodes, 0, [node(), n2, n3]),
Ctx
end,
@@ -1304,7 +1332,9 @@ purge_checkpoints_test_() ->
[
?TDEF_FE(t_not_sharded),
?TDEF_FE(t_purges_not_replicated),
- ?TDEF_FE(t_have_all_checkpoints)
+ ?TDEF_FE(t_have_all_checkpoints),
+ ?TDEF_FE(t_have_all_shards_db),
+ ?TDEF_FE(t_verify_checkpoint_shards_db)
]
}.
@@ -1394,4 +1424,52 @@ t_have_all_checkpoints(_) ->
couch_db:close(Db),
ok = couch_server:delete(SrcName, [?ADMIN_CTX]).
+t_have_all_shards_db(_) ->
+ Dbs = mem3_sync:shards_db(),
+ {ok, Db} = mem3_util:ensure_exists(Dbs),
+ SrcUuid = couch_db:get_uuid(Db),
+
+ Range = [0, ?RING_END],
+ Shards = [
+ #shard{node = node(), name = Dbs, dbname = Dbs, range = Range}
+ ],
+ meck:expect(mem3, shards, 1, Shards),
+
+ Src1 = #shard{name = Dbs, node = node(), range = Range},
+ Tgt1 = #shard{name = Dbs, node = 'n2', range = Range},
+
+ % We're the only node: don't expect any other checkpoints
+ meck:expect(mem3_sync, find_next_node, 0, node()),
+ ?assert(have_all_purge_checkpoints(Dbs)),
+
+ % There is another node and we don't have a checkpoint for it
+ meck:expect(mem3_sync, find_next_node, 0, 'n2'),
+ ?assert(not have_all_purge_checkpoints(Dbs)),
+
+ Body1 = purge_cp_body(Src1, Tgt1, 42),
+ TgtUuid1 = couch_uuids:random(),
+ DocId1 = make_purge_id(SrcUuid, TgtUuid1),
+ Doc1 = #doc{id = DocId1, body = Body1},
+ {ok, _} = couch_db:update_doc(Db, Doc1, [?ADMIN_CTX]),
+ couch_db:close(Db),
+
+ % After adding the checkpoint for n2, we should get true again
+ ?assert(have_all_purge_checkpoints(Dbs)),
+
+ ok = couch_server:delete(Dbs, [?ADMIN_CTX]).
+
+t_verify_checkpoint_shards_db(_) ->
+ Dbs = mem3_sync:shards_db(),
+ Range = [0, ?RING_END],
+ Shards = [
+ #shard{node = node(), name = Dbs, dbname = Dbs, range = Range}
+ ],
+ Props1 = [
+ {<<"target">>, atom_to_binary(n2, latin1)},
+ {<<"range">>, Range}
+ ],
+ ?assert(not verify_checkpoint_shard(Shards, Props1)),
+ meck:expect(mem3_sync, find_next_node, 0, 'n2'),
+ ?assert(verify_checkpoint_shard(Shards, Props1)).
+
-endif.