This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit 99615e31cbaf1fd8ca3df501c6bf9d800a50eb00 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Jun 9 21:06:59 2023 +0800 [improvement](exception-safe) create and prepare node/sink support exception safe (#20551) --- be/src/common/exception.h | 22 ++++++++++ be/src/exec/data_sink.cpp | 49 ++++++++-------------- be/src/olap/rowset/segment_v2/page_io.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/runtime/plan_fragment_executor.cpp | 13 +++--- be/src/vec/core/block.cpp | 4 +- be/src/vec/data_types/data_type_decimal.cpp | 11 +++-- be/src/vec/data_types/data_type_time_v2.cpp | 4 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 4 +- .../exec/format/parquet/vparquet_group_reader.cpp | 4 +- be/src/vec/exec/scan/vfile_scanner.cpp | 13 ++---- be/src/vec/exec/vsort_node.cpp | 4 +- 12 files changed, 70 insertions(+), 62 deletions(-) diff --git a/be/src/common/exception.h b/be/src/common/exception.h index 55eef571bc..3166d83844 100644 --- a/be/src/common/exception.h +++ b/be/src/common/exception.h @@ -92,3 +92,25 @@ inline std::string Exception::to_string() const { } \ } \ } while (0) + +#define RETURN_IF_ERROR_OR_CATCH_EXCEPTION(stmt) \ + do { \ + try { \ + doris::enable_thread_catch_bad_alloc++; \ + Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \ + { \ + Status _status_ = (stmt); \ + if (UNLIKELY(!_status_.ok())) { \ + return _status_; \ + } \ + } \ + } catch (const doris::Exception& e) { \ + if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { \ + return Status::MemoryLimitExceeded(fmt::format( \ + "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \ + e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \ + } else { \ + return Status::Error(e.code(), e.to_string()); \ + } \ + } \ + } while (0) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index b0d75067e0..c5d5446611 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -50,8 +50,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink const TPlanFragmentExecParams& params, const RowDescriptor& row_desc, RuntimeState* state, std::unique_ptr<DataSink>* sink, DescriptorTbl& desc_tbl) { - DataSink* tmp_sink = nullptr; - switch (thrift_sink.type) { case TDataSinkType::DATA_STREAM_SINK: { if (!thrift_sink.__isset.stream_sink) { @@ -62,11 +60,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink ? params.send_query_statistics_with_every_batch : false; // TODO: figure out good buffer size based on size of output row - tmp_sink = new vectorized::VDataStreamSender( + sink->reset(new vectorized::VDataStreamSender( state, pool, params.sender_id, row_desc, thrift_sink.stream_sink, - params.destinations, 16 * 1024, send_query_statistics_with_every_batch); + params.destinations, 16 * 1024, send_query_statistics_with_every_batch)); // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink)); - sink->reset(tmp_sink); break; } case TDataSinkType::RESULT_SINK: { @@ -75,9 +72,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink } // TODO: figure out good buffer size based on size of output row - tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs, - thrift_sink.result_sink, 4096); - sink->reset(tmp_sink); + sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs, + thrift_sink.result_sink, 4096)); break; } case TDataSinkType::RESULT_FILE_SINK: { @@ -92,17 +88,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink : false; // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { - tmp_sink = new doris::vectorized::VResultFileSink( + sink->reset(new doris::vectorized::VResultFileSink( pool, params.sender_id, row_desc, thrift_sink.result_file_sink, params.destinations, 16 * 1024, send_query_statistics_with_every_batch, - output_exprs, desc_tbl); + output_exprs, desc_tbl)); } else { - tmp_sink = new doris::vectorized::VResultFileSink( + sink->reset(new doris::vectorized::VResultFileSink( pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, - send_query_statistics_with_every_batch, output_exprs); + send_query_statistics_with_every_batch, output_exprs)); } - - sink->reset(tmp_sink); break; } case TDataSinkType::MEMORY_SCRATCH_SINK: { @@ -110,8 +104,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } - tmp_sink = new vectorized::MemoryScratchSink(row_desc, output_exprs); - sink->reset(tmp_sink); + sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs)); break; } case TDataSinkType::MYSQL_TABLE_SINK: { @@ -193,7 +186,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink const size_t& local_param_idx, const RowDescriptor& row_desc, RuntimeState* state, std::unique_ptr<DataSink>* sink, DescriptorTbl& desc_tbl) { - DataSink* tmp_sink = nullptr; const auto& local_params = params.local_params[local_param_idx]; switch (thrift_sink.type) { case TDataSinkType::DATA_STREAM_SINK: { @@ -205,11 +197,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink ? params.send_query_statistics_with_every_batch : false; // TODO: figure out good buffer size based on size of output row - tmp_sink = new vectorized::VDataStreamSender( + sink->reset(new vectorized::VDataStreamSender( state, pool, local_params.sender_id, row_desc, thrift_sink.stream_sink, - params.destinations, 16 * 1024, send_query_statistics_with_every_batch); + params.destinations, 16 * 1024, send_query_statistics_with_every_batch)); // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink)); - sink->reset(tmp_sink); break; } case TDataSinkType::RESULT_SINK: { @@ -218,9 +209,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink } // TODO: figure out good buffer size based on size of output row - tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs, - thrift_sink.result_sink, 4096); - sink->reset(tmp_sink); + sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs, + thrift_sink.result_sink, 4096)); break; } case TDataSinkType::RESULT_FILE_SINK: { @@ -235,17 +225,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink : false; // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { - tmp_sink = new doris::vectorized::VResultFileSink( + sink->reset(new doris::vectorized::VResultFileSink( pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink, params.destinations, 16 * 1024, send_query_statistics_with_every_batch, - output_exprs, desc_tbl); + output_exprs, desc_tbl)); } else { - tmp_sink = new doris::vectorized::VResultFileSink( + sink->reset(new doris::vectorized::VResultFileSink( pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, - send_query_statistics_with_every_batch, output_exprs); + send_query_statistics_with_every_batch, output_exprs)); } - - sink->reset(tmp_sink); break; } case TDataSinkType::MEMORY_SCRATCH_SINK: { @@ -253,8 +241,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } - tmp_sink = new vectorized::MemoryScratchSink(row_desc, output_exprs); - sink->reset(tmp_sink); + sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs)); break; } case TDataSinkType::MYSQL_TABLE_SINK: { diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 2f27375fe5..3016542372 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -51,7 +51,7 @@ Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space size_t uncompressed_size = Slice::compute_total_size(body); if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) { faststring buf; - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(codec->compress(body, uncompressed_size, &buf))); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf)); double space_saving = 1.0 - static_cast<double>(buf.size()) / uncompressed_size; // return compressed body only when it saves more than min_space_saving if (space_saving > 0 && space_saving >= min_space_saving) { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index ebb30fe384..edf0dc20cd 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -309,7 +309,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->set_num_per_fragment_instances(request.num_senders); if (request.fragment.__isset.output_sink) { - RETURN_IF_ERROR(DataSink::create_data_sink( + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( _runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, request, idx, _root_plan->row_desc(), _runtime_state.get(), &_sink, *desc_tbl)); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 079719927b..49b1afdc50 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -160,8 +160,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, // set up plan DCHECK(request.__isset.fragment); - RETURN_IF_ERROR(ExecNode::create_tree(_runtime_state.get(), obj_pool(), request.fragment.plan, - *desc_tbl, &_plan)); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree( + _runtime_state.get(), obj_pool(), request.fragment.plan, *desc_tbl, &_plan)); // set #senders of exchange nodes before calling Prepare() std::vector<ExecNode*> exch_nodes; @@ -173,6 +173,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, static_cast<doris::vectorized::VExchangeNode*>(exch_node)->set_num_senders(num_senders); } + // TODO Is it exception safe? RETURN_IF_ERROR(_plan->prepare(_runtime_state.get())); // set scan ranges std::vector<ExecNode*> scan_nodes; @@ -211,10 +212,10 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, // set up sink, if required if (request.fragment.__isset.output_sink) { - RETURN_IF_ERROR(DataSink::create_data_sink(obj_pool(), request.fragment.output_sink, - request.fragment.output_exprs, params, - row_desc(), runtime_state(), &_sink, *desc_tbl)); - RETURN_IF_ERROR(_sink->prepare(runtime_state())); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( + obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params, + row_desc(), runtime_state(), &_sink, *desc_tbl)); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state())); RuntimeProfile* sink_profile = _sink->profile(); if (sink_profile != nullptr) { diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 2422ca9b16..caa5d2acd5 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -828,8 +828,8 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); faststring buf_compressed; - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(codec->compress( - Slice(column_values.data(), content_uncompressed_size), &buf_compressed))); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress( + Slice(column_values.data(), content_uncompressed_size), &buf_compressed)); size_t compressed_size = buf_compressed.size(); if (LIKELY(compressed_size < content_uncompressed_size)) { pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); diff --git a/be/src/vec/data_types/data_type_decimal.cpp b/be/src/vec/data_types/data_type_decimal.cpp index c3bafba8aa..cf31bd1e40 100644 --- a/be/src/vec/data_types/data_type_decimal.cpp +++ b/be/src/vec/data_types/data_type_decimal.cpp @@ -164,13 +164,16 @@ bool DataTypeDecimal<T>::parse_from_string(const std::string& str, T* res) const DataTypePtr create_decimal(UInt64 precision_value, UInt64 scale_value, bool use_v2) { if (precision_value < min_decimal_precision() || precision_value > max_decimal_precision<Decimal128>()) { - LOG(WARNING) << "Wrong precision " << precision_value; - return nullptr; + throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR, + "Wrong precision {}, min: {}, max: {}", precision_value, + min_decimal_precision(), max_decimal_precision<Decimal128>()); } if (static_cast<UInt64>(scale_value) > precision_value) { - LOG(WARNING) << "Negative scales and scales larger than precision are not supported"; - return nullptr; + throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR, + "Negative scales and scales larger than precision are not " + "supported, scale_value: {}, precision_value: {}", + scale_value, precision_value); } if (use_v2) { diff --git a/be/src/vec/data_types/data_type_time_v2.cpp b/be/src/vec/data_types/data_type_time_v2.cpp index 8baf8a6776..e5b537ca34 100644 --- a/be/src/vec/data_types/data_type_time_v2.cpp +++ b/be/src/vec/data_types/data_type_time_v2.cpp @@ -202,8 +202,8 @@ void DataTypeDateTimeV2::cast_to_date_v2(const UInt64 from, UInt32& to) { DataTypePtr create_datetimev2(UInt64 scale_value) { if (scale_value > 6) { - LOG(WARNING) << "Wrong scale " << scale_value; - return nullptr; + throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR, "scale_value > 6 {}", + scale_value); } return std::make_shared<DataTypeDateTimeV2>(scale_value); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 945cf09087..c3d80b6422 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1302,8 +1302,8 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s for (auto& conjunct : _lazy_read_ctx.conjuncts) { filter_conjuncts.push_back(conjunct); } - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts( - filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all))); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts( + filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all)); if (_lazy_read_ctx.resize_first_column) { block->get_by_position(0).column->assume_mutable()->clear(); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index b50dc1d0e0..06dd72eb7b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -785,8 +785,8 @@ Status RowGroupReader::_rewrite_dict_predicates() { // The following process may be tricky and time-consuming, but we have no other way. temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); } - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep))); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block( + ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)); if (dict_pos != 0) { // We have to clean the first column to insert right data. temp_block.get_by_position(0).column->assume_mutable()->clear(); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index b0faa7f799..43fe98ea04 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -309,16 +309,11 @@ Status VFileScanner::_init_src_block(Block* block) { auto it = _name_to_col_type.find(slot->col_name()); if (it == _name_to_col_type.end() || _is_dynamic_schema) { // not exist in file, using type from _input_tuple_desc - data_type = - DataTypeFactory::instance().create_data_type(slot->type(), slot->is_nullable()); + RETURN_IF_CATCH_EXCEPTION(data_type = DataTypeFactory::instance().create_data_type( + slot->type(), slot->is_nullable())); } else { - data_type = DataTypeFactory::instance().create_data_type(it->second, true); - } - if (data_type == nullptr) { - return Status::NotSupported("Not support data type {} for column {}", - it == _name_to_col_type.end() ? slot->type().debug_string() - : it->second.debug_string(), - slot->col_name()); + RETURN_IF_CATCH_EXCEPTION( + data_type = DataTypeFactory::instance().create_data_type(it->second, true)); } MutableColumnPtr data_column = data_type->create_column(); _src_block.insert( diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 05b94b733a..b7ad692413 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -179,7 +179,7 @@ Status VSortNode::open(RuntimeState* state) { ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(sink(state, upstream_block.get(), eos))); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink(state, upstream_block.get(), eos)); } while (!eos); child(0)->close(state); @@ -191,7 +191,7 @@ Status VSortNode::open(RuntimeState* state) { } Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_sorter->get_next(state, output_block, eos))); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sorter->get_next(state, output_block, eos)); reached_limit(output_block, eos); if (*eos) { _runtime_profile->add_info_string("Spilled", _sorter->is_spilled() ? "true" : "false"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org