This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch fix-purge-checkpoint-creation-race-condition
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 70e863bcc12960fad4a661af45237cd0b6768e7d
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Dec 18 03:36:41 2025 -0500

    Fix race condition during purge checkpoint creation
    
    Previously, when the purge checkpoints were first created concurrently with
    compaction running, it was possible for compaction to finish first and 
remove
    too many purge infos before the internal replicator checkpointed. In that 
case
    we could end up with a "hole" between a minimum (checkpointed) purge 
sequence,
    and the oldest purge sequence. Subsequently, internal replicator would start
    crashing since when fetching the minimum purge sequence it will correctly
    detect that one of the purge clients is asking for a sequence that's too
    low (that is it "skipped" and hasn't processed intermediate purge 
sequences).
    The tell-tale sign of this in production is repeated 
`invalid_start_purge_seq`
    errors emitted in the logs. One way to get out of would be to delete the
    checkpoints docs and let them be re-created.
    
    To fix the race condition, when compaction starts check if all the expected
    checkpoints from the other shard copies are created first, and only then use
    the minimum version, otherwise use the oldest purge sequence version.
---
 src/couch/src/couch_bt_engine_compactor.erl |  15 ++-
 src/mem3/src/mem3_rep.erl                   | 165 +++++++++++++++++++++++++++-
 2 files changed, 177 insertions(+), 3 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl 
b/src/couch/src/couch_bt_engine_compactor.erl
index 85d33cf95..2414285b4 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -163,7 +163,20 @@ copy_purge_info(#comp_st{} = CompSt) ->
     % stale or deprecated internal replicator checkpoints beforehand.
     ok = mem3_rep:cleanup_purge_checkpoints(DbName),
     MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
-        couch_db:get_minimum_purge_seq(Db)
+        % If we don't (yet) have all the expected internal replicator purge
+        % checkpoints, use the oldest purge sequence instead of the minimum.
+        % This is to avoid the removing some purge infos too early before the
+        % checkpoint is created. For example, if the oldest sequence = 1,
+        % minimum sequence = 1000, and current purge sequence = 2000, we can
+        % compact and remove all the purge infos from 1 to 1000. While
+        % compaction happens, a checkpoint is created with sequence = 500. In
+        % that case we'd end up with a "hole" between 500 and 1001 -- a new
+        % minimum purge sequence of 500, but the oldest checkpoint is would be
+        % 1001.
+        case mem3_rep:have_all_purge_checkpoints(Db) of
+            true -> couch_db:get_minimum_purge_seq(Db);
+            false -> couch_db:get_oldest_purge_seq(Db)
+        end
     end),
     OldPSTree = OldSt#st.purge_seq_tree,
     StartSeq = couch_bt_engine:get_purge_seq(NewSt) + 1,
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 38a940e49..4345d792d 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -20,6 +20,7 @@
     make_purge_id/2,
     verify_purge_checkpoint/2,
     cleanup_purge_checkpoints/1,
+    have_all_purge_checkpoints/1,
     find_source_seq/4,
     find_split_target_seq/4,
     local_id_hash/1
@@ -207,11 +208,59 @@ cleanup_purge_checkpoints(Db) ->
                 {stop, Acc}
         end
     end,
-    Opts = [{start_key, list_to_binary(?PURGE_PREFIX)}],
-    {ok, ToDelete} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
+    ToDelete = fold_purge_checkpoints(Db, FoldFun, []),
     DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end,
     lists:foreach(DeleteFun, ToDelete).
 
+% Check if we have all the internal replicator purge checkpoints. Call this
+% before compaction starts to avoid removing purge infos before the internal
+% replicator has managed to create the first checkpoint.
+%
+have_all_purge_checkpoints(ShardName) when is_binary(ShardName) ->
+    couch_util:with_db(ShardName, fun(Db) -> have_all_purge_checkpoints(Db) 
end);
+have_all_purge_checkpoints(Db) ->
+    Shards = shards(couch_db:name(Db)),
+    ReplicatePurges = config:get_boolean("mem3", "replicate_purges", true),
+    have_all_purge_checkpoints(ReplicatePurges, Db, Shards).
+
+have_all_purge_checkpoints(true, Db, [_ | _] = Shards) ->
+    ShardName = couch_db:name(Db),
+    UUID = couch_db:get_uuid(Db),
+    FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+        case Id of
+            <<?PURGE_PREFIX, UUID:?UUID_SIZE/binary, "-", 
_:?UUID_SIZE/binary>> ->
+                case verify_checkpoint_shard(Shards, Props) of
+                    true ->
+                        Range = couch_util:get_value(<<"range">>, Props),
+                        TBin = couch_util:get_value(<<"target">>, Props),
+                        TNode = binary_to_existing_atom(TBin, latin1),
+                        {ok, sets:add_element({TNode, Range}, Acc)};
+                    false ->
+                        {ok, Acc}
+                end;
+            _ ->
+                {stop, Acc}
+        end
+    end,
+    Checkpoints = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()),
+    % Keep only shard copies. These are not necessarily ones with a matching
+    % ranges but also overlapping ranges, since the shards may have been split.
+    SrcRange = mem3:range(ShardName),
+    IsCopy = fun(#shard{name = Name, node = Node, range = Range}) ->
+        Name =/= ShardName andalso
+            Node =/= config:node_name() andalso
+            mem3_util:range_overlap(SrcRange, Range)
+    end,
+    Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards, IsCopy(S)],
+    Copies1 = couch_util:set_from_list(Copies),
+    sets:size(sets:subtract(Copies1, Checkpoints)) == 0;
+have_all_purge_checkpoints(false, _Db, _Shards) ->
+    % If purges are not replicated then we assume we have all (0) checkpoints.
+    true;
+have_all_purge_checkpoints(_, _Db, []) ->
+    % For a unsharded db we also assume we have all (0) checkpoints.
+    true.
+
 delete_checkpoint(Db, DocId) ->
     DbName = couch_db:name(Db),
     LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s",
@@ -229,6 +278,11 @@ delete_checkpoint(Db, DocId) ->
             ok
     end.
 
+fold_purge_checkpoints(Db, FoldFun, Acc0) ->
+    Opts = [{start_key, list_to_binary(?PURGE_PREFIX)}],
+    {ok, Acc1} = couch_db:fold_local_docs(Db, FoldFun, Acc0, Opts),
+    Acc1.
+
 verify_purge_checkpoint(DbName, Props) ->
     try
         case couch_util:get_value(<<"type">>, Props) of
@@ -1232,4 +1286,111 @@ target_not_in_shard_map(_) ->
     ?assertEqual(1, map_size(Map)),
     ?assertMatch(#{R0f := #shard{name = Name, node = 'n3'}}, Map).
 
+purge_checkpoints_test_() ->
+    {
+        foreach,
+        fun() ->
+            Ctx = test_util:start_couch([mem3, fabric]),
+            config:set("mem3", "replicate_purges", "true", false),
+            meck:new(mem3, [passthrough]),
+            meck:expect(mem3, nodes, 0, [node(), n2, n3]),
+            Ctx
+        end,
+        fun(Ctx) ->
+            meck:unload(),
+            config:delete("mem3", "replicate_purges", false),
+            test_util:stop_couch(Ctx)
+        end,
+        [
+            ?TDEF_FE(t_not_sharded),
+            ?TDEF_FE(t_purges_not_replicated),
+            ?TDEF_FE(t_have_all_checkpoints)
+        ]
+    }.
+
+t_not_sharded(_) ->
+    meck:expect(mem3, shards, 1, meck:raise(error, database_does_not_exist)),
+    Name = <<"mem3_rep_test", (couch_uuids:random())/binary>>,
+    {ok, Db} = couch_server:create(Name, [?ADMIN_CTX]),
+    couch_db:close(Db),
+    ?assert(have_all_purge_checkpoints(Name)),
+    ok = couch_server:delete(Name, [?ADMIN_CTX]).
+
+t_purges_not_replicated(_) ->
+    R07 = [16#00000000, 16#7fffffff],
+    R8f = [16#80000000, 16#ffffffff],
+    R0f = [16#00000000, 16#ffffffff],
+
+    Shards = [
+        #shard{node = node(), range = R07},
+        #shard{node = node(), range = R8f},
+        #shard{node = 'n2', range = R07},
+        #shard{node = 'n2', range = R8f},
+        #shard{node = 'n3', range = R0f}
+    ],
+    meck:expect(mem3, shards, 1, Shards),
+
+    SrcName = <<"shards/00000000-7fffffff/d.1551893550">>,
+    {ok, Db} = couch_server:create(SrcName, [?ADMIN_CTX]),
+    couch_db:close(Db),
+    ?assert(not have_all_purge_checkpoints(SrcName)),
+    config:set("mem3", "replicate_purges", "false", false),
+    ?assert(have_all_purge_checkpoints(SrcName)),
+    ok = couch_server:delete(SrcName, [?ADMIN_CTX]).
+
+t_have_all_checkpoints(_) ->
+    R07 = [16#00000000, 16#7fffffff],
+    R8f = [16#80000000, 16#ffffffff],
+    R0f = [16#00000000, 16#ffffffff],
+
+    Shards = [
+        #shard{node = node(), range = R07},
+        #shard{node = node(), range = R8f},
+        #shard{node = 'n2', range = R07},
+        #shard{node = 'n2', range = R8f},
+        #shard{node = 'n3', range = R0f}
+    ],
+    meck:expect(mem3, shards, 1, Shards),
+
+    SrcName = <<"shards/00000000-7fffffff/d.1551893551">>,
+    TgtName1 = <<"shards/00000000-7fffffff/d.1551893551">>,
+    TgtName2 = <<"shards/80000000-ffffffff/d.1551893551">>,
+    TgtName3 = <<"shards/00000000-ffffffff/d.1551893551">>,
+
+    Src1 = #shard{name = SrcName, node = node(), range = R07},
+    Tgt1 = #shard{name = TgtName1, node = 'n2', range = R07},
+    Tgt2 = #shard{name = TgtName2, node = 'n2', range = R8f},
+    Tgt3 = #shard{name = TgtName3, node = 'n3', range = R0f},
+
+    {ok, Db} = couch_server:create(SrcName, [?ADMIN_CTX]),
+    SrcUuid = couch_db:get_uuid(Db),
+
+    TgtUuid1 = couch_uuids:random(),
+    % <<"875ce187a5c0f36ee75896d74d10300c">>,
+    Body1 = purge_cp_body(Src1, Tgt1, 42),
+    DocId1 = make_purge_id(SrcUuid, TgtUuid1),
+    Doc1 = #doc{id = DocId1, body = Body1},
+    {ok, _} = couch_db:update_doc(Db, Doc1, [?ADMIN_CTX]),
+    % Not enough checkpoints
+    ?assert(not have_all_purge_checkpoints(SrcName)),
+
+    Body2 = purge_cp_body(Src1, Tgt2, 43),
+    TgtUuid2 = couch_uuids:random(),
+    DocId2 = make_purge_id(SrcUuid, TgtUuid2),
+    Doc2 = #doc{id = DocId2, body = Body2},
+    {ok, _} = couch_db:update_doc(Db, Doc2, [?ADMIN_CTX]),
+    % Still not enough checkpoints
+    ?assert(not have_all_purge_checkpoints(SrcName)),
+
+    Body3 = purge_cp_body(Src1, Tgt3, 44),
+    TgtUuid3 = couch_uuids:random(),
+    DocId3 = make_purge_id(SrcUuid, TgtUuid3),
+    Doc3 = #doc{id = DocId3, body = Body3},
+    {ok, _} = couch_db:update_doc(Db, Doc3, [?ADMIN_CTX]),
+    % Now should have all the checkpoints
+    ?assert(have_all_purge_checkpoints(SrcName)),
+
+    couch_db:close(Db),
+    ok = couch_server:delete(SrcName, [?ADMIN_CTX]).
+
 -endif.

Reply via email to