This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch tseq
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6d1c67cda54b1e17481b69a6d0f9b88800a6fdf3
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jul 21 10:53:01 2025 -0400

    Add time-seq to the db header and update it on commmit
    
    Since time-seq is fixed size, with less than 500 bytes when serialized, 
handle
    it like we handle the epochs in the header. That is simpler than having a 
new
    btree, or having to juggle file term pointers. When we write the 4KB db 
header
    block most of it is empty, so we'll use a few more hundreds bytes from there
    for time-seq data structure.
    
    This change is downgrade-safe because it's backwards compatible with 
previous
    supported disk format versions. It's possible to safely downgrade to a 
previous
    version before this feature was added. That is achieved by re-using a very 
old
    field from the header that was set to 0 for many years. Downgraded versions
    will simply ignore the new data structure. This means we don't to run
    compaction to upgrade anything, or create an extra intermediate release 
version
    in between to allow for safe downgrades.
    
    During shard splitting, since we're preserving the sequence numbers during 
the
    bulk copy (they just becomes sparser), we can just copy the time-seq data
    structure as is to the targets.
    
    Another case when it's important to preserve the time-seq data structure is
    during compaction. On compaction we don't update the struture as we're
    compacting, since the update happens only in couch_db_updater, we just have 
to
    ensure we copy it to the new file during the swap.
---
 src/couch/src/couch_bt_engine.erl                  | 13 +++++-
 src/couch/src/couch_bt_engine_header.erl           | 46 ++++++++++++++++++----
 src/couch/src/couch_db.erl                         | 17 ++++++++
 src/couch/src/couch_db_engine.erl                  | 24 +++++++++++
 src/couch/src/couch_db_split.erl                   |  4 +-
 src/couch/src/couch_db_updater.erl                 | 18 +++++++--
 .../eunit/couch_bt_engine_compactor_ev_tests.erl   | 37 ++++++++++++++---
 7 files changed, 139 insertions(+), 20 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl 
b/src/couch/src/couch_bt_engine.erl
index 88b0de98f..7d26b8dd8 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -64,6 +64,9 @@
     purge_docs/3,
     copy_purge_infos/2,
 
+    get_time_seq/1,
+    set_time_seq/2,
+
     commit_data/1,
 
     open_write_stream/2,
@@ -551,6 +554,13 @@ copy_purge_infos(#st{} = St, PurgeInfos) ->
         needs_commit = true
     }}.
 
+get_time_seq(#st{header = Header}) ->
+    couch_bt_engine_header:time_seq(Header).
+
+set_time_seq(#st{header = Header} = St, TimeSeq) ->
+    NewHeader = couch_bt_engine_header:set(Header, time_seq, TimeSeq),
+    {ok, St#st{header = NewHeader}}.
+
 commit_data(St) ->
     #st{
         fd = Fd,
@@ -1190,7 +1200,8 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
         header = couch_bt_engine_header:set(Header, [
             {compacted_seq, get_update_seq(OldSt)},
             {revs_limit, get_revs_limit(OldSt)},
-            {purge_infos_limit, get_purge_infos_limit(OldSt)}
+            {purge_infos_limit, get_purge_infos_limit(OldSt)},
+            {time_seq, get_time_seq(OldSt)}
         ]),
         local_tree = NewLocal2
     }),
diff --git a/src/couch/src/couch_bt_engine_header.erl 
b/src/couch/src/couch_bt_engine_header.erl
index 3581b1e39..2ae08dea4 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -38,7 +38,8 @@
     revs_limit/1,
     uuid/1,
     epochs/1,
-    compacted_seq/1
+    compacted_seq/1,
+    time_seq/1
 ]).
 
 -include_lib("stdlib/include/assert.hrl").
@@ -58,7 +59,7 @@
 -record(db_header, {
     disk_version = ?LATEST_DISK_VERSION,
     update_seq = 0,
-    unused = 0,
+    time_seq,
     id_tree_state = nil,
     seq_tree_state = nil,
     local_tree_state = nil,
@@ -79,7 +80,8 @@
 new() ->
     #db_header{
         uuid = couch_uuids:random(),
-        epochs = [{config:node_name(), 0}]
+        epochs = [{config:node_name(), 0}],
+        time_seq = couch_time_seq:new()
     }.
 
 from(Header0) ->
@@ -87,7 +89,8 @@ from(Header0) ->
     #db_header{
         uuid = Header#db_header.uuid,
         epochs = Header#db_header.epochs,
-        compacted_seq = Header#db_header.compacted_seq
+        compacted_seq = Header#db_header.compacted_seq,
+        time_seq = Header#db_header.time_seq
     }.
 
 is_header(Header) ->
@@ -105,7 +108,8 @@ upgrade(Header) ->
         fun upgrade_disk_version/1,
         fun upgrade_uuid/1,
         fun upgrade_epochs/1,
-        fun upgrade_compacted_seq/1
+        fun upgrade_compacted_seq/1,
+        fun upgrade_time_seq/1
     ],
     lists:foldl(
         fun(F, HdrAcc) ->
@@ -176,6 +180,9 @@ epochs(Header) ->
 compacted_seq(Header) ->
     get_field(Header, compacted_seq).
 
+time_seq(Header) ->
+    get_field(Header, time_seq).
+
 purge_infos_limit(Header) ->
     get_field(Header, purge_infos_limit).
 
@@ -330,6 +337,19 @@ upgrade_compacted_seq(#db_header{} = Header) ->
             Header
     end.
 
+upgrade_time_seq(#db_header{} = Header) ->
+    MaybeTSeq = Header#db_header.time_seq,
+    case couch_time_seq:check(MaybeTSeq) of
+        true ->
+            TSeq = couch_time_seq:new(MaybeTSeq),
+            Header#db_header{time_seq = TSeq};
+        false ->
+            TSeq = couch_time_seq:new(),
+            UpdateSeq = update_seq(Header),
+            TSeq1 = couch_time_seq:update(TSeq, UpdateSeq),
+            Header#db_header{time_seq = TSeq1}
+    end.
+
 latest(?LATEST_DISK_VERSION) ->
     true;
 latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION ->
@@ -338,7 +358,8 @@ latest(_Else) ->
     undefined.
 
 -ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
+
+-include_lib("couch/include/couch_eunit.hrl").
 
 mk_header(Vsn) ->
     {
@@ -348,8 +369,8 @@ mk_header(Vsn) ->
         Vsn,
         % update_seq
         100,
-        % unused
-        0,
+        % time_seq
+        couch_time_seq:new(),
         % id_tree_state
         foo,
         % seq_tree_state
@@ -478,4 +499,13 @@ get_epochs_from_old_header_test() ->
     Vsn5Header = mk_header(5),
     ?assertEqual(undefined, epochs(Vsn5Header)).
 
+upgrade_time_seq_test() ->
+    Header = mk_header(8),
+    ?assert(couch_time_seq:check(time_seq(Header))),
+    % time_seq's field was reused from an old field whichj was set to 0 so
+    % check that we can upgrade from 0
+    HeaderWith0Unused = setelement(4, Header, 0),
+    Upgrade = upgrade(HeaderWith0Unused),
+    ?assert(couch_time_seq:check(time_seq(Upgrade))).
+
 -endif.
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 3100ecdc7..285f60a8c 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -56,6 +56,10 @@
     get_oldest_purge_seq/1,
     get_purge_infos_limit/1,
 
+    time_seq_since/2,
+    get_time_seq/1,
+    set_time_seq/2,
+
     is_db/1,
     is_system_db/1,
     is_clustered/1,
@@ -568,6 +572,19 @@ get_oldest_purge_seq(#db{} = Db) ->
 get_purge_infos_limit(#db{} = Db) ->
     couch_db_engine:get_purge_infos_limit(Db).
 
+time_seq_since(#db{} = Db, Time) when is_integer(Time), Time >= 0 ->
+    TSeq = couch_db_engine:get_time_seq(Db),
+    couch_time_seq:since(TSeq, Time).
+
+get_time_seq(#db{} = Db) ->
+    couch_db_engine:get_time_seq(Db).
+
+set_time_seq(#db{main_pid = Pid} = Db, #{} = TSeq) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {set_time_seq, TSeq}, infinity);
+set_time_seq(_Db, _TSeq) ->
+    throw(invalid_time_seq).
+
 get_pid(#db{main_pid = Pid}) ->
     Pid.
 
diff --git a/src/couch/src/couch_db_engine.erl 
b/src/couch/src/couch_db_engine.erl
index 54f2c1482..1a6e05498 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -232,6 +232,11 @@
 % Get the current properties.
 -callback get_props(DbHandle :: db_handle()) -> Props :: [any()].
 
+% Return the couch_time_seq structure. That is a small, fixed size,
+% exponentially decaying set of time bins, mapping rough time instervals to db
+% update sequences. Use couch_time_seq to access and update the data structure.
+-callback get_time_seq(DbHandle :: db_handle()) -> TimeSeq :: any().
+
 % This information is displayed in the database info poperties. It
 % should just be a list of {Name::atom(), Size::non_neg_integer()}
 % tuples that will then be combined across shards. Currently,
@@ -290,6 +295,14 @@
 -callback set_props(DbHandle :: db_handle(), Props :: any()) ->
     {ok, NewDbHandle :: db_handle()}.
 
+% This function is only called by couch_db_updater and
+% couch_db_split, as is guaranteed to be a single threaded call. The
+% database should simply store provided TimeSeq struct in the
+% header somewhere.
+
+-callback set_time_seq(DbHandle :: db_handle(), TimeSeq :: any()) ->
+    {ok, NewDbHandle :: db_handle()}.
+
 % Set the current update sequence of the database. The intention is to use this
 % when copying a database such that the destination update sequence should
 % match exactly the source update sequence.
@@ -677,6 +690,7 @@
     get_revs_limit/1,
     get_security/1,
     get_props/1,
+    get_time_seq/1,
     get_size_info/1,
     get_partition_info/2,
     get_update_seq/1,
@@ -686,6 +700,7 @@
     set_security/2,
     set_purge_infos_limit/2,
     set_props/2,
+    set_time_seq/2,
 
     set_update_seq/2,
 
@@ -821,6 +836,10 @@ get_revs_limit(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     Engine:get_revs_limit(EngineState).
 
+get_time_seq(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:get_time_seq(EngineState).
+
 get_security(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     Engine:get_security(EngineState).
@@ -865,6 +884,11 @@ set_props(#db{} = Db, Props) ->
     {ok, NewSt} = Engine:set_props(EngineState, Props),
     {ok, Db#db{engine = {Engine, NewSt}}}.
 
+set_time_seq(#db{} = Db, TimeSeq) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:set_time_seq(EngineState, TimeSeq),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
 set_update_seq(#db{} = Db, UpdateSeq) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:set_update_seq(EngineState, UpdateSeq),
diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
index bd325d980..3c01e576b 100644
--- a/src/couch/src/couch_db_split.erl
+++ b/src/couch/src/couch_db_split.erl
@@ -319,12 +319,14 @@ copy_meta(#state{source_db = SourceDb, targets = Targets} 
= State) ->
     RevsLimit = couch_db:get_revs_limit(SourceDb),
     {SecProps} = couch_db:get_security(SourceDb),
     PurgeLimit = couch_db:get_purge_infos_limit(SourceDb),
+    TimeSeq = couch_db:get_time_seq(SourceDb),
     Targets1 = maps:map(
         fun(_, #target{db = Db} = T) ->
             {ok, Db1} = couch_db_engine:set_revs_limit(Db, RevsLimit),
             {ok, Db2} = couch_db_engine:set_security(Db1, SecProps),
             {ok, Db3} = couch_db_engine:set_purge_infos_limit(Db2, PurgeLimit),
-            T#target{db = Db3}
+            {ok, Db4} = couch_db_engine:set_time_seq(Db3, TimeSeq),
+            T#target{db = Db4}
         end,
         Targets
     ),
diff --git a/src/couch/src/couch_db_updater.erl 
b/src/couch/src/couch_db_updater.erl
index 3f6c8886d..a014bd47f 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -92,6 +92,11 @@ handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
     {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
     ok = couch_server:db_updated(Db2),
     {reply, ok, Db2};
+handle_call({set_time_seq, TSeq}, _From, Db) ->
+    {ok, Db1} = couch_db_engine:set_time_seq(Db, TSeq),
+    {ok, Db2} = couch_db_engine:commit_data(Db1),
+    ok = couch_server:db_updated(Db2),
+    {reply, ok, Db2};
 handle_call({purge_docs, [], _}, _From, Db) ->
     {reply, {ok, []}, Db};
 handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
@@ -876,11 +881,16 @@ apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) 
->
     NewReplies = [{ok, RemovedRevs} | Replies],
     apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies).
 
+update_time_seq(Db, UpdateSeq) ->
+    TSeq = couch_db_engine:get_time_seq(Db),
+    TSeq1 = couch_time_seq:update(TSeq, UpdateSeq),
+    couch_db_engine:set_time_seq(Db, TSeq1).
+
 commit_data(Db) ->
-    {ok, Db1} = couch_db_engine:commit_data(Db),
-    Db1#db{
-        committed_update_seq = couch_db_engine:get_update_seq(Db)
-    }.
+    UpdateSeq = couch_db_engine:get_update_seq(Db),
+    {ok, Db1} = update_time_seq(Db, UpdateSeq),
+    {ok, Db2} = couch_db_engine:commit_data(Db1),
+    Db2#db{committed_update_seq = UpdateSeq}.
 
 pair_write_info(Old, New) ->
     lists:map(
diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl 
b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
index 007c74d06..c81a4acc4 100644
--- a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
+++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
@@ -20,6 +20,8 @@
 -define(EV_MOD, couch_bt_engine_compactor_ev).
 -define(INIT_DOCS, 2500).
 -define(WRITE_DOCS, 20).
+-define(TSEQ_BOGUS_TIME, "3000-01-01T00:00:00Z").
+-define(TSEQ_BOGUS_SEQ, 4242).
 
 % The idea behind the tests in this module are to attempt to
 % cover the number of restart/recopy events during compaction
@@ -123,12 +125,16 @@ teardown(Ctx) ->
 start_empty_db_test(_Event) ->
     ?EV_MOD:clear(),
     DbName = ?tempdb(),
-    {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    % Create a predictable time seq entry in the far future
+    % so we can test it's going to be carried along to the
+    % new compaction target and not get reset
+    ok = add_bogus_time_seq(Db),
     DbName.
 
 start_populated_db_test(Event) ->
     DbName = start_empty_db_test(Event),
-    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
     try
         populate_db(Db, ?INIT_DOCS)
     after
@@ -228,7 +234,7 @@ run_static_init(Event, DbName) ->
 run_static(Event, DbName) ->
     {ok, ContinueFun} = ?EV_MOD:set_wait(init),
     {ok, Reason} = ?EV_MOD:set_crash(Event),
-    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
     Ref = couch_db:monitor(Db),
     {ok, CPid} = couch_db:start_compact(Db),
     ContinueFun(CPid),
@@ -247,7 +253,7 @@ run_dynamic_init(Event, DbName) ->
 run_dynamic(Event, DbName) ->
     {ok, ContinueFun} = ?EV_MOD:set_wait(init),
     {ok, Reason} = ?EV_MOD:set_crash(Event),
-    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
     Ref = couch_db:monitor(Db),
     {ok, CPid} = couch_db:start_compact(Db),
     ok = populate_db(Db, 10),
@@ -262,7 +268,7 @@ run_dynamic(Event, DbName) ->
 run_successful_compaction(DbName) ->
     ?EV_MOD:clear(),
     {ok, ContinueFun} = ?EV_MOD:set_wait(init),
-    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
     {ok, CPid} = couch_db:start_compact(Db),
     Ref = erlang:monitor(process, CPid),
     ContinueFun(CPid),
@@ -296,6 +302,14 @@ wait_db_cleared(Db, N) ->
             end
     end.
 
+add_bogus_time_seq(Db) ->
+    TSeq = couch_db:get_time_seq(Db),
+    % Set some bogus future time we'll check that compaction
+    % knows how to copy it properly and not reset it during compaction
+    Time = calendar:rfc3339_to_system_time(?TSEQ_BOGUS_TIME),
+    TSeq1 = couch_time_seq:update(TSeq, Time, ?TSEQ_BOGUS_SEQ),
+    ok = couch_db:set_time_seq(Db, TSeq1).
+
 populate_db(_Db, NumDocs) when NumDocs =< 0 ->
     ok;
 populate_db(Db, NumDocs) ->
@@ -324,7 +338,18 @@ validate_compaction(Db) ->
     end,
     {ok, {_, LastCount}} = couch_db:fold_docs(Db, FoldFun, {<<>>, 0}),
     ?assertEqual(DocCount + DelDocCount, LastCount),
-    ?assertEqual(NumChanges, LastCount).
+    ?assertEqual(NumChanges, LastCount),
+    % If there were any updates to the db check that the time seq structure
+    % was preserved
+    TSeq = couch_db:get_time_seq(Db),
+    #{bins := Bins} = TSeq,
+    case NumChanges > 0 of
+        true ->
+            Time = calendar:rfc3339_to_system_time(?TSEQ_BOGUS_TIME),
+            ?assertEqual({Time, ?TSEQ_BOGUS_SEQ}, hd(Bins));
+        false ->
+            ?assertEqual([], Bins)
+    end.
 
 purge_module() ->
     case code:which(couch_db_updater) of

Reply via email to