This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch tseq in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 627b904f0ef350b67c6bbb45052b5adcfa0d672e Author: Nick Vatamaniuc <[email protected]> AuthorDate: Sun Jul 20 23:33:05 2025 -0400 Implement an exponentially decaying time interval data structure The data structure maps time intervals to database update sequences. The idea is be able to quickly determine which changes which occurred in a time interval. The main goal of the design is to have a small data structure to fit well under the 4KB db header size, and yet represent time intervals up to a few decades. This goal was accomplished by using exponetially decaying time intervals. The further back in time we go, the longer the intervals get. This matches how humans usually keep track of time: if we're talking about yesterday, we may care about hours; if we talk about last month, we may care about single days; and if we talk about last year, we may only care about the months or quarters, and so on. If we accept this historical exponential loss of accuracy, we can hit the design goals of having only 50 time bins (integer KV pairs) and a small, under 500 bytes, on-disk representation. The format is a KV list of up to 50 pairs (bins) which look like: `[{Time, Seq}, {OlderTime, OlderSeq}, ...]`. Times are rounded to whole hours. The head of the KV list is the youngest entry. The `Time` value is the time of the earliest sequence in that time interval. The `Seq` value indicates the maximum sequence observed in the time interval before switching to the next one. During updates, if we're into the next hour, then the bins are rolled up. That means creating a new set of bins starting at the current hour, and then merging (copying) the old bins into the new ones. This way, the size of the bins always stays under the maximum size. The serialized on-disk size is only a few hundred bytes and should fit well under the 4KB header block size. Example of `term_to_binary` of the time_seq data structure filled with random sequences: ``` > Bins = couch_time_seq:empty_bins(os:system_time(second)). > length(Bins). 50 > TSeq = #{ver => 1, bins => [{T, rand:uniform(1000000)} || {T, _} <- Bins]}. > byte_size(term_to_binary(TSeq, [compressed])). 468 ``` * `new()` : create a new time sequence (`TSeq`) context. * `update(TSeq, Seq)` insert a new sequence into the timeline. * `since(TSeq, Time) -> Seq` get sequence before the timestamp. * `histogram(TSeq)` return formatted time bins and the count of updates which occurred each time interval. Use this for debugging or to give users an idea how many changes occurred in each interval. Since we're using the operating system's knowledge of time, the solution is not perfect. However there are few mitigations to help with some scenarios: * Time values are rounded to hours. Even if the synchronization is known to be off by a day, the user can just restrict the usage of the `since` parameter to a larger interval (only ask about time intervals greater than a few days). * Ignore updates which jumped back in time. That's done either by comparing to the last entry, or to a global configuration value. Users could set the global config to 1971, for instance, if they know their systems jump to 1970 after boot due to some hardware default, before NTP synchronization kicks in. As a safer alternative than 1971, picked a default minimum date that's just before the time this feature we was implemented. * If due to some misconfiguration time jumps far forward (say, to year 3000), or any other synchronization mishaps happen, then it's always safe to let the user reset the time seq structure and simply start fresh at the new time. There are EUnit and property tests for 100% test coverage. Thanks to Ilya (@iilyak) for writing the property tests --- rel/overlay/etc/default.ini | 10 + src/couch/src/couch_time_seq.erl | 216 ++++++++++++++++++++++ src/couch/test/eunit/couch_time_seq_tests.erl | 256 ++++++++++++++++++++++++++ 3 files changed, 482 insertions(+) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index dfefa62dc..f1b1ea1b7 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -1150,3 +1150,13 @@ url = {{nouveau_url}} ;mem3_shards = true ;nouveau_index_manager = true ;dreyfus_index_manager = true + +[time_seq] +; Ignore time-sequence updates which are below this threshold. Sometimes +; embedded systems get booted into 1970 and then get their time from NTP. To +; mitigate that updates are ignored if we can tell they are obviously wrong. +; Default value is some time before the feature was developed: +; +; 1752724800 = 2025-07-17T00:00:00 +; +;min_unix_time_sec = 1752724800 diff --git a/src/couch/src/couch_time_seq.erl b/src/couch/src/couch_time_seq.erl new file mode 100644 index 000000000..2fa6aa041 --- /dev/null +++ b/src/couch/src/couch_time_seq.erl @@ -0,0 +1,216 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% This module implements exponentially decaying time intervals which map to +% database update sequences. The idea is be able to quickly determine the set +% of changes which occurred in a rough time interval. The closer to the last +% update time -- the higher the accuracy. For instance, going back a few days, +% it's only possible to target individual days. Then weeks, then months, then +% going back years can target only years. +% +% An example of the shape of the data structure might be: +% +% +---------+ +---------+ +--------+ +% | seq=986 | | seq=891 | ... | seq=19 | +% +---------+ +---------+ +--------+ +% ^ ^ ^ +% | | | +% t=42 t=40 t=37 +% +% The head, on the left, points to the newest (most recently updated) time bin. +% In this example it started at t=42. The last t=37 bin is the oldest time bin. +% +% If we're looking for sequences starting before t=41, we'd pick seq=891 and +% run the changes since=891. If we're looking for a sequence starting before +% t=37, we'd start with since=0. The main idea here is that we'd rather error +% on the side of returning too many rows than not enough. +% +% The bins stay as a fixed size (maximum length = 50 bins) since on +% updates, when we're forced to move to a new hour (new bin), the bins are +% rolled-up into a new set of 50 bins, starting with the latest update time. +% During the roll-up, multiple old bins might end up fitting into a single new +% bin with a larger width. For example the above bins might end up in a single +% bin. For example the above data structure might now look like: +% +% +---------+ +% | seq=986 | +% +---------+ +% ^ +% | +% t=42 +% +% If we're now looking for sequences staring before t=42, we'd pick seq=0. In +% exchange for the loss of accuracy we get a fixed sized data structure with at +% most 50 entries and only a few hundred bytes in size when serialized. + +-module(couch_time_seq). + +-export([ + new/0, + new/1, + check/1, + since/2, + update/2, + update/3, + histogram/1, + bin_count/0 +]). + +% Bin widths defintion: +% * Base unit is hours +% * Total bin count is 50. This is the max length of the data structure as well. +% * Maximum time is 20 years with decreased granularity further back in time. +% +-define(D, 24). +-define(M, 24 * 30). +-define(Y, 24 * 365). +%% erlfmt-ignore +-define(BINS, { % Total + 1, 1, 1, 1, 4, 4, 4, 8, % 01D + 8, 8, 8, % 02D + 12, 12, % 03D + ?D, ?D, ?D, ?D, ?D, ?D, ?D, % 10D + ?D*5, ?D*5, ?D*5, ?D*5, % 01M + ?D*10, ?D*10, ?D*10, % 02M + ?M, ?M, ?M, ?M, ?M, ?M, ?M, ?M, ?M, ?M, % 01Y + ?M*6, ?M*6, ?M*6, ?M*6, % 03Y + ?Y, ?Y, ?Y, ?Y, ?Y, ?Y, ?Y, % 10Y + ?Y*5, ?Y*5 % 20Y +}). + +% Version number so we could upgrade the data structure in the future. +% +-define(VER, 1). + +% User can set a minimum time boundary to avoid errors with broken clocks. +% Sometimes embedded systems get booted into 1970 and then get their time from +% NTP. Updates are ignored if we can tell they are obviously wrong. Use some +% recent time as a default min time value: +% +% 1752724800 = 2025-07-17T00:00:00 +% +-define(DEFAULT_MIN_TIME, 1752724800). + +new() -> + #{v => ?VER, bins => []}. + +new(#{v := ?VER, bins := Bins} = Ctx) when is_list(Bins) -> + % Future upgrade clauses to next version could go here + Ctx. + +check(#{v := ?VER, bins := Bins}) when is_list(Bins) -> + true; +check(_) -> + false. + +update(#{v := ?VER} = Ctx, Seq) when is_integer(Seq), Seq >= 0 -> + update(Ctx, os:system_time(second), Seq). + +update(#{v := ?VER} = Ctx, Time, Seq) -> + #{bins := Bins} = Ctx, + case Time >= min_time() of + true -> Ctx#{bins := update_bins(Bins, hour(Time), Seq)}; + false -> Ctx + end. + +% Retun a highest known sequence that comes before the given time. If the time +% falls on or before the oldest bin then return 0. This might be a sequence to +% use with a regular _changes?since=... call +% +since(#{v := ?VER} = Ctx, Time) when is_integer(Time) -> + #{bins := Bins} = Ctx, + case lists:dropwhile(fun({T, _}) -> Time =< T end, Bins) of + [] -> 0; + [{_, Seq} | _] -> Seq + end. + +% Return a histogram of formatted time (RFC3339) and number of sequence updates +% which happened in that bin. The result might look like: be emitted as json: +% +% [["2025-01-02T03:04:00Z", 42], ["2025-01-02T01:01:00Z", 1], ...] +% +histogram(#{v := ?VER, bins := Bins}) -> + [[rfc3339(T), S] || {T, S} <- seq_histogram(Bins)]. + +% Return the maximum size of the data structure. At any point in time the +% actual number of bins may be lower than this but it won't be higher. +% +bin_count() -> + tuple_size(?BINS). + +% +% Private functions +% + +update_bins(Bins, _Time, _Seq = 0) -> + % Ignore sequence 0 updates + Bins; +update_bins([], Time, Seq) -> + % First update, must be non-0 sequence + [{Time, Seq}]; +update_bins([{TopT, TopSeq} | Rest], Time, Seq) when Time =:= TopT -> + % Update current bin. Bump up the sequence if it increased. + [{TopT, max(Seq, TopSeq)} | Rest]; +update_bins([{TopT, _} | _] = Bins, Time, _) when Time < TopT -> + % The bins are already at a later time; ignore. + Bins; +update_bins([{TopT, TopSeq} | _] = Bins, Time, Seq) when Time > TopT -> + % We're into another hour, so roll-up: + % * Put the new entry at the front, possibly creating bin_count() + 1 bins. + % * Create a set of new empty bins starting at current hour + % * Merge old bins to new empty ones + merge([{Time, max(Seq, TopSeq)} | Bins], empty_bins(Time)). + +merge(Old, New) -> + merge(Old, New, []). + +merge([], New, Acc) -> + remove_empty(lists:reverse(Acc) ++ New); +merge(Old, [], [{AccT, AccMaxSeq} | AccRest]) -> + % Old tail max seq goes into the last bin (Acc's head) + OldMaxSeq = lists:max([Seq || {_, Seq} <- Old]), + Acc1 = [{AccT, max(OldMaxSeq, AccMaxSeq)} | AccRest], + remove_empty(lists:reverse(Acc1)); +merge([{OldT, _} | _] = Old, [{NewT, _} = NewTop | Rest], Acc) when OldT < NewT -> + merge(Old, Rest, [NewTop | Acc]); +merge([{OldT, OldSeq} | OldRest], [{NewT, NewSeq} | Rest], Acc) when OldT >= NewT -> + % Old bin fits into the new bin. New bin might accumulate many old bins. + merge(OldRest, [{NewT, max(OldSeq, NewSeq)} | Rest], Acc). + +seq_histogram(Bins) -> + seq_histogram(Bins, []). + +seq_histogram([], Acc) -> + Acc; +seq_histogram([{T, S}], Acc) -> + seq_histogram([], [{T, S} | Acc]); +seq_histogram([{T1, S1}, {T2, S2} | Rest], Acc) -> + seq_histogram([{T2, S2} | Rest], [{T1, S1 - S2} | Acc]). + +hour(T) when is_integer(T) -> + (T div 3600) * 3600. + +empty_bins(T) -> + Hour = hour(T), + [{bin_start(Hour, I), 0} || I <- lists:seq(0, bin_count() - 1)]. + +remove_empty(Bins) when is_list(Bins) -> + [{T, S} || {T, S} <- Bins, S =/= 0]. + +bin_start(T, I) -> + hour(T) - I * 3600 * element(I + 1, ?BINS). + +min_time() -> + config:get_integer("time_seq", "min_unix_time_sec", ?DEFAULT_MIN_TIME). + +rfc3339(Time) when is_integer(Time) -> + list_to_binary(calendar:system_time_to_rfc3339(Time, [{offset, "Z"}])). diff --git a/src/couch/test/eunit/couch_time_seq_tests.erl b/src/couch/test/eunit/couch_time_seq_tests.erl new file mode 100644 index 000000000..8cbc4c415 --- /dev/null +++ b/src/couch/test/eunit/couch_time_seq_tests.erl @@ -0,0 +1,256 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_time_seq_tests). + +-ifdef(WITH_PROPER). +-include_lib("couch/include/couch_eunit_proper.hrl"). +-else. +-include_lib("couch/include/couch_eunit.hrl"). +-endif. + +% Some bogus time in the far future to avoid reaching it and have that +% interfere with the tests +% +-define(TEST_TIME, "3000-01-01T00:00:00Z"). + +new_test() -> + New = couch_time_seq:new(), + ?assert(couch_time_seq:check(New)), + TSeq1 = couch_time_seq:update(New, 1), + ?assert(couch_time_seq:check(TSeq1)), + Upgraded = couch_time_seq:new(New), + ?assert(couch_time_seq:check(Upgraded)). + +check_test() -> + New = couch_time_seq:new(), + ?assert(couch_time_seq:check(New)), + ?assertNot(couch_time_seq:check(0)), + ?assertNot(couch_time_seq:check([])), + ?assertNot(couch_time_seq:check(#{})), + ?assertNot(couch_time_seq:check(#{v => 1, bins => junk})). + +min_time_limit_test() -> + TSeq = couch_time_seq:new(), + MinTime = couch_time_seq:update(TSeq, 1, 42), + ?assertEqual(TSeq, MinTime). + +seq_0_test() -> + TSeq = couch_time_seq:new(), + ZeroSeq = couch_time_seq:update(TSeq, test_time() + 42, 0), + ?assertEqual(TSeq, ZeroSeq). + +update_first_test() -> + TSeq = couch_time_seq:new(), + TSeq1 = couch_time_seq:update(TSeq, test_time(), 1), + ?assertNotEqual(TSeq, TSeq1), + ?assertEqual(1, length(maps:get(bins, TSeq1))). + +update_same_hour_test() -> + TSeq = couch_time_seq:new(), + T = test_time(), + TSeq1 = couch_time_seq:update(TSeq, T, 1), + TSeq2 = couch_time_seq:update(TSeq1, T, 2), + #{bins := Bins} = TSeq2, + ?assertMatch([{_, 2}], Bins), + TSeq3 = couch_time_seq:update(TSeq2, T, 1), + ?assertEqual(TSeq2, TSeq3). + +stale_update_test() -> + TSeq = couch_time_seq:new(), + T = test_time(), + TSeq1 = couch_time_seq:update(TSeq, T + hours(1), 42), + TSeq2 = couch_time_seq:update(TSeq1, T, 43), + ?assertEqual(TSeq1, TSeq2). + +update_new_hour_test() -> + TSeq = couch_time_seq:new(), + TSeq1 = couch_time_seq:update(TSeq, test_time() + hours(1), 42), + TSeq2 = couch_time_seq:update(TSeq1, test_time() + hours(2), 43), + ?assertNotEqual(TSeq1, TSeq2), + #{bins := Bins} = TSeq2, + ?assertEqual(2, length(Bins)), + ?assertMatch([{_, 43}, {_, 42}], Bins). + +update_large_gap_test() -> + TSeq = couch_time_seq:new(), + T = test_time(), + TSeq1 = couch_time_seq:update(TSeq, T + hours(1), 42), + TSeq2 = couch_time_seq:update(TSeq1, T + years(30), 43), + ?assertNotEqual(TSeq1, TSeq2), + #{bins := Bins} = TSeq2, + ?assertEqual(2, length(Bins)), + ?assertMatch([{_, 43}, {_, 42}], Bins). + +update_when_bins_are_full_test() -> + TSeq = fill_bins(), + #{bins := Bins} = TSeq, + ?assertEqual(couch_time_seq:bin_count(), length(Bins)), + #{bins := [{TopTime, TopSeq} | _]} = TSeq, + TSeq1 = couch_time_seq:update(TSeq, TopTime + hours(1), TopSeq + 1), + #{bins := Bins1} = TSeq1, + ?assert(length(Bins1) =< couch_time_seq:bin_count()), + TSeq2 = couch_time_seq:update(TSeq, TopTime + hours(2), TopSeq + 2), + #{bins := Bins2} = TSeq2, + ?assert(length(Bins2) =< couch_time_seq:bin_count()). + +update_large_gap_when_bins_are_full_test() -> + TSeq = fill_bins(), + #{bins := Bins} = TSeq, + ?assertEqual(couch_time_seq:bin_count(), length(Bins)), + #{bins := [{TopTime, TopSeq} | _]} = TSeq, + TSeq1 = couch_time_seq:update(TSeq, TopTime + years(999), TopSeq + 1), + #{bins := Bins1} = TSeq1, + % Bin count collapses to just two bins, as the ones in + % between will be empty (0) and will be cleaned up + ?assertEqual(2, length(Bins1)). + +before_test() -> + T = test_time(), + New = couch_time_seq:new(), + TSeq0 = couch_time_seq:update(New, T, 42), + TSeq = couch_time_seq:update(TSeq0, T + hours(1), 43), + % [{T + H, 43}, {T, 42}] + ?assertEqual(0, couch_time_seq:since(TSeq, 0)), + ?assertEqual(0, couch_time_seq:since(TSeq, T - 1)), + ?assertEqual(0, couch_time_seq:since(TSeq, T)), + ?assertEqual(42, couch_time_seq:since(TSeq, T + 1)), + ?assertEqual(42, couch_time_seq:since(TSeq, T + hours(1) - 1)), + ?assertEqual(42, couch_time_seq:since(TSeq, T + hours(1))), + ?assertEqual(43, couch_time_seq:since(TSeq, T + hours(1) + 1)). + +histogram_test() -> + New = couch_time_seq:new(), + ?assertEqual([], couch_time_seq:histogram(New)), + T = test_time(), + TSeq0 = couch_time_seq:update(New, T, 42), + % 42 in the result is the jump from 0 to 42 + ?assertEqual( + [ + [<<"3000-01-01T00:00:00Z">>, 42] + ], + couch_time_seq:histogram(TSeq0) + ), + TSeq = couch_time_seq:update(TSeq0, T + hours(1), 43), + % The 1 is the jump from 42 to 43 + ?assertEqual( + [ + [<<"3000-01-01T00:00:00Z">>, 42], + [<<"3000-01-01T01:00:00Z">>, 1] + ], + couch_time_seq:histogram(TSeq) + ). + +% Some test helper functions + +test_time() -> + calendar:rfc3339_to_system_time(?TEST_TIME). + +hours(Hours) -> + Hours * 3600. + +years(Years) -> + hours(1) * 24 * 356 * Years. + +fill_bins() -> + fill_bins(test_time(), 1, couch_time_seq:new()). + +fill_bins(Time, Seq, #{bins := Bins} = TSeq) -> + case length(Bins) == couch_time_seq:bin_count() of + true -> + TSeq; + false -> + Time1 = Time + hours(1), + Seq1 = Seq + 1, + fill_bins(Time1, Seq1, couch_time_seq:update(TSeq, Time1, Seq1)) + end. + +% +% Property tests +% + +-ifdef(WITH_PROPER). + +couch_time_property_test_() -> + ?EUNIT_QUICKCHECK(60, 10000). +% +% Properties +% + +prop_sorted_after_update() -> + ?FORALL( + TSeq, + tseq_g(), + begin + #{bins := Bins} = TSeq, + Bins =:= lists:reverse(lists:ukeysort(1, Bins)) + end + ). + +prop_sequences_are_non_decreasing_after_update() -> + ?FORALL( + TSeq, + tseq_g(), + begin + #{bins := Bins} = TSeq, + {_, Seqs} = lists:unzip(Bins), + Seqs =:= lists:reverse(lists:sort(Seqs)) + end + ). + +prop_correct_size_after_update() -> + ?FORALL( + TSeq, + tseq_g(), + begin + #{bins := Bins} = TSeq, + couch_time_seq:bin_count() >= length(Bins) + end + ). + +prop_no_empty_bins_after_update() -> + ?FORALL( + TSeq, + tseq_g(), + begin + #{bins := Bins} = TSeq, + EmptyBins = [B || {_, 0} = B <- Bins], + [] =:= EmptyBins + end + ). + +% +% Generators +% + +hours_g() -> + % 10 years worth of hours + % -24 is to testing jumping backwards in time + range(-24, 24 * 365 * 10). + +seq_g() -> + non_neg_integer(). + +tseq_g() -> + ?LET( + Updates, + list({hours_g(), seq_g()}), + lists:foldl( + fun({Hours, Seq}, TSeq) -> + couch_time_seq:update(TSeq, test_time() + hours(Hours), Seq) + end, + couch_time_seq:new(), + Updates + ) + ). + +-endif.
