This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bf741a2b671 [Fix](Status) Handle returned Status correctly #31434 bf741a2b671 is described below commit bf741a2b6717303c41e5ebe3c53c1f2279492f91 Author: Uniqueyou <134280716+wyxxx...@users.noreply.github.com> AuthorDate: Thu Feb 29 23:47:33 2024 +0800 [Fix](Status) Handle returned Status correctly #31434 --- be/src/agent/task_worker_pool.cpp | 2 +- be/src/exec/odbc_connector.cpp | 2 +- be/src/exprs/runtime_filter.cpp | 14 ++++++++------ be/src/exprs/runtime_filter.h | 2 +- be/src/exprs/runtime_filter_slots.h | 2 +- be/src/http/action/restore_tablet_action.cpp | 3 +-- be/src/http/action/tablet_migration_action.cpp | 8 ++++---- be/src/index-tools/index_tool.cpp | 7 ++++++- be/src/olap/tablet_manager.cpp | 3 +-- be/src/olap/tablet_manager.h | 2 +- be/src/pipeline/exec/analytic_source_operator.cpp | 7 +++---- be/src/pipeline/exec/analytic_source_operator.h | 2 +- be/src/pipeline/exec/result_file_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/result_sink_operator.cpp | 4 ++-- be/src/pipeline/pipeline_x/operator.cpp | 2 +- be/src/runtime/result_buffer_mgr.cpp | 5 ++--- be/src/runtime/result_buffer_mgr.h | 2 +- be/src/vec/exec/vanalytic_eval_node.cpp | 15 +++++++-------- be/src/vec/exec/vanalytic_eval_node.h | 2 +- be/src/vec/exec/vsort_node.cpp | 2 +- be/src/vec/exec/vunion_node.cpp | 2 +- be/src/vec/functions/function_rpc.cpp | 10 ++++++---- be/src/vec/functions/function_rpc.h | 6 +++--- be/src/vec/olap/vertical_merge_iterator.cpp | 4 ++-- be/src/vec/olap/vgeneric_iterators.cpp | 4 ++-- be/src/vec/sink/async_writer_sink.h | 2 +- be/src/vec/sink/autoinc_buffer.cpp | 10 ++++++---- be/src/vec/sink/autoinc_buffer.h | 2 +- be/src/vec/sink/vdata_stream_sender.cpp | 2 +- be/src/vec/sink/vresult_file_sink.cpp | 6 +++--- be/src/vec/sink/vresult_sink.cpp | 6 +++--- be/src/vec/sink/writer/async_result_writer.cpp | 8 +++++--- be/src/vec/sink/writer/async_result_writer.h | 2 +- be/src/vec/sink/writer/vfile_result_writer.cpp | 5 ++--- be/src/vec/sink/writer/vfile_result_writer.h | 2 +- 35 files changed, 84 insertions(+), 77 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index b47f79734e9..c4fc258552b 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -997,7 +997,7 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf request.__isset.tablets = true; uint64_t report_version = s_report_version; - static_cast<void>(engine.tablet_manager()->build_all_report_tablets_info(&request.tablets)); + engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); if (report_version < s_report_version) { // TODO llj This can only reduce the possibility for report error, but can't avoid it. // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp index e25d6ff62f2..0a0f240cec7 100644 --- a/be/src/exec/odbc_connector.cpp +++ b/be/src/exec/odbc_connector.cpp @@ -151,7 +151,7 @@ Status ODBCConnector::open(RuntimeState* state, bool read) { LOG(INFO) << "connect success:" << _connect_string.substr(0, _connect_string.find("Pwd=")); _is_open = true; - static_cast<void>(begin_trans()); + RETURN_IF_ERROR(begin_trans()); return Status::OK(); } diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 786876cc796..24c41613be4 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -331,7 +331,7 @@ public: return Status::OK(); } - void change_to_bloom_filter(bool need_init_bf = false) { + Status change_to_bloom_filter(bool need_init_bf = false) { CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) << "Can not change to bloom filter because of runtime filter type is " << IRuntimeFilter::to_string(_filter_type); @@ -339,11 +339,12 @@ public: BloomFilterFuncBase* bf = _context.bloom_filter_func.get(); if (need_init_bf) { // BloomFilter may be not init - static_cast<void>(bf->init_with_fixed_length()); + RETURN_IF_ERROR(bf->init_with_fixed_length()); insert_to_bloom_filter(bf); } // release in filter _context.hybrid_set.reset(); + return Status::OK(); } Status init_bloom_filter(const size_t build_bf_cardinality) { @@ -508,12 +509,12 @@ public: VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; - change_to_bloom_filter(true); + RETURN_IF_ERROR(change_to_bloom_filter(true)); } } else { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; - change_to_bloom_filter(); + RETURN_IF_ERROR(change_to_bloom_filter()); RETURN_IF_ERROR(_context.bloom_filter_func->merge( wrapper->_context.bloom_filter_func.get())); } @@ -1307,8 +1308,9 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, } } -void IRuntimeFilter::change_to_bloom_filter() { - _wrapper->change_to_bloom_filter(); +Status IRuntimeFilter::change_to_bloom_filter() { + RETURN_IF_ERROR(_wrapper->change_to_bloom_filter()); + return Status::OK(); } Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 74b2580a4e6..91456cccced 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -278,7 +278,7 @@ public: static Status create_wrapper(const UpdateRuntimeFilterParamsV2* param, RuntimePredicateWrapper** wrapper); - void change_to_bloom_filter(); + Status change_to_bloom_filter(); Status init_bloom_filter(const size_t build_bf_cardinality); Status update_filter(const UpdateRuntimeFilterParams* param); void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t merge_time, diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index c9c1a996064..1fadf81e809 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -105,7 +105,7 @@ public: if (over_max_in_num && runtime_filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - runtime_filter->change_to_bloom_filter(); + RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter()); } if (runtime_filter->is_bloomfilter()) { diff --git a/be/src/http/action/restore_tablet_action.cpp b/be/src/http/action/restore_tablet_action.cpp index 9b2428befaa..0413a94dd66 100644 --- a/be/src/http/action/restore_tablet_action.cpp +++ b/be/src/http/action/restore_tablet_action.cpp @@ -183,8 +183,7 @@ Status RestoreTabletAction::_restore(const std::string& key, int64_t tablet_id, Status s = _create_hard_link_recursive(latest_tablet_path, restore_schema_hash_path); if (!s.ok()) { // do not check the status of delete_directory, return status of link operation - static_cast<void>( - io::global_local_filesystem()->delete_directory(restore_schema_hash_path)); + RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(restore_schema_hash_path)); return s; } std::string restore_shard_path = store->get_absolute_shard_path(tablet_meta.shard_id()); diff --git a/be/src/http/action/tablet_migration_action.cpp b/be/src/http/action/tablet_migration_action.cpp index b24aebdfffd..3e44d985881 100644 --- a/be/src/http/action/tablet_migration_action.cpp +++ b/be/src/http/action/tablet_migration_action.cpp @@ -40,10 +40,10 @@ const static std::string HEADER_JSON = "application/json"; void TabletMigrationAction::_init_migration_action() { int32_t max_thread_num = config::max_tablet_migration_threads; int32_t min_thread_num = config::min_tablet_migration_threads; - static_cast<void>(ThreadPoolBuilder("MigrationTaskThreadPool") - .set_min_threads(min_thread_num) - .set_max_threads(max_thread_num) - .build(&_migration_thread_pool)); + THROW_IF_ERROR(ThreadPoolBuilder("MigrationTaskThreadPool") + .set_min_threads(min_thread_num) + .set_max_threads(max_thread_num) + .build(&_migration_thread_pool)); } void TabletMigrationAction::handle(HttpRequest* req) { diff --git a/be/src/index-tools/index_tool.cpp b/be/src/index-tools/index_tool.cpp index ade72ae6809..91beb905ae2 100644 --- a/be/src/index-tools/index_tool.cpp +++ b/be/src/index-tools/index_tool.cpp @@ -265,7 +265,12 @@ int main(int argc, char** argv) { std::vector<FileInfo> files; bool exists = false; std::filesystem::path root_dir(FLAGS_directory); - static_cast<void>(fs->list(root_dir, true, &files, &exists)); + doris::Status status = fs->list(root_dir, true, &files, &exists); + if (!status.ok) { + std::cerr << "can't search from directory's all files,err : " << status + << std::endl; + return -1; + } if (!exists) { std::cerr << FLAGS_directory << " is not exists" << std::endl; return -1; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index df9a45b03c8..248acf2270d 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -993,7 +993,7 @@ Status TabletManager::report_tablet_info(TTabletInfo* tablet_info) { return res; } -Status TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info) { +void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info) { DCHECK(tablets_info != nullptr); VLOG_NOTICE << "begin to build all report tablets info"; @@ -1032,7 +1032,6 @@ Status TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet> DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( tablet_version_num_hist); LOG(INFO) << "success to build all report tablets info. tablet_count=" << tablets_info->size(); - return Status::OK(); } Status TabletManager::start_trash_sweep() { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 04f79b4f0f0..178ba50d9eb 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -132,7 +132,7 @@ public: // Status::Error<INVALID_ARGUMENT>(), if tables is null Status report_tablet_info(TTabletInfo* tablet_info); - Status build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info); + void build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info); Status start_trash_sweep(); diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 2f0f827d088..cb98b2e6466 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -240,13 +240,12 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) { return Status::OK(); } -Status AnalyticLocalState::_reset_agg_status() { +void AnalyticLocalState::_reset_agg_status() { for (size_t i = 0; i < _agg_functions_size; ++i) { _agg_functions[i]->reset( _fn_place_ptr + _parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]); } - return Status::OK(); } Status AnalyticLocalState::_create_agg_status() { @@ -359,7 +358,7 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) { range_end = _shared_state->current_row_position + 1; //going on calculate,add up data, no need to reset state } else { - static_cast<void>(_reset_agg_status()); + _reset_agg_status(); if (!_parent->cast<AnalyticSourceOperatorX>() ._window.__isset .window_start) { //[preceding, offset] --unbound: [preceding, following] @@ -437,7 +436,7 @@ bool AnalyticLocalState::init_next_partition(vectorized::BlockRowPos found_parti _partition_by_start = _shared_state->partition_by_end; _shared_state->partition_by_end = found_partition_end; _shared_state->current_row_position = _partition_by_start.pos; - static_cast<void>(_reset_agg_status()); + _reset_agg_status(); return true; } return false; diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index b2ab5b24b3c..eeb790ebf94 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -88,7 +88,7 @@ private: bool need_check_first = false); bool _whether_need_next_partition(vectorized::BlockRowPos& found_partition_end); - Status _reset_agg_status(); + void _reset_agg_status(); Status _create_agg_status(); Status _destroy_agg_status(); diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 88646ba4db3..4ed414a0774 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -187,9 +187,9 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); static_cast<void>(_sender->close(final_status)); } - static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time( + state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id())); + state->fragment_instance_id()); } else { if (final_status.ok()) { bool all_receiver_eof = true; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 11a208e1392..ddaf1cd4a2b 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -182,9 +182,9 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { } static_cast<void>(_sender->close(final_status)); } - static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time( + state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id())); + state->fragment_instance_id()); RETURN_IF_ERROR(Base::close(state, exec_status)); return final_status; } diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 30ff16dde80..a29516a29af 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -490,7 +490,7 @@ template <typename Writer, typename Parent> requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); - _writer->start_writer(state, _profile); + RETURN_IF_ERROR(_writer->start_writer(state, _profile)); return Status::OK(); } diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index a2009c5ec3c..04831fcefb2 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -92,7 +92,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size // details see issue https://github.com/apache/doris/issues/16203 // add extra 5s for avoid corner case int64_t max_timeout = time(nullptr) + exec_timout + 5; - static_cast<void>(cancel_at_time(max_timeout, query_id)); + cancel_at_time(max_timeout, query_id); } *sender = control_block; return Status::OK(); @@ -173,7 +173,7 @@ Status ResultBufferMgr::cancel(const TUniqueId& query_id) { return Status::OK(); } -Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { +void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { std::lock_guard<std::mutex> l(_timeout_lock); TimeoutMap::iterator iter = _timeout_map.find(cancel_time); @@ -184,7 +184,6 @@ Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& quer } iter->second.push_back(query_id); - return Status::OK(); } void ResultBufferMgr::cancel_thread() { diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 7995496cbf9..06d13104205 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -74,7 +74,7 @@ public: Status cancel(const TUniqueId& fragment_id); // cancel one query at a future time. - Status cancel_at_time(time_t cancel_time, const TUniqueId& query_id); + void cancel_at_time(time_t cancel_time, const TUniqueId& query_id); private: using BufferMap = std::unordered_map<TUniqueId, std::shared_ptr<BufferControlBlock>>; diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 4491d291e2d..fbd49aa145a 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -203,7 +203,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { } _fn_place_ptr = _agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states, _align_aggregate_states); - RETURN_IF_CATCH_EXCEPTION(static_cast<void>(_create_agg_status())); + RETURN_IF_ERROR(_create_agg_status()); _executor.insert_result = std::bind<void>(&VAnalyticEvalNode::_insert_result_info, this, std::placeholders::_1); _executor.execute = @@ -211,7 +211,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); for (const auto& ctx : _agg_expr_ctxs) { - static_cast<void>(VExpr::prepare(ctx, state, child(0)->row_desc())); + RETURN_IF_ERROR(VExpr::prepare(ctx, state, child(0)->row_desc())); } if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) { vector<TTupleId> tuple_ids; @@ -274,9 +274,9 @@ Status VAnalyticEvalNode::pull(doris::RuntimeState* /*state*/, vectorized::Block return Status::OK(); } _next_partition = _init_next_partition(_found_partition_end); - static_cast<void>(_init_result_columns()); + RETURN_IF_ERROR(_init_result_columns()); size_t current_block_rows = _input_blocks[_output_block_index].rows(); - static_cast<void>(_executor.get_next(current_block_rows)); + RETURN_IF_ERROR(_executor.get_next(current_block_rows)); if (_window_end_position == current_block_rows) { break; } @@ -373,7 +373,7 @@ Status VAnalyticEvalNode::_get_next_for_rows(size_t current_block_rows) { range_end = _current_row_position + 1; //going on calculate,add up data, no need to reset state } else { - static_cast<void>(_reset_agg_status()); + _reset_agg_status(); if (!_window.__isset .window_start) { //[preceding, offset] --unbound: [preceding, following] range_start = _partition_by_start.pos; @@ -610,7 +610,7 @@ bool VAnalyticEvalNode::_init_next_partition(BlockRowPos found_partition_end) { _partition_by_start = _partition_by_end; _partition_by_end = found_partition_end; _current_row_position = _partition_by_start.pos; - static_cast<void>(_reset_agg_status()); + _reset_agg_status(); return true; } return false; @@ -730,11 +730,10 @@ Status VAnalyticEvalNode::_init_result_columns() { return Status::OK(); } -Status VAnalyticEvalNode::_reset_agg_status() { +void VAnalyticEvalNode::_reset_agg_status() { for (size_t i = 0; i < _agg_functions_size; ++i) { _agg_functions[i]->reset(_fn_place_ptr + _offsets_of_aggregate_states[i]); } - return Status::OK(); } Status VAnalyticEvalNode::_create_agg_status() { diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index 8c26c1ad79e..45f7ce5b1e8 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -98,7 +98,7 @@ private: void _execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start, int64_t frame_end); - Status _reset_agg_status(); + void _reset_agg_status(); Status _init_result_columns(); Status _create_agg_status(); Status _destroy_agg_status(); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 4206d9b6a2f..848256a0aee 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -188,7 +188,7 @@ Status VSortNode::open(RuntimeState* state) { } while (!eos); - static_cast<void>(child(0)->close(state)); + RETURN_IF_ERROR(child(0)->close(state)); mem_tracker()->consume(_sorter->data_size()); COUNTER_UPDATE(_sort_blocks_memory_usage, _sorter->data_size()); diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index cc7eabc3bd2..2405373a062 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -276,7 +276,7 @@ Status VUnionNode::get_next(RuntimeState* state, Block* block, bool* eos) { // The previous child needs to be closed if passthrough was enabled for it. In the non // passthrough case, the child was already closed in the previous call to get_next(). DCHECK(is_child_passthrough(_to_close_child_idx)); - static_cast<void>(child(_to_close_child_idx)->close(state)); + RETURN_IF_ERROR(child(_to_close_child_idx)->close(state)); _to_close_child_idx = -1; } diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index a900436ffc5..ba171ffbbc9 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -26,6 +26,7 @@ #include <memory> #include <utility> +#include "common/status.h" #include "runtime/exec_env.h" #include "util/brpc_client_cache.h" #include "vec/columns/column.h" @@ -46,7 +47,7 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnN PFunctionCallRequest request; PFunctionCallResponse response; request.set_function_name(_function_name); - _convert_block_to_proto(block, arguments, input_rows_count, &request); + RETURN_IF_ERROR(_convert_block_to_proto(block, arguments, input_rows_count, &request)); brpc::Controller cntl; _client->fn_call(&cntl, &request, &response, nullptr); if (cntl.Failed()) { @@ -65,16 +66,17 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnN return Status::OK(); } -void RPCFnImpl::_convert_block_to_proto(Block& block, const ColumnNumbers& arguments, - size_t input_rows_count, PFunctionCallRequest* request) { +Status RPCFnImpl::_convert_block_to_proto(Block& block, const ColumnNumbers& arguments, + size_t input_rows_count, PFunctionCallRequest* request) { size_t row_count = std::min(block.rows(), input_rows_count); for (size_t col_idx : arguments) { PValues* arg = request->add_args(); ColumnWithTypeAndName& column = block.get_by_position(col_idx); arg->set_has_null(column.column->has_null(row_count)); auto col = column.column->convert_to_full_column_if_const(); - static_cast<void>(column.type->get_serde()->write_column_to_pb(*col, *arg, 0, row_count)); + RETURN_IF_ERROR(column.type->get_serde()->write_column_to_pb(*col, *arg, 0, row_count)); } + return Status::OK(); } void RPCFnImpl::_convert_to_block(Block& block, const PValues& result, size_t pos) { diff --git a/be/src/vec/functions/function_rpc.h b/be/src/vec/functions/function_rpc.h index fa23e5b97cb..ae71632f974 100644 --- a/be/src/vec/functions/function_rpc.h +++ b/be/src/vec/functions/function_rpc.h @@ -52,9 +52,9 @@ public: bool available() { return _client != nullptr; } private: - void _convert_block_to_proto(vectorized::Block& block, - const vectorized::ColumnNumbers& arguments, - size_t input_rows_count, PFunctionCallRequest* request); + Status _convert_block_to_proto(vectorized::Block& block, + const vectorized::ColumnNumbers& arguments, + size_t input_rows_count, PFunctionCallRequest* request); void _convert_to_block(vectorized::Block& block, const PValues& result, size_t pos); std::shared_ptr<PFunctionService_Stub> _client; diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 5fbd1ed2c2e..49916048b5c 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -363,7 +363,7 @@ Status VerticalMergeIteratorContext::_load_next_block() { } for (auto it = _block_list.begin(); it != _block_list.end(); it++) { if (it->use_count() == 1) { - static_cast<void>(block_reset(*it)); + RETURN_IF_ERROR(block_reset(*it)); _block = *it; _block_list.erase(it); break; @@ -371,7 +371,7 @@ Status VerticalMergeIteratorContext::_load_next_block() { } if (_block == nullptr) { _block = std::make_shared<Block>(); - static_cast<void>(block_reset(_block)); + RETURN_IF_ERROR(block_reset(_block)); } Status st = _iter->next_batch(_block.get()); if (!st.ok()) { diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 0e97f1d1720..2d45c884a78 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -292,7 +292,7 @@ Status VMergeIteratorContext::_load_next_block() { } for (auto it = _block_list.begin(); it != _block_list.end(); it++) { if (it->use_count() == 1) { - static_cast<void>(block_reset(*it)); + RETURN_IF_ERROR(block_reset(*it)); _block = *it; _block_list.erase(it); break; @@ -300,7 +300,7 @@ Status VMergeIteratorContext::_load_next_block() { } if (_block == nullptr) { _block = std::make_shared<Block>(); - static_cast<void>(block_reset(_block)); + RETURN_IF_ERROR(block_reset(_block)); } Status st = _iter->next_batch(_block.get()); if (!st.ok()) { diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index 8105ff96573..337fbcf3f99 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -72,7 +72,7 @@ public: // Prepare the exprs to run. RETURN_IF_ERROR(VExpr::open(_output_vexpr_ctxs, state)); if (state->enable_pipeline_exec()) { - _writer->start_writer(state, _profile); + RETURN_IF_ERROR(_writer->start_writer(state, _profile)); } else { RETURN_IF_ERROR(_writer->open(state, _profile)); } diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 844d8ed8524..39613159aca 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -19,6 +19,7 @@ #include <gen_cpp/HeartbeatService_types.h> +#include "common/status.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "util/runtime_profile.h" @@ -48,7 +49,7 @@ void AutoIncIDBuffer::_wait_for_prefetching() { Status AutoIncIDBuffer::sync_request_ids(size_t length, std::vector<std::pair<int64_t, size_t>>* result) { std::unique_lock<std::mutex> lock(_mutex); - _prefetch_ids(_prefetch_size()); + RETURN_IF_ERROR(_prefetch_ids(_prefetch_size())); if (_front_buffer.second > 0) { auto min_length = std::min(_front_buffer.second, length); length -= min_length; @@ -75,13 +76,13 @@ Status AutoIncIDBuffer::sync_request_ids(size_t length, return Status::OK(); } -void AutoIncIDBuffer::_prefetch_ids(size_t length) { +Status AutoIncIDBuffer::_prefetch_ids(size_t length) { if (_front_buffer.second > _low_water_level_mark() || _is_fetching) { - return; + return Status::OK(); } TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; _is_fetching = true; - static_cast<void>(_rpc_token->submit_func([=, this]() { + RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; request.__set_db_id(_db_id); @@ -113,6 +114,7 @@ void AutoIncIDBuffer::_prefetch_ids(size_t length) { } _is_fetching = false; })); + return Status::OK(); } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index 80a469596fd..3ec009b0960 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -64,7 +64,7 @@ public: Status sync_request_ids(size_t length, std::vector<std::pair<int64_t, size_t>>* result); private: - void _prefetch_ids(size_t length); + Status _prefetch_ids(size_t length); [[nodiscard]] size_t _prefetch_size() const { return _batch_size * config::auto_inc_prefetch_size_ratio; } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index a7f1b4b8584..b6e4d0b962b 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -798,7 +798,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { if (_peak_memory_usage_counter) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } - static_cast<void>(DataSink::close(state, exec_status)); + RETURN_IF_ERROR(DataSink::close(state, exec_status)); return final_st; } diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 08dd881bf4b..035a76a2f7f 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -132,11 +132,11 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { // close sender, this is normal path end if (_sender) { _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); - static_cast<void>(_sender->close(final_status)); + RETURN_IF_ERROR(_sender->close(final_status)); } - static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time( + state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id())); + state->fragment_instance_id()); } else { if (final_status.ok()) { auto st = _stream_sender->send(state, _output_block.get(), true); diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 59bf82483c5..a7da5a6a5ff 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -170,11 +170,11 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) { if (_writer) { _sender->update_return_rows(_writer->get_written_rows()); } - static_cast<void>(_sender->close(final_status)); + RETURN_IF_ERROR(_sender->close(final_status)); } - static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time( + state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id())); + state->fragment_instance_id()); return DataSink::close(state, exec_status); } diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 0d88b9dedfa..fc431b6e863 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -17,6 +17,7 @@ #include "async_result_writer.h" +#include "common/status.h" #include "pipeline/pipeline_x/dependency.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -86,7 +87,7 @@ std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() { return block; } -void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) { +Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) { // Should set to false here, to _writer_thread_closed = false; // This is a async thread, should lock the task ctx, to make sure runtimestate and profile @@ -94,7 +95,7 @@ void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profil auto task_ctx = state->get_task_execution_context(); if (state->get_query_ctx() && state->get_query_ctx()->get_non_pipe_exec_thread_pool()) { ThreadPool* pool_ptr = state->get_query_ctx()->get_non_pipe_exec_thread_pool(); - static_cast<void>(pool_ptr->submit_func([this, state, profile, task_ctx]() { + RETURN_IF_ERROR(pool_ptr->submit_func([this, state, profile, task_ctx]() { auto task_lock = task_ctx.lock(); if (task_lock == nullptr) { _writer_thread_closed = true; @@ -103,7 +104,7 @@ void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profil this->process_block(state, profile); })); } else { - static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( [this, state, profile, task_ctx]() { auto task_lock = task_ctx.lock(); if (task_lock == nullptr) { @@ -113,6 +114,7 @@ void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profil this->process_block(state, profile); })); } + return Status::OK(); } void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profile) { diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 8ed39aeb795..ac14616d333 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -76,7 +76,7 @@ public: Status sink(Block* block, bool eos); // Add the IO thread task process block() to thread pool to dispose the IO - void start_writer(RuntimeState* state, RuntimeProfile* profile); + Status start_writer(RuntimeState* state, RuntimeProfile* profile); Status get_writer_status() { std::lock_guard l(_m); diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index c30a23ec219..661afa15935 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -199,7 +199,7 @@ Status VFileResultWriter::_get_next_file_name(std::string* file_name) { // S3: {file_path}{fragment_instance_id}_ // BROKER: {file_path}{fragment_instance_id}_ -Status VFileResultWriter::_get_file_url(std::string* file_url) { +void VFileResultWriter::_get_file_url(std::string* file_url) { std::stringstream ss; if (_storage_type == TStorageBackendType::LOCAL) { ss << "file:///" << BackendOptions::get_localhost(); @@ -207,7 +207,6 @@ Status VFileResultWriter::_get_file_url(std::string* file_url) { ss << _file_opts->file_path; ss << print_id(_fragment_instance_id) << "_"; *file_url = ss.str(); - return Status::OK(); } std::string VFileResultWriter::_file_format_to_name() { @@ -306,7 +305,7 @@ Status VFileResultWriter::_send_result() { row_buffer.push_bigint(_written_rows_counter->value()); // total rows row_buffer.push_bigint(_written_data_bytes->value()); // file size std::string file_url; - static_cast<void>(_get_file_url(&file_url)); + _get_file_url(&file_url); std::stringstream ss; ss << file_url << "*"; file_url = ss.str(); diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 864d0966a77..c52050ba6f1 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -83,7 +83,7 @@ private: // get next export file name Status _get_next_file_name(std::string* file_name); Status _get_success_file_name(std::string* file_name); - Status _get_file_url(std::string* file_url); + void _get_file_url(std::string* file_url); std::string _file_format_to_name(); // close file writer, and if !done, it will create new writer for next file. Status _close_file_writer(bool done); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org