This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1d1b2f98c3 [refactor](function) let agg functions exception safety (#19109) 1d1b2f98c3 is described below commit 1d1b2f98c3ed4a38a2c03b5916806a953187050e Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Thu May 11 10:17:11 2023 +0800 [refactor](function) let agg functions exception safety (#19109) --- be/src/util/tdigest.h | 2 + .../aggregate_function_bitmap.cpp | 11 +- .../aggregate_function_java_udaf.h | 114 +++++++++++++-------- .../aggregate_function_min_max.h | 18 ++-- .../aggregate_function_min_max_by.h | 10 +- .../aggregate_function_percentile_approx.h | 8 +- .../aggregate_function_reader_first_last.h | 16 +-- .../aggregate_function_window.cpp | 12 +-- 8 files changed, 113 insertions(+), 78 deletions(-) diff --git a/be/src/util/tdigest.h b/be/src/util/tdigest.h index 079c0996ba..0a8168fe8e 100644 --- a/be/src/util/tdigest.h +++ b/be/src/util/tdigest.h @@ -51,6 +51,7 @@ #include <utility> #include <vector> +#include "common/factory_creator.h" #include "common/logging.h" #include "udf/udf.h" #include "util/debug_util.h" @@ -119,6 +120,7 @@ struct CentroidComparator { }; class TDigest { + ENABLE_FACTORY_CREATOR(TDigest); struct TDigestRadixSortTraits { using Element = Centroid; using Key = Value; diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp b/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp index 35e4636dfa..0676fd5bc2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp @@ -25,15 +25,18 @@ namespace doris::vectorized { template <bool nullable, template <bool, typename> class AggregateFunctionTemplate> -IAggregateFunction* create_with_int_data_type(const DataTypes& argument_type) { +AggregateFunctionPtr create_with_int_data_type(const DataTypes& argument_type) { auto type = remove_nullable(argument_type[0]); WhichDataType which(type); -#define DISPATCH(TYPE) \ - if (which.idx == TypeIndex::TYPE) { \ - return new AggregateFunctionTemplate<nullable, ColumnVector<TYPE>>(argument_type); \ +#define DISPATCH(TYPE) \ + if (which.idx == TypeIndex::TYPE) { \ + return std::make_shared<AggregateFunctionTemplate<nullable, ColumnVector<TYPE>>>( \ + argument_type); \ } FOR_INTEGER_TYPES(DISPATCH) #undef DISPATCH + LOG(WARNING) << "with unknowed type, failed in create_with_int_data_type bitmap_union_int" + << " and type is: " << argument_type[0]->get_name(); return nullptr; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 4ab11049d3..d913c8b32d 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -23,6 +23,8 @@ #include <cstdint> #include <memory> +#include "common/compiler_util.h" +#include "common/exception.h" #include "common/status.h" #include "gutil/strings/substitute.h" #include "runtime/user_function_cache.h" @@ -53,18 +55,18 @@ public: AggregateJavaUdafData() = default; AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; - input_values_buffer_ptr.reset(new int64_t[num_args]); - input_nulls_buffer_ptr.reset(new int64_t[num_args]); - input_offsets_ptrs.reset(new int64_t[num_args]); - input_array_nulls_buffer_ptr.reset(new int64_t[num_args]); - input_array_string_offsets_ptrs.reset(new int64_t[num_args]); - input_place_ptrs.reset(new int64_t); - output_value_buffer.reset(new int64_t); - output_null_value.reset(new int64_t); - output_offsets_ptr.reset(new int64_t); - output_intermediate_state_ptr.reset(new int64_t); - output_array_null_ptr.reset(new int64_t); - output_array_string_offsets_ptr.reset(new int64_t); + input_values_buffer_ptr = std::make_unique<int64_t[]>(num_args); + input_nulls_buffer_ptr = std::make_unique<int64_t[]>(num_args); + input_offsets_ptrs = std::make_unique<int64_t[]>(num_args); + input_array_nulls_buffer_ptr = std::make_unique<int64_t[]>(num_args); + input_array_string_offsets_ptrs = std::make_unique<int64_t[]>(num_args); + input_place_ptrs = std::make_unique<int64_t>(0); + output_value_buffer = std::make_unique<int64_t>(0); + output_null_value = std::make_unique<int64_t>(0); + output_offsets_ptr = std::make_unique<int64_t>(0); + output_intermediate_state_ptr = std::make_unique<int64_t>(0); + output_array_null_ptr = std::make_unique<int64_t>(0); + output_array_string_offsets_ptr = std::make_unique<int64_t>(0); } ~AggregateJavaUdafData() { @@ -206,6 +208,7 @@ public: // save it in BE, Because i'm not sure there is a way to use the byte[] not allocate again. jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod( executor_obj, executor_cl, executor_serialize_id, place)); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); int len = env->GetArrayLength(arr); serialize_data.resize(len); env->GetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data())); @@ -251,17 +254,17 @@ public: jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, \ executor_result_id, to.size() - 1, place); \ while (res != JNI_TRUE) { \ - /*Add this check is now, the agg function can't deal with the return status, */ \ - /*even we return a bad status, nobody could deal with it,*/ \ - /*so add this limit avoid std::bad_alloc, (1024<<10) is enough*/ \ - /*but this maybe get a mistake of result,when could handle exception need removethis*/ \ - if (increase_buffer_size == 10) { \ - return Status::MemoryAllocFailed("memory allocate failed, buffer:{},size:{}", \ - increase_buffer_size, buffer_size); \ - } \ + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); \ increase_buffer_size++; \ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - chars.resize(buffer_size); \ + try { \ + chars.resize(buffer_size); \ + } catch (std::bad_alloc const& e) { \ + throw doris::Exception( \ + ErrorCode::INTERNAL_ERROR, \ + "memory allocate failed in column string, buffer:{},size:{}", \ + increase_buffer_size, buffer_size); \ + } \ *output_value_buffer = reinterpret_cast<int64_t>(chars.data()); \ *output_intermediate_state_ptr = chars.size(); \ res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \ @@ -298,15 +301,19 @@ public: jboolean res = env->CallNonvirtualBooleanMethod( \ executor_obj, executor_cl, executor_result_id, to.size() - 1, place); \ while (res != JNI_TRUE) { \ - if (increase_buffer_size == 10) { \ - return Status::MemoryAllocFailed("memory allocate failed, buffer:{},size:{}", \ - increase_buffer_size, buffer_size); \ - } \ + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); \ increase_buffer_size++; \ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - null_map_data.resize(buffer_size); \ - chars.resize(buffer_size); \ - offsets.resize(buffer_size); \ + try { \ + null_map_data.resize(buffer_size); \ + chars.resize(buffer_size); \ + offsets.resize(buffer_size); \ + } catch (std::bad_alloc const& e) { \ + throw doris::Exception( \ + ErrorCode::INTERNAL_ERROR, \ + "memory allocate failed in array column string, buffer:{},size:{}", \ + increase_buffer_size, buffer_size); \ + } \ *output_array_null_ptr = reinterpret_cast<int64_t>(null_map_data.data()); \ *output_value_buffer = reinterpret_cast<int64_t>(chars.data()); \ *output_array_string_offsets_ptr = reinterpret_cast<int64_t>(offsets.data()); \ @@ -320,14 +327,18 @@ public: jboolean res = env->CallNonvirtualBooleanMethod( \ executor_obj, executor_cl, executor_result_id, to.size() - 1, place); \ while (res != JNI_TRUE) { \ - if (increase_buffer_size == 10) { \ - return Status::MemoryAllocFailed("memory allocate failed, buffer:{},size:{}", \ - increase_buffer_size, buffer_size); \ - } \ + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); \ increase_buffer_size++; \ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ - null_map_data.resize(buffer_size); \ - data_column->resize(buffer_size); \ + try { \ + null_map_data.resize(buffer_size); \ + data_column->resize(buffer_size); \ + } catch (std::bad_alloc const& e) { \ + throw doris::Exception( \ + ErrorCode::INTERNAL_ERROR, \ + "memory allocate failed in array number column, buffer:{},size:{}", \ + increase_buffer_size, buffer_size); \ + } \ *output_array_null_ptr = reinterpret_cast<int64_t>(null_map_data.data()); \ *output_value_buffer = \ reinterpret_cast<int64_t>(data_column->get_raw_data().data); \ @@ -409,6 +420,7 @@ private: class AggregateJavaUdaf final : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> { public: + ENABLE_FACTORY_CREATOR(AggregateJavaUdaf); AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const DataTypePtr& return_type) : IAggregateFunctionDataHelper(argument_types), @@ -416,7 +428,7 @@ public: _return_type(return_type), _first_created(true), _exec_place(nullptr) {} - ~AggregateJavaUdaf() = default; + ~AggregateJavaUdaf() override = default; static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types, const DataTypePtr& return_type) { @@ -461,6 +473,7 @@ public: Arena*) const override { LOG(WARNING) << " shouldn't going add function, there maybe some error about function " << _fn.name.function_name; + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "shouldn't going add function"); } void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset, @@ -469,7 +482,11 @@ public: for (size_t i = 0; i < batch_size; ++i) { places_address[i] = reinterpret_cast<int64_t>(places[i] + place_offset); } - this->data(_exec_place).add(places_address, false, columns, 0, batch_size, argument_types); + Status st = this->data(_exec_place) + .add(places_address, false, columns, 0, batch_size, argument_types); + if (UNLIKELY(st != Status::OK())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string()); + } } // TODO: Here we calling method by jni, And if we get a thrown from FE, @@ -478,23 +495,35 @@ public: Arena* /*arena*/) const override { int64_t places_address[1]; places_address[0] = reinterpret_cast<int64_t>(place); - this->data(_exec_place).add(places_address, true, columns, 0, batch_size, argument_types); + Status st = this->data(_exec_place) + .add(places_address, true, columns, 0, batch_size, argument_types); + if (UNLIKELY(st != Status::OK())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string()); + } } // TODO: reset function should be implement also in struct data void reset(AggregateDataPtr /*place*/) const override { LOG(WARNING) << " shouldn't going reset function, there maybe some error about function " << _fn.name.function_name; + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "shouldn't going reset function"); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena*) const override { - this->data(_exec_place).merge(this->data(rhs), reinterpret_cast<int64_t>(place)); + Status st = + this->data(_exec_place).merge(this->data(rhs), reinterpret_cast<int64_t>(place)); + if (UNLIKELY(st != Status::OK())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string()); + } } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - this->data(const_cast<AggregateDataPtr&>(_exec_place)) - .write(buf, reinterpret_cast<int64_t>(place)); + Status st = this->data(const_cast<AggregateDataPtr&>(_exec_place)) + .write(buf, reinterpret_cast<int64_t>(place)); + if (UNLIKELY(st != Status::OK())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string()); + } } // during merge-finalized phase, for deserialize and merge firstly, @@ -509,7 +538,10 @@ public: } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - this->data(_exec_place).get(to, _return_type, reinterpret_cast<int64_t>(place)); + Status st = this->data(_exec_place).get(to, _return_type, reinterpret_cast<int64_t>(place)); + if (UNLIKELY(st != Status::OK())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string()); + } } private: diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h b/be/src/vec/aggregate_functions/aggregate_function_min_max.h index 4d8a2ae598..2d1f7b7c00 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h +++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h @@ -91,7 +91,7 @@ public: } } - void read(BufferReadable& buf) { + void read(BufferReadable& buf, Arena* arena) { read_binary(has_value, buf); if (has()) { read_binary(value, buf); @@ -209,7 +209,7 @@ public: } } - void read(BufferReadable& buf) { + void read(BufferReadable& buf, Arena* arena) { read_binary(has_value, buf); if (has()) { read_binary(value, buf); @@ -308,7 +308,7 @@ private: char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero. public: - ~SingleValueDataString() { delete[] large_data; } + ~SingleValueDataString() = default; constexpr static bool IsFixedLength = false; @@ -340,7 +340,7 @@ public: } } - void read(BufferReadable& buf) { + void read(BufferReadable& buf, Arena* arena) { Int32 rhs_size; read_binary(rhs_size, buf); @@ -356,8 +356,7 @@ public: } else { if (capacity < rhs_size) { capacity = static_cast<UInt32>(round_up_to_power_of_two_or_zero(rhs_size)); - delete[] large_data; - large_data = new char[capacity]; + large_data = arena->alloc(capacity); } size = rhs_size; @@ -386,8 +385,7 @@ public: if (capacity < value_size) { /// Don't free large_data here. capacity = round_up_to_power_of_two_or_zero(value_size); - delete[] large_data; - large_data = new char[capacity]; + large_data = arena->alloc(capacity); } size = value_size; @@ -546,8 +544,8 @@ public: } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena*) const override { - this->data(place).read(buf); + Arena* arena) const override { + this->data(place).read(buf, arena); } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h index 69945facfc..6fa82fa2a1 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h +++ b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h @@ -45,9 +45,9 @@ public: key.write(buf); } - void read(BufferReadable& buf) { - value.read(buf); - key.read(buf); + void read(BufferReadable& buf, Arena* arena) { + value.read(buf, arena); + key.read(buf, arena); } }; @@ -129,8 +129,8 @@ public: } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena*) const override { - this->data(place).read(buf); + Arena* arena) const override { + this->data(place).read(buf, arena); } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h index b94b922840..13884e92ba 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h +++ b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h @@ -69,7 +69,7 @@ struct PercentileApproxState { if (compression < 2048 || compression > 10000) { compression = 10000; } - digest.reset(new TDigest(compression)); + digest = TDigest::create_unique(compression); compressions = compression; init_flag = true; } @@ -101,7 +101,7 @@ struct PercentileApproxState { read_binary(compressions, buf); std::string str; read_binary(str, buf); - digest.reset(new TDigest(compressions)); + digest = TDigest::create_unique(compressions); digest->unserialize((uint8_t*)str.c_str()); } @@ -121,7 +121,7 @@ struct PercentileApproxState { DCHECK(digest.get() != nullptr); digest->merge(rhs.digest.get()); } else { - digest.reset(new TDigest(compressions)); + digest = TDigest::create_unique(compressions); digest->merge(rhs.digest.get()); init_flag = true; } @@ -138,7 +138,7 @@ struct PercentileApproxState { void reset() { target_quantile = INIT_QUANTILE; init_flag = false; - digest.reset(new TDigest(compressions)); + digest = TDigest::create_unique(compressions); } bool init_flag = false; diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h index 9ec64b248b..54d146ca37 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h +++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h @@ -232,20 +232,20 @@ private: template <template <typename> class AggregateFunctionTemplate, template <typename> class Impl, bool result_is_nullable, bool arg_is_nullable, bool is_copy = false> -IAggregateFunction* create_function_single_value(const String& name, - const DataTypes& argument_types) { +AggregateFunctionPtr create_function_single_value(const String& name, + const DataTypes& argument_types) { auto type = remove_nullable(argument_types[0]); WhichDataType which(*type); -#define DISPATCH(TYPE, COLUMN_TYPE) \ - if (which.idx == TypeIndex::TYPE) \ - return new AggregateFunctionTemplate<Impl<ReaderFirstAndLastData< \ - COLUMN_TYPE, result_is_nullable, arg_is_nullable, is_copy>>>(argument_types); +#define DISPATCH(TYPE, COLUMN_TYPE) \ + if (which.idx == TypeIndex::TYPE) \ + return std::make_shared<AggregateFunctionTemplate<Impl<ReaderFirstAndLastData< \ + COLUMN_TYPE, result_is_nullable, arg_is_nullable, is_copy>>>>(argument_types); TYPE_TO_COLUMN_TYPE(DISPATCH) #undef DISPATCH - LOG(FATAL) << "with unknowed type, failed in create_aggregate_function_" << name - << " and type is: " << argument_types[0]->get_name(); + LOG(WARNING) << "with unknowed type, failed in create_aggregate_function_" << name + << " and type is: " << argument_types[0]->get_name(); return nullptr; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp b/be/src/vec/aggregate_functions/aggregate_function_window.cpp index f18668690d..bacdd45131 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp @@ -34,15 +34,15 @@ namespace doris::vectorized { template <template <typename> class AggregateFunctionTemplate, template <typename ColVecType, bool, bool> class Data, template <typename> class Impl, bool result_is_nullable, bool arg_is_nullable> -IAggregateFunction* create_function_lead_lag_first_last(const String& name, - const DataTypes& argument_types) { +AggregateFunctionPtr create_function_lead_lag_first_last(const String& name, + const DataTypes& argument_types) { auto type = remove_nullable(argument_types[0]); WhichDataType which(*type); -#define DISPATCH(TYPE, COLUMN_TYPE) \ - if (which.idx == TypeIndex::TYPE) \ - return new AggregateFunctionTemplate< \ - Impl<Data<COLUMN_TYPE, result_is_nullable, arg_is_nullable>>>(argument_types); +#define DISPATCH(TYPE, COLUMN_TYPE) \ + if (which.idx == TypeIndex::TYPE) \ + return std::make_shared<AggregateFunctionTemplate< \ + Impl<Data<COLUMN_TYPE, result_is_nullable, arg_is_nullable>>>>(argument_types); TYPE_TO_BASIC_COLUMN_TYPE(DISPATCH) #undef DISPATCH --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org