This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 74ba8699f0f77eb6617796aab75916f4588bf13f Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Jul 8 00:28:00 2025 -0400 Make replicator shutdown a bit more orderly Attempt to do a more orderly shutdown of the replication jobs. Give the jobs a limited amount of time to shut down, which might include perfoming another checkpoint. Notify all jobs in parallel to shutdown to give them a chance to do a final checkpoint, and then move on. In total wait is just a bit less than the (default) worker shutdown time. This should hopefully induce a bit less load during startup as the actively replicating jobs might not need to rewind as far back. It should also reduce some logging noise since fewer jobs should crash in unpredictable ways. --- .../src/couch_replicator_scheduler.erl | 885 ++++++++++----------- 1 file changed, 434 insertions(+), 451 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index a3aa7601d..379a42b38 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -71,6 +71,13 @@ -define(DEFAULT_MAX_HISTORY, 20). -define(DEFAULT_SCHEDULER_INTERVAL, 60000). +% Worker children get a default 5 second shutdown timeout, so pick a value just +% a bit less than that: 4.5 seconds. In couch_replicator_sup our scheduler +% worker doesn't specify the timeout, so it up picks ups the OTP default of 5 +% seconds https://www.erlang.org/doc/system/sup_princ.html#child-specification +% +-define(TERMINATE_SHUTDOWN_TIME, 4500). + -record(state, { interval = ?DEFAULT_SCHEDULER_INTERVAL, timer, @@ -346,6 +353,7 @@ handle_info(_, State) -> {noreply, State}. terminate(_Reason, _State) -> + stop_clear_all_jobs(?TERMINATE_SHUTDOWN_TIME), couch_replicator_share:clear(), ok. @@ -387,6 +395,33 @@ handle_config_terminate(_, _, _) -> %% Private functions +% Stop jobs in parallel +% +stop_clear_all_jobs(TimeLeftMSec) -> + ShutdownFun = fun(#job{pid = Pid}) -> + Pid ! shutdown, + erlang:monitor(process, Pid) + end, + Refs = lists:map(ShutdownFun, running_jobs()), + ets:delete_all_objects(?MODULE), + wait_jobs_stop(TimeLeftMSec, Refs). + +wait_jobs_stop(_, []) -> + ok; +wait_jobs_stop(TimeLeftMSec, _) when TimeLeftMSec =< 0 -> + % If some survive a bit longer we let them finish checkpointing. + timeout; +wait_jobs_stop(TimeLeftMsec, [Ref | Refs]) -> + T0 = erlang:monotonic_time(), + receive + {'DOWN', Ref, _, _, _} -> + Dt = erlang:monotonic_time() - T0, + DtMSec = erlang:convert_time_unit(Dt, native, millisecond), + wait_jobs_stop(TimeLeftMsec - DtMSec, Refs) + after TimeLeftMsec -> + ok + end. + % Handle crashed jobs. Handling differs between transient and permanent jobs. % Transient jobs are those posted to the _replicate endpoint. They don't have a % db associated with them. When those jobs crash, they are not restarted. That @@ -946,7 +981,7 @@ existing_replication(#rep{} = NewRep) -> -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). backoff_micros_test_() -> BaseInterval = ?BACKOFF_INTERVAL_MICROS, @@ -1045,464 +1080,412 @@ scheduler_test_() -> fun setup/0, fun teardown/1, [ - t_pending_jobs_simple(), - t_pending_jobs_skip_crashed(), - t_pending_jobs_skip_running(), - t_one_job_starts(), - t_no_jobs_start_if_max_is_0(), - t_one_job_starts_if_max_is_1(), - t_max_churn_does_not_throttle_initial_start(), - t_excess_oneshot_only_jobs(), - t_excess_continuous_only_jobs(), - t_excess_prefer_continuous_first(), - t_stop_oldest_first(), - t_start_oldest_first(), - t_jobs_churn_even_if_not_all_max_jobs_are_running(), - t_jobs_dont_churn_if_there_are_available_running_slots(), - t_start_only_pending_jobs_do_not_churn_existing_ones(), - t_dont_stop_if_nothing_pending(), - t_max_churn_limits_number_of_rotated_jobs(), - t_existing_jobs(), - t_if_pending_less_than_running_start_all_pending(), - t_running_less_than_pending_swap_all_running(), - t_oneshot_dont_get_rotated(), - t_rotate_continuous_only_if_mixed(), - t_oneshot_dont_get_starting_priority(), - t_oneshot_will_hog_the_scheduler(), - t_if_excess_is_trimmed_rotation_still_happens(), - t_if_transient_job_crashes_it_gets_removed(), - t_if_permanent_job_crashes_it_stays_in_ets(), - t_job_summary_running(), - t_job_summary_pending(), - t_job_summary_crashing_once(), - t_job_summary_crashing_many_times(), - t_job_summary_proxy_fields() + ?TDEF_FE(t_pending_jobs_simple), + ?TDEF_FE(t_pending_jobs_skip_crashed), + ?TDEF_FE(t_pending_jobs_skip_running), + ?TDEF_FE(t_one_job_starts), + ?TDEF_FE(t_no_jobs_start_if_max_is_0), + ?TDEF_FE(t_one_job_starts_if_max_is_1), + ?TDEF_FE(t_max_churn_does_not_throttle_initial_start), + ?TDEF_FE(t_excess_oneshot_only_jobs), + ?TDEF_FE(t_excess_continuous_only_jobs), + ?TDEF_FE(t_excess_prefer_continuous_first), + ?TDEF_FE(t_stop_oldest_first), + ?TDEF_FE(t_start_oldest_first), + ?TDEF_FE(t_jobs_churn_even_if_not_all_max_jobs_are_running), + ?TDEF_FE(t_jobs_dont_churn_if_there_are_available_running_slots), + ?TDEF_FE(t_start_only_pending_jobs_do_not_churn_existing_ones), + ?TDEF_FE(t_dont_stop_if_nothing_pending), + ?TDEF_FE(t_max_churn_limits_number_of_rotated_jobs), + ?TDEF_FE(t_existing_jobs), + ?TDEF_FE(t_if_pending_less_than_running_start_all_pending), + ?TDEF_FE(t_running_less_than_pending_swap_all_running), + ?TDEF_FE(t_oneshot_dont_get_rotated), + ?TDEF_FE(t_rotate_continuous_only_if_mixed), + ?TDEF_FE(t_oneshot_dont_get_starting_priority), + ?TDEF_FE(t_oneshot_will_hog_the_scheduler), + ?TDEF_FE(t_if_excess_is_trimmed_rotation_still_happens), + ?TDEF_FE(t_if_transient_job_crashes_it_gets_removed), + ?TDEF_FE(t_if_permanent_job_crashes_it_stays_in_ets), + ?TDEF_FE(t_stop_all_stops_jobs), + ?TDEF_FE(t_job_summary_running), + ?TDEF_FE(t_job_summary_pending), + ?TDEF_FE(t_job_summary_crashing_once), + ?TDEF_FE(t_job_summary_crashing_many_times), + ?TDEF_FE(t_job_summary_proxy_fields) ] } }. -t_pending_jobs_simple() -> - ?_test(begin - Job1 = oneshot(1), - Job2 = oneshot(2), - setup_jobs([Job2, Job1]), - ?assertEqual([], pending_jobs(0)), - ?assertEqual([Job1], pending_jobs(1)), - ?assertEqual([Job1, Job2], pending_jobs(2)), - ?assertEqual([Job1, Job2], pending_jobs(3)) - end). - -t_pending_jobs_skip_crashed() -> - ?_test(begin - Job = oneshot(1), - Ts = os:timestamp(), - History = [crashed(Ts), started(Ts) | Job#job.history], - Job1 = Job#job{history = History}, - Job2 = oneshot(2), - Job3 = oneshot(3), - setup_jobs([Job2, Job1, Job3]), - ?assertEqual([Job2], pending_jobs(1)), - ?assertEqual([Job2, Job3], pending_jobs(2)), - ?assertEqual([Job2, Job3], pending_jobs(3)) - end). - -t_pending_jobs_skip_running() -> - ?_test(begin - Job1 = continuous(1), - Job2 = continuous_running(2), - Job3 = oneshot(3), - Job4 = oneshot_running(4), - Jobs = [Job1, Job2, Job3, Job4], - setup_jobs(Jobs), - ?assertEqual([Job1, Job3], pending_jobs(4)) - end). - -t_one_job_starts() -> - ?_test(begin - setup_jobs([oneshot(1)]), - ?assertEqual({0, 1}, run_stop_count()), - reschedule(mock_state(?DEFAULT_MAX_JOBS)), - ?assertEqual({1, 0}, run_stop_count()) - end). - -t_no_jobs_start_if_max_is_0() -> - ?_test(begin - setup_jobs([oneshot(1)]), - reschedule(mock_state(0)), - ?assertEqual({0, 1}, run_stop_count()) - end). - -t_one_job_starts_if_max_is_1() -> - ?_test(begin - setup_jobs([oneshot(1), oneshot(2)]), - reschedule(mock_state(1)), - ?assertEqual({1, 1}, run_stop_count()) - end). - -t_max_churn_does_not_throttle_initial_start() -> - ?_test(begin - setup_jobs([oneshot(1), oneshot(2)]), - reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)), - ?assertEqual({2, 0}, run_stop_count()) - end). - -t_excess_oneshot_only_jobs() -> - ?_test(begin - setup_jobs([oneshot_running(1), oneshot_running(2)]), - ?assertEqual({2, 0}, run_stop_count()), - reschedule(mock_state(1)), - ?assertEqual({1, 1}, run_stop_count()), - reschedule(mock_state(0)), - ?assertEqual({0, 2}, run_stop_count()) - end). - -t_excess_continuous_only_jobs() -> - ?_test(begin - setup_jobs([continuous_running(1), continuous_running(2)]), - ?assertEqual({2, 0}, run_stop_count()), - reschedule(mock_state(1)), - ?assertEqual({1, 1}, run_stop_count()), - reschedule(mock_state(0)), - ?assertEqual({0, 2}, run_stop_count()) - end). - -t_excess_prefer_continuous_first() -> - ?_test(begin - Jobs = [ - continuous_running(1), - oneshot_running(2), - continuous_running(3) - ], - setup_jobs(Jobs), - ?assertEqual({3, 0}, run_stop_count()), - ?assertEqual({1, 0}, oneshot_run_stop_count()), - reschedule(mock_state(2)), - ?assertEqual({2, 1}, run_stop_count()), - ?assertEqual({1, 0}, oneshot_run_stop_count()), - reschedule(mock_state(1)), - ?assertEqual({1, 0}, oneshot_run_stop_count()), - reschedule(mock_state(0)), - ?assertEqual({0, 1}, oneshot_run_stop_count()) - end). - -t_stop_oldest_first() -> - ?_test(begin - Jobs = [ - continuous_running(7), - continuous_running(4), - continuous_running(5) - ], - setup_jobs(Jobs), - reschedule(mock_state(2, 1)), - ?assertEqual({2, 1}, run_stop_count()), - ?assertEqual([4], jobs_stopped()), - reschedule(mock_state(1, 1)), - ?assertEqual([7], jobs_running()) - end). - -t_start_oldest_first() -> - ?_test(begin - setup_jobs([continuous(7), continuous(2), continuous(5)]), - reschedule(mock_state(1)), - ?assertEqual({1, 2}, run_stop_count()), - ?assertEqual([2], jobs_running()), - reschedule(mock_state(2)), - ?assertEqual({2, 1}, run_stop_count()), - % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should - % be running. - ?assertEqual([2], jobs_stopped()) - end). - -t_jobs_churn_even_if_not_all_max_jobs_are_running() -> - ?_test(begin - setup_jobs([ - continuous_running(7), - continuous(2), - continuous(5) - ]), - reschedule(mock_state(2, 2)), - ?assertEqual({2, 1}, run_stop_count()), - ?assertEqual([7], jobs_stopped()) - end). - -t_jobs_dont_churn_if_there_are_available_running_slots() -> - ?_test(begin - setup_jobs([ - continuous_running(1), - continuous_running(2) - ]), - reschedule(mock_state(2, 2)), - ?assertEqual({2, 0}, run_stop_count()), - ?assertEqual([], jobs_stopped()), - ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_job, start_link, 1)) - end). - -t_start_only_pending_jobs_do_not_churn_existing_ones() -> - ?_test(begin - setup_jobs([ - continuous(1), - continuous_running(2) - ]), - reschedule(mock_state(2, 2)), - ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_job, start_link, 1)), - ?assertEqual([], jobs_stopped()), - ?assertEqual({2, 0}, run_stop_count()) - end). - -t_dont_stop_if_nothing_pending() -> - ?_test(begin - setup_jobs([continuous_running(1), continuous_running(2)]), - reschedule(mock_state(2)), - ?assertEqual({2, 0}, run_stop_count()) - end). - -t_max_churn_limits_number_of_rotated_jobs() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous_running(2), - continuous(3), - continuous_running(4) - ], - setup_jobs(Jobs), - reschedule(mock_state(2, 1)), - ?assertEqual([2, 3], jobs_stopped()) - end). - -t_if_pending_less_than_running_start_all_pending() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous_running(2), - continuous(3), - continuous_running(4), - continuous_running(5) - ], - setup_jobs(Jobs), - reschedule(mock_state(3)), - ?assertEqual([1, 2, 5], jobs_running()) - end). - -t_running_less_than_pending_swap_all_running() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous(2), - continuous(3), - continuous_running(4), - continuous_running(5) - ], - setup_jobs(Jobs), - reschedule(mock_state(2)), - ?assertEqual([3, 4, 5], jobs_stopped()) - end). - -t_oneshot_dont_get_rotated() -> - ?_test(begin - setup_jobs([oneshot_running(1), continuous(2)]), - reschedule(mock_state(1)), - ?assertEqual([1], jobs_running()) - end). - -t_rotate_continuous_only_if_mixed() -> - ?_test(begin - setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]), - reschedule(mock_state(2)), - ?assertEqual([1, 2], jobs_running()) - end). - -t_oneshot_dont_get_starting_priority() -> - ?_test(begin - setup_jobs([continuous(1), oneshot(2), continuous_running(3)]), - reschedule(mock_state(1)), - ?assertEqual([1], jobs_running()) - end). +t_pending_jobs_simple(_) -> + Job1 = oneshot(1), + Job2 = oneshot(2), + setup_jobs([Job2, Job1]), + ?assertEqual([], pending_jobs(0)), + ?assertEqual([Job1], pending_jobs(1)), + ?assertEqual([Job1, Job2], pending_jobs(2)), + ?assertEqual([Job1, Job2], pending_jobs(3)). + +t_pending_jobs_skip_crashed(_) -> + Job = oneshot(1), + Ts = os:timestamp(), + History = [crashed(Ts), started(Ts) | Job#job.history], + Job1 = Job#job{history = History}, + Job2 = oneshot(2), + Job3 = oneshot(3), + setup_jobs([Job2, Job1, Job3]), + ?assertEqual([Job2], pending_jobs(1)), + ?assertEqual([Job2, Job3], pending_jobs(2)), + ?assertEqual([Job2, Job3], pending_jobs(3)). + +t_pending_jobs_skip_running(_) -> + Job1 = continuous(1), + Job2 = continuous_running(2), + Job3 = oneshot(3), + Job4 = oneshot_running(4), + Jobs = [Job1, Job2, Job3, Job4], + setup_jobs(Jobs), + ?assertEqual([Job1, Job3], pending_jobs(4)). + +t_one_job_starts(_) -> + setup_jobs([oneshot(1)]), + ?assertEqual({0, 1}, run_stop_count()), + reschedule(mock_state(?DEFAULT_MAX_JOBS)), + ?assertEqual({1, 0}, run_stop_count()). + +t_no_jobs_start_if_max_is_0(_) -> + setup_jobs([oneshot(1)]), + reschedule(mock_state(0)), + ?assertEqual({0, 1}, run_stop_count()). + +t_one_job_starts_if_max_is_1(_) -> + setup_jobs([oneshot(1), oneshot(2)]), + reschedule(mock_state(1)), + ?assertEqual({1, 1}, run_stop_count()). + +t_max_churn_does_not_throttle_initial_start(_) -> + setup_jobs([oneshot(1), oneshot(2)]), + reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)), + ?assertEqual({2, 0}, run_stop_count()). + +t_excess_oneshot_only_jobs(_) -> + setup_jobs([oneshot_running(1), oneshot_running(2)]), + ?assertEqual({2, 0}, run_stop_count()), + reschedule(mock_state(1)), + ?assertEqual({1, 1}, run_stop_count()), + reschedule(mock_state(0)), + ?assertEqual({0, 2}, run_stop_count()). + +t_excess_continuous_only_jobs(_) -> + setup_jobs([continuous_running(1), continuous_running(2)]), + ?assertEqual({2, 0}, run_stop_count()), + reschedule(mock_state(1)), + ?assertEqual({1, 1}, run_stop_count()), + reschedule(mock_state(0)), + ?assertEqual({0, 2}, run_stop_count()). + +t_excess_prefer_continuous_first(_) -> + Jobs = [ + continuous_running(1), + oneshot_running(2), + continuous_running(3) + ], + setup_jobs(Jobs), + ?assertEqual({3, 0}, run_stop_count()), + ?assertEqual({1, 0}, oneshot_run_stop_count()), + reschedule(mock_state(2)), + ?assertEqual({2, 1}, run_stop_count()), + ?assertEqual({1, 0}, oneshot_run_stop_count()), + reschedule(mock_state(1)), + ?assertEqual({1, 0}, oneshot_run_stop_count()), + reschedule(mock_state(0)), + ?assertEqual({0, 1}, oneshot_run_stop_count()). + +t_stop_oldest_first(_) -> + Jobs = [ + continuous_running(7), + continuous_running(4), + continuous_running(5) + ], + setup_jobs(Jobs), + reschedule(mock_state(2, 1)), + ?assertEqual({2, 1}, run_stop_count()), + ?assertEqual([4], jobs_stopped()), + reschedule(mock_state(1, 1)), + ?assertEqual([7], jobs_running()). + +t_start_oldest_first(_) -> + setup_jobs([continuous(7), continuous(2), continuous(5)]), + reschedule(mock_state(1)), + ?assertEqual({1, 2}, run_stop_count()), + ?assertEqual([2], jobs_running()), + reschedule(mock_state(2)), + ?assertEqual({2, 1}, run_stop_count()), + % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should + % be running. + ?assertEqual([2], jobs_stopped()). + +t_jobs_churn_even_if_not_all_max_jobs_are_running(_) -> + setup_jobs([ + continuous_running(7), + continuous(2), + continuous(5) + ]), + reschedule(mock_state(2, 2)), + ?assertEqual({2, 1}, run_stop_count()), + ?assertEqual([7], jobs_stopped()). + +t_jobs_dont_churn_if_there_are_available_running_slots(_) -> + setup_jobs([ + continuous_running(1), + continuous_running(2) + ]), + reschedule(mock_state(2, 2)), + ?assertEqual({2, 0}, run_stop_count()), + ?assertEqual([], jobs_stopped()), + ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_job, start_link, 1)). + +t_start_only_pending_jobs_do_not_churn_existing_ones(_) -> + setup_jobs([ + continuous(1), + continuous_running(2) + ]), + reschedule(mock_state(2, 2)), + ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_job, start_link, 1)), + ?assertEqual([], jobs_stopped()), + ?assertEqual({2, 0}, run_stop_count()). + +t_dont_stop_if_nothing_pending(_) -> + setup_jobs([continuous_running(1), continuous_running(2)]), + reschedule(mock_state(2)), + ?assertEqual({2, 0}, run_stop_count()). + +t_max_churn_limits_number_of_rotated_jobs(_) -> + Jobs = [ + continuous(1), + continuous_running(2), + continuous(3), + continuous_running(4) + ], + setup_jobs(Jobs), + reschedule(mock_state(2, 1)), + ?assertEqual([2, 3], jobs_stopped()). + +t_if_pending_less_than_running_start_all_pending(_) -> + Jobs = [ + continuous(1), + continuous_running(2), + continuous(3), + continuous_running(4), + continuous_running(5) + ], + setup_jobs(Jobs), + reschedule(mock_state(3)), + ?assertEqual([1, 2, 5], jobs_running()). + +t_running_less_than_pending_swap_all_running(_) -> + Jobs = [ + continuous(1), + continuous(2), + continuous(3), + continuous_running(4), + continuous_running(5) + ], + setup_jobs(Jobs), + reschedule(mock_state(2)), + ?assertEqual([3, 4, 5], jobs_stopped()). + +t_oneshot_dont_get_rotated(_) -> + setup_jobs([oneshot_running(1), continuous(2)]), + reschedule(mock_state(1)), + ?assertEqual([1], jobs_running()). + +t_rotate_continuous_only_if_mixed(_) -> + setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]), + reschedule(mock_state(2)), + ?assertEqual([1, 2], jobs_running()). + +t_oneshot_dont_get_starting_priority(_) -> + setup_jobs([continuous(1), oneshot(2), continuous_running(3)]), + reschedule(mock_state(1)), + ?assertEqual([1], jobs_running()). % This tested in other test cases, it is here to mainly make explicit a property % of one-shot replications -- they can starve other jobs if they "take control" % of all the available scheduler slots. -t_oneshot_will_hog_the_scheduler() -> - ?_test(begin - Jobs = [ - oneshot_running(1), - oneshot_running(2), - oneshot(3), - continuous(4) - ], - setup_jobs(Jobs), - reschedule(mock_state(2)), - ?assertEqual([1, 2], jobs_running()) - end). - -t_if_excess_is_trimmed_rotation_still_happens() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous_running(2), - continuous_running(3) - ], - setup_jobs(Jobs), - reschedule(mock_state(1)), - ?assertEqual([1], jobs_running()) - end). - -t_if_transient_job_crashes_it_gets_removed() -> - ?_test(begin - Pid = mock_pid(), - Rep = continuous_rep(), - Job = #job{ - id = job1, - pid = Pid, - history = [added()], - rep = Rep#rep{db_name = null} - }, - setup_jobs([Job]), - ?assertEqual(1, ets:info(?MODULE, size)), - State = #state{max_history = 3, stats_pid = self()}, - {noreply, State} = handle_info( - {'EXIT', Pid, failed}, - State - ), - ?assertEqual(0, ets:info(?MODULE, size)) - end). - -t_if_permanent_job_crashes_it_stays_in_ets() -> - ?_test(begin - Pid = mock_pid(), - Rep = continuous_rep(), - Job = #job{ - id = job1, - pid = Pid, - history = [added()], - rep = Rep#rep{db_name = <<"db1">>} - }, - setup_jobs([Job]), - ?assertEqual(1, ets:info(?MODULE, size)), - State = #state{ - max_jobs = 1, - max_history = 3, - stats_pid = self() - }, - {noreply, State} = handle_info( - {'EXIT', Pid, failed}, - State - ), - ?assertEqual(1, ets:info(?MODULE, size)), - [Job1] = ets:lookup(?MODULE, job1), - [Latest | _] = Job1#job.history, - ?assertMatch({{crashed, failed}, _}, Latest) - end). - -t_existing_jobs() -> - ?_test(begin - Rep0 = continuous_rep(<<"s">>, <<"t">>), - Rep = Rep0#rep{id = job1, db_name = <<"db">>}, - setup_jobs([#job{id = Rep#rep.id, rep = Rep}]), - NewRep0 = continuous_rep(<<"s">>, <<"t">>), - NewRep = NewRep0#rep{id = Rep#rep.id, db_name = <<"db">>}, - ?assert(existing_replication(NewRep)), - ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})), - ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})), - ?assertNot(existing_replication(NewRep#rep{options = []})) - end). - -t_job_summary_running() -> - ?_test(begin - Rep = rep(<<"s">>, <<"t">>), - Job = #job{ - id = job1, - pid = mock_pid(), - history = [added()], - rep = Rep#rep{db_name = <<"db1">>} - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(running, proplists:get_value(state, Summary)), - ?assertEqual(null, proplists:get_value(info, Summary)), - ?assertEqual(0, proplists:get_value(error_count, Summary)), - - Stats = [{source_seq, <<"1-abc">>}], - handle_cast({update_job_stats, job1, Stats}, mock_state(1)), - Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual({Stats}, proplists:get_value(info, Summary1)) - end). - -t_job_summary_pending() -> - ?_test(begin - Job = #job{ - id = job1, - pid = undefined, - history = [stopped(20), started(10), added()], - rep = rep(<<"s">>, <<"t">>) - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(pending, proplists:get_value(state, Summary)), - ?assertEqual(null, proplists:get_value(info, Summary)), - ?assertEqual(0, proplists:get_value(error_count, Summary)), - - Stats = [{doc_write_failures, 1}], - handle_cast({update_job_stats, job1, Stats}, mock_state(1)), - Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual({Stats}, proplists:get_value(info, Summary1)) - end). - -t_job_summary_crashing_once() -> - ?_test(begin - Job = #job{ - id = job1, - history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)], - rep = rep(<<"s">>, <<"t">>) - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(crashing, proplists:get_value(state, Summary)), - Info = proplists:get_value(info, Summary), - ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), - ?assertEqual(0, proplists:get_value(error_count, Summary)) - end). - -t_job_summary_crashing_many_times() -> - ?_test(begin - Job = #job{ - id = job1, - history = [crashed(4), started(3), crashed(2), started(1)], - rep = rep(<<"s">>, <<"t">>) - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(crashing, proplists:get_value(state, Summary)), - Info = proplists:get_value(info, Summary), - ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), - ?assertEqual(2, proplists:get_value(error_count, Summary)) - end). - -t_job_summary_proxy_fields() -> - ?_test(begin - Src = #httpdb{ - url = "https://s", - proxy_url = "http://u:p@sproxy:12" - }, - Tgt = #httpdb{ - url = "http://t", - proxy_url = "socks5://u:p@tproxy:34" - }, - Job = #job{ - id = job1, - history = [started(10), added()], - rep = rep(Src, Tgt) - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual( - <<"http://u:*****@sproxy:12">>, - proplists:get_value(source_proxy, Summary) - ), - ?assertEqual( - <<"socks5://u:*****@tproxy:34">>, - proplists:get_value(target_proxy, Summary) - ) - end). +t_oneshot_will_hog_the_scheduler(_) -> + Jobs = [ + oneshot_running(1), + oneshot_running(2), + oneshot(3), + continuous(4) + ], + setup_jobs(Jobs), + reschedule(mock_state(2)), + ?assertEqual([1, 2], jobs_running()). + +t_if_excess_is_trimmed_rotation_still_happens(_) -> + Jobs = [ + continuous(1), + continuous_running(2), + continuous_running(3) + ], + setup_jobs(Jobs), + reschedule(mock_state(1)). + +t_if_transient_job_crashes_it_gets_removed(_) -> + Pid = mock_pid(), + Rep = continuous_rep(), + Job = #job{ + id = job1, + pid = Pid, + history = [added()], + rep = Rep#rep{db_name = null} + }, + setup_jobs([Job]), + ?assertEqual(1, ets:info(?MODULE, size)), + State = #state{max_history = 3, stats_pid = self()}, + {noreply, State} = handle_info( + {'EXIT', Pid, failed}, + State + ), + ?assertEqual(0, ets:info(?MODULE, size)). + +t_if_permanent_job_crashes_it_stays_in_ets(_) -> + Pid = mock_pid(), + Rep = continuous_rep(), + Job = #job{ + id = job1, + pid = Pid, + history = [added()], + rep = Rep#rep{db_name = <<"db1">>} + }, + setup_jobs([Job]), + ?assertEqual(1, ets:info(?MODULE, size)), + State = #state{ + max_jobs = 1, + max_history = 3, + stats_pid = self() + }, + {noreply, State} = handle_info( + {'EXIT', Pid, failed}, + State + ), + ?assertEqual(1, ets:info(?MODULE, size)), + [Job1] = ets:lookup(?MODULE, job1), + [Latest | _] = Job1#job.history, + ?assertMatch({{crashed, failed}, _}, Latest). + +t_stop_all_stops_jobs(_) -> + Jobs = [ + oneshot_running(1), + oneshot_running(2), + oneshot(3), + continuous(4) + ], + setup_jobs(Jobs), + ?assertEqual(ok, stop_clear_all_jobs(?TERMINATE_SHUTDOWN_TIME)), + ?assertEqual([], jobs_running()), + ?assertEqual(0, ets:info(?MODULE, size)). + +t_existing_jobs(_) -> + Rep0 = continuous_rep(<<"s">>, <<"t">>), + Rep = Rep0#rep{id = job1, db_name = <<"db">>}, + setup_jobs([#job{id = Rep#rep.id, rep = Rep}]), + NewRep0 = continuous_rep(<<"s">>, <<"t">>), + NewRep = NewRep0#rep{id = Rep#rep.id, db_name = <<"db">>}, + ?assert(existing_replication(NewRep)), + ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})), + ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})), + ?assertNot(existing_replication(NewRep#rep{options = []})). + +t_job_summary_running(_) -> + Rep = rep(<<"s">>, <<"t">>), + Job = #job{ + id = job1, + pid = mock_pid(), + history = [added()], + rep = Rep#rep{db_name = <<"db1">>} + }, + setup_jobs([Job]), + Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual(running, proplists:get_value(state, Summary)), + ?assertEqual(null, proplists:get_value(info, Summary)), + ?assertEqual(0, proplists:get_value(error_count, Summary)), + + Stats = [{source_seq, <<"1-abc">>}], + handle_cast({update_job_stats, job1, Stats}, mock_state(1)), + Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual({Stats}, proplists:get_value(info, Summary1)). + +t_job_summary_pending(_) -> + Job = #job{ + id = job1, + pid = undefined, + history = [stopped(20), started(10), added()], + rep = rep(<<"s">>, <<"t">>) + }, + setup_jobs([Job]), + Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual(pending, proplists:get_value(state, Summary)), + ?assertEqual(null, proplists:get_value(info, Summary)), + ?assertEqual(0, proplists:get_value(error_count, Summary)), + + Stats = [{doc_write_failures, 1}], + handle_cast({update_job_stats, job1, Stats}, mock_state(1)), + Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual({Stats}, proplists:get_value(info, Summary1)). + +t_job_summary_crashing_once(_) -> + Job = #job{ + id = job1, + history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)], + rep = rep(<<"s">>, <<"t">>) + }, + setup_jobs([Job]), + Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual(crashing, proplists:get_value(state, Summary)), + Info = proplists:get_value(info, Summary), + ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), + ?assertEqual(0, proplists:get_value(error_count, Summary)). + +t_job_summary_crashing_many_times(_) -> + Job = #job{ + id = job1, + history = [crashed(4), started(3), crashed(2), started(1)], + rep = rep(<<"s">>, <<"t">>) + }, + setup_jobs([Job]), + Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual(crashing, proplists:get_value(state, Summary)), + Info = proplists:get_value(info, Summary), + ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), + ?assertEqual(2, proplists:get_value(error_count, Summary)). + +t_job_summary_proxy_fields(_) -> + Src = #httpdb{ + url = "https://s", + proxy_url = "http://u:p@sproxy:12" + }, + Tgt = #httpdb{ + url = "http://t", + proxy_url = "socks5://u:p@tproxy:34" + }, + Job = #job{ + id = job1, + history = [started(10), added()], + rep = rep(Src, Tgt) + }, + setup_jobs([Job]), + Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual( + <<"http://u:*****@sproxy:12">>, + proplists:get_value(source_proxy, Summary) + ), + ?assertEqual( + <<"socks5://u:*****@tproxy:34">>, + proplists:get_value(target_proxy, Summary) + ). % Test helper functions
