This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 03fdbd68f Consider previous node replications for _dbs purge
checkpoints
03fdbd68f is described below
commit 03fdbd68f8bbf4435bc747a5408077297e455745
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Wed Dec 24 01:25:42 2025 -0500
Consider previous node replications for _dbs purge checkpoints
In the previous PR [1] we added special handling for shards dbs since it
has a
custom ring replication topology. In PR [1] we considered only the
checkpoint
for pushes from the current node to the "next" none in the ring. However, we
should also consider the checkpoints created by the "previous" node when it
pull purges from current node, so that what we fix in this PR.
As a reminder, a replication job from node A to node B will:
1) pull purges from B to A (checkpoint on B with a B->A purge checkpoint
doc)
2) push purges from A to B (checkpoint on A with a A->B purge checkpoint
doc)
3) push do updates from A to B (checkpoint on A with A->B sync checkpoint
doc, and
on B also with an A->B sync checkpoint doc)
[1] https://github.com/apache/couchdb/pull/5832
---
src/mem3/src/mem3_rep.erl | 49 ++++++++++++++++++++++++++++++++++++++--------
src/mem3/src/mem3_sync.erl | 43 ++++++++++++++++++++++++++++++++++------
2 files changed, 78 insertions(+), 14 deletions(-)
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 153eb28d9..6413c72a4 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -250,11 +250,20 @@ have_all_purge_checkpoints(true, Db, [_ | _] = Shards) ->
% mem3:shards/1 returns a single #shard{} record with node =
% node(), name = _dbs, range = [0, ?RING_END] and it should
% replicate in a ring to the dbs copy on "next" node in a ring.
+ % We push purges to the next node and the previous node pull
+ % purges from us, so we expect to have only two purge
+ % checkpoints for the next and previous nodes.
Next = mem3_sync:find_next_node(),
- % If we're the only node, then next == node()
+ Prev = mem3_sync:find_previous_node(),
+ % If we're the only node, then next == node() and prev ==
node()
case Next == config:node_name() of
- true -> couch_util:new_set();
- false -> couch_util:set_from_list([{Next, [0, ?RING_END]}])
+ true ->
+ couch_util:new_set();
+ false ->
+ couch_util:set_from_list([
+ {Next, [0, ?RING_END]},
+ {Prev, [0, ?RING_END]}
+ ])
end;
false ->
% Keep only shard copies. These are not necessarily ones with
a matching
@@ -325,12 +334,15 @@ verify_checkpoint_shard(Shards, Props) when
is_list(Shards), is_list(Props) ->
case Shards of
[#shard{dbname = ShardsDb}] ->
% This is shards db itself. It's a special case since replications
- % copies are other shard db copies replicated in a ring
+ % copies are other shard db copies replicated in a ring. We push
+ % purges to the next and node and the previous node pull purges
+ % from us. So we expect to have two purge replication checkpoints.
Next = mem3_sync:find_next_node(),
+ Prev = mem3_sync:find_previous_node(),
% If we're the only node, the next == node()
case Next == config:node_name() of
true -> false;
- false -> TNode == Next
+ false -> TNode == Next orelse TNode == Prev
end;
_ ->
Range = couch_util:get_value(<<"range">>, Props),
@@ -1437,13 +1449,16 @@ t_have_all_shards_db(_) ->
Src1 = #shard{name = Dbs, node = node(), range = Range},
Tgt1 = #shard{name = Dbs, node = 'n2', range = Range},
+ Tgt2 = #shard{name = Dbs, node = 'n3', range = Range},
% We're the only node: don't expect any other checkpoints
meck:expect(mem3_sync, find_next_node, 0, node()),
+ meck:expect(mem3_sync, find_previous_node, 0, node()),
?assert(have_all_purge_checkpoints(Dbs)),
% There is another node and we don't have a checkpoint for it
meck:expect(mem3_sync, find_next_node, 0, 'n2'),
+ meck:expect(mem3_sync, find_previous_node, 0, 'n3'),
?assert(not have_all_purge_checkpoints(Dbs)),
Body1 = purge_cp_body(Src1, Tgt1, 42),
@@ -1451,11 +1466,21 @@ t_have_all_shards_db(_) ->
DocId1 = make_purge_id(SrcUuid, TgtUuid1),
Doc1 = #doc{id = DocId1, body = Body1},
{ok, _} = couch_db:update_doc(Db, Doc1, [?ADMIN_CTX]),
- couch_db:close(Db),
- % After adding the checkpoint for n2, we should get true again
+ % After adding the checkpoint for n2, we should still get false because
+ % there is no previous checkpoint for n3 pull purges from us
+ ?assert(not have_all_purge_checkpoints(Dbs)),
+
+ Body2 = purge_cp_body(Src1, Tgt2, 43),
+ TgtUuid2 = couch_uuids:random(),
+ DocId2 = make_purge_id(SrcUuid, TgtUuid2),
+ Doc2 = #doc{id = DocId2, body = Body2},
+ {ok, _} = couch_db:update_doc(Db, Doc2, [?ADMIN_CTX]),
+
+ % After adding the checkpoint for n3, we should get true
?assert(have_all_purge_checkpoints(Dbs)),
+ couch_db:close(Db),
ok = couch_server:delete(Dbs, [?ADMIN_CTX]).
t_verify_checkpoint_shards_db(_) ->
@@ -1470,6 +1495,14 @@ t_verify_checkpoint_shards_db(_) ->
],
?assert(not verify_checkpoint_shard(Shards, Props1)),
meck:expect(mem3_sync, find_next_node, 0, 'n2'),
- ?assert(verify_checkpoint_shard(Shards, Props1)).
+ ?assert(verify_checkpoint_shard(Shards, Props1)),
+
+ Props2 = [
+ {<<"target">>, atom_to_binary(n3, latin1)},
+ {<<"range">>, Range}
+ ],
+ ?assert(not verify_checkpoint_shard(Shards, Props2)),
+ meck:expect(mem3_sync, find_previous_node, 0, 'n3'),
+ ?assert(verify_checkpoint_shard(Shards, Props2)).
-endif.
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 67eb77181..363b3e0b7 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -32,7 +32,8 @@
nodes_db/0,
shards_db/0,
users_db/0,
- find_next_node/0
+ find_next_node/0,
+ find_previous_node/0
]).
-export([
local_dbs/0
@@ -307,10 +308,19 @@ find_next_node() ->
Self = node(),
LiveNodes = [Self | nodes()],
Mem3Nodes = mem3:nodes(),
- find_next_node(Self, LiveNodes, Mem3Nodes).
+ find_next_node(Self, LiveNodes, lists:sort(Mem3Nodes)).
-find_next_node(Self, LiveNodes, Mem3Nodes) ->
- SortedMem3Nodes = lists:sort(Mem3Nodes),
+find_previous_node() ->
+ Self = node(),
+ LiveNodes = [Self | nodes()],
+ Mem3Nodes = mem3:nodes(),
+ % Previous node is the "next" node in the reverse sorted list
+ find_previous_node(Self, LiveNodes, lists:sort(Mem3Nodes)).
+
+find_previous_node(Self, LiveNodes, SortedMem3Nodes) ->
+ find_next_node(Self, LiveNodes, lists:reverse(SortedMem3Nodes)).
+
+find_next_node(Self, LiveNodes, SortedMem3Nodes) ->
LiveMem3Nodes = [N || N <- SortedMem3Nodes, lists:member(N, LiveNodes)],
case LiveMem3Nodes of
[] ->
@@ -404,13 +414,34 @@ is_job_current(#job{name = Name, node = Node},
ConnectedNodes, Mem3Nodes) ->
find_next_node_test() ->
?assertEqual(n, find_next_node(n, [n], [])),
+ ?assertEqual(n, find_previous_node(n, [], [])),
+
?assertEqual(n, find_next_node(n, [n], [n])),
+ ?assertEqual(n, find_previous_node(n, [n], [n])),
+
+ % We're in the middle
?assertEqual(x, find_next_node(n, [a, n, x], [a, n, x])),
+ ?assertEqual(a, find_previous_node(n, [a, n, x], [a, n, x])),
+
+ % Two nodes, we're at the end (start, for previous)
?assertEqual(a, find_next_node(n, [a, n], [a, n])),
+ ?assertEqual(a, find_previous_node(n, [a, n], [a, n])),
+
+ % We're on a node that's not in mem3:nodes() so next/prev is ourselves.
?assertEqual(n, find_next_node(n, [a, n], [a])),
- ?assertEqual(x, find_next_node(n, [n, x], [x, n])),
+ ?assertEqual(n, find_previous_node(n, [a, n], [a])),
+
+ % Two nodes, we're at the start (end, for previous). Live nodes unsorted
+ ?assertEqual(x, find_next_node(n, [x, n], [n, x])),
+ ?assertEqual(x, find_previous_node(n, [x, n], [n, x])),
+
+ % Two nodes, we're at the start (end, for previous). Live nodes are sorted
?assertEqual(x, find_next_node(n, [n, x], [n, x])),
- ?assertEqual(a, find_next_node(n, [a, n, x], [a, n, y])).
+ ?assertEqual(x, find_previous_node(n, [n, x], [n, x])),
+
+ % node x is not in mem3:nodes() and node and y is not live
+ ?assertEqual(a, find_next_node(n, [a, n, x], [a, n, y])),
+ ?assertEqual(a, find_previous_node(n, [a, n, x], [a, n, y])).
is_job_current_test_() ->
{