nickva commented on code in PR #5014:
URL: https://github.com/apache/couchdb/pull/5014#discussion_r1556404940


##########
src/couch_scanner/src/couch_scanner_plugin.erl:
##########
@@ -0,0 +1,662 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Scanner plugin runner process
+%
+% This is the process which is spawned and run for each enabled plugin.
+%
+% A number of these processes are managed by the couch_scanner_server via
+% start_link/1 and complete/1 functions. After a plugin runner is spawned, the 
only
+% thing couch_scanner_server does is wait for it to exit.
+%
+% The plugin runner process may exit normally, crash, or exit with {shutdown,
+% {reschedule, TSec}} if they want to reschedule to run again at some point the
+% future (next day, a week later, etc).
+%
+% After the process starts, it will load and validate the plugin module. Then,
+% it will start scanning all the dbs and docs on the local node. Shard ranges
+% will be scanned only on one of the cluster nodes to avoid duplicating work.
+% For instance, if there are 2 shard ranges, 0-7, 8-f, with copies on nodes n1,
+% n2, n3. Then, 0-7 might be scanned on n1 only, and 8-f on n3.
+%
+% The plugin API defined in the behavior definition section.
+%
+% The start/2 function is called when the plugin starts running. It returns
+% some context (St), which can be any Erlang term. All subsequent function
+% calls will be called with the same St object, and may return an updated
+% version of it.
+%
+% If the plugin hasn't finished runing and has resumed running after the node
+% was restarted or an error happened, the resume/2 function will be called.
+% That's the difference between start and resume: start/2 is called when the
+% scan starts from the beginning (first db, first shard, ...), and resume/2 is
+% called when the scanning hasn't finished and has to continue.
+%
+% If start/2 or resume/2 returns `reset` then the checkpoint will be reset and
+% the plugin will be restarted. This may be useful in cases when the plugin
+% detects configuration changes since last scanning session had already
+% started, or when the plugin module was updated and the checkpoint version is
+% stale.
+%
+% The checkpoint/1 callback is periodically called to checkpoint the scanning
+% progress. start/2 and resume/2 function will be called with the last saved
+% checkpoint map value.
+%
+% The complete/1 callback is called when the scan has finished. The complete
+% callback should return final checkpoint map object. The last checkoint will
+% be written and then.
+%
+% As the cluster dbs, shards, ddocs and individual docs are discovered during
+% scanning, the appropriate callbacks will be called. Most callbacks, besides
+% the updated St object, can reply with ok, skip or complete tags. The meaning 
of
+% those are:
+%
+%   * ok  - continue to the next object
+%
+%   * skip - skip the current object and don't scan its internal (ex: skip a db
+%     and don't scan its ddocs, but continue with the next db)
+%
+%   * stop - stop scanning any remaining objects of that type (ex: don't scan
+%     any more dbs)
+%
+%   * reset - stop, reset the checkpoint data and restart, this may be useful
+%     if the configuration changes and it's best to just restart with the new
+%     settings
+
+-module(couch_scanner_plugin).
+
+-export([
+    % Main plugin process API
+    spawn_link/2,
+    stop/1,
+    % Internal export
+    run/2
+]).
+
+-include_lib("couch_scanner/include/couch_scanner_plugin.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+% Behaviour callback definitions
+
+-callback start(ScanId :: binary(), EJson :: #{}) ->
+    {ok, St :: term()} | skip | reset.
+
+-callback resume(ScanId :: binary(), EJson :: #{}) ->
+    {ok, St :: term()} | skip | reset.
+
+% Optional
+-callback complete(St :: term()) ->
+    {ok, EJson :: #{}}.
+
+% Optional
+-callback checkpoint(St :: term()) ->
+    {ok, EJson :: #{}}.
+
+-callback db(St :: term(), DbName :: binary()) ->
+    {ok | skip | stop, St1 :: term()}.
+
+% Optional
+-callback ddoc(St :: term(), DbName :: binary(), #doc{}) ->
+    {ok | stop, St1 :: term()}.
+
+% Optional. If no subsequent callbacks are defined, then the default function
+% returns [] (don't open any shards). If any subsequent callbacks are defined,
+% the default action is to return all the shards in the list.
+-callback shards(St :: term(), [#shard{}]) ->
+    {[#shard{}], St1 :: term()}.
+
+% Optional
+-callback db_opened(St :: term(), Db :: term()) ->
+    {ok, St :: term()}.
+
+% Optional. If doc is not defined, then ddoc_id default action is {skip, St}.
+% If it is defined, the default action is {ok, St}.
+-callback doc_id(St :: term(), DocId :: binary(), Db :: term()) ->
+    {ok | skip | stop, St1 :: term()}.
+
+% Optional.
+-callback doc(St :: term(), Db :: term(), #doc{}) ->
+    {ok | stop, St1 :: term()}.
+
+% Optional.
+-callback db_closing(St :: term(), Db :: term()) ->
+    {ok, St1 :: term()}.
+
+-optional_callbacks([
+    complete/1,
+    checkpoint/1,
+    ddoc/3,
+    shards/2,
+    db_opened/2,
+    doc_id/3,
+    doc/3,
+    db_closing/2
+]).
+
+-define(CALLBACKS, [
+    {start, 2, fun required_callback/3},
+    {resume, 2, fun required_callback/3},
+    {complete, 1, fun default_complete/3},
+    {checkpoint, 1, fun default_checkpoint/3},
+    {db, 2, fun required_callback/3},
+    {ddoc, 3, fun default_ddoc/3},
+    {shards, 2, fun default_shards/3},
+    {db_opened, 2, fun default_db_opened/3},
+    {doc_id, 3, fun default_doc_id/3},
+    {doc, 3, fun default_doc/3},
+    {db_closing, 2, fun default_db_closing/3}
+]).
+
+-define(CHECKPOINT_INTERVAL_SEC, 10).
+-define(STOP_TIMEOUT_SEC, 5).
+
+-record(st, {
+    id,
+    rlimiter,
+    scan_id,
+    mod,
+    callbacks = #{},
+    pst,
+    dbname,
+    cursor,
+    shards_db,
+    db,
+    checkpoint_sec = 0,
+    start_sec = 0,
+    skip_dbs,
+    skip_ddocs,
+    skip_docs
+}).
+
+spawn_link(Id, RLimiter) ->
+    proc_lib:spawn_link(?MODULE, run, [Id, RLimiter]).
+
+stop(Pid) when is_pid(Pid) ->
+    unlink(Pid),
+    Ref = erlang:monitor(process, Pid),
+    Pid ! stop,
+    receive
+        {'DOWN', Ref, _, _, _} -> ok
+    after ?STOP_TIMEOUT_SEC * 1000 ->
+        exit(Pid, kill),
+        receive
+            {'DOWN', Ref, _, _, _} -> ok
+        end
+    end,
+    ok.
+
+% Main run function
+
+run(Id, RLimiter) ->
+    {Mod, Callbacks} = plugin_mod(Id),
+    St = #st{
+        id = Id,
+        mod = Mod,
+        callbacks = Callbacks,
+        rlimiter = RLimiter
+    },
+    St1 = init_config(St),
+    St2 = init_from_checkpoint(St1),
+    St3 = scan_dbs(St2),
+    finalize(St3).
+
+% Private functions
+
+init_config(#st{mod = Mod} = St) ->
+    St#st{
+        skip_dbs = config_match_patterns(Mod, "skip_dbs"),
+        skip_ddocs = config_match_patterns(Mod, "skip_ddocs"),
+        skip_docs = config_match_patterns(Mod, "skip_docs")
+    }.
+
+init_from_checkpoint(#st{} = St) ->
+    #st{id = Id, mod = Mod, callbacks = Cbks} = St,
+    case couch_scanner_checkpoint:read(Id) of
+        #{
+            <<"state">> := <<"running">>,
+            <<"cursor">> := Cur,
+            <<"scan_id">> := SId,
+            <<"pst">> := EJsonPSt,
+            <<"start_sec">> := StartSec
+        } ->
+            Now = tsec(),
+            PSt = resume_callback(Cbks, SId, EJsonPSt),
+            St#st{
+                pst = PSt,
+                cursor = Cur,
+                checkpoint_sec = Now,
+                start_sec = StartSec,
+                scan_id = SId
+            };
+        not_found ->
+            SId = couch_scanner_util:new_scan_id(),
+            Now = tsec(),
+            LastStartSec = 0,
+            Cur = <<>>,
+            PSt = start_callback(Mod, Cbks, Now, SId, LastStartSec, #{}),
+            ok = start_checkpoint(Id, Cbks, Now, SId, Cur, PSt),
+            St#st{
+                pst = PSt,
+                cursor = Cur,
+                checkpoint_sec = 0,
+                start_sec = Now,
+                scan_id = SId
+            };
+        #{
+            <<"state">> := <<"finished">>,
+            <<"pst">> := EJson,
+            <<"start_sec">> := LastStartSec
+        } ->
+            SId = couch_scanner_util:new_scan_id(),
+            Now = tsec(),
+            Cur = <<>>,
+            PSt = start_callback(Mod, Cbks, Now, SId, LastStartSec, EJson),
+            ok = start_checkpoint(Id, Cbks, Now, SId, Cur, PSt),
+            St#st{
+                pst = PSt,
+                cursor = Cur,
+                checkpoint_sec = Now,
+                start_sec = Now,
+                scan_id = SId
+            }
+    end.
+
+scan_dbs(#st{cursor = Cursor} = St) ->
+    DbsDbName = mem3_sync:shards_db(),

Review Comment:
   Good idea. Updated it with the better name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to