This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch stream-load-vec in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/stream-load-vec by this push: new 687dc8a643 [Bug] Fix some bug in vec stream load (#9094) 687dc8a643 is described below commit 687dc8a643c4b6e3d5a2a8e5f8e60848de094a68 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Apr 21 10:59:59 2022 +0800 [Bug] Fix some bug in vec stream load (#9094) 1. mem leak in agg data in memtable 2. core dump in replace function 3. core dump in DCHECK in tablet sink Co-authored-by: lihaopeng <lihaop...@baidu.com> --- be/src/exec/tablet_sink.h | 3 - be/src/olap/memtable.cpp | 10 +-- be/src/olap/memtable.h | 8 +- .../aggregate_function_reader.cpp | 41 +++++---- .../aggregate_function_reader.h | 7 +- .../aggregate_function_simple_factory.cpp | 4 +- .../aggregate_function_window.cpp | 96 ++-------------------- .../aggregate_function_window.h | 69 +++++++++++++--- be/src/vec/olap/block_reader.cpp | 2 +- be/src/vec/sink/vtablet_sink.cpp | 23 ++++-- 10 files changed, 120 insertions(+), 143 deletions(-) diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 900f60a6a8..a2edb73b19 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -273,9 +273,6 @@ protected: // add batches finished means the last rpc has be response, used to check whether this channel can be closed std::atomic<bool> _add_batches_finished {false}; // reuse for vectorized - // TODO(cmy): should be removed - std::atomic<bool> _last_patch_processed_finished {true}; // reuse for vectorized - bool _eos_is_produced {false}; // only for restricting producer behaviors std::unique_ptr<RowDescriptor> _row_desc; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 66d6a04fc2..2628b34007 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -28,8 +28,10 @@ #include "util/doris_metrics.h" #include "vec/core/field.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/aggregate_functions/aggregate_function_reader.h" namespace doris { + MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, KeysType keys_type, RowsetWriter* rowset_writer, @@ -77,13 +79,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) ->column(cid) .aggregation(); std::string agg_name = - TabletColumn::get_string_by_aggregation_type(agg_method); - if (agg_name=="REPLACE"){ - agg_name = "last_value"; - }else{ - agg_name += "_reader"; - } - + TabletColumn::get_string_by_aggregation_type(agg_method) + vectorized::AGG_LOAD_SUFFIX; std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), [](unsigned char c) { return std::tolower(c); }); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 02f7699719..c5f519eeeb 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -28,6 +28,7 @@ #include "vec/core/block.h" #include "vec/common/string_ref.h" #include "vec/aggregate_functions/aggregate_function.h" + namespace doris { struct ContiguousRow; @@ -37,8 +38,6 @@ class SlotDescriptor; class TabletSchema; class Tuple; class TupleDescriptor; -class RowInBlock; -class RowInBlockComparator; class MemTable { public: @@ -103,8 +102,11 @@ private: NullState null_state = is_null ? NullState::IS_NULL : NullState::NOT_NULL; return RowCursorCell(ref.data, null_state); } + ~RowInBlock() { - std::vector<vectorized::AggregateDataPtr>().swap(_agg_places); + for (auto agg_place : _agg_places) { + delete [] agg_place; + } } }; class RowInBlockComparator { diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp index ce78397794..f90515fd5e 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp @@ -20,29 +20,38 @@ namespace doris::vectorized { // auto spread at nullable condition, null value do not participate aggregate -void register_aggregate_function_reader(AggregateFunctionSimpleFactory& factory) { +void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& factory) { // add a suffix to the function name here to distinguish special functions of agg reader - auto register_function_reader = [&](const std::string& name, - const AggregateFunctionCreator& creator) { - factory.register_function(name + agg_reader_suffix, creator, false); + auto register_function = [&](const std::string& name, + const AggregateFunctionCreator& creator) { + factory.register_function(name + AGG_READER_SUFFIX, creator, false); + factory.register_function(name + AGG_LOAD_SUFFIX, creator, false); }; - register_function_reader("sum", create_aggregate_function_sum_reader); - register_function_reader("max", create_aggregate_function_max); - register_function_reader("min", create_aggregate_function_min); - register_function_reader("replace_if_not_null", create_aggregate_function_replace_if_not_null); - register_function_reader("bitmap_union", create_aggregate_function_bitmap_union); - register_function_reader("hll_union", create_aggregate_function_HLL_union<false>); + register_function("sum", create_aggregate_function_sum_reader); + register_function("max", create_aggregate_function_max); + register_function("min", create_aggregate_function_min); + register_function("bitmap_union", create_aggregate_function_bitmap_union); + register_function("hll_union", create_aggregate_function_HLL_union<false>); } -void register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& factory) { - auto register_function_reader = [&](const std::string& name, - const AggregateFunctionCreator& creator, bool nullable) { - factory.register_function(name + agg_reader_suffix, creator, nullable); +// only replace funtion in load/reader do different agg operation. +// because Doris can ensure that the data is globally ordered in reader, but cannot in load +// 1. reader, get the first value of input data. +// 2. load, get the last value of input data. +void register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory& factory) { + auto register_function = [&](const std::string& name, const std::string& suffix, + const AggregateFunctionCreator& creator, bool nullable) { + factory.register_function(name + suffix, creator, nullable); }; - register_function_reader("replace", create_aggregate_function_replace, false); - register_function_reader("replace", create_aggregate_function_replace_nullable, true); + register_function("replace", AGG_READER_SUFFIX, create_aggregate_function_first<false, true>, false); + register_function("replace", AGG_READER_SUFFIX, create_aggregate_function_first<true, true>, true); + register_function("replace", AGG_LOAD_SUFFIX, create_aggregate_function_last<false, true>, false); + register_function("replace", AGG_LOAD_SUFFIX, create_aggregate_function_last<true, true>, true); + + register_function("replace_if_not_null", AGG_READER_SUFFIX, create_aggregate_function_first<false, true>, false); + register_function("replace_if_not_null", AGG_LOAD_SUFFIX, create_aggregate_function_last<false, true>, false); } } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.h b/be/src/vec/aggregate_functions/aggregate_function_reader.h index f44be5ee57..86fea6f079 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader.h +++ b/be/src/vec/aggregate_functions/aggregate_function_reader.h @@ -26,10 +26,11 @@ namespace doris::vectorized { -static const std::string agg_reader_suffix = "_reader"; +static auto constexpr AGG_READER_SUFFIX = "_reader"; +static auto constexpr AGG_LOAD_SUFFIX = "_load"; -void register_aggregate_function_reader(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& factory); -void register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory& factory); } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp index fcf333c1bd..799f3cbb6a 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp @@ -56,7 +56,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_uniq(instance); register_aggregate_function_bitmap(instance); register_aggregate_function_combinator_distinct(instance); - register_aggregate_function_reader(instance); // register aggregate function for agg reader + register_aggregate_function_reader_load(instance); // register aggregate function for agg reader register_aggregate_function_window_rank(instance); register_aggregate_function_stddev_variance_pop(instance); register_aggregate_function_topn(instance); @@ -70,7 +70,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_combinator_null(instance); register_aggregate_function_stddev_variance_samp(instance); - register_aggregate_function_reader_no_spread(instance); + register_aggregate_function_replace_reader_load(instance); register_aggregate_function_window_lead_lag(instance); register_aggregate_function_HLL_union_agg(instance); register_aggregate_function_percentile_approx(instance); diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp b/be/src/vec/aggregate_functions/aggregate_function_window.cpp index d4b7f99121..53a4c4931c 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp @@ -23,7 +23,7 @@ #include "common/logging.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" #include "vec/aggregate_functions/factory_helpers.h" -#include "vec/aggregate_functions/helpers.h" + namespace doris::vectorized { AggregateFunctionPtr create_aggregate_function_dense_rank(const std::string& name, @@ -53,44 +53,6 @@ AggregateFunctionPtr create_aggregate_function_row_number(const std::string& nam return std::make_shared<WindowFunctionRowNumber>(argument_types); } -template <template <typename> class AggregateFunctionTemplate, template <typename> class Data, - bool is_nullable, bool is_copy = false> -static IAggregateFunction* create_function_single_value(const String& name, - const DataTypes& argument_types, - const Array& parameters) { - using StoreType = CopiedValue; - - assert_arity_at_most<3>(name, argument_types); - - auto type = argument_types[0].get(); - if (type->is_nullable()) { - type = assert_cast<const DataTypeNullable*>(type)->get_nested_type().get(); - } - WhichDataType which(*type); - -#define DISPATCH(TYPE) \ - if (which.idx == TypeIndex::TYPE) \ - return new AggregateFunctionTemplate< \ - Data<LeadAndLagData<TYPE, is_nullable, false, StoreType>>>(argument_types); - FOR_NUMERIC_TYPES(DISPATCH) -#undef DISPATCH - - if (which.is_decimal()) { - return new AggregateFunctionTemplate< - Data<LeadAndLagData<Int128, is_nullable, false, StoreType>>>(argument_types); - } - if (which.is_date_or_datetime()) { - return new AggregateFunctionTemplate< - Data<LeadAndLagData<Int64, is_nullable, false, StoreType>>>(argument_types); - } - if (which.is_string_or_fixed_string()) { - return new AggregateFunctionTemplate< - Data<LeadAndLagData<StringRef, is_nullable, true, StoreType>>>(argument_types); - } - DCHECK(false) << "with unknowed type, failed in create_aggregate_function_leadlag"; - return nullptr; -} - template <bool is_nullable> AggregateFunctionPtr create_aggregate_function_lag(const std::string& name, const DataTypes& argument_types, @@ -111,53 +73,6 @@ AggregateFunctionPtr create_aggregate_function_lead(const std::string& name, name, argument_types, parameters)); } -template <bool is_nullable> -AggregateFunctionPtr create_aggregate_function_first(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable) { - return AggregateFunctionPtr( - create_function_single_value<WindowFunctionData, WindowFunctionFirstData, is_nullable>( - name, argument_types, parameters)); -} - -template <bool is_nullable> -AggregateFunctionPtr create_aggregate_function_last(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable) { - return AggregateFunctionPtr( - create_function_single_value<WindowFunctionData, WindowFunctionLastData, is_nullable>( - name, argument_types, parameters)); -} - -AggregateFunctionPtr create_aggregate_function_replace_if_not_null(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable) { - return AggregateFunctionPtr( - create_function_single_value<WindowFunctionData, WindowFunctionFirstData, false, true>( - name, argument_types, parameters)); -} - -AggregateFunctionPtr create_aggregate_function_replace(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable) { - return AggregateFunctionPtr( - create_function_single_value<WindowFunctionData, WindowFunctionFirstData, false, true>( - name, argument_types, parameters)); -} - -AggregateFunctionPtr create_aggregate_function_replace_nullable(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable) { - return AggregateFunctionPtr( - create_function_single_value<WindowFunctionData, WindowFunctionFirstData, true, true>( - name, argument_types, parameters)); -} - void register_aggregate_function_window_rank(AggregateFunctionSimpleFactory& factory) { factory.register_function("dense_rank", create_aggregate_function_dense_rank); factory.register_function("rank", create_aggregate_function_rank); @@ -169,9 +84,10 @@ void register_aggregate_function_window_lead_lag(AggregateFunctionSimpleFactory& factory.register_function("lead", create_aggregate_function_lead<true>, true); factory.register_function("lag", create_aggregate_function_lag<false>); factory.register_function("lag", create_aggregate_function_lag<true>, true); - factory.register_function("first_value", create_aggregate_function_first<false>); - factory.register_function("first_value", create_aggregate_function_first<true>, true); - factory.register_function("last_value", create_aggregate_function_last<false>); - factory.register_function("last_value", create_aggregate_function_last<true>, true); + factory.register_function("first_value", create_aggregate_function_first<false, false>); + factory.register_function("first_value", create_aggregate_function_first<true, false>, true); + factory.register_function("last_value", create_aggregate_function_last<false, false>); + factory.register_function("last_value", create_aggregate_function_last<true, false>, true); } + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h index 133efe7ea0..c438cd3582 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window.h @@ -21,11 +21,13 @@ #pragma once #include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/helpers.h" #include "vec/columns/column_vector.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_string.h" #include "vec/io/io_helper.h" +#include "factory_helpers.h" namespace doris::vectorized { @@ -405,19 +407,62 @@ private: DataTypePtr _argument_type; }; -AggregateFunctionPtr create_aggregate_function_replace_if_not_null(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable); +template <template <typename> class AggregateFunctionTemplate, template <typename> class Data, + bool is_nullable, bool is_copy = false> +static IAggregateFunction* create_function_single_value(const String& name, + const DataTypes& argument_types, + const Array& parameters) { + using StoreType = std::conditional_t<is_copy, CopiedValue, Value>; -AggregateFunctionPtr create_aggregate_function_replace(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable); + assert_arity_at_most<3>(name, argument_types); -AggregateFunctionPtr create_aggregate_function_replace_nullable(const std::string& name, - const DataTypes& argument_types, - const Array& parameters, - const bool result_is_nullable); + auto type = argument_types[0].get(); + if (type->is_nullable()) { + type = assert_cast<const DataTypeNullable*>(type)->get_nested_type().get(); + } + WhichDataType which(*type); + +#define DISPATCH(TYPE) \ + if (which.idx == TypeIndex::TYPE) \ + return new AggregateFunctionTemplate< \ + Data<LeadAndLagData<TYPE, is_nullable, false, StoreType>>>(argument_types); + FOR_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + + if (which.is_decimal()) { + return new AggregateFunctionTemplate< + Data<LeadAndLagData<Int128, is_nullable, false, StoreType>>>(argument_types); + } + if (which.is_date_or_datetime()) { + return new AggregateFunctionTemplate< + Data<LeadAndLagData<Int64, is_nullable, false, StoreType>>>(argument_types); + } + if (which.is_string_or_fixed_string()) { + return new AggregateFunctionTemplate< + Data<LeadAndLagData<StringRef, is_nullable, true, StoreType>>>(argument_types); + } + DCHECK(false) << "with unknowed type, failed in create_aggregate_function_leadlag"; + return nullptr; +} + +template <bool is_nullable, bool is_copy> +AggregateFunctionPtr create_aggregate_function_first(const std::string& name, + const DataTypes& argument_types, + const Array& parameters, + bool result_is_nullable) { + return AggregateFunctionPtr( + create_function_single_value<WindowFunctionData, WindowFunctionFirstData, is_nullable, is_copy>( + name, argument_types, parameters)); +} + +template <bool is_nullable, bool is_copy> +AggregateFunctionPtr create_aggregate_function_last(const std::string& name, + const DataTypes& argument_types, + const Array& parameters, + bool result_is_nullable) { + return AggregateFunctionPtr( + create_function_single_value<WindowFunctionData, WindowFunctionLastData, is_nullable, is_copy>( + name, argument_types, parameters)); +} } // namespace doris::vectorized diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 1959fc70fa..a24fb4ee77 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -91,7 +91,7 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) { .column(read_params.origin_return_columns->at(_return_columns_loc[idx])) .aggregation(); std::string agg_name = - TabletColumn::get_string_by_aggregation_type(agg_method) + agg_reader_suffix; + TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_READER_SUFFIX; std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), [](unsigned char c) { return std::tolower(c); }); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 54a361044e..997b619bd5 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -178,18 +178,28 @@ Status VNodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, std::unique_ptr<ThreadPoolToken>& thread_pool_token) { - auto st = none_of({_cancelled, _send_finished}); + auto st = none_of({_cancelled, _send_finished}); if (!st.ok()) { return 0; } - bool is_finished = true; - if (!_add_block_closure->is_packet_in_flight() && _pending_batches_num > 0 && - _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) { + + if (!_add_block_closure->try_set_in_flight()) { + return _send_finished ? 0 : 1; + } + + // We are sure that try_send_batch is not running + if (_pending_batches_num > 0) { auto s = thread_pool_token->submit_func( - std::bind(&VNodeChannel::try_send_block, this, state)); + std::bind(&VNodeChannel::try_send_block, this, state)); if (!s.ok()) { _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); + // clear in flight + _add_block_closure->clear_in_flight(); } + // in_flight is cleared in closure::Run + } else { + // clear in flight + _add_block_closure->clear_in_flight(); } return _send_finished ? 0 : 1; } @@ -221,6 +231,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { &compressed_bytes, &_column_values_buffer); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + _add_block_closure->clear_in_flight(); return; } if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) { @@ -234,6 +245,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { if (remain_ms <= 0 && !request.eos()) { cancel(fmt::format("{}, err: timeout", channel_info())); + _add_block_closure->clear_in_flight(); return; } else { remain_ms = config::min_load_rpc_timeout_ms; @@ -266,7 +278,6 @@ void VNodeChannel::try_send_block(RuntimeState* state) { _add_block_closure); _next_packet_seq++; - _last_patch_processed_finished = true; } void VNodeChannel::_close_check() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org