This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch tseq in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6d1c67cda54b1e17481b69a6d0f9b88800a6fdf3 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Mon Jul 21 10:53:01 2025 -0400 Add time-seq to the db header and update it on commmit Since time-seq is fixed size, with less than 500 bytes when serialized, handle it like we handle the epochs in the header. That is simpler than having a new btree, or having to juggle file term pointers. When we write the 4KB db header block most of it is empty, so we'll use a few more hundreds bytes from there for time-seq data structure. This change is downgrade-safe because it's backwards compatible with previous supported disk format versions. It's possible to safely downgrade to a previous version before this feature was added. That is achieved by re-using a very old field from the header that was set to 0 for many years. Downgraded versions will simply ignore the new data structure. This means we don't to run compaction to upgrade anything, or create an extra intermediate release version in between to allow for safe downgrades. During shard splitting, since we're preserving the sequence numbers during the bulk copy (they just becomes sparser), we can just copy the time-seq data structure as is to the targets. Another case when it's important to preserve the time-seq data structure is during compaction. On compaction we don't update the struture as we're compacting, since the update happens only in couch_db_updater, we just have to ensure we copy it to the new file during the swap. --- src/couch/src/couch_bt_engine.erl | 13 +++++- src/couch/src/couch_bt_engine_header.erl | 46 ++++++++++++++++++---- src/couch/src/couch_db.erl | 17 ++++++++ src/couch/src/couch_db_engine.erl | 24 +++++++++++ src/couch/src/couch_db_split.erl | 4 +- src/couch/src/couch_db_updater.erl | 18 +++++++-- .../eunit/couch_bt_engine_compactor_ev_tests.erl | 37 ++++++++++++++--- 7 files changed, 139 insertions(+), 20 deletions(-) diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 88b0de98f..7d26b8dd8 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -64,6 +64,9 @@ purge_docs/3, copy_purge_infos/2, + get_time_seq/1, + set_time_seq/2, + commit_data/1, open_write_stream/2, @@ -551,6 +554,13 @@ copy_purge_infos(#st{} = St, PurgeInfos) -> needs_commit = true }}. +get_time_seq(#st{header = Header}) -> + couch_bt_engine_header:time_seq(Header). + +set_time_seq(#st{header = Header} = St, TimeSeq) -> + NewHeader = couch_bt_engine_header:set(Header, time_seq, TimeSeq), + {ok, St#st{header = NewHeader}}. + commit_data(St) -> #st{ fd = Fd, @@ -1190,7 +1200,8 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> header = couch_bt_engine_header:set(Header, [ {compacted_seq, get_update_seq(OldSt)}, {revs_limit, get_revs_limit(OldSt)}, - {purge_infos_limit, get_purge_infos_limit(OldSt)} + {purge_infos_limit, get_purge_infos_limit(OldSt)}, + {time_seq, get_time_seq(OldSt)} ]), local_tree = NewLocal2 }), diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index 3581b1e39..2ae08dea4 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -38,7 +38,8 @@ revs_limit/1, uuid/1, epochs/1, - compacted_seq/1 + compacted_seq/1, + time_seq/1 ]). -include_lib("stdlib/include/assert.hrl"). @@ -58,7 +59,7 @@ -record(db_header, { disk_version = ?LATEST_DISK_VERSION, update_seq = 0, - unused = 0, + time_seq, id_tree_state = nil, seq_tree_state = nil, local_tree_state = nil, @@ -79,7 +80,8 @@ new() -> #db_header{ uuid = couch_uuids:random(), - epochs = [{config:node_name(), 0}] + epochs = [{config:node_name(), 0}], + time_seq = couch_time_seq:new() }. from(Header0) -> @@ -87,7 +89,8 @@ from(Header0) -> #db_header{ uuid = Header#db_header.uuid, epochs = Header#db_header.epochs, - compacted_seq = Header#db_header.compacted_seq + compacted_seq = Header#db_header.compacted_seq, + time_seq = Header#db_header.time_seq }. is_header(Header) -> @@ -105,7 +108,8 @@ upgrade(Header) -> fun upgrade_disk_version/1, fun upgrade_uuid/1, fun upgrade_epochs/1, - fun upgrade_compacted_seq/1 + fun upgrade_compacted_seq/1, + fun upgrade_time_seq/1 ], lists:foldl( fun(F, HdrAcc) -> @@ -176,6 +180,9 @@ epochs(Header) -> compacted_seq(Header) -> get_field(Header, compacted_seq). +time_seq(Header) -> + get_field(Header, time_seq). + purge_infos_limit(Header) -> get_field(Header, purge_infos_limit). @@ -330,6 +337,19 @@ upgrade_compacted_seq(#db_header{} = Header) -> Header end. +upgrade_time_seq(#db_header{} = Header) -> + MaybeTSeq = Header#db_header.time_seq, + case couch_time_seq:check(MaybeTSeq) of + true -> + TSeq = couch_time_seq:new(MaybeTSeq), + Header#db_header{time_seq = TSeq}; + false -> + TSeq = couch_time_seq:new(), + UpdateSeq = update_seq(Header), + TSeq1 = couch_time_seq:update(TSeq, UpdateSeq), + Header#db_header{time_seq = TSeq1} + end. + latest(?LATEST_DISK_VERSION) -> true; latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION -> @@ -338,7 +358,8 @@ latest(_Else) -> undefined. -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). + +-include_lib("couch/include/couch_eunit.hrl"). mk_header(Vsn) -> { @@ -348,8 +369,8 @@ mk_header(Vsn) -> Vsn, % update_seq 100, - % unused - 0, + % time_seq + couch_time_seq:new(), % id_tree_state foo, % seq_tree_state @@ -478,4 +499,13 @@ get_epochs_from_old_header_test() -> Vsn5Header = mk_header(5), ?assertEqual(undefined, epochs(Vsn5Header)). +upgrade_time_seq_test() -> + Header = mk_header(8), + ?assert(couch_time_seq:check(time_seq(Header))), + % time_seq's field was reused from an old field whichj was set to 0 so + % check that we can upgrade from 0 + HeaderWith0Unused = setelement(4, Header, 0), + Upgrade = upgrade(HeaderWith0Unused), + ?assert(couch_time_seq:check(time_seq(Upgrade))). + -endif. diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 3100ecdc7..285f60a8c 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -56,6 +56,10 @@ get_oldest_purge_seq/1, get_purge_infos_limit/1, + time_seq_since/2, + get_time_seq/1, + set_time_seq/2, + is_db/1, is_system_db/1, is_clustered/1, @@ -568,6 +572,19 @@ get_oldest_purge_seq(#db{} = Db) -> get_purge_infos_limit(#db{} = Db) -> couch_db_engine:get_purge_infos_limit(Db). +time_seq_since(#db{} = Db, Time) when is_integer(Time), Time >= 0 -> + TSeq = couch_db_engine:get_time_seq(Db), + couch_time_seq:since(TSeq, Time). + +get_time_seq(#db{} = Db) -> + couch_db_engine:get_time_seq(Db). + +set_time_seq(#db{main_pid = Pid} = Db, #{} = TSeq) -> + check_is_admin(Db), + gen_server:call(Pid, {set_time_seq, TSeq}, infinity); +set_time_seq(_Db, _TSeq) -> + throw(invalid_time_seq). + get_pid(#db{main_pid = Pid}) -> Pid. diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 54f2c1482..1a6e05498 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -232,6 +232,11 @@ % Get the current properties. -callback get_props(DbHandle :: db_handle()) -> Props :: [any()]. +% Return the couch_time_seq structure. That is a small, fixed size, +% exponentially decaying set of time bins, mapping rough time instervals to db +% update sequences. Use couch_time_seq to access and update the data structure. +-callback get_time_seq(DbHandle :: db_handle()) -> TimeSeq :: any(). + % This information is displayed in the database info poperties. It % should just be a list of {Name::atom(), Size::non_neg_integer()} % tuples that will then be combined across shards. Currently, @@ -290,6 +295,14 @@ -callback set_props(DbHandle :: db_handle(), Props :: any()) -> {ok, NewDbHandle :: db_handle()}. +% This function is only called by couch_db_updater and +% couch_db_split, as is guaranteed to be a single threaded call. The +% database should simply store provided TimeSeq struct in the +% header somewhere. + +-callback set_time_seq(DbHandle :: db_handle(), TimeSeq :: any()) -> + {ok, NewDbHandle :: db_handle()}. + % Set the current update sequence of the database. The intention is to use this % when copying a database such that the destination update sequence should % match exactly the source update sequence. @@ -677,6 +690,7 @@ get_revs_limit/1, get_security/1, get_props/1, + get_time_seq/1, get_size_info/1, get_partition_info/2, get_update_seq/1, @@ -686,6 +700,7 @@ set_security/2, set_purge_infos_limit/2, set_props/2, + set_time_seq/2, set_update_seq/2, @@ -821,6 +836,10 @@ get_revs_limit(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, Engine:get_revs_limit(EngineState). +get_time_seq(#db{} = Db) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:get_time_seq(EngineState). + get_security(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, Engine:get_security(EngineState). @@ -865,6 +884,11 @@ set_props(#db{} = Db, Props) -> {ok, NewSt} = Engine:set_props(EngineState, Props), {ok, Db#db{engine = {Engine, NewSt}}}. +set_time_seq(#db{} = Db, TimeSeq) -> + #db{engine = {Engine, EngineState}} = Db, + {ok, NewSt} = Engine:set_time_seq(EngineState, TimeSeq), + {ok, Db#db{engine = {Engine, NewSt}}}. + set_update_seq(#db{} = Db, UpdateSeq) -> #db{engine = {Engine, EngineState}} = Db, {ok, NewSt} = Engine:set_update_seq(EngineState, UpdateSeq), diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl index bd325d980..3c01e576b 100644 --- a/src/couch/src/couch_db_split.erl +++ b/src/couch/src/couch_db_split.erl @@ -319,12 +319,14 @@ copy_meta(#state{source_db = SourceDb, targets = Targets} = State) -> RevsLimit = couch_db:get_revs_limit(SourceDb), {SecProps} = couch_db:get_security(SourceDb), PurgeLimit = couch_db:get_purge_infos_limit(SourceDb), + TimeSeq = couch_db:get_time_seq(SourceDb), Targets1 = maps:map( fun(_, #target{db = Db} = T) -> {ok, Db1} = couch_db_engine:set_revs_limit(Db, RevsLimit), {ok, Db2} = couch_db_engine:set_security(Db1, SecProps), {ok, Db3} = couch_db_engine:set_purge_infos_limit(Db2, PurgeLimit), - T#target{db = Db3} + {ok, Db4} = couch_db_engine:set_time_seq(Db3, TimeSeq), + T#target{db = Db4} end, Targets ), diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 3f6c8886d..a014bd47f 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -92,6 +92,11 @@ handle_call({set_purge_infos_limit, Limit}, _From, Db) -> {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit), ok = couch_server:db_updated(Db2), {reply, ok, Db2}; +handle_call({set_time_seq, TSeq}, _From, Db) -> + {ok, Db1} = couch_db_engine:set_time_seq(Db, TSeq), + {ok, Db2} = couch_db_engine:commit_data(Db1), + ok = couch_server:db_updated(Db2), + {reply, ok, Db2}; handle_call({purge_docs, [], _}, _From, Db) -> {reply, {ok, []}, Db}; handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) -> @@ -876,11 +881,16 @@ apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) -> NewReplies = [{ok, RemovedRevs} | Replies], apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies). +update_time_seq(Db, UpdateSeq) -> + TSeq = couch_db_engine:get_time_seq(Db), + TSeq1 = couch_time_seq:update(TSeq, UpdateSeq), + couch_db_engine:set_time_seq(Db, TSeq1). + commit_data(Db) -> - {ok, Db1} = couch_db_engine:commit_data(Db), - Db1#db{ - committed_update_seq = couch_db_engine:get_update_seq(Db) - }. + UpdateSeq = couch_db_engine:get_update_seq(Db), + {ok, Db1} = update_time_seq(Db, UpdateSeq), + {ok, Db2} = couch_db_engine:commit_data(Db1), + Db2#db{committed_update_seq = UpdateSeq}. pair_write_info(Old, New) -> lists:map( diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl index 007c74d06..c81a4acc4 100644 --- a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl +++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl @@ -20,6 +20,8 @@ -define(EV_MOD, couch_bt_engine_compactor_ev). -define(INIT_DOCS, 2500). -define(WRITE_DOCS, 20). +-define(TSEQ_BOGUS_TIME, "3000-01-01T00:00:00Z"). +-define(TSEQ_BOGUS_SEQ, 4242). % The idea behind the tests in this module are to attempt to % cover the number of restart/recopy events during compaction @@ -123,12 +125,16 @@ teardown(Ctx) -> start_empty_db_test(_Event) -> ?EV_MOD:clear(), DbName = ?tempdb(), - {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + % Create a predictable time seq entry in the far future + % so we can test it's going to be carried along to the + % new compaction target and not get reset + ok = add_bogus_time_seq(Db), DbName. start_populated_db_test(Event) -> DbName = start_empty_db_test(Event), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), try populate_db(Db, ?INIT_DOCS) after @@ -228,7 +234,7 @@ run_static_init(Event, DbName) -> run_static(Event, DbName) -> {ok, ContinueFun} = ?EV_MOD:set_wait(init), {ok, Reason} = ?EV_MOD:set_crash(Event), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), Ref = couch_db:monitor(Db), {ok, CPid} = couch_db:start_compact(Db), ContinueFun(CPid), @@ -247,7 +253,7 @@ run_dynamic_init(Event, DbName) -> run_dynamic(Event, DbName) -> {ok, ContinueFun} = ?EV_MOD:set_wait(init), {ok, Reason} = ?EV_MOD:set_crash(Event), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), Ref = couch_db:monitor(Db), {ok, CPid} = couch_db:start_compact(Db), ok = populate_db(Db, 10), @@ -262,7 +268,7 @@ run_dynamic(Event, DbName) -> run_successful_compaction(DbName) -> ?EV_MOD:clear(), {ok, ContinueFun} = ?EV_MOD:set_wait(init), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), {ok, CPid} = couch_db:start_compact(Db), Ref = erlang:monitor(process, CPid), ContinueFun(CPid), @@ -296,6 +302,14 @@ wait_db_cleared(Db, N) -> end end. +add_bogus_time_seq(Db) -> + TSeq = couch_db:get_time_seq(Db), + % Set some bogus future time we'll check that compaction + % knows how to copy it properly and not reset it during compaction + Time = calendar:rfc3339_to_system_time(?TSEQ_BOGUS_TIME), + TSeq1 = couch_time_seq:update(TSeq, Time, ?TSEQ_BOGUS_SEQ), + ok = couch_db:set_time_seq(Db, TSeq1). + populate_db(_Db, NumDocs) when NumDocs =< 0 -> ok; populate_db(Db, NumDocs) -> @@ -324,7 +338,18 @@ validate_compaction(Db) -> end, {ok, {_, LastCount}} = couch_db:fold_docs(Db, FoldFun, {<<>>, 0}), ?assertEqual(DocCount + DelDocCount, LastCount), - ?assertEqual(NumChanges, LastCount). + ?assertEqual(NumChanges, LastCount), + % If there were any updates to the db check that the time seq structure + % was preserved + TSeq = couch_db:get_time_seq(Db), + #{bins := Bins} = TSeq, + case NumChanges > 0 of + true -> + Time = calendar:rfc3339_to_system_time(?TSEQ_BOGUS_TIME), + ?assertEqual({Time, ?TSEQ_BOGUS_SEQ}, hd(Bins)); + false -> + ?assertEqual([], Bins) + end. purge_module() -> case code:which(couch_db_updater) of
