This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3d6077efe0 [pipeline](profile) Support real-time profile report in
pipeline (#16772)
3d6077efe0 is described below
commit 3d6077efe0e02d0e5a1f555cfe51bea9df3e2d6e
Author: Gabriel <[email protected]>
AuthorDate: Fri Feb 17 10:01:34 2023 +0800
[pipeline](profile) Support real-time profile report in pipeline (#16772)
---
be/src/pipeline/pipeline_fragment_context.cpp | 235 +++++++++--------------
be/src/pipeline/pipeline_fragment_context.h | 54 +++++-
be/src/runtime/fragment_mgr.cpp | 261 ++++++++++++++------------
be/src/runtime/fragment_mgr.h | 24 ++-
be/src/runtime/plan_fragment_executor.cpp | 6 +-
5 files changed, 304 insertions(+), 276 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6e91991061..524e1be67e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -90,7 +90,8 @@ namespace doris::pipeline {
PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const TUniqueId& instance_id, const int
fragment_id,
int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx,
ExecEnv* exec_env,
- std::function<void(RuntimeState*, Status*)> call_back)
+ const std::function<void(RuntimeState*, Status*)>& call_back,
+ const report_status_callback& report_status_cb)
: _query_id(query_id),
_fragment_instance_id(instance_id),
_fragment_id(fragment_id),
@@ -98,12 +99,16 @@ PipelineFragmentContext::PipelineFragmentContext(
_exec_env(exec_env),
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR),
_query_ctx(std::move(query_ctx)),
- _call_back(call_back) {
+ _call_back(call_back),
+ _report_thread_active(false),
+ _report_status_cb(report_status_cb),
+ _is_report_on_cancel(true) {
_fragment_watcher.start();
}
PipelineFragmentContext::~PipelineFragmentContext() {
_call_back(_runtime_state.get(), &_exec_status);
+ DCHECK(!_report_thread_active);
}
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
@@ -115,6 +120,7 @@ void PipelineFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
}
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
_exec_status = Status::Cancelled(msg);
+ _set_is_report_on_cancel(false);
}
_runtime_state->set_is_cancelled(true);
if (_pipe != nullptr) {
@@ -269,6 +275,13 @@ Status PipelineFragmentContext::prepare(const
doris::TExecPlanFragmentParams& re
_runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(),
true, nullptr);
_runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true,
nullptr);
+ if (_is_report_success && config::status_report_interval > 0) {
+ std::unique_lock<std::mutex> l(_report_thread_lock);
+ _report_thread = std::thread(&PipelineFragmentContext::report_profile,
this);
+ // make sure the thread started up, otherwise report_profile() might
get into a race
+ // with stop_report_thread()
+ _report_thread_started_cv.wait(l);
+ }
_prepared = true;
return Status::OK();
}
@@ -455,6 +468,68 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
return Status::OK();
}
+void PipelineFragmentContext::_stop_report_thread() {
+ if (!_report_thread_active) {
+ return;
+ }
+
+ _report_thread_active = false;
+
+ _stop_report_thread_cv.notify_one();
+ _report_thread.join();
+}
+
+void PipelineFragmentContext::report_profile() {
+ SCOPED_ATTACH_TASK(_runtime_state.get());
+ VLOG_FILE << "report_profile(): instance_id=" <<
_runtime_state->fragment_instance_id();
+
+ _report_thread_active = true;
+
+ std::unique_lock<std::mutex> l(_report_thread_lock);
+ // tell Open() that we started
+ _report_thread_started_cv.notify_one();
+
+ // Jitter the reporting time of remote fragments by a random amount between
+ // 0 and the report_interval. This way, the coordinator doesn't get all
the
+ // updates at once so its better for contention as well as smoother
progress
+ // reporting.
+ int report_fragment_offset = rand() % config::status_report_interval;
+ // We don't want to wait longer than it takes to run the entire fragment.
+ _stop_report_thread_cv.wait_for(l,
std::chrono::seconds(report_fragment_offset));
+ while (_report_thread_active) {
+ if (config::status_report_interval > 0) {
+ // wait_for can return because the timeout occurred or the
condition variable
+ // was signaled. We can't rely on its return value to distinguish
between the
+ // two cases (e.g. there is a race here where the wait timed out
but before grabbing
+ // the lock, the condition variable was signaled). Instead, we
will use an external
+ // flag, _report_thread_active, to coordinate this.
+ _stop_report_thread_cv.wait_for(l,
+
std::chrono::seconds(config::status_report_interval));
+ } else {
+ LOG(WARNING) << "config::status_report_interval is equal to or
less than zero, exiting "
+ "reporting thread.";
+ break;
+ }
+
+ if (VLOG_FILE_IS_ON) {
+ VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " :
" ")
+ << "profile for instance " <<
_runtime_state->fragment_instance_id();
+ std::stringstream ss;
+ _runtime_state->runtime_profile()->compute_time_in_profile();
+ _runtime_state->runtime_profile()->pretty_print(&ss);
+ VLOG_FILE << ss.str();
+ }
+
+ if (!_report_thread_active) {
+ break;
+ }
+
+ send_report(false);
+ }
+
+ VLOG_FILE << "exiting reporting thread: instance_id=" <<
_runtime_state->fragment_instance_id();
+}
+
// TODO: use virtual function to do abstruct
Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr
cur_pipe) {
auto node_type = node->type();
@@ -752,6 +827,7 @@ Status PipelineFragmentContext::_create_sink(const
TDataSink& thrift_sink) {
void PipelineFragmentContext::_close_action() {
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
send_report(true);
+ _stop_report_thread();
// all submitted tasks done
_exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this());
}
@@ -763,24 +839,11 @@ void PipelineFragmentContext::close_a_pipeline() {
}
}
-// TODO pipeline dump copy from FragmentExecState::to_http_path
-std::string PipelineFragmentContext::to_http_path(const std::string&
file_name) {
- std::stringstream url;
- url << "http://" << BackendOptions::get_localhost() << ":" <<
config::webserver_port
- << "/api/_download_load?"
- << "token=" << _exec_env->token() << "&file=" << file_name;
- return url.str();
-}
-
-// TODO pipeline dump copy from FragmentExecState::coordinator_callback
-// TODO pipeline this callback should be placed in a thread pool
void PipelineFragmentContext::send_report(bool done) {
Status exec_status = Status::OK();
{
std::lock_guard<std::mutex> l(_status_lock);
- if (!_exec_status.ok()) {
- exec_status = _exec_status;
- }
+ exec_status = _exec_status;
}
// If plan is done successfully, but _is_report_success is false,
@@ -789,137 +852,21 @@ void PipelineFragmentContext::send_report(bool done) {
return;
}
- Status coord_status;
- auto coord_addr = _query_ctx->coord_addr;
- FrontendServiceConnection coord(_exec_env->frontend_client_cache(),
coord_addr, &coord_status);
- if (!coord_status.ok()) {
- std::stringstream ss;
- ss << "couldn't get a client for " << coord_addr << ", reason: " <<
coord_status;
- LOG(WARNING) << "query_id: " << print_id(_query_id) << ", " <<
ss.str();
- {
- std::lock_guard<std::mutex> l(_status_lock);
- if (_exec_status.ok()) {
- _exec_status = Status::InternalError(ss.str());
- }
- }
+ // If both _is_report_success and _is_report_on_cancel are false,
+ // which means no matter query is success or failed, no report is needed.
+ // This may happen when the query limit reached and
+ // a internal cancellation being processed
+ if (!_is_report_success && !_is_report_on_cancel) {
return;
}
- auto* profile = _is_report_success ? _runtime_state->runtime_profile() :
nullptr;
-
- TReportExecStatusParams params;
- params.protocol_version = FrontendServiceVersion::V1;
- params.__set_query_id(_query_id);
- params.__set_backend_num(_backend_num);
- params.__set_fragment_instance_id(_fragment_instance_id);
- params.__set_fragment_id(_fragment_id);
- exec_status.set_t_status(¶ms);
- params.__set_done(true);
-
- auto* runtime_state = _runtime_state.get();
- DCHECK(runtime_state != nullptr);
- if (runtime_state->query_type() == TQueryType::LOAD && !done &&
exec_status.ok()) {
- // this is a load plan, and load is not finished, just make a brief
report
- params.__set_loaded_rows(runtime_state->num_rows_load_total());
- params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
- } else {
- if (runtime_state->query_type() == TQueryType::LOAD) {
- params.__set_loaded_rows(runtime_state->num_rows_load_total());
- params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
- }
- if (profile == nullptr) {
- params.__isset.profile = false;
- } else {
- profile->to_thrift(¶ms.profile);
- params.__isset.profile = true;
- }
-
- if (!runtime_state->output_files().empty()) {
- params.__isset.delta_urls = true;
- for (auto& it : runtime_state->output_files()) {
- params.delta_urls.push_back(to_http_path(it));
- }
- }
- if (runtime_state->num_rows_load_total() > 0 ||
- runtime_state->num_rows_load_filtered() > 0) {
- params.__isset.load_counters = true;
-
- static std::string s_dpp_normal_all = "dpp.norm.ALL";
- static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
- static std::string s_unselected_rows = "unselected.rows";
-
- params.load_counters.emplace(s_dpp_normal_all,
-
std::to_string(runtime_state->num_rows_load_success()));
- params.load_counters.emplace(s_dpp_abnormal_all,
-
std::to_string(runtime_state->num_rows_load_filtered()));
- params.load_counters.emplace(s_unselected_rows,
-
std::to_string(runtime_state->num_rows_load_unselected()));
- }
- if (!runtime_state->get_error_log_file_path().empty()) {
- params.__set_tracking_url(
-
to_load_error_http_path(runtime_state->get_error_log_file_path()));
- }
- if (!runtime_state->export_output_files().empty()) {
- params.__isset.export_files = true;
- params.export_files = runtime_state->export_output_files();
- }
- if (!runtime_state->tablet_commit_infos().empty()) {
- params.__isset.commitInfos = true;
-
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
- for (auto& info : runtime_state->tablet_commit_infos()) {
- params.commitInfos.push_back(info);
- }
- }
- if (!runtime_state->error_tablet_infos().empty()) {
- params.__isset.errorTabletInfos = true;
-
params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size());
- for (auto& info : runtime_state->error_tablet_infos()) {
- params.errorTabletInfos.push_back(info);
- }
- }
- // Send new errors to coordinator
- runtime_state->get_unreported_errors(&(params.error_log));
- params.__isset.error_log = (params.error_log.size() > 0);
- }
-
- if (_exec_env->master_info()->__isset.backend_id) {
- params.__set_backend_id(_exec_env->master_info()->backend_id);
- }
-
- TReportExecStatusResult res;
- Status rpc_status;
-
- VLOG_DEBUG << "reportExecStatus params is "
- << apache::thrift::ThriftDebugString(params).c_str();
- try {
- try {
- coord->reportExecStatus(res, params);
- } catch (TTransportException& e) {
- LOG(WARNING) << "Retrying ReportExecStatus. query id: " <<
print_id(_query_id)
- << ", instance id: " <<
print_id(_fragment_instance_id) << " to "
- << coord_addr << ", err: " << e.what();
- rpc_status = coord.reopen();
-
- if (!rpc_status.ok()) {
- // we need to cancel the execution of this fragment
- cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "report rpc
fail");
- return;
- }
- coord->reportExecStatus(res, params);
- }
-
- rpc_status = Status(res.status);
- } catch (TException& e) {
- std::stringstream msg;
- msg << "ReportExecStatus() to " << coord_addr << " failed:\n" <<
e.what();
- LOG(WARNING) << msg.str();
- rpc_status = Status::InternalError(msg.str());
- }
-
- if (!rpc_status.ok()) {
- // we need to cancel the execution of this fragment
- cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "rpc fail 2");
- }
+ _report_status_cb(
+ {exec_status, _is_report_success ?
_runtime_state->runtime_profile() : nullptr,
+ done || !exec_status.ok(), _query_ctx->coord_addr, _query_id,
_fragment_id,
+ _fragment_instance_id, _backend_num, _runtime_state.get(),
+ std::bind(&PipelineFragmentContext::update_status, this,
std::placeholders::_1),
+ std::bind(&PipelineFragmentContext::cancel, this,
std::placeholders::_1,
+ std::placeholders::_2)});
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 3a4deb5772..38b07d503e 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -25,6 +25,7 @@
namespace doris {
class ExecNode;
class DataSink;
+struct ReportStatusRequest;
namespace vectorized {
template <bool is_intersect>
@@ -37,10 +38,18 @@ class PipelineTask;
class PipelineFragmentContext : public
std::enable_shared_from_this<PipelineFragmentContext> {
public:
+ // Callback to report execution status of plan fragment.
+ // 'profile' is the cumulative profile, 'done' indicates whether the
execution
+ // is done or still continuing.
+ // Note: this does not take a const RuntimeProfile&, because it might need
to call
+ // functions like PrettyPrint() or to_thrift(), neither of which is const
+ // because they take locks.
+ using report_status_callback = std::function<void(const
ReportStatusRequest)>;
PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId&
instance_id,
const int fragment_id, int backend_num,
std::shared_ptr<QueryFragmentsCtx> query_ctx,
ExecEnv* exec_env,
- std::function<void(RuntimeState*, Status*)>
call_back);
+ const std::function<void(RuntimeState*, Status*)>&
call_back,
+ const report_status_callback& report_status_cb);
~PipelineFragmentContext();
@@ -92,7 +101,27 @@ public:
void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
+ void report_profile();
+
+ Status update_status(Status status) {
+ std::lock_guard<std::mutex> l(_status_lock);
+ if (!status.ok() && _exec_status.ok()) {
+ _exec_status = status;
+ }
+ return _exec_status;
+ }
+
private:
+ Status _create_sink(const TDataSink& t_data_sink);
+ Status _build_pipelines(ExecNode*, PipelinePtr);
+ Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams&
request);
+ Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request);
+ template <bool is_intersect>
+ Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
+ void _close_action();
+ void _stop_report_thread();
+ void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
+
// Id of this query
TUniqueId _query_id;
TUniqueId _fragment_instance_id;
@@ -142,16 +171,23 @@ private:
std::shared_ptr<io::StreamLoadPipe> _pipe;
- Status _create_sink(const TDataSink& t_data_sink);
- Status _build_pipelines(ExecNode*, PipelinePtr);
- Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams&
request);
- Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request);
-
- template <bool is_intersect>
- Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
std::function<void(RuntimeState*, Status*)> _call_back;
- void _close_action();
std::once_flag _close_once_flag;
+
+ std::condition_variable _report_thread_started_cv;
+ // true if we started the thread
+ bool _report_thread_active;
+ // profile reporting-related
+ report_status_callback _report_status_cb;
+ std::thread _report_thread;
+ std::mutex _report_thread_lock;
+
+ // Indicates that profile reporting thread should stop.
+ // Tied to _report_thread_lock.
+ std::condition_variable _stop_report_thread_cv;
+ // If this is set to false, and '_is_report_success' is false as well,
+ // This executor will not report status to FE on being cancelled.
+ bool _is_report_on_cancel;
};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7d0d2e82b6..f2a2c9e798 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -79,14 +79,14 @@ using apache::thrift::transport::TTransportException;
class RuntimeProfile;
class FragmentExecState {
public:
+ using report_status_callback_impl = std::function<void(const
ReportStatusRequest)>;
// Constructor by using QueryFragmentsCtx
FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id,
int backend_num,
- ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx>
fragments_ctx);
+ ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx>
fragments_ctx,
+ const report_status_callback_impl&
report_status_cb_impl);
Status prepare(const TExecPlanFragmentParams& params);
- std::string to_http_path(const std::string& file_name);
-
Status execute();
Status cancel(const PPlanFragmentCancelReason& reason, const std::string&
msg = "");
@@ -146,7 +146,6 @@ private:
TUniqueId _fragment_instance_id;
// Used to report to coordinator which backend is over
int _backend_num;
- ExecEnv* _exec_env;
TNetworkAddress _coord_addr;
PlanFragmentExecutor _executor;
@@ -171,22 +170,24 @@ private:
// If set the true, this plan fragment will be executed only after FE send
execution start rpc.
bool _need_wait_execution_trigger = false;
+ report_status_callback_impl _report_status_cb_impl;
};
FragmentExecState::FragmentExecState(const TUniqueId& query_id,
const TUniqueId& fragment_instance_id,
int backend_num,
ExecEnv* exec_env,
- std::shared_ptr<QueryFragmentsCtx>
fragments_ctx)
+ std::shared_ptr<QueryFragmentsCtx>
fragments_ctx,
+ const report_status_callback_impl&
report_status_cb_impl)
: _query_id(query_id),
_fragment_instance_id(fragment_instance_id),
_backend_num(backend_num),
- _exec_env(exec_env),
_executor(exec_env,
std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
this, std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)),
_set_rsc_info(false),
_timeout_second(-1),
- _fragments_ctx(std::move(fragments_ctx)) {
+ _fragments_ctx(std::move(fragments_ctx)),
+ _report_status_cb_impl(report_status_cb_impl) {
_start_time = DateTimeValue::local_time();
_coord_addr = _fragments_ctx->coord_addr;
}
@@ -256,9 +257,67 @@ Status FragmentExecState::cancel(const
PPlanFragmentCancelReason& reason, const
return Status::OK();
}
-std::string FragmentExecState::to_http_path(const std::string& file_name) {
+// There can only be one of these callbacks in-flight at any moment, because
+// it is only invoked from the executor's reporting thread.
+// Also, the reported status will always reflect the most recent execution
status,
+// including the final status when execution finishes.
+void FragmentExecState::coordinator_callback(const Status& status,
RuntimeProfile* profile,
+ bool done) {
+ _report_status_cb_impl(
+ {status, profile, done, _coord_addr, _query_id, -1,
_fragment_instance_id, _backend_num,
+ _executor.runtime_state(),
+ std::bind(&FragmentExecState::update_status, this,
std::placeholders::_1),
+ std::bind(&PlanFragmentExecutor::cancel, &_executor,
std::placeholders::_1,
+ std::placeholders::_2)});
+ DCHECK(status.ok() || done); // if !status.ok() => done
+}
+
+FragmentMgr::FragmentMgr(ExecEnv* exec_env)
+ : _exec_env(exec_env), _stop_background_threads_latch(1) {
+ _entity =
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
+ INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
+ REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return
_fragment_map.size(); });
+
+ auto s = Thread::create(
+ "FragmentMgr", "cancel_timeout_plan_fragment", [this]() {
this->cancel_worker(); },
+ &_cancel_thread);
+ CHECK(s.ok()) << s.to_string();
+
+ // TODO(zc): we need a better thread-pool
+ // now one user can use all the thread pool, others have no resource.
+ s = ThreadPoolBuilder("FragmentMgrThreadPool")
+ .set_min_threads(config::fragment_pool_thread_num_min)
+ .set_max_threads(config::fragment_pool_thread_num_max)
+ .set_max_queue_size(config::fragment_pool_queue_size)
+ .build(&_thread_pool);
+
+ REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
+ [this]() { return _thread_pool->get_queue_size(); });
+ CHECK(s.ok()) << s.to_string();
+}
+
+FragmentMgr::~FragmentMgr() {
+ DEREGISTER_HOOK_METRIC(plan_fragment_count);
+ DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
+ _stop_background_threads_latch.count_down();
+ if (_cancel_thread) {
+ _cancel_thread->join();
+ }
+ // Stop all the worker, should wait for a while?
+ // _thread_pool->wait_for();
+ _thread_pool->shutdown();
+
+ // Only me can delete
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ _fragment_map.clear();
+ _fragments_ctx_map.clear();
+ }
+}
+
+std::string FragmentMgr::to_http_path(const std::string& file_name) {
std::stringstream url;
- url << "http://" << get_host_port(BackendOptions::get_localhost(),
config::webserver_port)
+ url << "http://" << BackendOptions::get_localhost() << ":" <<
config::webserver_port
<< "/api/_download_load?"
<< "token=" << _exec_env->token() << "&file=" << file_name;
return url.str();
@@ -268,94 +327,96 @@ std::string FragmentExecState::to_http_path(const
std::string& file_name) {
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution
status,
// including the final status when execution finishes.
-void FragmentExecState::coordinator_callback(const Status& status,
RuntimeProfile* profile,
- bool done) {
- DCHECK(status.ok() || done); // if !status.ok() => done
- Status exec_status = update_status(status);
+void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
+ DCHECK(req.status.ok() || req.done); // if !status.ok() => done
+ Status exec_status = req.update_fn(req.status);
Status coord_status;
- FrontendServiceConnection coord(_exec_env->frontend_client_cache(),
_coord_addr, &coord_status);
+ FrontendServiceConnection coord(_exec_env->frontend_client_cache(),
req.coord_addr,
+ &coord_status);
if (!coord_status.ok()) {
std::stringstream ss;
- UniqueId uid(_query_id.hi, _query_id.lo);
- ss << "couldn't get a client for " << _coord_addr << ", reason: " <<
coord_status;
+ UniqueId uid(req.query_id.hi, req.query_id.lo);
+ ss << "couldn't get a client for " << req.coord_addr << ", reason: "
<< coord_status;
LOG(WARNING) << "query_id: " << uid << ", " << ss.str();
- update_status(Status::InternalError(ss.str()));
+ req.update_fn(Status::InternalError(ss.str()));
return;
}
TReportExecStatusParams params;
params.protocol_version = FrontendServiceVersion::V1;
- params.__set_query_id(_query_id);
- params.__set_backend_num(_backend_num);
- params.__set_fragment_instance_id(_fragment_instance_id);
+ params.__set_query_id(req.query_id);
+ params.__set_backend_num(req.backend_num);
+ params.__set_fragment_instance_id(req.fragment_instance_id);
+ params.__set_fragment_id(req.fragment_id);
exec_status.set_t_status(¶ms);
- params.__set_done(done);
+ params.__set_done(req.done);
- RuntimeState* runtime_state = _executor.runtime_state();
- DCHECK(runtime_state != nullptr);
- if (runtime_state->query_type() == TQueryType::LOAD && !done &&
status.ok()) {
+ DCHECK(req.runtime_state != nullptr);
+ if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done &&
req.status.ok()) {
// this is a load plan, and load is not finished, just make a brief
report
- params.__set_loaded_rows(runtime_state->num_rows_load_total());
- params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
+ params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
+ params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
} else {
- if (runtime_state->query_type() == TQueryType::LOAD) {
- params.__set_loaded_rows(runtime_state->num_rows_load_total());
- params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
+ if (req.runtime_state->query_type() == TQueryType::LOAD) {
+ params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
+
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
}
- if (profile == nullptr) {
+ if (req.profile == nullptr) {
params.__isset.profile = false;
} else {
- profile->to_thrift(¶ms.profile);
+ req.profile->to_thrift(¶ms.profile);
params.__isset.profile = true;
}
- if (!runtime_state->output_files().empty()) {
+ if (!req.runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
- for (auto& it : runtime_state->output_files()) {
+ for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
- if (runtime_state->num_rows_load_total() > 0 ||
- runtime_state->num_rows_load_filtered() > 0) {
+ if (req.runtime_state->num_rows_load_total() > 0 ||
+ req.runtime_state->num_rows_load_filtered() > 0) {
params.__isset.load_counters = true;
static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
static std::string s_unselected_rows = "unselected.rows";
- params.load_counters.emplace(s_dpp_normal_all,
-
std::to_string(runtime_state->num_rows_load_success()));
- params.load_counters.emplace(s_dpp_abnormal_all,
-
std::to_string(runtime_state->num_rows_load_filtered()));
- params.load_counters.emplace(s_unselected_rows,
-
std::to_string(runtime_state->num_rows_load_unselected()));
+ params.load_counters.emplace(
+ s_dpp_normal_all,
std::to_string(req.runtime_state->num_rows_load_success()));
+ params.load_counters.emplace(
+ s_dpp_abnormal_all,
+
std::to_string(req.runtime_state->num_rows_load_filtered()));
+ params.load_counters.emplace(
+ s_unselected_rows,
+
std::to_string(req.runtime_state->num_rows_load_unselected()));
}
- if (!runtime_state->get_error_log_file_path().empty()) {
+ if (!req.runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
-
to_load_error_http_path(runtime_state->get_error_log_file_path()));
+
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
}
- if (!runtime_state->export_output_files().empty()) {
+ if (!req.runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
- params.export_files = runtime_state->export_output_files();
+ params.export_files = req.runtime_state->export_output_files();
}
- if (!runtime_state->tablet_commit_infos().empty()) {
+ if (!req.runtime_state->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
-
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
- for (auto& info : runtime_state->tablet_commit_infos()) {
+
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
+ for (auto& info : req.runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
}
- if (!runtime_state->error_tablet_infos().empty()) {
+ if (!req.runtime_state->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
-
params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size());
- for (auto& info : runtime_state->error_tablet_infos()) {
+
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
+ for (auto& info : req.runtime_state->error_tablet_infos()) {
params.errorTabletInfos.push_back(info);
}
}
// Send new errors to coordinator
- runtime_state->get_unreported_errors(&(params.error_log));
+ req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (params.error_log.size() > 0);
}
@@ -370,22 +431,23 @@ void FragmentExecState::coordinator_callback(const
Status& status, RuntimeProfil
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.to_string()
- << " to coordinator: " << _coord_addr << ", query id: "
<< print_id(_query_id)
- << ", instance id: " << print_id(_fragment_instance_id);
+ << " to coordinator: " << req.coord_addr
+ << ", query id: " << print_id(req.query_id)
+ << ", instance id: " <<
print_id(req.fragment_instance_id);
}
try {
try {
coord->reportExecStatus(res, params);
} catch (TTransportException& e) {
- LOG(WARNING) << "Retrying ReportExecStatus. query id: " <<
print_id(_query_id)
- << ", instance id: " <<
print_id(_fragment_instance_id) << " to "
- << _coord_addr << ", err: " << e.what();
+ LOG(WARNING) << "Retrying ReportExecStatus. query id: " <<
print_id(req.query_id)
+ << ", instance id: " <<
print_id(req.fragment_instance_id) << " to "
+ << req.coord_addr << ", err: " << e.what();
rpc_status = coord.reopen();
if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
- update_status(rpc_status);
- _executor.cancel();
+ req.update_fn(rpc_status);
+ req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR,
"report rpc fail");
return;
}
coord->reportExecStatus(res, params);
@@ -394,64 +456,22 @@ void FragmentExecState::coordinator_callback(const
Status& status, RuntimeProfil
rpc_status = Status(res.status);
} catch (TException& e) {
std::stringstream msg;
- msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" <<
e.what();
+ msg << "ReportExecStatus() to " << req.coord_addr << " failed:\n" <<
e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
}
if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
- update_status(rpc_status);
- _executor.cancel();
- }
-}
-
-FragmentMgr::FragmentMgr(ExecEnv* exec_env)
- : _exec_env(exec_env), _stop_background_threads_latch(1) {
- _entity =
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
- INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
- REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return
_fragment_map.size(); });
-
- auto s = Thread::create(
- "FragmentMgr", "cancel_timeout_plan_fragment", [this]() {
this->cancel_worker(); },
- &_cancel_thread);
- CHECK(s.ok()) << s.to_string();
-
- // TODO(zc): we need a better thread-pool
- // now one user can use all the thread pool, others have no resource.
- s = ThreadPoolBuilder("FragmentMgrThreadPool")
- .set_min_threads(config::fragment_pool_thread_num_min)
- .set_max_threads(config::fragment_pool_thread_num_max)
- .set_max_queue_size(config::fragment_pool_queue_size)
- .build(&_thread_pool);
-
- REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
- [this]() { return _thread_pool->get_queue_size(); });
- CHECK(s.ok()) << s.to_string();
-}
-
-FragmentMgr::~FragmentMgr() {
- DEREGISTER_HOOK_METRIC(plan_fragment_count);
- DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
- _stop_background_threads_latch.count_down();
- if (_cancel_thread) {
- _cancel_thread->join();
- }
- // Stop all the worker, should wait for a while?
- // _thread_pool->wait_for();
- _thread_pool->shutdown();
-
- // Only me can delete
- {
- std::lock_guard<std::mutex> lock(_lock);
- _fragment_map.clear();
- _fragments_ctx_map.clear();
+ req.update_fn(rpc_status);
+ req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, "rpc fail 2");
}
}
static void empty_function(RuntimeState*, Status*) {}
-void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb) {
+void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
+ const FinishCallback& cb) {
std::string func_name {"PlanFragmentExecutor::_exec_actual"};
#ifndef BE_TEST
auto span =
exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name);
@@ -597,7 +617,8 @@ void FragmentMgr::remove_pipeline_context(
}
}
-Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
FinishCallback cb) {
+Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
+ const FinishCallback& cb) {
auto tracer = telemetry::is_current_span_valid() ?
telemetry::get_tracer("tracer")
:
telemetry::get_noop_tracer();
VLOG_ROW << "exec_plan_fragment params is "
@@ -703,9 +724,11 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
}
fragments_ctx->fragment_ids.push_back(fragment_instance_id);
- exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
- params.params.fragment_instance_id,
params.backend_num,
- _exec_env, fragments_ctx));
+ exec_state.reset(
+ new FragmentExecState(fragments_ctx->query_id,
params.params.fragment_instance_id,
+ params.backend_num, _exec_env, fragments_ctx,
+
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback),
+ this,
std::placeholders::_1)));
if (params.__isset.need_wait_execution_trigger &&
params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not
actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
@@ -756,7 +779,9 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
fragments_ctx->query_id, fragment_instance_id, -1,
params.backend_num,
- fragments_ctx, _exec_env, cb);
+ fragments_ctx, _exec_env, cb,
+
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
+ std::placeholders::_1));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
@@ -782,7 +807,8 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
return Status::OK();
}
-Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
FinishCallback cb) {
+Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
+ const FinishCallback& cb) {
auto tracer = telemetry::is_current_span_valid() ?
telemetry::get_tracer("tracer")
:
telemetry::get_noop_tracer();
VLOG_ROW << "exec_plan_fragment params is "
@@ -889,8 +915,11 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params, Fi
fragments_ctx->fragment_ids.push_back(fragment_instance_id);
- exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
fragment_instance_id,
- local_params.backend_num,
_exec_env, fragments_ctx));
+ exec_state.reset(new FragmentExecState(
+ fragments_ctx->query_id, fragment_instance_id,
local_params.backend_num, _exec_env,
+ fragments_ctx,
+
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
+ std::placeholders::_1)));
if (params.__isset.need_wait_execution_trigger &&
params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not
actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
@@ -906,7 +935,9 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params, Fi
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
fragments_ctx->query_id, fragment_instance_id,
params.fragment_id,
- local_params.backend_num, fragments_ctx, _exec_env,
cb);
+ local_params.backend_num, fragments_ctx, _exec_env, cb,
+
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
+ std::placeholders::_1));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params, i);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 98e6c26ddd..6508ea0659 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -61,6 +61,20 @@ class RuntimeFilterMergeController;
std::string to_load_error_http_path(const std::string& file_name);
+struct ReportStatusRequest {
+ const Status& status;
+ RuntimeProfile* profile;
+ bool done;
+ TNetworkAddress coord_addr;
+ TUniqueId query_id;
+ int fragment_id;
+ TUniqueId fragment_instance_id;
+ int backend_num;
+ RuntimeState* runtime_state;
+ std::function<Status(Status)> update_fn;
+ std::function<void(const PPlanFragmentCancelReason&, const std::string&)>
cancel_fn;
+};
+
// This class used to manage all the fragment execute in this instance
class FragmentMgr : public RestMonitorIface {
public:
@@ -78,9 +92,9 @@ public:
std::shared_ptr<pipeline::PipelineFragmentContext>
pipeline_context);
// TODO(zc): report this is over
- Status exec_plan_fragment(const TExecPlanFragmentParams& params,
FinishCallback cb);
+ Status exec_plan_fragment(const TExecPlanFragmentParams& params, const
FinishCallback& cb);
- Status exec_plan_fragment(const TPipelineFragmentParams& params,
FinishCallback cb);
+ Status exec_plan_fragment(const TPipelineFragmentParams& params, const
FinishCallback& cb);
Status start_query_execution(const PExecPlanFragmentStartRequest* request);
@@ -116,8 +130,12 @@ public:
std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId&
fragment_instance_id);
+ std::string to_http_path(const std::string& file_name);
+
+ void coordinator_callback(const ReportStatusRequest& req);
+
private:
- void _exec_actual(std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb);
+ void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const
FinishCallback& cb);
void _set_scan_concurrency(const TExecPlanFragmentParams& params,
QueryFragmentsCtx* fragments_ctx);
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 96908dde30..f62bd4bae1 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -445,11 +445,7 @@ void PlanFragmentExecutor::send_report(bool done) {
// This will send a report even if we are cancelled. If the query
completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the
coordinator will
// be waiting for a final report and profile.
- if (_is_report_success) {
- _report_status_cb(status, profile(), done || !status.ok());
- } else {
- _report_status_cb(status, nullptr, done || !status.ok());
- }
+ _report_status_cb(status, _is_report_success ? profile() : nullptr, done
|| !status.ok());
}
void PlanFragmentExecutor::stop_report_thread() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]