This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch tseq
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d14cc52858cd475ab1ed0b8ff26f37c052fe8e49
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jul 21 12:31:02 2025 -0400

    Time-based since parameter for _changes
    
    Use the new time-seq feature to stream changes from before a point in time.
    
    This can be used for backups or any case when then it helps to associate a
    range of sequence updates to a time interval. The time-seq exponential 
decaying
    interval rules apply: the further back in time, the less accurate the time
    intervals will be.
    
    The API change consists in making `since` accept an RFC3339 timestamp format
    value. It already accepts `now` as special value, so this new variant is
    implemented in the same way: before the changes request starts, it 
translates
    the time value to a proper `since` sequence. From then on it proceeds as
    before. There are no changes to any of the underlying changes processing 
logic
    steps after that.
    
    As a start, accept only proper RFC3339 timestamps. They should look like:
    `YYYY-MM-DDTHH:MM:SSZ`. In the future more versions could be accepted such 
as
    'YYYY-MM-DD` or even `last-N-days`, but for now, it's better to start 
stricter
    and then relax the rules later.
    
    A small example with a db I updated every few hours ours during the day:
    
    `http get $DB/db/_time_seq`
    
    ```
    {
        "00000000-ffffffff": {
            "[email protected]": [
                ["2025-07-21T01:00:00Z", 15],
                ["2025-07-21T05:00:00Z", 2]
                ["2025-07-21T19:00:00Z", 9],
                ["2025-07-21T20:00:00Z", 5],
                ["2025-07-21T21:00:00Z", 70]
                ["2025-07-21T22:00:00Z", 10]
            ]
        }
    }
    ```
    
    `_change?since=2025-07-21T22:00:00Z` will return documents changed since 
that
    last hour only:
    
    ```
    % http get $DB/db/_changes'?since=2025-07-21T22:00:00Z' | jq -r 
'.results[].id'
    
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    ```
    
    Even the somewhat hidden `since_seq` replication parameter should work, so 
we
    can replicate from a particular point in time:
    
    ```
    % http post 'http://adm:pass@localhost:15984/_replicate' \
      source:='"http://adm:pass@localhost:15984/db";' \
      target:='"http://adm:pass@localhost:15984/tgt";' \
      since_seq:='"2025-07-21T22:00:00Z"'
    
    {
        "history": [
            {
                "bulk_get_attempts": 10,
                "bulk_get_docs": 10,
                "doc_write_failures": 0,
                "docs_read": 10,
                "docs_written": 10,
                "end_last_seq": 
"111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews",
                "end_time": "Mon, 21 Jul 2025 22:11:59 GMT",
                "missing_checked": 10,
                "missing_found": 10,
                "recorded_seq": 
"111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews",
                "session_id": "19252b97e34088aeaaa6cde6694a419f",
                "start_last_seq": "2025-07-21T22:00:00Z",
                "start_time": "Mon, 21 Jul 2025 22:11:55 GMT"
            }
        ],
        "ok": true,
        "replication_id_version": 4,
        "session_id": "19252b97e34088aeaaa6cde6694a419f",
        "source_last_seq": 
"111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews"
    }
    ```
    
    The target db now has only the documents written in that last hour:
    
    ```
    % http $DB/tgt/_all_docs | jq -r '.rows[].id'
    
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    ```
---
 src/chttpd/test/eunit/chttpd_changes_test.erl | 93 ++++++++++++++++++++++++++-
 src/fabric/src/fabric_view_changes.erl        | 21 ++++--
 2 files changed, 106 insertions(+), 8 deletions(-)

diff --git a/src/chttpd/test/eunit/chttpd_changes_test.erl 
b/src/chttpd/test/eunit/chttpd_changes_test.erl
index b93667752..f155b8446 100644
--- a/src/chttpd/test/eunit/chttpd_changes_test.erl
+++ b/src/chttpd/test/eunit/chttpd_changes_test.erl
@@ -83,7 +83,8 @@ changes_test_() ->
             ?TDEF(t_selector_filter),
             ?TDEF(t_design_filter),
             ?TDEF(t_docs_id_filter),
-            ?TDEF(t_docs_id_filter_over_limit)
+            ?TDEF(t_docs_id_filter_over_limit),
+            ?TDEF(t_time_since)
         ])
     }.
 
@@ -109,7 +110,8 @@ changes_q8_test_() ->
             ?TDEF(t_reverse_limit_one_q8),
             ?TDEF(t_selector_filter),
             ?TDEF(t_design_filter),
-            ?TDEF(t_docs_id_filter_q8)
+            ?TDEF(t_docs_id_filter_q8),
+            ?TDEF(t_time_since_q8)
         ])
     }.
 
@@ -493,6 +495,93 @@ t_docs_id_filter_over_limit({_, DbUrl}) ->
         Rows
     ).
 
+t_time_since({_, DbUrl}) ->
+    % Far into the future, we should get nothing
+    Params1 = "?since=3000-02-03T04:05:00Z",
+    {Seq1, Pending1, Rows1} = changes(DbUrl, Params1),
+    ?assertEqual(8, Seq1),
+    ?assertEqual(0, Pending1),
+    ?assertEqual([], Rows1),
+
+    % Before the feature is released, should same as since=0
+    Params2 = "?since=2025-01-01T04:05:00Z",
+    Res2 = {Seq2, Pending2, Rows2} = changes(DbUrl, Params2),
+    ?assertEqual(8, Seq2),
+    ?assertEqual(0, Pending2),
+    ?assertEqual(
+        [
+            {6, {?DOC1, <<"2-c">>}, ?LEAFREV},
+            {7, {?DOC3, <<"2-b">>}, ?DELETED},
+            {8, {?DDOC2, <<"2-c">>}, ?LEAFREV}
+        ],
+        Rows2
+    ),
+    % Verify we're getting the same resulta as since=0
+    ?assertEqual(Res2, changes(DbUrl, "?since=0")),
+
+    % Invalid time
+    Params3 = "?since=2025-01-01Txx:yy:00Z",
+    {InvalCode, InvalRes} = reqraw(get, DbUrl ++ "/_changes" ++ Params3),
+    ?assertEqual(400, InvalCode),
+    ?assertMatch(
+        #{
+            <<"error">> := <<"bad_request">>,
+            <<"reason">> := <<"invalid_time_format">>
+        },
+        json(InvalRes)
+    ),
+
+    % Check we can get _time_seq
+    {TSeqCode, TSeqRes} = reqraw(get, DbUrl ++ "/_time_seq"),
+    ?assertEqual(200, TSeqCode),
+    Year = integer_to_binary(element(1, date())),
+    Node = atom_to_binary(config:node_name()),
+    ?assertMatch(
+        #{
+            <<"00000000-ffffffff">> := #{
+                Node := [[<<Year:4/binary, _/binary>>, 8]]
+            }
+        },
+        json(TSeqRes)
+    ),
+
+    % Reset the time seq info
+    {TSeqResetCode, TSeqResetRes} = reqraw(delete, DbUrl ++ "/_time_seq"),
+    ?assertEqual(200, TSeqResetCode),
+    ?assertMatch(#{<<"ok">> := true}, json(TSeqResetRes)),
+
+    % Check that it took effect
+    {TSeqCodeVerify, TSeqResVerify} = reqraw(get, DbUrl ++ "/_time_seq"),
+    ?assertEqual(200, TSeqCodeVerify),
+    ?assertMatch(#{<<"00000000-ffffffff">> := #{Node := []}}, 
json(TSeqResVerify)),
+
+    % Changes feeds still work after a reset
+    ?assertEqual(Res2, changes(DbUrl, Params2)).
+
+t_time_since_q8({_, DbUrl}) ->
+    % Far into the future, we should get nothing
+    Params1 = "?since=3000-02-03T04:05:00Z",
+    {Seq1, Pending1, Rows1} = changes(DbUrl, Params1),
+    ?assertEqual(8, Seq1),
+    ?assertEqual(0, Pending1),
+    ?assertEqual([], Rows1),
+
+    % Before the feature is released, should get everything
+    Params2 = "?since=2025-01-01T04:05:00Z",
+    {Seq2, Pending2, Rows2} = changes(DbUrl, Params2),
+    {Seqs, Revs, _Deleted} = lists:unzip3(Rows2),
+    ?assertEqual(8, Seq2),
+    ?assertEqual(0, Pending2),
+    ?assertEqual(
+        [
+            {?DDOC2, <<"2-c">>},
+            {?DOC1, <<"2-c">>},
+            {?DOC3, <<"2-b">>}
+        ],
+        lists:sort(Revs)
+    ),
+    ?assertEqual(Seqs, lists:sort(Seqs)).
+
 t_js_filter({_, DbUrl}) ->
     DDocId = "_design/filters",
     FilterFun = <<"function(doc, req) {return (doc._id == 'doc3')}">>,
diff --git a/src/fabric/src/fabric_view_changes.erl 
b/src/fabric/src/fabric_view_changes.erl
index f6695f163..73c05163d 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -27,13 +27,15 @@
 
 -import(fabric_db_update_listener, [wait_db_updated/1]).
 
+-define(RFC3339_TIME, [_, _, _, _, $-, _, _, $-, _, _, $T, _, _, $:, _, _, $:, 
_, _, $Z]).
+
 go(DbName, Feed, Options, Callback, Acc0) when
     Feed == "continuous" orelse
         Feed == "longpoll" orelse Feed == "eventsource"
 ->
     Args = make_changes_args(Options),
     Since = get_start_seq(DbName, Args),
-    case validate_start_seq(DbName, Since) of
+    case validate_start_seq(Since) of
         ok ->
             {ok, Acc} = Callback(start, Acc0),
             {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
@@ -69,7 +71,7 @@ go(DbName, Feed, Options, Callback, Acc0) when
 go(DbName, "normal", Options, Callback, Acc0) ->
     Args = make_changes_args(Options),
     Since = get_start_seq(DbName, Args),
-    case validate_start_seq(DbName, Since) of
+    case validate_start_seq(Since) of
         ok ->
             {ok, Acc} = Callback(start, Acc0),
             {ok, Collector} = send_changes(
@@ -369,6 +371,11 @@ get_start_seq(DbName, #changes_args{dir = Dir, since = 
Since}) when
 ->
     {ok, Info} = fabric:get_db_info(DbName),
     couch_util:get_value(update_seq, Info);
+get_start_seq(DbName, #changes_args{dir = fwd, since = ?RFC3339_TIME = Since}) 
->
+    case fabric:time_seq_since(DbName, Since) of
+        {ok, SinceSeq} -> SinceSeq;
+        {error, Error} -> {error, Error}
+    end;
 get_start_seq(_DbName, #changes_args{dir = fwd, since = Since}) ->
     Since.
 
@@ -728,11 +735,11 @@ make_split_seq({Num, Uuid, Node}, RepCount) when RepCount 
> 1 ->
 make_split_seq(Seq, _) ->
     Seq.
 
-validate_start_seq(_DbName, 0) ->
+validate_start_seq(0) ->
     ok;
-validate_start_seq(_DbName, "0") ->
+validate_start_seq("0") ->
     ok;
-validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) ->
+validate_start_seq(Seq) when is_list(Seq) orelse is_binary(Seq) ->
     try
         Opaque = unpack_seq_regex_match(Seq),
         unpack_seq_decode_term(Opaque),
@@ -741,7 +748,9 @@ validate_start_seq(_DbName, Seq) when is_list(Seq) orelse 
is_binary(Seq) ->
         _:_ ->
             Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
             {error, {bad_request, Reason}}
-    end.
+    end;
+validate_start_seq({error, Error}) ->
+    {error, {bad_request, Error}}.
 
 get_changes_epoch() ->
     case application:get_env(fabric, changes_epoch) of

Reply via email to