chewbranca commented on code in PR #5213: URL: https://github.com/apache/couchdb/pull/5213#discussion_r1817427800
########## src/couch_stats/src/couch_stats_resource_tracker.erl: ########## @@ -0,0 +1,912 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_stats_resource_tracker). + +-behaviour(gen_server). + +-export([ + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 +]). + +%% PidRef API +-export([ + get_pid_ref/0, + set_pid_ref/1, + create_pid_ref/0, + close_pid_ref/0, close_pid_ref/1 +]). + +%% Context API +-export([ + create_resource/1, + create_context/5, + create_coordinator_context/2, + create_worker_context/3, + destroy_context/0, destroy_context/1, + + get_resource/0, get_resource/1, + + set_context_dbname/1, set_context_dbname/2, + set_context_handler_fun/1, set_context_handler_fun/2, + set_context_username/1, set_context_username/2 +]). + +%% stats collection api +-export([ + is_enabled/0, + + inc/1, inc/2, + maybe_inc/2, + accumulate_delta/1, + make_delta/0, + + ioq_called/0, + + should_track/1 +]). + +%% aggregate query api +-export([ + active/0, active/1, + active_coordinators/0, active_coordinators/1, + active_workers/0, active_workers/1, + + count_by/1, + group_by/2, group_by/3, + sorted/1, + sorted_by/1, sorted_by/2, sorted_by/3, + + find_by_pid/1, + find_by_pidref/1, + find_by_nonce/1, + find_workers_by_pidref/1 +]). + +%% Process lifetime reporting api +-export([ + log_process_lifetime_report/1, + is_logging_enabled/0, + logging_enabled/0, + should_log/1, should_log/2, + tracker/1 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +%% Module pdict markers +-define(DELTA_TA, csrt_delta_ta). +-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0 +-define(PID_REF, csrt_pid_ref). %% track local ID +-define(TRACKER_PID, csrt_tracker). %% tracker pid + +-define(MANGO_EVAL_MATCH, mango_eval_match). +-define(DB_OPEN_DOC, docs_read). +-define(DB_OPEN, db_open). +-define(COUCH_SERVER_OPEN, db_open). +-define(COUCH_BT_GET_KP_NODE, get_kp_node). +-define(COUCH_BT_GET_KV_NODE, get_kv_node). +-define(COUCH_BT_WRITE_KP_NODE, write_kp_node). +-define(COUCH_BT_WRITE_KV_NODE, write_kv_node). +-define(COUCH_JS_FILTER, js_filter). +-define(COUCH_JS_FILTER_ERROR, js_filter_error). +-define(COUCH_JS_FILTERED_DOCS, js_filtered_docs). +-define(IOQ_CALLS, ioq_calls). +-define(ROWS_READ, rows_read). + +%% TODO: overlap between this and couch btree fold invocations +%% TODO: need some way to distinguish fols on views vs find vs all_docs +-define(FRPC_CHANGES_ROW, changes_processed). +-define(FRPC_CHANGES_RETURNED, changes_returned). + +-record(st, {}). + +-record(rctx, { + %% Metadata + started_at = tnow(), + updated_at = tnow(), + pid_ref, + mfa, + nonce, + from, + type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc + dbname, + username, + path, + + %% Stats counters + db_open = 0, + docs_read = 0, + rows_read = 0, + changes_processed = 0, + changes_returned = 0, + ioq_calls = 0, + io_bytes_read = 0, + io_bytes_written = 0, + js_evals = 0, + js_filter = 0, + js_filter_error = 0, + js_filtered_docs = 0, + mango_eval_match = 0, + %% TODO: switch record definitions to be macro based, eg: + %% ?COUCH_BT_GET_KP_NODE = 0, + get_kv_node = 0, + get_kp_node = 0, + write_kv_node = 0, + write_kp_node = 0 +}). + +%% +%% Public API +%% + +%% +%% PidRef operations +%% + +get_pid_ref() -> + get(?PID_REF). + +set_pid_ref(PidRef) -> + erlang:put(?PID_REF, PidRef), + PidRef. + +create_pid_ref() -> + case get_pid_ref() of + undefined -> + ok; + PidRef0 -> + %% TODO: what to do when it already exists? + throw({epidexist, PidRef0}), + close_pid_ref(PidRef0) + end, + PidRef = {self(), make_ref()}, + set_pid_ref(PidRef), + PidRef. + +close_pid_ref() -> + close_pid_ref(get_pid_ref()). + +%%close_pid_ref(undefined) -> +%% undefined; +close_pid_ref(_PidRef) -> + erase(?PID_REF). + +get_resource() -> + get_resource(get_pid_ref()). + +get_resource(undefined) -> + undefined; +get_resource(PidRef) -> + catch case ets:lookup(?MODULE, PidRef) of + [#rctx{}=Rctx] -> + Rctx; + [] -> + undefined + end. + +%% monotonic time now in millisecionds +tnow() -> + erlang:monotonic_time(millisecond). + +is_enabled() -> + config:get_boolean(?MODULE_STRING, "enabled", true). + +%% +%% Aggregate query API +%% + +active() -> active_int(all). +active_coordinators() -> active_int(coordinators). +active_workers() -> active_int(workers). + +%% active_json() or active(json)? +active(json) -> to_json_list(active_int(all)). +active_coordinators(json) -> to_json_list(active_int(coordinators)). +active_workers(json) -> to_json_list(active_int(workers)). + + +active_int(coordinators) -> + select_by_type(coordinators); +active_int(workers) -> + select_by_type(workers); +active_int(all) -> + select_by_type(all). + + +select_by_type(coordinators) -> + ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {coordinator, _, _}} = R) -> R end)); +select_by_type(workers) -> + ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end)); +select_by_type(all) -> + ets:tab2list(?MODULE). + +find_by_nonce(Nonce) -> + ets:match_object(?MODULE, ets:fun2ms(fun(#rctx{nonce = Nonce1} = R) when Nonce =:= Nonce1 -> R end)). + +find_by_pid(Pid) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. + +find_by_pidref(PidRef) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref=PidRef, _ = '_'})]. + +find_workers_by_pidref(PidRef) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{from=PidRef, _ = '_'})]. + +field(#rctx{pid_ref=Val}, pid_ref) -> Val; +%% NOTE: Pros and cons to doing these convert functions here +%% Ideally, this would be done later so as to prefer the core data structures +%% as long as possible, but we currently need the output of this function to +%% be jiffy:encode'able. The tricky bit is dynamically encoding the group_by +%% structure provided by the caller of *_by aggregator functions below. +%% For now, we just always return jiffy:encode'able data types. +field(#rctx{mfa=Val}, mfa) -> convert_mfa(Val); +field(#rctx{nonce=Val}, nonce) -> Val; +field(#rctx{from=Val}, from) -> Val; +field(#rctx{type=Val}, type) -> convert_type(Val); +field(#rctx{dbname=Val}, dbname) -> Val; +field(#rctx{username=Val}, username) -> Val; +field(#rctx{path=Val}, path) -> Val; +field(#rctx{db_open=Val}, db_open) -> Val; +field(#rctx{docs_read=Val}, docs_read) -> Val; +field(#rctx{rows_read=Val}, rows_read) -> Val; +field(#rctx{changes_processed=Val}, changes_processed) -> Val; +field(#rctx{changes_returned=Val}, changes_returned) -> Val; +field(#rctx{ioq_calls=Val}, ioq_calls) -> Val; +field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val; +field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val; +field(#rctx{js_evals=Val}, js_evals) -> Val; +field(#rctx{js_filter=Val}, js_filter) -> Val; +field(#rctx{js_filter_error=Val}, js_filter_error) -> Val; +field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val; +field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val; +field(#rctx{get_kv_node=Val}, get_kv_node) -> Val; +field(#rctx{get_kp_node=Val}, get_kp_node) -> Val. + +curry_field(Field) -> + fun(Ele) -> field(Ele, Field) end. + +count_by(KeyFun) -> + group_by(KeyFun, fun(_) -> 1 end). + +group_by(KeyFun, ValFun) -> + group_by(KeyFun, ValFun, fun erlang:'+'/2). + +%% eg: group_by(mfa, docs_read). +%% eg: group_by(fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls). +%% eg: ^^ or: group_by([mfa, docs_read], ioq_calls). +%% eg: group_by([username, dbname, mfa], docs_read). +%% eg: group_by([username, dbname, mfa], ioq_calls). +%% eg: group_by([username, dbname, mfa], js_filters). +group_by(KeyL, ValFun, AggFun) when is_list(KeyL) -> + KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end, + group_by(KeyFun, ValFun, AggFun); +group_by(Key, ValFun, AggFun) when is_atom(Key) -> + group_by(curry_field(Key), ValFun, AggFun); +group_by(KeyFun, Val, AggFun) when is_atom(Val) -> + group_by(KeyFun, curry_field(Val), AggFun); +group_by(KeyFun, ValFun, AggFun) -> + FoldFun = fun(Ele, Acc) -> + Key = KeyFun(Ele), + Val = ValFun(Ele), + CurrVal = maps:get(Key, Acc, 0), + NewVal = AggFun(CurrVal, Val), + %% TODO: should we skip here? how to make this optional? + case NewVal > 0 of + true -> + maps:put(Key, NewVal, Acc); + false -> + Acc + end + end, + ets:foldl(FoldFun, #{}, ?MODULE). + +%% Sorts largest first +sorted(Map) when is_map(Map) -> + lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)). + +shortened(L) -> + lists:sublist(L, 10). + +%% eg: sorted_by([username, dbname, mfa], ioq_calls) +%% eg: sorted_by([dbname, mfa], doc_reads) +sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))). +sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))). +sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, AggFun))). + +%% +%% Conversion API for outputting JSON +%% + +convert_mfa(MFA) when is_list(MFA) -> + list_to_binary(MFA); +convert_mfa({M0, F0, A0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + A = integer_to_binary(A0), + <<M/binary, ":", F/binary, "/", A/binary>>; +convert_mfa(null) -> + null; +convert_mfa(undefined) -> + null. + +convert_type(Atom) when is_atom(Atom) -> + atom_to_binary(Atom); +convert_type({coordinator, Verb0, Atom0}) when is_atom(Atom0) -> + Verb = atom_to_binary(Verb0), + Atom = atom_to_binary(Atom0), + <<"coordinator:", Verb/binary, ":", Atom/binary>>; +convert_type({coordinator, Verb0, Path0}) -> + Verb = atom_to_binary(Verb0), + Path = list_to_binary(Path0), + <<"coordinator:", Verb/binary, ":", Path/binary>>; +convert_type({worker, M0, F0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + <<"worker:", M/binary, ":", F/binary>>; +convert_type(null) -> + null; +convert_type(undefined) -> + null. + +convert_path(undefined) -> + null; +convert_path(Path) when is_binary(Path) -> + Path. + +convert_pidref({Parent0, ParentRef0}) -> + Parent = convert_pid(Parent0), + ParentRef = convert_ref(ParentRef0), + <<Parent/binary, ":", ParentRef/binary>>; +convert_pidref(null) -> + null; +convert_pidref(undefined) -> + null. + +convert_pid(Pid) when is_pid(Pid) -> + ?l2b(pid_to_list(Pid)). + +convert_ref(Ref) when is_reference(Ref) -> + ?l2b(ref_to_list(Ref)). + +to_json(#rctx{}=Rctx) -> + #rctx{ + updated_at = TP, + started_at = TInit, + pid_ref = PidRef, + mfa = MFA, + nonce = Nonce, + from = From, + dbname = DbName, + path = Path, + username = UserName, + db_open = DbOpens, + docs_read = DocsRead, + rows_read = RowsRead, + js_filter = JSFilters, + js_filter_error = JSFilterErrors, + js_filtered_docs = JSFilteredDocss, + type = Type, + get_kp_node = GetKpNodes, + get_kv_node = GetKvNodes, + %%write_kp_node = WriteKpNodes, + %%write_kv_node = WriteKvNodes, + changes_returned = ChangesReturned, + ioq_calls = IoqCalls + } = Rctx, + + #{ + updated_at => TP, + started_at => TInit, + pid_ref => convert_pidref(PidRef), + mfa => convert_mfa(MFA), + nonce => Nonce, + from => convert_pidref(From), + dbname => DbName, + path => convert_path(Path), + username => UserName, + db_open => DbOpens, + docs_read => DocsRead, + js_filter => JSFilters, + js_filter_error => JSFilterErrors, + js_filtered_docs => JSFilteredDocss, + rows_read => RowsRead, + type => convert_type(Type), + get_kp_nodes => GetKpNodes, + get_kv_nodes => GetKvNodes, + %%write_kp_nodes => WriteKpNodes, + %%write_kv_nodes => WriteKvNodes, + changes_returned => ChangesReturned, + ioq_calls => IoqCalls + }. + +%% +%% Context lifecycle API +%% + +create_resource(#rctx{} = Rctx) -> + catch ets:insert(?MODULE, Rctx). + +create_worker_context(From, {M,F,_A} = MFA, Nonce) -> + case is_enabled() of + true -> + create_context(MFA, {worker, M, F}, null, From, Nonce); + false -> + false + end. + +create_coordinator_context(#httpd{} = Req, Path0) -> + case is_enabled() of + true -> + #httpd{ + method = Verb, + nonce = Nonce + %%path_parts = Parts + } = Req, + %%Path = list_to_binary([$/ | io_lib:format("~p", [Parts])]), + Path = list_to_binary([$/ | Path0]), + Type = {coordinator, Verb, init}, + create_context(null, Type, Path, null, Nonce); + false -> + false + end. + +create_context(MFA, Type, Path, From, Nonce) -> + PidRef = create_pid_ref(), + Rctx = #rctx{ + from = From, + pid_ref = PidRef, + mfa = MFA, + nonce = Nonce, + path = Path, + type = Type + }, + erlang:put(?DELTA_TZ, Rctx), + create_resource(Rctx), + track(Rctx), + PidRef. + +set_context_dbname(DbName) -> + set_context_dbname(DbName, get_pid_ref()). + +set_context_dbname(_, undefined) -> + ok; +set_context_dbname(DbName, PidRef) -> + is_enabled() andalso update_element(PidRef, [{#rctx.dbname, DbName}]). + +set_context_handler_fun(Fun) when is_function(Fun) -> + set_context_handler_fun(Fun, get_pid_ref()). +set_context_handler_fun(_, undefined) -> + ok; +set_context_handler_fun(Fun, PidRef) when is_function(Fun) -> + case is_enabled() of + false -> + ok; + true -> + FunName = erlang:fun_to_list(Fun), + #rctx{type={coordinator, Verb, _}} = get_resource(), + Update = [{#rctx.type, {coordinator, Verb, FunName}}], + update_element(PidRef, Update) + end. + +set_context_username(null) -> + ok; +set_context_username(undefined) -> + ok; +set_context_username(User) -> + set_context_username(User, get_pid_ref()). + +set_context_username(null, _) -> + ok; +set_context_username(_, undefined) -> + ok; +set_context_username(#httpd{user_ctx = Ctx}, PidRef) -> + set_context_username(Ctx, PidRef); +set_context_username(#user_ctx{name = Name}, PidRef) -> + set_context_username(Name, PidRef); +set_context_username(UserName, PidRef) -> + is_enabled() andalso update_element(PidRef, [{#rctx.username, UserName}]). + +destroy_context() -> + destroy_context(get_pid_ref()). + +destroy_context(undefined) -> + ok; +destroy_context({_, _} = PidRef) -> + stop_tracker(get_tracker()), + close_pid_ref(PidRef), + ok. + +%% Stat collection API + +inc(Key) -> + inc(Key, 1). + +%% TODO: inc(io_bytes_read, N) -> +%% TODO: inc(io_bytes_written, N) -> +%% TODO: inc(js_evals, N) -> +inc(?DB_OPEN, N) -> + update_counter(#rctx.?DB_OPEN, N); +inc(?ROWS_READ, N) -> + update_counter(#rctx.?ROWS_READ, N); +inc(?FRPC_CHANGES_RETURNED, N) -> + update_counter(#rctx.?FRPC_CHANGES_RETURNED, N); +inc(?IOQ_CALLS, N) -> + update_counter(#rctx.?IOQ_CALLS, N); +inc(?COUCH_JS_FILTER, N) -> + update_counter(#rctx.?COUCH_JS_FILTER, N); +inc(?COUCH_JS_FILTER_ERROR, N) -> + update_counter(#rctx.?COUCH_JS_FILTER_ERROR, N); +inc(?COUCH_JS_FILTERED_DOCS, N) -> + update_counter(#rctx.?COUCH_JS_FILTERED_DOCS, N); +inc(?MANGO_EVAL_MATCH, N) -> + update_counter(#rctx.?MANGO_EVAL_MATCH, N); +inc(?DB_OPEN_DOC, N) -> + update_counter(#rctx.?DB_OPEN_DOC, N); +inc(?FRPC_CHANGES_ROW, N) -> + update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read +inc(?COUCH_BT_GET_KP_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N); +inc(?COUCH_BT_GET_KV_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KV_NODE, N); +inc(_, _) -> + %% inc needs to allow unknown types to pass for accumulate_update to handle + %% updates from nodes with newer data formats + 0. + +maybe_inc([mango, evaluate_selector], Val) -> + inc(?MANGO_EVAL_MATCH, Val); +maybe_inc([couchdb, database_reads], Val) -> + inc(?DB_OPEN_DOC, Val); +maybe_inc([fabric_rpc, changes, processed], Val) -> + inc(?FRPC_CHANGES_ROW, Val); +maybe_inc([fabric_rpc, changes, returned], Val) -> + inc(?FRPC_CHANGES_RETURNED, Val); +maybe_inc([fabric_rpc, view, rows_read], Val) -> + inc(?ROWS_READ, Val); +maybe_inc([couchdb, couch_server, open], Val) -> + inc(?DB_OPEN, Val); +maybe_inc([couchdb, btree, get_node, kp_node], Val) -> + inc(?COUCH_BT_GET_KP_NODE, Val); +maybe_inc([couchdb, btree, get_node, kv_node], Val) -> + inc(?COUCH_BT_GET_KV_NODE, Val); +%% The write_node logic won't pickup writes as none of the RPC +%% processes actually perform the write operation +%% TODO: bubble up induced work from other processes +maybe_inc([couchdb, btree, write_node, kp_node], Val) -> + inc(?COUCH_BT_WRITE_KP_NODE, Val); +maybe_inc([couchdb, btree, write_node, kv_node], Val) -> + inc(?COUCH_BT_WRITE_KV_NODE, Val); +maybe_inc([couchdb, query_server, js_filter_error], Val) -> + inc(?COUCH_JS_FILTER_ERROR, Val); +maybe_inc([couchdb, query_server, js_filter], Val) -> + inc(?COUCH_JS_FILTER, Val); +maybe_inc([couchdb, query_server, js_filtered_docs], Val) -> + inc(?COUCH_JS_FILTERED_DOCS, Val); +maybe_inc(_Metric, _Val) -> + %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), + 0. + +%% TODO: update stats_descriptions.cfg for relevant apps +should_track([fabric_rpc, all_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, processed]) -> + is_enabled(); +should_track([fabric_rpc, changes, returned]) -> + is_enabled(); +should_track([fabric_rpc, map_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, reduce_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, get_all_security, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_doc, spawned]) -> + is_enabled(); +should_track([fabric_rpc, update_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_shard, spawned]) -> + is_enabled(); +should_track([mango_cursor, view, all_docs]) -> + is_enabled(); +should_track([mango_cursor, view, idx]) -> + is_enabled(); +should_track(_Metric) -> + %%io:format("SKIPPING METRIC: ~p~n", [Metric]), + false. + +ioq_called() -> + inc(ioq_calls). + +accumulate_delta(Delta) when is_map(Delta) -> + %% TODO: switch to creating a batch of updates to invoke a single + %% update_counter rather than sequentially invoking it for each field + is_enabled() andalso maps:foreach(fun inc/2, Delta); +accumulate_delta(undefined) -> + ok. + +make_delta() -> + TA = case get(?DELTA_TA) of + undefined -> + %% Need to handle this better, can't just make a new T0 at T' as + %% the timestamps will be identical causing a divide by zero error. + %% + %% Realistically need to ensure that all invocations of database + %% operations sets T0 appropriately. Perhaps it's possible to do + %% this is the couch_db:open chain, and then similarly, in + %% couch_server, and uhhhh... couch_file, and... + %% + %% I think we need some type of approach for establishing a T0 that + %% doesn't result in outrageous deltas. For now zero out the + %% microseconds field, or subtract a second on the off chance that + %% microseconds is zero. I'm not uptodate on the latest Erlang time + %% libraries and don't remember how to easily get an + %% `os:timestamp()` out of now() - 100ms or some such. + %% + %% I think it's unavoidable that we'll have some codepaths that do + %% not properly instantiate the T0 at spawn resulting in needing to + %% do some time of "time warp" or ignoring the timing collection + %% entirely. Perhaps if we hoisted out the stats collection into + %% the primary flow of the database and funnel that through all the + %% function clauses we could then utilize Dialyzer to statically + %% analyze and assert all code paths that invoke database + %% operations have properly instantinated a T0 at the appropriate + %% start time such that we don't have to "fudge" deltas with a + %% missing start point, but we're a long ways from that happening + %% so I feel it necessary to address the NULL start time. + + %% Track how often we fail to initiate T0 correctly + %% Perhaps somewhat naughty we're incrementing stats from within + %% couch_stats itself? Might need to handle this differently + %% TODO: determine appropriate course of action here + %% io:format("~n**********MISSING STARTING DELTA************~n~n", []), + couch_stats:increment_counter( + [couchdb, csrt, delta_missing_t0]), + %%[couch_stats_resource_tracker, delta_missing_t0]), + + case erlang:get(?DELTA_TZ) of + undefined -> + TA0 = make_delta_base(), + %% TODO: handline missing deltas, otherwise divide by zero + set_delta_a(TA0), + TA0; + TA0 -> + TA0 + end; + #rctx{} = TA0 -> + TA0 + end, + TB = get_resource(), + Delta = make_delta(TA, TB), + set_delta_a(TB), + Delta. + +make_delta(#rctx{}=TA, #rctx{}=TB) -> + Delta = #{ + docs_read => TB#rctx.docs_read - TA#rctx.docs_read, + js_filter => TB#rctx.js_filter - TA#rctx.js_filter, + js_filter_error => TB#rctx.js_filter_error - TA#rctx.js_filter_error, + js_filtered_docs => TB#rctx.js_filtered_docs - TA#rctx.js_filtered_docs, + rows_read => TB#rctx.rows_read - TA#rctx.rows_read, + changes_returned => TB#rctx.changes_returned - TA#rctx.changes_returned, + get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, + get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, + db_open => TB#rctx.db_open - TA#rctx.db_open, + ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls, + dt => TB#rctx.updated_at - TA#rctx.updated_at + }, + %% TODO: reevaluate this decision + %% Only return non zero (and also positive) delta fields + maps:filter(fun(_K,V) -> V > 0 end, Delta); +make_delta(_, #rctx{}) -> + #{error => missing_beg_rctx}; +make_delta(#rctx{}, _) -> + #{error => missing_fin_rctx}. + +%% TODO: what to do when PidRef=undefined? +make_delta_base(PidRef) -> + %% TODO: extract user_ctx and db/shard from request + Now = tnow(), + #rctx{ + pid_ref = PidRef, + %% TODO: confirm this subtraction works + started_at = Now - 100, %% give us 100ms rewind time for missing T0 + updated_at = Now + }. + +make_delta_base() -> + make_delta_base(get_pid_ref()). + +set_delta_a(TA) -> + erlang:put(?DELTA_TA, TA). + +update_counter(Field, Count) -> + is_enabled() andalso update_counter(get_pid_ref(), Field, Count). + +update_counter(undefined, _Field, _Count) -> + ok; +update_counter({_Pid,_Ref}=PidRef, Field, Count) -> + %% TODO: mem3 crashes without catch, why do we lose the stats table? + is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). + +update_element(undefined, _Update) -> + ok; +update_element({_Pid,_Ref}=PidRef, Update) -> + %% TODO: should we take any action when the update fails? + is_enabled() andalso catch ets:update_element(?MODULE, PidRef, Update). + +%% Process lifetime logging api + +track(#rctx{pid_ref=PidRef}) -> + case get_tracker() of + undefined -> + Pid = spawn(?MODULE, tracker, [PidRef]), + put_tracker(Pid), + Pid; + Pid when is_pid(Pid) -> + Pid + end. + +tracker({Pid, _Ref}=PidRef) -> + MonRef = erlang:monitor(process, Pid), + receive + stop -> + %% TODO: do we need cleanup here? + log_process_lifetime_report(PidRef), + catch evict(PidRef), + demonitor(MonRef), + ok; + {'DOWN', MonRef, _Type, _0DPid, _Reason0} -> + destroy_context(PidRef), + %% TODO: should we pass reason to log_process_lifetime_report? + %% Reason = case Reason0 of + %% {shutdown, Shutdown0} -> + %% Shutdown = atom_to_binary(Shutdown0), + %% <<"shutdown: ", Shutdown/binary>>; + %% Reason0 -> + %% Reason0 + %% end, + %% TODO: should we send the induced work delta to the coordinator? + log_process_lifetime_report(PidRef), + catch evict(PidRef) + end. + +log_process_lifetime_report(PidRef) -> + case is_enabled() andalso is_logging_enabled() of + true -> + Rctx = get_resource(PidRef), + case should_log(Rctx) of + true -> + couch_log:report("csrt-pid-usage-lifetime", to_json(Rctx)); + _ -> + ok + end; + false -> + ok + end. + +is_logging_enabled() -> Review Comment: Ahhhhh that's an interesting idea with using persistent_term, I need to remember that's a thing now. The purpose of the gen_server was two fold: 1) to have a way of registering and preserving the filters; 2) to have a mechanism for getting the registered filters applied to the reports as the the CSRT contexts are cleaned up. There's various ways to do 1), but for 2) my experiment above used a gen_server so that it could store the filters and then have the logs funneled through the gen_server, which likely should use sharded gen_servers like we've done elsewhere. This isn't strictly necessary though, as the current PR approach uses the dedicated monitor process to generate the report https://github.com/apache/couchdb/blob/couch-stats-resource-tracker-v2/src/couch_stats/src/couch_stats_resource_tracker.erl#L734 and avoids needing dedicated servers to filter the operations. The tricky bit was about how to get the registered filters down to the monitor processes so they could determine locally whether to log the report or not. The use of persistent terms is an interesting approach for declaring the filter matchers, especially since the output `ets:match_spec_compile` is a reference to an internal representation, so fetching the set of references across the various workers and coordinators to then apply locally seems worth exploring more, especially since it allows us to stick with the decentralized monitor processes rather than creating new high throughput gen_server's. We still have the aspect of discovery, I'm curious what you had in mind on this front @iilyak, but from the persistent term API unless we use some other mechanism for dynamically declaring the set of enabled filters, we'll have to do something with an expected naming scheme and iterate over the entries. Maybe something like ```erlang register_filter(Name, CompMatchSpec) when is_binary(Name0) -> Name = <<"csrt-logging-filter::", Name0/binary>>, persistent_term:put(Name, CompMatchSpec). get_active_filters() -> lists:filter(fun (<<"csrt-logging-filter::", _/Binary>>) -> true; (_) -> false end, persistent_term:get()). ``` Basically using https://www.erlang.org/doc/apps/erts/persistent_term.html#get/0 to iterate over all declared persistent terms and then looking for an expected prefix naming scheme. Did you have something else in mind? I suppose we could also have a gen_server for handling the state and declarations, but there still needs to be some mechanism for publishing what is active. That might also be useful if we want to have the ability to register TTL filtering, but I'm not convinced that's worth complicating the design over. I'm curious what thoughts you had on this front, how would a given CSRT monitor process query the registered filters? Hmmm... I suppose we could make it even simpler and just do `persistent_term:put(all_the_filters, #{...}` and then just store and fetch a map of those entries. That might be a decent idea, we need to iterate through _all_ the filters so we might as well grab them in one query. That also avoids having to do the dynamic naming sequences. Then our earlier functions become: ```erlang register_filter(Name, CompMatchSpec) -> Filters = persistent_term:get(all_the_filters, #{}), persistent_term:put(all_the_filters, maps:put(Name, CompMatchSpec, Filters)). get_active_filters() -> persistent_term:get(all_the_filters, #{}). ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
