This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch tseq in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit d14cc52858cd475ab1ed0b8ff26f37c052fe8e49 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Mon Jul 21 12:31:02 2025 -0400 Time-based since parameter for _changes Use the new time-seq feature to stream changes from before a point in time. This can be used for backups or any case when then it helps to associate a range of sequence updates to a time interval. The time-seq exponential decaying interval rules apply: the further back in time, the less accurate the time intervals will be. The API change consists in making `since` accept an RFC3339 timestamp format value. It already accepts `now` as special value, so this new variant is implemented in the same way: before the changes request starts, it translates the time value to a proper `since` sequence. From then on it proceeds as before. There are no changes to any of the underlying changes processing logic steps after that. As a start, accept only proper RFC3339 timestamps. They should look like: `YYYY-MM-DDTHH:MM:SSZ`. In the future more versions could be accepted such as 'YYYY-MM-DD` or even `last-N-days`, but for now, it's better to start stricter and then relax the rules later. A small example with a db I updated every few hours ours during the day: `http get $DB/db/_time_seq` ``` { "00000000-ffffffff": { "[email protected]": [ ["2025-07-21T01:00:00Z", 15], ["2025-07-21T05:00:00Z", 2] ["2025-07-21T19:00:00Z", 9], ["2025-07-21T20:00:00Z", 5], ["2025-07-21T21:00:00Z", 70] ["2025-07-21T22:00:00Z", 10] ] } } ``` `_change?since=2025-07-21T22:00:00Z` will return documents changed since that last hour only: ``` % http get $DB/db/_changes'?since=2025-07-21T22:00:00Z' | jq -r '.results[].id' 101 102 103 104 105 106 107 108 109 110 ``` Even the somewhat hidden `since_seq` replication parameter should work, so we can replicate from a particular point in time: ``` % http post 'http://adm:pass@localhost:15984/_replicate' \ source:='"http://adm:pass@localhost:15984/db"' \ target:='"http://adm:pass@localhost:15984/tgt"' \ since_seq:='"2025-07-21T22:00:00Z"' { "history": [ { "bulk_get_attempts": 10, "bulk_get_docs": 10, "doc_write_failures": 0, "docs_read": 10, "docs_written": 10, "end_last_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews", "end_time": "Mon, 21 Jul 2025 22:11:59 GMT", "missing_checked": 10, "missing_found": 10, "recorded_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews", "session_id": "19252b97e34088aeaaa6cde6694a419f", "start_last_seq": "2025-07-21T22:00:00Z", "start_time": "Mon, 21 Jul 2025 22:11:55 GMT" } ], "ok": true, "replication_id_version": 4, "session_id": "19252b97e34088aeaaa6cde6694a419f", "source_last_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews" } ``` The target db now has only the documents written in that last hour: ``` % http $DB/tgt/_all_docs | jq -r '.rows[].id' 101 102 103 104 105 106 107 108 109 110 ``` --- src/chttpd/test/eunit/chttpd_changes_test.erl | 93 ++++++++++++++++++++++++++- src/fabric/src/fabric_view_changes.erl | 21 ++++-- 2 files changed, 106 insertions(+), 8 deletions(-) diff --git a/src/chttpd/test/eunit/chttpd_changes_test.erl b/src/chttpd/test/eunit/chttpd_changes_test.erl index b93667752..f155b8446 100644 --- a/src/chttpd/test/eunit/chttpd_changes_test.erl +++ b/src/chttpd/test/eunit/chttpd_changes_test.erl @@ -83,7 +83,8 @@ changes_test_() -> ?TDEF(t_selector_filter), ?TDEF(t_design_filter), ?TDEF(t_docs_id_filter), - ?TDEF(t_docs_id_filter_over_limit) + ?TDEF(t_docs_id_filter_over_limit), + ?TDEF(t_time_since) ]) }. @@ -109,7 +110,8 @@ changes_q8_test_() -> ?TDEF(t_reverse_limit_one_q8), ?TDEF(t_selector_filter), ?TDEF(t_design_filter), - ?TDEF(t_docs_id_filter_q8) + ?TDEF(t_docs_id_filter_q8), + ?TDEF(t_time_since_q8) ]) }. @@ -493,6 +495,93 @@ t_docs_id_filter_over_limit({_, DbUrl}) -> Rows ). +t_time_since({_, DbUrl}) -> + % Far into the future, we should get nothing + Params1 = "?since=3000-02-03T04:05:00Z", + {Seq1, Pending1, Rows1} = changes(DbUrl, Params1), + ?assertEqual(8, Seq1), + ?assertEqual(0, Pending1), + ?assertEqual([], Rows1), + + % Before the feature is released, should same as since=0 + Params2 = "?since=2025-01-01T04:05:00Z", + Res2 = {Seq2, Pending2, Rows2} = changes(DbUrl, Params2), + ?assertEqual(8, Seq2), + ?assertEqual(0, Pending2), + ?assertEqual( + [ + {6, {?DOC1, <<"2-c">>}, ?LEAFREV}, + {7, {?DOC3, <<"2-b">>}, ?DELETED}, + {8, {?DDOC2, <<"2-c">>}, ?LEAFREV} + ], + Rows2 + ), + % Verify we're getting the same resulta as since=0 + ?assertEqual(Res2, changes(DbUrl, "?since=0")), + + % Invalid time + Params3 = "?since=2025-01-01Txx:yy:00Z", + {InvalCode, InvalRes} = reqraw(get, DbUrl ++ "/_changes" ++ Params3), + ?assertEqual(400, InvalCode), + ?assertMatch( + #{ + <<"error">> := <<"bad_request">>, + <<"reason">> := <<"invalid_time_format">> + }, + json(InvalRes) + ), + + % Check we can get _time_seq + {TSeqCode, TSeqRes} = reqraw(get, DbUrl ++ "/_time_seq"), + ?assertEqual(200, TSeqCode), + Year = integer_to_binary(element(1, date())), + Node = atom_to_binary(config:node_name()), + ?assertMatch( + #{ + <<"00000000-ffffffff">> := #{ + Node := [[<<Year:4/binary, _/binary>>, 8]] + } + }, + json(TSeqRes) + ), + + % Reset the time seq info + {TSeqResetCode, TSeqResetRes} = reqraw(delete, DbUrl ++ "/_time_seq"), + ?assertEqual(200, TSeqResetCode), + ?assertMatch(#{<<"ok">> := true}, json(TSeqResetRes)), + + % Check that it took effect + {TSeqCodeVerify, TSeqResVerify} = reqraw(get, DbUrl ++ "/_time_seq"), + ?assertEqual(200, TSeqCodeVerify), + ?assertMatch(#{<<"00000000-ffffffff">> := #{Node := []}}, json(TSeqResVerify)), + + % Changes feeds still work after a reset + ?assertEqual(Res2, changes(DbUrl, Params2)). + +t_time_since_q8({_, DbUrl}) -> + % Far into the future, we should get nothing + Params1 = "?since=3000-02-03T04:05:00Z", + {Seq1, Pending1, Rows1} = changes(DbUrl, Params1), + ?assertEqual(8, Seq1), + ?assertEqual(0, Pending1), + ?assertEqual([], Rows1), + + % Before the feature is released, should get everything + Params2 = "?since=2025-01-01T04:05:00Z", + {Seq2, Pending2, Rows2} = changes(DbUrl, Params2), + {Seqs, Revs, _Deleted} = lists:unzip3(Rows2), + ?assertEqual(8, Seq2), + ?assertEqual(0, Pending2), + ?assertEqual( + [ + {?DDOC2, <<"2-c">>}, + {?DOC1, <<"2-c">>}, + {?DOC3, <<"2-b">>} + ], + lists:sort(Revs) + ), + ?assertEqual(Seqs, lists:sort(Seqs)). + t_js_filter({_, DbUrl}) -> DDocId = "_design/filters", FilterFun = <<"function(doc, req) {return (doc._id == 'doc3')}">>, diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index f6695f163..73c05163d 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -27,13 +27,15 @@ -import(fabric_db_update_listener, [wait_db_updated/1]). +-define(RFC3339_TIME, [_, _, _, _, $-, _, _, $-, _, _, $T, _, _, $:, _, _, $:, _, _, $Z]). + go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Feed == "longpoll" orelse Feed == "eventsource" -> Args = make_changes_args(Options), Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of + case validate_start_seq(Since) of ok -> {ok, Acc} = Callback(start, Acc0), {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), @@ -69,7 +71,7 @@ go(DbName, Feed, Options, Callback, Acc0) when go(DbName, "normal", Options, Callback, Acc0) -> Args = make_changes_args(Options), Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of + case validate_start_seq(Since) of ok -> {ok, Acc} = Callback(start, Acc0), {ok, Collector} = send_changes( @@ -369,6 +371,11 @@ get_start_seq(DbName, #changes_args{dir = Dir, since = Since}) when -> {ok, Info} = fabric:get_db_info(DbName), couch_util:get_value(update_seq, Info); +get_start_seq(DbName, #changes_args{dir = fwd, since = ?RFC3339_TIME = Since}) -> + case fabric:time_seq_since(DbName, Since) of + {ok, SinceSeq} -> SinceSeq; + {error, Error} -> {error, Error} + end; get_start_seq(_DbName, #changes_args{dir = fwd, since = Since}) -> Since. @@ -728,11 +735,11 @@ make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 -> make_split_seq(Seq, _) -> Seq. -validate_start_seq(_DbName, 0) -> +validate_start_seq(0) -> ok; -validate_start_seq(_DbName, "0") -> +validate_start_seq("0") -> ok; -validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) -> +validate_start_seq(Seq) when is_list(Seq) orelse is_binary(Seq) -> try Opaque = unpack_seq_regex_match(Seq), unpack_seq_decode_term(Opaque), @@ -741,7 +748,9 @@ validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) -> _:_ -> Reason = <<"Malformed sequence supplied in 'since' parameter.">>, {error, {bad_request, Reason}} - end. + end; +validate_start_seq({error, Error}) -> + {error, {bad_request, Error}}. get_changes_epoch() -> case application:get_env(fabric, changes_epoch) of
