This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 74ba8699f0f77eb6617796aab75916f4588bf13f
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Jul 8 00:28:00 2025 -0400

    Make replicator shutdown a bit more orderly
    
    Attempt to do a more orderly shutdown of the replication jobs. Give the 
jobs a
    limited amount of time to shut down, which might include perfoming another
    checkpoint.
    
    Notify all jobs in parallel to shutdown to give them a chance to do a final
    checkpoint, and then move on. In total wait is just a bit less than
    the (default) worker shutdown time.
    
    This should hopefully induce a bit less load during startup as the actively
    replicating jobs might not need to rewind as far back. It should also reduce
    some logging noise since fewer jobs should crash in unpredictable ways.
---
 .../src/couch_replicator_scheduler.erl             | 885 ++++++++++-----------
 1 file changed, 434 insertions(+), 451 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl 
b/src/couch_replicator/src/couch_replicator_scheduler.erl
index a3aa7601d..379a42b38 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -71,6 +71,13 @@
 -define(DEFAULT_MAX_HISTORY, 20).
 -define(DEFAULT_SCHEDULER_INTERVAL, 60000).
 
+% Worker children get a default 5 second shutdown timeout, so pick a value just
+% a bit less than that: 4.5 seconds. In couch_replicator_sup our scheduler
+% worker doesn't specify the timeout, so it up picks ups the OTP default of 5
+% seconds https://www.erlang.org/doc/system/sup_princ.html#child-specification
+%
+-define(TERMINATE_SHUTDOWN_TIME, 4500).
+
 -record(state, {
     interval = ?DEFAULT_SCHEDULER_INTERVAL,
     timer,
@@ -346,6 +353,7 @@ handle_info(_, State) ->
     {noreply, State}.
 
 terminate(_Reason, _State) ->
+    stop_clear_all_jobs(?TERMINATE_SHUTDOWN_TIME),
     couch_replicator_share:clear(),
     ok.
 
@@ -387,6 +395,33 @@ handle_config_terminate(_, _, _) ->
 
 %% Private functions
 
+% Stop jobs in parallel
+%
+stop_clear_all_jobs(TimeLeftMSec) ->
+    ShutdownFun = fun(#job{pid = Pid}) ->
+        Pid ! shutdown,
+        erlang:monitor(process, Pid)
+    end,
+    Refs = lists:map(ShutdownFun, running_jobs()),
+    ets:delete_all_objects(?MODULE),
+    wait_jobs_stop(TimeLeftMSec, Refs).
+
+wait_jobs_stop(_, []) ->
+    ok;
+wait_jobs_stop(TimeLeftMSec, _) when TimeLeftMSec =< 0 ->
+    % If some survive a bit longer we let them finish checkpointing.
+    timeout;
+wait_jobs_stop(TimeLeftMsec, [Ref | Refs]) ->
+    T0 = erlang:monotonic_time(),
+    receive
+        {'DOWN', Ref, _, _, _} ->
+            Dt = erlang:monotonic_time() - T0,
+            DtMSec = erlang:convert_time_unit(Dt, native, millisecond),
+            wait_jobs_stop(TimeLeftMsec - DtMSec, Refs)
+    after TimeLeftMsec ->
+        ok
+    end.
+
 % Handle crashed jobs. Handling differs between transient and permanent jobs.
 % Transient jobs are those posted to the _replicate endpoint. They don't have a
 % db associated with them. When those jobs crash, they are not restarted. That
@@ -946,7 +981,7 @@ existing_replication(#rep{} = NewRep) ->
 
 -ifdef(TEST).
 
--include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
 
 backoff_micros_test_() ->
     BaseInterval = ?BACKOFF_INTERVAL_MICROS,
@@ -1045,464 +1080,412 @@ scheduler_test_() ->
             fun setup/0,
             fun teardown/1,
             [
-                t_pending_jobs_simple(),
-                t_pending_jobs_skip_crashed(),
-                t_pending_jobs_skip_running(),
-                t_one_job_starts(),
-                t_no_jobs_start_if_max_is_0(),
-                t_one_job_starts_if_max_is_1(),
-                t_max_churn_does_not_throttle_initial_start(),
-                t_excess_oneshot_only_jobs(),
-                t_excess_continuous_only_jobs(),
-                t_excess_prefer_continuous_first(),
-                t_stop_oldest_first(),
-                t_start_oldest_first(),
-                t_jobs_churn_even_if_not_all_max_jobs_are_running(),
-                t_jobs_dont_churn_if_there_are_available_running_slots(),
-                t_start_only_pending_jobs_do_not_churn_existing_ones(),
-                t_dont_stop_if_nothing_pending(),
-                t_max_churn_limits_number_of_rotated_jobs(),
-                t_existing_jobs(),
-                t_if_pending_less_than_running_start_all_pending(),
-                t_running_less_than_pending_swap_all_running(),
-                t_oneshot_dont_get_rotated(),
-                t_rotate_continuous_only_if_mixed(),
-                t_oneshot_dont_get_starting_priority(),
-                t_oneshot_will_hog_the_scheduler(),
-                t_if_excess_is_trimmed_rotation_still_happens(),
-                t_if_transient_job_crashes_it_gets_removed(),
-                t_if_permanent_job_crashes_it_stays_in_ets(),
-                t_job_summary_running(),
-                t_job_summary_pending(),
-                t_job_summary_crashing_once(),
-                t_job_summary_crashing_many_times(),
-                t_job_summary_proxy_fields()
+                ?TDEF_FE(t_pending_jobs_simple),
+                ?TDEF_FE(t_pending_jobs_skip_crashed),
+                ?TDEF_FE(t_pending_jobs_skip_running),
+                ?TDEF_FE(t_one_job_starts),
+                ?TDEF_FE(t_no_jobs_start_if_max_is_0),
+                ?TDEF_FE(t_one_job_starts_if_max_is_1),
+                ?TDEF_FE(t_max_churn_does_not_throttle_initial_start),
+                ?TDEF_FE(t_excess_oneshot_only_jobs),
+                ?TDEF_FE(t_excess_continuous_only_jobs),
+                ?TDEF_FE(t_excess_prefer_continuous_first),
+                ?TDEF_FE(t_stop_oldest_first),
+                ?TDEF_FE(t_start_oldest_first),
+                ?TDEF_FE(t_jobs_churn_even_if_not_all_max_jobs_are_running),
+                
?TDEF_FE(t_jobs_dont_churn_if_there_are_available_running_slots),
+                ?TDEF_FE(t_start_only_pending_jobs_do_not_churn_existing_ones),
+                ?TDEF_FE(t_dont_stop_if_nothing_pending),
+                ?TDEF_FE(t_max_churn_limits_number_of_rotated_jobs),
+                ?TDEF_FE(t_existing_jobs),
+                ?TDEF_FE(t_if_pending_less_than_running_start_all_pending),
+                ?TDEF_FE(t_running_less_than_pending_swap_all_running),
+                ?TDEF_FE(t_oneshot_dont_get_rotated),
+                ?TDEF_FE(t_rotate_continuous_only_if_mixed),
+                ?TDEF_FE(t_oneshot_dont_get_starting_priority),
+                ?TDEF_FE(t_oneshot_will_hog_the_scheduler),
+                ?TDEF_FE(t_if_excess_is_trimmed_rotation_still_happens),
+                ?TDEF_FE(t_if_transient_job_crashes_it_gets_removed),
+                ?TDEF_FE(t_if_permanent_job_crashes_it_stays_in_ets),
+                ?TDEF_FE(t_stop_all_stops_jobs),
+                ?TDEF_FE(t_job_summary_running),
+                ?TDEF_FE(t_job_summary_pending),
+                ?TDEF_FE(t_job_summary_crashing_once),
+                ?TDEF_FE(t_job_summary_crashing_many_times),
+                ?TDEF_FE(t_job_summary_proxy_fields)
             ]
         }
     }.
 
-t_pending_jobs_simple() ->
-    ?_test(begin
-        Job1 = oneshot(1),
-        Job2 = oneshot(2),
-        setup_jobs([Job2, Job1]),
-        ?assertEqual([], pending_jobs(0)),
-        ?assertEqual([Job1], pending_jobs(1)),
-        ?assertEqual([Job1, Job2], pending_jobs(2)),
-        ?assertEqual([Job1, Job2], pending_jobs(3))
-    end).
-
-t_pending_jobs_skip_crashed() ->
-    ?_test(begin
-        Job = oneshot(1),
-        Ts = os:timestamp(),
-        History = [crashed(Ts), started(Ts) | Job#job.history],
-        Job1 = Job#job{history = History},
-        Job2 = oneshot(2),
-        Job3 = oneshot(3),
-        setup_jobs([Job2, Job1, Job3]),
-        ?assertEqual([Job2], pending_jobs(1)),
-        ?assertEqual([Job2, Job3], pending_jobs(2)),
-        ?assertEqual([Job2, Job3], pending_jobs(3))
-    end).
-
-t_pending_jobs_skip_running() ->
-    ?_test(begin
-        Job1 = continuous(1),
-        Job2 = continuous_running(2),
-        Job3 = oneshot(3),
-        Job4 = oneshot_running(4),
-        Jobs = [Job1, Job2, Job3, Job4],
-        setup_jobs(Jobs),
-        ?assertEqual([Job1, Job3], pending_jobs(4))
-    end).
-
-t_one_job_starts() ->
-    ?_test(begin
-        setup_jobs([oneshot(1)]),
-        ?assertEqual({0, 1}, run_stop_count()),
-        reschedule(mock_state(?DEFAULT_MAX_JOBS)),
-        ?assertEqual({1, 0}, run_stop_count())
-    end).
-
-t_no_jobs_start_if_max_is_0() ->
-    ?_test(begin
-        setup_jobs([oneshot(1)]),
-        reschedule(mock_state(0)),
-        ?assertEqual({0, 1}, run_stop_count())
-    end).
-
-t_one_job_starts_if_max_is_1() ->
-    ?_test(begin
-        setup_jobs([oneshot(1), oneshot(2)]),
-        reschedule(mock_state(1)),
-        ?assertEqual({1, 1}, run_stop_count())
-    end).
-
-t_max_churn_does_not_throttle_initial_start() ->
-    ?_test(begin
-        setup_jobs([oneshot(1), oneshot(2)]),
-        reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)),
-        ?assertEqual({2, 0}, run_stop_count())
-    end).
-
-t_excess_oneshot_only_jobs() ->
-    ?_test(begin
-        setup_jobs([oneshot_running(1), oneshot_running(2)]),
-        ?assertEqual({2, 0}, run_stop_count()),
-        reschedule(mock_state(1)),
-        ?assertEqual({1, 1}, run_stop_count()),
-        reschedule(mock_state(0)),
-        ?assertEqual({0, 2}, run_stop_count())
-    end).
-
-t_excess_continuous_only_jobs() ->
-    ?_test(begin
-        setup_jobs([continuous_running(1), continuous_running(2)]),
-        ?assertEqual({2, 0}, run_stop_count()),
-        reschedule(mock_state(1)),
-        ?assertEqual({1, 1}, run_stop_count()),
-        reschedule(mock_state(0)),
-        ?assertEqual({0, 2}, run_stop_count())
-    end).
-
-t_excess_prefer_continuous_first() ->
-    ?_test(begin
-        Jobs = [
-            continuous_running(1),
-            oneshot_running(2),
-            continuous_running(3)
-        ],
-        setup_jobs(Jobs),
-        ?assertEqual({3, 0}, run_stop_count()),
-        ?assertEqual({1, 0}, oneshot_run_stop_count()),
-        reschedule(mock_state(2)),
-        ?assertEqual({2, 1}, run_stop_count()),
-        ?assertEqual({1, 0}, oneshot_run_stop_count()),
-        reschedule(mock_state(1)),
-        ?assertEqual({1, 0}, oneshot_run_stop_count()),
-        reschedule(mock_state(0)),
-        ?assertEqual({0, 1}, oneshot_run_stop_count())
-    end).
-
-t_stop_oldest_first() ->
-    ?_test(begin
-        Jobs = [
-            continuous_running(7),
-            continuous_running(4),
-            continuous_running(5)
-        ],
-        setup_jobs(Jobs),
-        reschedule(mock_state(2, 1)),
-        ?assertEqual({2, 1}, run_stop_count()),
-        ?assertEqual([4], jobs_stopped()),
-        reschedule(mock_state(1, 1)),
-        ?assertEqual([7], jobs_running())
-    end).
-
-t_start_oldest_first() ->
-    ?_test(begin
-        setup_jobs([continuous(7), continuous(2), continuous(5)]),
-        reschedule(mock_state(1)),
-        ?assertEqual({1, 2}, run_stop_count()),
-        ?assertEqual([2], jobs_running()),
-        reschedule(mock_state(2)),
-        ?assertEqual({2, 1}, run_stop_count()),
-        % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
-        % be running.
-        ?assertEqual([2], jobs_stopped())
-    end).
-
-t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
-    ?_test(begin
-        setup_jobs([
-            continuous_running(7),
-            continuous(2),
-            continuous(5)
-        ]),
-        reschedule(mock_state(2, 2)),
-        ?assertEqual({2, 1}, run_stop_count()),
-        ?assertEqual([7], jobs_stopped())
-    end).
-
-t_jobs_dont_churn_if_there_are_available_running_slots() ->
-    ?_test(begin
-        setup_jobs([
-            continuous_running(1),
-            continuous_running(2)
-        ]),
-        reschedule(mock_state(2, 2)),
-        ?assertEqual({2, 0}, run_stop_count()),
-        ?assertEqual([], jobs_stopped()),
-        ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_job, 
start_link, 1))
-    end).
-
-t_start_only_pending_jobs_do_not_churn_existing_ones() ->
-    ?_test(begin
-        setup_jobs([
-            continuous(1),
-            continuous_running(2)
-        ]),
-        reschedule(mock_state(2, 2)),
-        ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_job, 
start_link, 1)),
-        ?assertEqual([], jobs_stopped()),
-        ?assertEqual({2, 0}, run_stop_count())
-    end).
-
-t_dont_stop_if_nothing_pending() ->
-    ?_test(begin
-        setup_jobs([continuous_running(1), continuous_running(2)]),
-        reschedule(mock_state(2)),
-        ?assertEqual({2, 0}, run_stop_count())
-    end).
-
-t_max_churn_limits_number_of_rotated_jobs() ->
-    ?_test(begin
-        Jobs = [
-            continuous(1),
-            continuous_running(2),
-            continuous(3),
-            continuous_running(4)
-        ],
-        setup_jobs(Jobs),
-        reschedule(mock_state(2, 1)),
-        ?assertEqual([2, 3], jobs_stopped())
-    end).
-
-t_if_pending_less_than_running_start_all_pending() ->
-    ?_test(begin
-        Jobs = [
-            continuous(1),
-            continuous_running(2),
-            continuous(3),
-            continuous_running(4),
-            continuous_running(5)
-        ],
-        setup_jobs(Jobs),
-        reschedule(mock_state(3)),
-        ?assertEqual([1, 2, 5], jobs_running())
-    end).
-
-t_running_less_than_pending_swap_all_running() ->
-    ?_test(begin
-        Jobs = [
-            continuous(1),
-            continuous(2),
-            continuous(3),
-            continuous_running(4),
-            continuous_running(5)
-        ],
-        setup_jobs(Jobs),
-        reschedule(mock_state(2)),
-        ?assertEqual([3, 4, 5], jobs_stopped())
-    end).
-
-t_oneshot_dont_get_rotated() ->
-    ?_test(begin
-        setup_jobs([oneshot_running(1), continuous(2)]),
-        reschedule(mock_state(1)),
-        ?assertEqual([1], jobs_running())
-    end).
-
-t_rotate_continuous_only_if_mixed() ->
-    ?_test(begin
-        setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]),
-        reschedule(mock_state(2)),
-        ?assertEqual([1, 2], jobs_running())
-    end).
-
-t_oneshot_dont_get_starting_priority() ->
-    ?_test(begin
-        setup_jobs([continuous(1), oneshot(2), continuous_running(3)]),
-        reschedule(mock_state(1)),
-        ?assertEqual([1], jobs_running())
-    end).
+t_pending_jobs_simple(_) ->
+    Job1 = oneshot(1),
+    Job2 = oneshot(2),
+    setup_jobs([Job2, Job1]),
+    ?assertEqual([], pending_jobs(0)),
+    ?assertEqual([Job1], pending_jobs(1)),
+    ?assertEqual([Job1, Job2], pending_jobs(2)),
+    ?assertEqual([Job1, Job2], pending_jobs(3)).
+
+t_pending_jobs_skip_crashed(_) ->
+    Job = oneshot(1),
+    Ts = os:timestamp(),
+    History = [crashed(Ts), started(Ts) | Job#job.history],
+    Job1 = Job#job{history = History},
+    Job2 = oneshot(2),
+    Job3 = oneshot(3),
+    setup_jobs([Job2, Job1, Job3]),
+    ?assertEqual([Job2], pending_jobs(1)),
+    ?assertEqual([Job2, Job3], pending_jobs(2)),
+    ?assertEqual([Job2, Job3], pending_jobs(3)).
+
+t_pending_jobs_skip_running(_) ->
+    Job1 = continuous(1),
+    Job2 = continuous_running(2),
+    Job3 = oneshot(3),
+    Job4 = oneshot_running(4),
+    Jobs = [Job1, Job2, Job3, Job4],
+    setup_jobs(Jobs),
+    ?assertEqual([Job1, Job3], pending_jobs(4)).
+
+t_one_job_starts(_) ->
+    setup_jobs([oneshot(1)]),
+    ?assertEqual({0, 1}, run_stop_count()),
+    reschedule(mock_state(?DEFAULT_MAX_JOBS)),
+    ?assertEqual({1, 0}, run_stop_count()).
+
+t_no_jobs_start_if_max_is_0(_) ->
+    setup_jobs([oneshot(1)]),
+    reschedule(mock_state(0)),
+    ?assertEqual({0, 1}, run_stop_count()).
+
+t_one_job_starts_if_max_is_1(_) ->
+    setup_jobs([oneshot(1), oneshot(2)]),
+    reschedule(mock_state(1)),
+    ?assertEqual({1, 1}, run_stop_count()).
+
+t_max_churn_does_not_throttle_initial_start(_) ->
+    setup_jobs([oneshot(1), oneshot(2)]),
+    reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)),
+    ?assertEqual({2, 0}, run_stop_count()).
+
+t_excess_oneshot_only_jobs(_) ->
+    setup_jobs([oneshot_running(1), oneshot_running(2)]),
+    ?assertEqual({2, 0}, run_stop_count()),
+    reschedule(mock_state(1)),
+    ?assertEqual({1, 1}, run_stop_count()),
+    reschedule(mock_state(0)),
+    ?assertEqual({0, 2}, run_stop_count()).
+
+t_excess_continuous_only_jobs(_) ->
+    setup_jobs([continuous_running(1), continuous_running(2)]),
+    ?assertEqual({2, 0}, run_stop_count()),
+    reschedule(mock_state(1)),
+    ?assertEqual({1, 1}, run_stop_count()),
+    reschedule(mock_state(0)),
+    ?assertEqual({0, 2}, run_stop_count()).
+
+t_excess_prefer_continuous_first(_) ->
+    Jobs = [
+        continuous_running(1),
+        oneshot_running(2),
+        continuous_running(3)
+    ],
+    setup_jobs(Jobs),
+    ?assertEqual({3, 0}, run_stop_count()),
+    ?assertEqual({1, 0}, oneshot_run_stop_count()),
+    reschedule(mock_state(2)),
+    ?assertEqual({2, 1}, run_stop_count()),
+    ?assertEqual({1, 0}, oneshot_run_stop_count()),
+    reschedule(mock_state(1)),
+    ?assertEqual({1, 0}, oneshot_run_stop_count()),
+    reschedule(mock_state(0)),
+    ?assertEqual({0, 1}, oneshot_run_stop_count()).
+
+t_stop_oldest_first(_) ->
+    Jobs = [
+        continuous_running(7),
+        continuous_running(4),
+        continuous_running(5)
+    ],
+    setup_jobs(Jobs),
+    reschedule(mock_state(2, 1)),
+    ?assertEqual({2, 1}, run_stop_count()),
+    ?assertEqual([4], jobs_stopped()),
+    reschedule(mock_state(1, 1)),
+    ?assertEqual([7], jobs_running()).
+
+t_start_oldest_first(_) ->
+    setup_jobs([continuous(7), continuous(2), continuous(5)]),
+    reschedule(mock_state(1)),
+    ?assertEqual({1, 2}, run_stop_count()),
+    ?assertEqual([2], jobs_running()),
+    reschedule(mock_state(2)),
+    ?assertEqual({2, 1}, run_stop_count()),
+    % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
+    % be running.
+    ?assertEqual([2], jobs_stopped()).
+
+t_jobs_churn_even_if_not_all_max_jobs_are_running(_) ->
+    setup_jobs([
+        continuous_running(7),
+        continuous(2),
+        continuous(5)
+    ]),
+    reschedule(mock_state(2, 2)),
+    ?assertEqual({2, 1}, run_stop_count()),
+    ?assertEqual([7], jobs_stopped()).
+
+t_jobs_dont_churn_if_there_are_available_running_slots(_) ->
+    setup_jobs([
+        continuous_running(1),
+        continuous_running(2)
+    ]),
+    reschedule(mock_state(2, 2)),
+    ?assertEqual({2, 0}, run_stop_count()),
+    ?assertEqual([], jobs_stopped()),
+    ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_job, start_link, 
1)).
+
+t_start_only_pending_jobs_do_not_churn_existing_ones(_) ->
+    setup_jobs([
+        continuous(1),
+        continuous_running(2)
+    ]),
+    reschedule(mock_state(2, 2)),
+    ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_job, start_link, 
1)),
+    ?assertEqual([], jobs_stopped()),
+    ?assertEqual({2, 0}, run_stop_count()).
+
+t_dont_stop_if_nothing_pending(_) ->
+    setup_jobs([continuous_running(1), continuous_running(2)]),
+    reschedule(mock_state(2)),
+    ?assertEqual({2, 0}, run_stop_count()).
+
+t_max_churn_limits_number_of_rotated_jobs(_) ->
+    Jobs = [
+        continuous(1),
+        continuous_running(2),
+        continuous(3),
+        continuous_running(4)
+    ],
+    setup_jobs(Jobs),
+    reschedule(mock_state(2, 1)),
+    ?assertEqual([2, 3], jobs_stopped()).
+
+t_if_pending_less_than_running_start_all_pending(_) ->
+    Jobs = [
+        continuous(1),
+        continuous_running(2),
+        continuous(3),
+        continuous_running(4),
+        continuous_running(5)
+    ],
+    setup_jobs(Jobs),
+    reschedule(mock_state(3)),
+    ?assertEqual([1, 2, 5], jobs_running()).
+
+t_running_less_than_pending_swap_all_running(_) ->
+    Jobs = [
+        continuous(1),
+        continuous(2),
+        continuous(3),
+        continuous_running(4),
+        continuous_running(5)
+    ],
+    setup_jobs(Jobs),
+    reschedule(mock_state(2)),
+    ?assertEqual([3, 4, 5], jobs_stopped()).
+
+t_oneshot_dont_get_rotated(_) ->
+    setup_jobs([oneshot_running(1), continuous(2)]),
+    reschedule(mock_state(1)),
+    ?assertEqual([1], jobs_running()).
+
+t_rotate_continuous_only_if_mixed(_) ->
+    setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]),
+    reschedule(mock_state(2)),
+    ?assertEqual([1, 2], jobs_running()).
+
+t_oneshot_dont_get_starting_priority(_) ->
+    setup_jobs([continuous(1), oneshot(2), continuous_running(3)]),
+    reschedule(mock_state(1)),
+    ?assertEqual([1], jobs_running()).
 
 % This tested in other test cases, it is here to mainly make explicit a 
property
 % of one-shot replications -- they can starve other jobs if they "take control"
 % of all the available scheduler slots.
-t_oneshot_will_hog_the_scheduler() ->
-    ?_test(begin
-        Jobs = [
-            oneshot_running(1),
-            oneshot_running(2),
-            oneshot(3),
-            continuous(4)
-        ],
-        setup_jobs(Jobs),
-        reschedule(mock_state(2)),
-        ?assertEqual([1, 2], jobs_running())
-    end).
-
-t_if_excess_is_trimmed_rotation_still_happens() ->
-    ?_test(begin
-        Jobs = [
-            continuous(1),
-            continuous_running(2),
-            continuous_running(3)
-        ],
-        setup_jobs(Jobs),
-        reschedule(mock_state(1)),
-        ?assertEqual([1], jobs_running())
-    end).
-
-t_if_transient_job_crashes_it_gets_removed() ->
-    ?_test(begin
-        Pid = mock_pid(),
-        Rep = continuous_rep(),
-        Job = #job{
-            id = job1,
-            pid = Pid,
-            history = [added()],
-            rep = Rep#rep{db_name = null}
-        },
-        setup_jobs([Job]),
-        ?assertEqual(1, ets:info(?MODULE, size)),
-        State = #state{max_history = 3, stats_pid = self()},
-        {noreply, State} = handle_info(
-            {'EXIT', Pid, failed},
-            State
-        ),
-        ?assertEqual(0, ets:info(?MODULE, size))
-    end).
-
-t_if_permanent_job_crashes_it_stays_in_ets() ->
-    ?_test(begin
-        Pid = mock_pid(),
-        Rep = continuous_rep(),
-        Job = #job{
-            id = job1,
-            pid = Pid,
-            history = [added()],
-            rep = Rep#rep{db_name = <<"db1">>}
-        },
-        setup_jobs([Job]),
-        ?assertEqual(1, ets:info(?MODULE, size)),
-        State = #state{
-            max_jobs = 1,
-            max_history = 3,
-            stats_pid = self()
-        },
-        {noreply, State} = handle_info(
-            {'EXIT', Pid, failed},
-            State
-        ),
-        ?assertEqual(1, ets:info(?MODULE, size)),
-        [Job1] = ets:lookup(?MODULE, job1),
-        [Latest | _] = Job1#job.history,
-        ?assertMatch({{crashed, failed}, _}, Latest)
-    end).
-
-t_existing_jobs() ->
-    ?_test(begin
-        Rep0 = continuous_rep(<<"s">>, <<"t">>),
-        Rep = Rep0#rep{id = job1, db_name = <<"db">>},
-        setup_jobs([#job{id = Rep#rep.id, rep = Rep}]),
-        NewRep0 = continuous_rep(<<"s">>, <<"t">>),
-        NewRep = NewRep0#rep{id = Rep#rep.id, db_name = <<"db">>},
-        ?assert(existing_replication(NewRep)),
-        ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})),
-        ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})),
-        ?assertNot(existing_replication(NewRep#rep{options = []}))
-    end).
-
-t_job_summary_running() ->
-    ?_test(begin
-        Rep = rep(<<"s">>, <<"t">>),
-        Job = #job{
-            id = job1,
-            pid = mock_pid(),
-            history = [added()],
-            rep = Rep#rep{db_name = <<"db1">>}
-        },
-        setup_jobs([Job]),
-        Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
-        ?assertEqual(running, proplists:get_value(state, Summary)),
-        ?assertEqual(null, proplists:get_value(info, Summary)),
-        ?assertEqual(0, proplists:get_value(error_count, Summary)),
-
-        Stats = [{source_seq, <<"1-abc">>}],
-        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
-        Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
-        ?assertEqual({Stats}, proplists:get_value(info, Summary1))
-    end).
-
-t_job_summary_pending() ->
-    ?_test(begin
-        Job = #job{
-            id = job1,
-            pid = undefined,
-            history = [stopped(20), started(10), added()],
-            rep = rep(<<"s">>, <<"t">>)
-        },
-        setup_jobs([Job]),
-        Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
-        ?assertEqual(pending, proplists:get_value(state, Summary)),
-        ?assertEqual(null, proplists:get_value(info, Summary)),
-        ?assertEqual(0, proplists:get_value(error_count, Summary)),
-
-        Stats = [{doc_write_failures, 1}],
-        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
-        Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
-        ?assertEqual({Stats}, proplists:get_value(info, Summary1))
-    end).
-
-t_job_summary_crashing_once() ->
-    ?_test(begin
-        Job = #job{
-            id = job1,
-            history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)],
-            rep = rep(<<"s">>, <<"t">>)
-        },
-        setup_jobs([Job]),
-        Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
-        ?assertEqual(crashing, proplists:get_value(state, Summary)),
-        Info = proplists:get_value(info, Summary),
-        ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
-        ?assertEqual(0, proplists:get_value(error_count, Summary))
-    end).
-
-t_job_summary_crashing_many_times() ->
-    ?_test(begin
-        Job = #job{
-            id = job1,
-            history = [crashed(4), started(3), crashed(2), started(1)],
-            rep = rep(<<"s">>, <<"t">>)
-        },
-        setup_jobs([Job]),
-        Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
-        ?assertEqual(crashing, proplists:get_value(state, Summary)),
-        Info = proplists:get_value(info, Summary),
-        ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
-        ?assertEqual(2, proplists:get_value(error_count, Summary))
-    end).
-
-t_job_summary_proxy_fields() ->
-    ?_test(begin
-        Src = #httpdb{
-            url = "https://s";,
-            proxy_url = "http://u:p@sproxy:12";
-        },
-        Tgt = #httpdb{
-            url = "http://t";,
-            proxy_url = "socks5://u:p@tproxy:34"
-        },
-        Job = #job{
-            id = job1,
-            history = [started(10), added()],
-            rep = rep(Src, Tgt)
-        },
-        setup_jobs([Job]),
-        Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
-        ?assertEqual(
-            <<"http://u:*****@sproxy:12";>>,
-            proplists:get_value(source_proxy, Summary)
-        ),
-        ?assertEqual(
-            <<"socks5://u:*****@tproxy:34">>,
-            proplists:get_value(target_proxy, Summary)
-        )
-    end).
+t_oneshot_will_hog_the_scheduler(_) ->
+    Jobs = [
+        oneshot_running(1),
+        oneshot_running(2),
+        oneshot(3),
+        continuous(4)
+    ],
+    setup_jobs(Jobs),
+    reschedule(mock_state(2)),
+    ?assertEqual([1, 2], jobs_running()).
+
+t_if_excess_is_trimmed_rotation_still_happens(_) ->
+    Jobs = [
+        continuous(1),
+        continuous_running(2),
+        continuous_running(3)
+    ],
+    setup_jobs(Jobs),
+    reschedule(mock_state(1)).
+
+t_if_transient_job_crashes_it_gets_removed(_) ->
+    Pid = mock_pid(),
+    Rep = continuous_rep(),
+    Job = #job{
+        id = job1,
+        pid = Pid,
+        history = [added()],
+        rep = Rep#rep{db_name = null}
+    },
+    setup_jobs([Job]),
+    ?assertEqual(1, ets:info(?MODULE, size)),
+    State = #state{max_history = 3, stats_pid = self()},
+    {noreply, State} = handle_info(
+        {'EXIT', Pid, failed},
+        State
+    ),
+    ?assertEqual(0, ets:info(?MODULE, size)).
+
+t_if_permanent_job_crashes_it_stays_in_ets(_) ->
+    Pid = mock_pid(),
+    Rep = continuous_rep(),
+    Job = #job{
+        id = job1,
+        pid = Pid,
+        history = [added()],
+        rep = Rep#rep{db_name = <<"db1">>}
+    },
+    setup_jobs([Job]),
+    ?assertEqual(1, ets:info(?MODULE, size)),
+    State = #state{
+        max_jobs = 1,
+        max_history = 3,
+        stats_pid = self()
+    },
+    {noreply, State} = handle_info(
+        {'EXIT', Pid, failed},
+        State
+    ),
+    ?assertEqual(1, ets:info(?MODULE, size)),
+    [Job1] = ets:lookup(?MODULE, job1),
+    [Latest | _] = Job1#job.history,
+    ?assertMatch({{crashed, failed}, _}, Latest).
+
+t_stop_all_stops_jobs(_) ->
+    Jobs = [
+        oneshot_running(1),
+        oneshot_running(2),
+        oneshot(3),
+        continuous(4)
+    ],
+    setup_jobs(Jobs),
+    ?assertEqual(ok, stop_clear_all_jobs(?TERMINATE_SHUTDOWN_TIME)),
+    ?assertEqual([], jobs_running()),
+    ?assertEqual(0, ets:info(?MODULE, size)).
+
+t_existing_jobs(_) ->
+    Rep0 = continuous_rep(<<"s">>, <<"t">>),
+    Rep = Rep0#rep{id = job1, db_name = <<"db">>},
+    setup_jobs([#job{id = Rep#rep.id, rep = Rep}]),
+    NewRep0 = continuous_rep(<<"s">>, <<"t">>),
+    NewRep = NewRep0#rep{id = Rep#rep.id, db_name = <<"db">>},
+    ?assert(existing_replication(NewRep)),
+    ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})),
+    ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})),
+    ?assertNot(existing_replication(NewRep#rep{options = []})).
+
+t_job_summary_running(_) ->
+    Rep = rep(<<"s">>, <<"t">>),
+    Job = #job{
+        id = job1,
+        pid = mock_pid(),
+        history = [added()],
+        rep = Rep#rep{db_name = <<"db1">>}
+    },
+    setup_jobs([Job]),
+    Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+    ?assertEqual(running, proplists:get_value(state, Summary)),
+    ?assertEqual(null, proplists:get_value(info, Summary)),
+    ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+    Stats = [{source_seq, <<"1-abc">>}],
+    handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+    Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+    ?assertEqual({Stats}, proplists:get_value(info, Summary1)).
+
+t_job_summary_pending(_) ->
+    Job = #job{
+        id = job1,
+        pid = undefined,
+        history = [stopped(20), started(10), added()],
+        rep = rep(<<"s">>, <<"t">>)
+    },
+    setup_jobs([Job]),
+    Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+    ?assertEqual(pending, proplists:get_value(state, Summary)),
+    ?assertEqual(null, proplists:get_value(info, Summary)),
+    ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+    Stats = [{doc_write_failures, 1}],
+    handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+    Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+    ?assertEqual({Stats}, proplists:get_value(info, Summary1)).
+
+t_job_summary_crashing_once(_) ->
+    Job = #job{
+        id = job1,
+        history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)],
+        rep = rep(<<"s">>, <<"t">>)
+    },
+    setup_jobs([Job]),
+    Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+    ?assertEqual(crashing, proplists:get_value(state, Summary)),
+    Info = proplists:get_value(info, Summary),
+    ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
+    ?assertEqual(0, proplists:get_value(error_count, Summary)).
+
+t_job_summary_crashing_many_times(_) ->
+    Job = #job{
+        id = job1,
+        history = [crashed(4), started(3), crashed(2), started(1)],
+        rep = rep(<<"s">>, <<"t">>)
+    },
+    setup_jobs([Job]),
+    Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+    ?assertEqual(crashing, proplists:get_value(state, Summary)),
+    Info = proplists:get_value(info, Summary),
+    ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
+    ?assertEqual(2, proplists:get_value(error_count, Summary)).
+
+t_job_summary_proxy_fields(_) ->
+    Src = #httpdb{
+        url = "https://s";,
+        proxy_url = "http://u:p@sproxy:12";
+    },
+    Tgt = #httpdb{
+        url = "http://t";,
+        proxy_url = "socks5://u:p@tproxy:34"
+    },
+    Job = #job{
+        id = job1,
+        history = [started(10), added()],
+        rep = rep(Src, Tgt)
+    },
+    setup_jobs([Job]),
+    Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+    ?assertEqual(
+        <<"http://u:*****@sproxy:12";>>,
+        proplists:get_value(source_proxy, Summary)
+    ),
+    ?assertEqual(
+        <<"socks5://u:*****@tproxy:34">>,
+        proplists:get_value(target_proxy, Summary)
+    ).
 
 % Test helper functions
 

Reply via email to