This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch fix-purge-internal-replicator-client-verify in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 578c09e229e1f46d0f14a5a90b8eb615ac9f3fa2 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Nov 25 00:41:23 2025 -0500 Optimize and clean up internal replicator purge checkpoints Previously, the internal replicator created twice the number of checkpoints needed to replicate purges between nodes. An internal replication job first pulls the purges from the target to the source, then pushes the purges from the source to the target, then finally pushes the document updates to the target. During the pull operation, for example, from node `B` (the target) to node `A` (the source), it creates an `A->B` checkpoint on node `B` (the target). Then, during the push from `A` to `B` it creates an `A->B` checkpoint on node A (the source). As a result, after the job finishes there are two checkpoints: an A->B one on A, and an `A->B` one on B. It may look something like this: ``` [node A] [node B] <-------pull------ (A->B) (A->B) --------push------> ``` When the internal replication job runs on node B and _pushes_ purges to node A, it will create a `B->A` checkpoint on B. After this instant, there will be two checkpoints on B for replicating purges from B to A: one is `A->B`, from the first job, and another `B->A`, from the second job. Both of the checkpoints essentially checkpoint the same thing. It may looke like this after both replication jobs finish: ``` [node A] [node B] <-------pull------ (A->B) JOB1 (A->B) --------push------> (B->A) --------pull------> <-------push------ (B->A) JOB2 ``` On B, the checkpoints `A->B` and `B->A` could have a different purge sequence: one higher than the other, and so the lower one could delay the compactor from cleaning up purge infos. This also makes it harder to reason about the replication process, since we have an `A->B` checkpoint on `B`, but it's for sending changes _from_ B _to_ A, not like one might expect `A->B` based on its name. To fix this, make sure to use a single checkpoint per direction of replication. So, when change are pulled from B to A, the checkpoint is now B->A, and when changes are pushed from B to A the checkpoint is also B->A. It should look something like this: ``` [node A] [node B] <-------pull------ JOB1 (A->B) --------push------> --------pull------> <-------push------ (B->A) JOB2 ``` Since after this change we'll have some deprecated purge checkpoints to clean up, it's probably also a good time to introduce purge checkpoint cleanup. We have this for indexes but we didn't have it for the internal replicator. That meant that on shard map reconfigurations, or node membership changes, user would have to manually hunt down local (un-clustered) stale purge checkpoints and remove them. Now this happens automatically when we compact, and before we replicate between nodes. --- src/couch/src/couch_bt_engine_compactor.erl | 4 + src/mem3/src/mem3_rep.erl | 145 +++++++++++++++++++--------- src/mem3/src/mem3_rpc.erl | 42 ++++++-- src/mem3/test/eunit/mem3_rep_test.erl | 119 ++++++++++++++++++++++- 4 files changed, 256 insertions(+), 54 deletions(-) diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index d412c891f..85d33cf95 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -158,6 +158,10 @@ copy_purge_info(#comp_st{} = CompSt) -> retry = Retry } = CompSt, ?COMP_EVENT(purge_init), + % The minumum purge sequence calculation involves finding the lowest + % reported purge sequence across all checkpoints. Make sure to clean up any + % stale or deprecated internal replicator checkpoints beforehand. + ok = mem3_rep:cleanup_purge_checkpoints(DbName), MinPurgeSeq = couch_util:with_db(DbName, fun(Db) -> couch_db:get_minimum_purge_seq(Db) end), diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index c5157f52c..e602d0bd1 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -19,6 +19,7 @@ make_local_id/3, make_purge_id/2, verify_purge_checkpoint/2, + cleanup_purge_checkpoints/1, find_source_seq/4, find_split_target_seq/4, local_id_hash/1 @@ -56,6 +57,10 @@ }). -define(DEFAULT_REXI_TIMEOUT, 600000). +-define(CHECKPOINT_PREFIX, "_local/shard-sync-"). +-define(PURGE_PREFIX, "_local/purge-mem3-"). +-define(UUID_SIZE, 32). +-define(PURGE_TYPE, <<"internal_replication">>). go(Source, Target) -> go(Source, Target, []). @@ -148,12 +153,12 @@ make_local_id(#shard{node = SourceNode}, #shard{node = TargetNode}, Filter) -> make_local_id(SourceThing, TargetThing, F) when is_binary(F) -> S = local_id_hash(SourceThing), T = local_id_hash(TargetThing), - <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>; + <<?CHECKPOINT_PREFIX, S/binary, "-", T/binary, F/binary>>; make_local_id(SourceThing, TargetThing, Filter) -> S = local_id_hash(SourceThing), T = local_id_hash(TargetThing), F = filter_hash(Filter), - <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>. + <<?CHECKPOINT_PREFIX, S/binary, "-", T/binary, F/binary>>. filter_hash(Filter) when is_function(Filter) -> {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq), @@ -166,44 +171,98 @@ local_id_hash(Thing) -> couch_util:encodeBase64Url(couch_hash:md5_hash(?term_to_bin(Thing))). make_purge_id(SourceUUID, TargetUUID) -> - <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>. + <<?PURGE_PREFIX, SourceUUID/binary, "-", TargetUUID/binary>>. -verify_purge_checkpoint(DbName, Props) -> - try - Type = couch_util:get_value(<<"type">>, Props), - if - Type =/= <<"internal_replication">> -> - false; - true -> - SourceBin = couch_util:get_value(<<"source">>, Props), - TargetBin = couch_util:get_value(<<"target">>, Props), - Range = couch_util:get_value(<<"range">>, Props), +remote_id_to_local(<<?PURGE_PREFIX, Remote:?UUID_SIZE/binary, "-", Local:?UUID_SIZE/binary>>) -> + <<?PURGE_PREFIX, Local/binary, "-", Remote/binary>>. - Source = binary_to_existing_atom(SourceBin, latin1), - Target = binary_to_existing_atom(TargetBin, latin1), +% If the shard map changed, nodes are decomissioned, or user upgraded from a +% version before 3.6 we may have some some checkpoints to clean up. Call this +% function before compacting, right before we calculate the minimum purge +% sequence, and also before we replicate purges to/from other copies. +% +cleanup_purge_checkpoints(ShardName) when is_binary(ShardName) -> + couch_util:with_db(ShardName, fun(Db) -> cleanup_purge_checkpoints(Db) end); +cleanup_purge_checkpoints(Db) -> + Shards = shards(couch_db:name(Db)), + UUID = couch_db:get_uuid(Db), + FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) -> + case Id of + <<?PURGE_PREFIX, UUID:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> -> + case verify_checkpoint_shard(Shards, Props) of + true -> {ok, Acc}; + false -> {ok, [Id | Acc]} + end; + <<?PURGE_PREFIX, _:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> -> + % Cleanup checkpoints not originating at the current shard. + % Previously, before version 3.6, during a pull from shard B to + % shard A we checkpointed on target B with doc ID + % mem3-purge-$AUuid-$BUuid. That created a redunant checkpoint + % which was the same as target B pushing changes to target A, + % which already had a checkpoint: mem3-purge-$BUuid-$AUuid, + % with the same direction and same purge sequence ID. So here + % we remove those reduntant checkpoints. + {ok, [Id | Acc]}; + _ -> + {stop, Acc} + end + end, + Opts = [{start_key, list_to_binary(?PURGE_PREFIX)}], + {ok, ToDelete} = couch_db:fold_local_docs(Db, FoldFun, [], Opts), + DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end, + lists:foreach(DeleteFun, ToDelete). + +delete_checkpoint(Db, DocId) -> + DbName = couch_db:name(Db), + LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s", + couch_log:warning(LogMsg, [?MODULE, DbName, DocId]), + try couch_db:open_doc(Db, DocId, []) of + {ok, Doc = #doc{}} -> + Deleted = Doc#doc{deleted = true, body = {[]}}, + couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]); + {not_found, _} -> + ok + catch + Tag:Error -> + ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p", + couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]), + ok + end. - try - Nodes = lists:foldl( - fun(Shard, Acc) -> - case Shard#shard.range == Range of - true -> [Shard#shard.node | Acc]; - false -> Acc - end - end, - [], - mem3:shards(mem3:dbname(DbName)) - ), - lists:member(Source, Nodes) andalso lists:member(Target, Nodes) - catch - error:database_does_not_exist -> - false - end +verify_purge_checkpoint(DbName, Props) -> + try + case couch_util:get_value(<<"type">>, Props) of + ?PURGE_TYPE -> verify_checkpoint_shard(shards(DbName), Props); + _ -> false end catch - _:_ -> + Tag:Error -> + ErrLog = "~p : invalid checkpoint shard:~s props:~p error: ~p:~p", + couch_log:error(ErrLog, [?MODULE, DbName, Props, Tag, Error]), false end. +shards(DbName) -> + try + mem3:shards(mem3:dbname(DbName)) + catch + error:database_does_not_exist -> + [] + end. + +verify_checkpoint_shard(Shards, Props) when is_list(Shards), is_list(Props) -> + Range = couch_util:get_value(<<"range">>, Props), + Fun = fun(S, Acc) -> + case mem3:range(S) == Range of + true -> [mem3:node(S) | Acc]; + false -> Acc + end + end, + Nodes = lists:foldl(Fun, [], Shards), + TBin = couch_util:get_value(<<"target">>, Props), + TNode = binary_to_existing_atom(TBin, latin1), + lists:member(TNode, Nodes) andalso lists:member(TNode, mem3:nodes()). + %% @doc Find and return the largest update_seq in SourceDb %% that the client has seen from TargetNode. %% @@ -335,6 +394,7 @@ pull_purges_multi(#acc{} = Acc0) -> hashfun = HashFun } = Acc0, with_src_db(Acc0, fun(Db) -> + cleanup_purge_checkpoints(Db), Targets = maps:map( fun(_, #tgt{} = T) -> pull_purges(Db, Count, Source, T, HashFun) @@ -365,9 +425,9 @@ pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, HashFun) -> #tgt{shard = TgtShard} = Tgt0, SrcUUID = couch_db:get_uuid(Db), #shard{node = TgtNode, name = TgtDbName} = TgtShard, - {LocalPurgeId, Infos, ThroughSeq, Remaining} = + {RemoteId, Infos, ThroughSeq, Remaining} = mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count), - Tgt = Tgt0#tgt{purgeid = LocalPurgeId}, + Tgt = Tgt0#tgt{purgeid = RemoteId}, if Infos == [] -> ok; @@ -391,7 +451,7 @@ pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, HashFun) -> Infos1 = lists:filter(BelongsFun, Infos), {ok, _} = couch_db:purge_docs(Db, Infos1, [?REPLICATED_CHANGES]), Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq), - mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, Body) + mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, RemoteId, Body) end, Tgt#tgt{remaining = max(0, Remaining)}. @@ -427,7 +487,8 @@ push_purges_multi(#acc{} = Acc) -> end). push_purges(Db, BatchSize, SrcShard, Tgt, HashFun) -> - #tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt, + #tgt{shard = TgtShard, purgeid = RemotePurgeId} = Tgt, + LocalPurgeId = remote_id_to_local(RemotePurgeId), #shard{node = TgtNode, range = TgtRange, name = TgtDbName} = TgtShard, StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of @@ -741,21 +802,19 @@ update_locals(Target, Db, Seq) -> {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []). purge_cp_body(#shard{} = Source, #shard{} = Target, PurgeSeq) -> - {Mega, Secs, _} = os:timestamp(), - NowSecs = Mega * 1000000 + Secs, {[ - {<<"type">>, <<"internal_replication">>}, - {<<"updated_on">>, NowSecs}, + {<<"type">>, ?PURGE_TYPE}, + {<<"updated_on">>, os:system_time(second)}, {<<"purge_seq">>, PurgeSeq}, {<<"source">>, atom_to_binary(Source#shard.node, latin1)}, {<<"target">>, atom_to_binary(Target#shard.node, latin1)}, - {<<"range">>, Source#shard.range} + {<<"range">>, Target#shard.range} ]}. find_repl_doc(SrcDb, TgtUUIDPrefix) -> SrcUUID = couch_db:get_uuid(SrcDb), S = local_id_hash(SrcUUID), - DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>, + DocIdPrefix = <<?CHECKPOINT_PREFIX, S/binary, "-">>, FoldFun = fun(#doc{id = DocId, body = {BodyProps}} = Doc, _) -> TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>), case is_prefix(DocIdPrefix, DocId) of @@ -802,7 +861,7 @@ find_split_target_seq_int(TgtDb, Node, SrcUUIDPrefix) -> {ok, not_found} end end, - Options = [{start_key, <<"_local/shard-sync-">>}], + Options = [{start_key, <<?CHECKPOINT_PREFIX>>}], case couch_db:fold_local_docs(TgtDb, FoldFun, not_found, Options) of {ok, Seqs} when is_list(Seqs) -> {ok, Seqs}; diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index c62954fc4..51c18d175 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -189,7 +189,12 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) -> case get_or_create_db(DbName, [?ADMIN_CTX]) of {ok, Db} -> TgtUUID = couch_db:get_uuid(Db), - PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID), + % This is the remote checkpoint running on the target to pull + % purges to the source. The changes are flowing from the target to + % the source, that's why checkpoint is from tgt to src here. This + % is also the same checkpoint used when the target pushed changes + % to the source. + PurgeDocId = mem3_rep:make_purge_id(TgtUUID, SrcUUID), StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of {ok, #doc{body = {Props}}} -> @@ -222,19 +227,36 @@ save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of {ok, Db} -> - Doc = #doc{id = PurgeDocId, body = Body}, - Resp = - try couch_db:update_doc(Db, Doc, []) of - Resp0 -> Resp0 - catch - T:R -> - {T, R} - end, - rexi:reply(Resp); + case purge_checkpoint_updated(Db, PurgeDocId, Body) of + true -> + % Checkpoint on the target updated while source pulled the + % changes. Do not update the doc then to avoid rewinding + % back. + rexi:reply({ok, stale}); + false -> + Doc = #doc{id = PurgeDocId, body = Body}, + rexi:reply( + try + couch_db:update_doc(Db, Doc, []) + catch + T:R -> {T, R} + end + ) + end; Error -> rexi:reply(Error) end. +purge_checkpoint_updated(Db, DocId, {Props}) when is_binary(DocId), is_list(Props) -> + Seq = couch_util:get_value(<<"purge_seq">>, Props), + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{body = {DocProps}}} -> + DocSeq = couch_util:get_value(<<"purge_seq">>, DocProps), + is_integer(Seq) andalso is_integer(DocSeq) andalso DocSeq > Seq; + {not_found, _} -> + false + end. + replicate_rpc(DbName, Target) -> rexi:reply( try diff --git a/src/mem3/test/eunit/mem3_rep_test.erl b/src/mem3/test/eunit/mem3_rep_test.erl index 814fd11b2..0d081a014 100644 --- a/src/mem3/test/eunit/mem3_rep_test.erl +++ b/src/mem3/test/eunit/mem3_rep_test.erl @@ -31,6 +31,7 @@ setup() -> create_db(PartSrc, [{q, 1}, {n, 1}, {props, PartProps}]), create_db(PartTgt, [{q, 2}, {n, 1}, {props, PartProps}]), create_local_db(Localdb), + meck:new(mem3, [passthrough]), #{ allsrc => AllSrc, alltgt => AllTgt, @@ -40,6 +41,7 @@ setup() -> }. teardown(#{} = Dbs) -> + meck:unload(), maps:map( fun (localdb, Db) -> delete_local_db(Db); @@ -71,7 +73,8 @@ mem3_reshard_db_test_() -> ?TDEF_FE(replicate_low_batch_count, ?TIMEOUT), ?TDEF_FE(replicate_with_partitions, ?TIMEOUT), ?TDEF_FE(replicate_to_and_from_local, ?TIMEOUT), - ?TDEF_FE(replicate_with_purges, ?TIMEOUT) + ?TDEF_FE(replicate_with_purges, ?TIMEOUT), + ?TDEF_FE(clean_purge_checkpoints, ?TIMEOUT) ] } } @@ -173,6 +176,110 @@ replicate_with_purges(#{allsrc := AllSrc, alltgt := AllTgt}) -> ?assertEqual(#{}, SDocs), ?assertEqual(#{}, get_all_docs(AllTgt)). +clean_purge_checkpoints(#{allsrc := AllSrc, alltgt := AllTgt}) -> + DocSpec = #{docs => 10, delete => [5, 9], purge => [2, 4]}, + add_test_docs(AllSrc, DocSpec), + % Add and purge some docs on target to excercise the pull_purges code path + add_test_docs(AllTgt, #{docs => 3, purge => [0, 2]}), + [Src] = lists:sort(mem3:local_shards(AllSrc)), + [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)), + #shard{name = SrcName} = Src, + + % Since we don't have multiple nodes running and are just replicating + % from one clustered db to another, we need to patch up the shard map + % during the replication so it looks targets are part of the shard maps + meck:expect(mem3, shards, fun(DbName) -> + case DbName == Src#shard.dbname of + true -> [Src, Tgt1, Tgt2]; + false -> meck:passthrough([DbName]) + end + end), + + FakeTarget = '[email protected]', + + % Add a mix of stale, invalid or deprecated purge checkpoints + [Uuid1, Uuid2, Uuid3] = [couch_uuids:random() || _ <- lists:seq(1, 3)], + + CheckpointIds = couch_util:with_db(SrcName, fun(Db) -> + Uuid = couch_db:get_uuid(Db), + Docs = [ + % This one is ok and should not be cleaned up + #doc{ + id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid1/binary>>, + body = + {[ + {<<"type">>, <<"internal_replicator">>}, + {<<"updated_on">>, os:system_time(second)}, + {<<"purge_seq">>, 10042}, + {<<"source">>, atom_to_binary(Src#shard.node, latin1)}, + {<<"target">>, atom_to_binary(Tgt1#shard.node, latin1)}, + {<<"range">>, Tgt1#shard.range} + ]} + }, + % Non-existent range. Should be cleaned up. + #doc{ + id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid2/binary>>, + body = + {[ + {<<"type">>, <<"internal_replicator">>}, + {<<"updated_on">>, os:system_time(second)}, + {<<"purge_seq">>, 10043}, + {<<"source">>, atom_to_binary(Src#shard.node, latin1)}, + {<<"target">>, atom_to_binary(Tgt1#shard.node, latin1)}, + {<<"range">>, [0, 1]} + ]} + }, + % Non-existent target. Shoudl be cleaned up. + #doc{ + id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid3/binary>>, + body = + {[ + {<<"type">>, <<"internal_replicator">>}, + {<<"updated_on">>, os:system_time(second)}, + {<<"purge_seq">>, 10044}, + {<<"source">>, atom_to_binary(Src#shard.node, latin1)}, + {<<"target">>, atom_to_binary(FakeTarget, latin1)}, + {<<"range">>, Tgt1#shard.range} + ]} + }, + % Deprecated checkpoint format. Should be cleaned up. + #doc{ + id = <<"_local/purge-mem3-", Uuid1/binary, "-", Uuid/binary>>, + body = + {[ + {<<"type">>, <<"internal_replicator">>}, + {<<"updated_on">>, os:system_time(second)}, + {<<"purge_seq">>, 10045}, + {<<"source">>, atom_to_binary(Src#shard.node, latin1)}, + {<<"target">>, atom_to_binary(Tgt1#shard.node, latin1)}, + {<<"range">>, Tgt1#shard.range} + ]} + } + ], + {ok, _} = couch_db:update_docs(Db, Docs, []), + [Id || #doc{id = Id} <- Docs] + end), + + #shard{range = R1} = Tgt1, + #shard{range = R2} = Tgt2, + TMap = #{R1 => Tgt1, R2 => Tgt2}, + Opts = [{batch_size, 1000}, {batch_count, all}], + ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)), + + SDocs = get_all_docs(AllSrc), + % Purges from the target should have been pulled and removed docs 0,1,2. + % Source should have no live docs. + ?assertEqual(#{}, SDocs), + ?assertEqual(#{}, get_all_docs(AllTgt)), + + % From the purge checkpoint doc ids we only expect the first one to survive + [Id1, Id2, Id3, Id4] = CheckpointIds, + LocalDocs = local_docs(SrcName), + ?assert(is_map_key(Id1, LocalDocs)), + ?assertNot(is_map_key(Id2, LocalDocs)), + ?assertNot(is_map_key(Id3, LocalDocs)), + ?assertNot(is_map_key(Id4, LocalDocs)). + replicate_to_and_from_local(#{localdb := LocalDb, allsrc := ClusteredDb}) -> % We'll just tests that we can pull purges from the target add_test_docs(ClusteredDb, #{docs => 6, purge => [0, 2]}), @@ -381,3 +488,13 @@ atts(Size) when is_integer(Size), Size >= 1 -> {data, Data} ]) ]. + +local_docs(DbName) -> + {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), + FoldFun = fun(#doc{id = DocId, body = Body}, Acc) -> + Map = ?JSON_DECODE(?JSON_ENCODE(Body), [return_maps]), + {ok, Acc#{DocId => Map}} + end, + {ok, Res} = couch_db:fold_local_docs(Db, FoldFun, #{}, []), + couch_db:close(Db), + Res.
