This is an automated email from the ASF dual-hosted git repository. weixiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5b39fa9843 [Feature](vec)(quantile_state): support quantile state in vectorized engine (#16562) 5b39fa9843 is described below commit 5b39fa9843a23eee197dad93c94eaae7b3862fdc Author: spaces-x <weixiao5...@gmail.com> AuthorDate: Tue Mar 14 10:54:04 2023 +0800 [Feature](vec)(quantile_state): support quantile state in vectorized engine (#16562) * [Feature](vectorized)(quantile_state): support vectorized quantile state functions 1. now quantile column only support not nullable 2. add up some regression test cases 3. set default enable_quantile_state_type = true --------- Co-authored-by: spaces-x <weixian...@meituan.com> --- be/src/exprs/rpc_fn_comm.h | 25 ++ be/src/util/quantile_state.cpp | 20 +- be/src/util/quantile_state.h | 9 +- be/src/vec/CMakeLists.txt | 3 + .../aggregate_function_quantile_state.cpp | 41 +++ .../aggregate_function_quantile_state.h | 153 ++++++++++++ .../aggregate_function_reader.cpp | 1 + .../aggregate_function_reader.h | 1 + .../aggregate_function_simple_factory.cpp | 2 + be/src/vec/columns/column.h | 2 + be/src/vec/columns/column_complex.h | 15 ++ be/src/vec/core/types.h | 11 + be/src/vec/data_types/data_type.cpp | 2 + be/src/vec/data_types/data_type_factory.cpp | 10 + be/src/vec/data_types/data_type_factory.hpp | 2 + be/src/vec/data_types/data_type_quantilestate.cpp | 126 ++++++++++ be/src/vec/data_types/data_type_quantilestate.h | 84 +++++++ be/src/vec/functions/function_quantile_state.cpp | 277 +++++++++++++++++++++ be/src/vec/functions/function_rpc.cpp | 25 ++ be/src/vec/functions/simple_function_factory.h | 2 + be/src/vec/olap/olap_data_convertor.cpp | 82 ++++++ be/src/vec/olap/olap_data_convertor.h | 5 + be/src/vec/sink/vmysql_result_writer.cpp | 14 +- be/test/vec/core/column_complex_test.cpp | 65 ++++- .../main/java/org/apache/doris/common/Config.java | 2 +- .../apache/doris/analysis/CreateFunctionStmt.java | 3 + .../apache/doris/analysis/FunctionCallExpr.java | 1 - gensrc/proto/types.proto | 1 + gensrc/script/doris_builtins_functions.py | 9 +- .../common/load/quantile_state_basic_agg.sql | 4 + .../common/table/quantile_state_basic_agg.sql | 6 + .../datatype_p0/complex_types/basic_agg_test.out | 9 + .../test_aggregate_all_functions.out | 15 ++ .../test_aggregate_all_functions.out | 14 ++ .../data/types/complex_types/basic_agg_test.out | 9 + .../complex_types/basic_agg_test.groovy | 6 +- .../test_aggregate_all_functions.groovy | 39 +++ .../test_aggregate_all_functions.groovy | 41 +++ .../types/complex_types/basic_agg_test.groovy | 6 +- 39 files changed, 1119 insertions(+), 23 deletions(-) diff --git a/be/src/exprs/rpc_fn_comm.h b/be/src/exprs/rpc_fn_comm.h index 1e7fcce5f7..1849c0a2d2 100644 --- a/be/src/exprs/rpc_fn_comm.h +++ b/be/src/exprs/rpc_fn_comm.h @@ -286,6 +286,24 @@ void convert_col_to_pvalue(const vectorized::ColumnPtr& column, } break; } + case vectorized::TypeIndex::QuantileState: { + ptype->set_id(PGenericType::QUANTILE_STATE); + arg->mutable_bytes_value()->Reserve(row_count); + for (size_t row_num = start; row_num < end; ++row_num) { + if constexpr (nullable) { + if (column->is_null_at(row_num)) { + arg->add_bytes_value(nullptr); + } else { + StringRef data = column->get_data_at(row_num); + arg->add_bytes_value(data.data, data.size); + } + } else { + StringRef data = column->get_data_at(row_num); + arg->add_bytes_value(data.data, data.size); + } + } + break; + } default: LOG(INFO) << "unknown type: " << data_type->get_name(); ptype->set_id(PGenericType::UNKNOWN); @@ -438,6 +456,13 @@ void convert_to_column(vectorized::MutableColumnPtr& column, const PValues& resu } break; } + case PGenericType::QUANTILE_STATE: { + column->reserve(result.bytes_value_size()); + for (int i = 0; i < result.bytes_value_size(); ++i) { + column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size()); + } + break; + } default: { LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString(); break; diff --git a/be/src/util/quantile_state.cpp b/be/src/util/quantile_state.cpp index 4e91b400a9..d991f77953 100644 --- a/be/src/util/quantile_state.cpp +++ b/be/src/util/quantile_state.cpp @@ -113,22 +113,22 @@ bool QuantileState<T>::is_valid(const Slice& slice) { } template <typename T> -T QuantileState<T>::get_explicit_value_by_percentile(float percentile) { +T QuantileState<T>::get_explicit_value_by_percentile(float percentile) const { DCHECK(_type == EXPLICIT); int n = _explicit_data.size(); - std::sort(_explicit_data.begin(), _explicit_data.end()); + std::vector<T> sorted_data(_explicit_data.begin(), _explicit_data.end()); + std::sort(sorted_data.begin(), sorted_data.end()); double index = (n - 1) * percentile; int intIdx = (int)index; if (intIdx == n - 1) { - return _explicit_data[intIdx]; + return sorted_data[intIdx]; } - return _explicit_data[intIdx + 1] * (index - intIdx) + - _explicit_data[intIdx] * (intIdx + 1 - index); + return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] * (intIdx + 1 - index); } template <typename T> -T QuantileState<T>::get_value_by_percentile(float percentile) { +T QuantileState<T>::get_value_by_percentile(float percentile) const { DCHECK(percentile >= 0 && percentile <= 1); switch (_type) { case EMPTY: { @@ -191,7 +191,7 @@ bool QuantileState<T>::deserialize(const Slice& slice) { } case TDIGEST: { // 4: Tdigest object value - _tdigest_ptr = std::make_unique<TDigest>(0); + _tdigest_ptr = std::make_shared<TDigest>(0); _tdigest_ptr->unserialize(ptr); break; } @@ -241,7 +241,7 @@ size_t QuantileState<T>::serialize(uint8_t* dst) const { } template <typename T> -void QuantileState<T>::merge(QuantileState<T>& other) { +void QuantileState<T>::merge(const QuantileState<T>& other) { switch (other._type) { case EMPTY: break; @@ -263,7 +263,7 @@ void QuantileState<T>::merge(QuantileState<T>& other) { case EXPLICIT: if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) { _type = TDIGEST; - _tdigest_ptr = std::make_unique<TDigest>(_compression); + _tdigest_ptr = std::make_shared<TDigest>(_compression); for (int i = 0; i < _explicit_data.size(); i++) { _tdigest_ptr->add(_explicit_data[i]); } @@ -330,7 +330,7 @@ void QuantileState<T>::add_value(const T& value) { break; case EXPLICIT: if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) { - _tdigest_ptr = std::make_unique<TDigest>(_compression); + _tdigest_ptr = std::make_shared<TDigest>(_compression); for (int i = 0; i < _explicit_data.size(); i++) { _tdigest_ptr->add(_explicit_data[i]); } diff --git a/be/src/util/quantile_state.h b/be/src/util/quantile_state.h index 30cc7f48fa..c3af66c495 100644 --- a/be/src/util/quantile_state.h +++ b/be/src/util/quantile_state.h @@ -49,21 +49,22 @@ public: void set_compression(float compression); bool deserialize(const Slice& slice); size_t serialize(uint8_t* dst) const; - void merge(QuantileState<T>& other); + void merge(const QuantileState<T>& other); void add_value(const T& value); void clear(); bool is_valid(const Slice& slice); size_t get_serialized_size(); - T get_value_by_percentile(float percentile); - T get_explicit_value_by_percentile(float percentile); + T get_value_by_percentile(float percentile) const; + T get_explicit_value_by_percentile(float percentile) const; ~QuantileState() = default; private: QuantileStateType _type = EMPTY; - std::unique_ptr<TDigest> _tdigest_ptr; + std::shared_ptr<TDigest> _tdigest_ptr; T _single_data; std::vector<T> _explicit_data; float _compression; }; +using QuantileStateDouble = QuantileState<double>; } // namespace doris diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 758e6fd978..289bf3ca10 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -46,6 +46,7 @@ set(VEC_FILES aggregate_functions/aggregate_function_orthogonal_bitmap.cpp aggregate_functions/aggregate_function_avg_weighted.cpp aggregate_functions/aggregate_function_histogram.cpp + aggregate_functions/aggregate_function_quantile_state.cpp columns/column.cpp columns/column_array.cpp columns/column_struct.cpp @@ -95,6 +96,7 @@ set(VEC_FILES data_types/data_type_string.cpp data_types/data_type_decimal.cpp data_types/data_type_map.cpp + data_types/data_type_quantilestate.cpp data_types/get_least_supertype.cpp data_types/convert_field_to_type.cpp data_types/nested_utils.cpp @@ -249,6 +251,7 @@ set(VEC_FILES functions/function_running_difference.cpp functions/function_width_bucket.cpp functions/match.cpp + functions/function_quantile_state.cpp jsonb/serialize.cpp olap/vgeneric_iterators.cpp diff --git a/be/src/vec/aggregate_functions/aggregate_function_quantile_state.cpp b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.cpp new file mode 100644 index 0000000000..4c8ec27296 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.cpp @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/aggregate_functions/aggregate_function_quantile_state.h" + +#include "vec/aggregate_functions//aggregate_function_simple_factory.h" + +namespace doris::vectorized { + +AggregateFunctionPtr create_aggregate_function_quantile_state_union(const std::string& name, + const DataTypes& argument_types, + const bool result_is_nullable) { + const bool arg_is_nullable = argument_types[0]->is_nullable(); + if (arg_is_nullable) { + return std::make_shared<AggregateFunctionQuantileStateOp< + true, AggregateFunctionQuantileStateUnionOp, double>>(argument_types); + } else { + return std::make_shared<AggregateFunctionQuantileStateOp< + false, AggregateFunctionQuantileStateUnionOp, double>>(argument_types); + } +} + +void register_aggregate_function_quantile_state(AggregateFunctionSimpleFactory& factory) { + factory.register_function("quantile_union", create_aggregate_function_quantile_state_union); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_quantile_state.h b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.h new file mode 100644 index 0000000000..6b07f79648 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_quantile_state.h @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/quantile_state.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column_complex.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/assert_cast.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_quantilestate.h" + +namespace doris::vectorized { + +struct AggregateFunctionQuantileStateUnionOp { + static constexpr auto name = "quantile_union"; + + template <typename T> + static void add(QuantileState<T>& res, const T& data, bool& is_first) { + res.add_value(data); + } + + template <typename T> + static void add(QuantileState<T>& res, const QuantileState<T>& data, bool& is_first) { + if (UNLIKELY(is_first)) { + res = data; + is_first = false; + } else { + res.merge(data); + } + } + + template <typename T> + static void merge(QuantileState<T>& res, const QuantileState<T>& data, bool& is_first) { + if (UNLIKELY(is_first)) { + res = data; + is_first = false; + } else { + res.merge(data); + } + } +}; + +template <typename Op, typename InternalType> +struct AggregateFunctionQuantileStateData { + using DataType = QuantileState<InternalType>; + DataType value; + bool is_first = true; + + template <typename T> + void add(const T& data) { + Op::add(value, data, is_first); + } + + void merge(const DataType& data) { Op::merge(value, data, is_first); } + + void write(BufferWritable& buf) const { + DataTypeQuantileState<InternalType>::serialize_as_stream(value, buf); + } + + void read(BufferReadable& buf) { + DataTypeQuantileState<InternalType>::deserialize_as_stream(value, buf); + } + + void reset() { is_first = true; } + + DataType& get() { return value; } +}; + +template <bool arg_is_nullable, typename Op, typename InternalType> +class AggregateFunctionQuantileStateOp final + : public IAggregateFunctionDataHelper< + AggregateFunctionQuantileStateData<Op, InternalType>, + AggregateFunctionQuantileStateOp<arg_is_nullable, Op, InternalType>> { +public: + using ResultDataType = QuantileState<InternalType>; + using ColVecType = ColumnQuantileState<InternalType>; + using ColVecResult = ColumnQuantileState<InternalType>; + + String get_name() const override { return Op::name; } + + AggregateFunctionQuantileStateOp(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper< + AggregateFunctionQuantileStateData<Op, InternalType>, + AggregateFunctionQuantileStateOp<arg_is_nullable, Op, InternalType>>( + argument_types_) {} + + DataTypePtr get_return_type() const override { + return std::make_shared<DataTypeQuantileState<InternalType>>(); + } + + void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num, + Arena*) const override { + if constexpr (arg_is_nullable) { + auto& nullable_column = assert_cast<const ColumnNullable&>(*columns[0]); + if (!nullable_column.is_null_at(row_num)) { + const auto& column = + static_cast<const ColVecType&>(nullable_column.get_nested_column()); + this->data(place).add(column.get_data()[row_num]); + } + } else { + const auto& column = static_cast<const ColVecType&>(*columns[0]); + this->data(place).add(column.get_data()[row_num]); + } + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, + Arena*) const override { + this->data(place).merge( + const_cast<AggregateFunctionQuantileStateData<Op, InternalType>&>(this->data(rhs)) + .get()); + } + + void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { + this->data(place).write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, + Arena*) const override { + this->data(place).read(buf); + } + + void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { + auto& column = static_cast<ColVecResult&>(to); + column.get_data().push_back( + const_cast<AggregateFunctionQuantileStateData<Op, InternalType>&>(this->data(place)) + .get()); + } + + void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } +}; + +AggregateFunctionPtr create_aggregate_function_quantile_state_union(const std::string& name, + const DataTypes& argument_types, + const bool result_is_nullable); + +} // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp index 0d4231e8e7..46384cf48a 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp @@ -34,6 +34,7 @@ void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& fac register_function_both("bitmap_union", create_aggregate_function_bitmap_union); register_function_both("hll_union", create_aggregate_function_HLL<AggregateFunctionHLLUnionImpl>); + register_function_both("quantile_union", create_aggregate_function_quantile_state_union); } // only replace function in load/reader do different agg operation. diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.h b/be/src/vec/aggregate_functions/aggregate_function_reader.h index 626c06571b..a062a7f496 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader.h +++ b/be/src/vec/aggregate_functions/aggregate_function_reader.h @@ -20,6 +20,7 @@ #include "vec/aggregate_functions/aggregate_function_bitmap.h" #include "vec/aggregate_functions/aggregate_function_hll_union_agg.h" #include "vec/aggregate_functions/aggregate_function_min_max.h" +#include "vec/aggregate_functions/aggregate_function_quantile_state.h" #include "vec/aggregate_functions/aggregate_function_reader_first_last.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" #include "vec/aggregate_functions/aggregate_function_sum.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp index 6a29f581a4..2ce9fcd820 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp @@ -38,6 +38,7 @@ void register_aggregate_function_HLL_union_agg(AggregateFunctionSimpleFactory& f void register_aggregate_function_uniq(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_bitmap(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_quantile_state(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_window_rank(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_window_lead_lag_first_last( AggregateFunctionSimpleFactory& factory); @@ -69,6 +70,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_bit(instance); register_aggregate_function_bitmap(instance); register_aggregate_function_group_concat(instance); + register_aggregate_function_quantile_state(instance); register_aggregate_function_combinator_distinct(instance); register_aggregate_function_reader_load( instance); // register aggregate function for agg reader diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 5faecbdeac..b18258d805 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -578,6 +578,8 @@ public: virtual bool is_hll() const { return false; } + virtual bool is_quantile_state() const { return false; } + // true if column has null element virtual bool has_null() const { return false; } diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index d6c47d0dca..0b6dd71731 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -24,6 +24,7 @@ #include "olap/hll.h" #include "util/bitmap_value.h" +#include "util/quantile_state.h" #include "vec/columns/column.h" #include "vec/columns/column_impl.h" #include "vec/columns/column_string.h" @@ -48,6 +49,7 @@ public: bool is_bitmap() const override { return std::is_same_v<T, BitmapValue>; } bool is_hll() const override { return std::is_same_v<T, HyperLogLog>; } + bool is_quantile_state() const override { return std::is_same_v<T, QuantileState<double>>; } size_t size() const override { return data.size(); } @@ -75,6 +77,8 @@ public: pvalue->deserialize(pos); } else if constexpr (std::is_same_v<T, HyperLogLog>) { pvalue->deserialize(Slice(pos, length)); + } else if constexpr (std::is_same_v<T, QuantileStateDouble>) { + pvalue->deserialize(Slice(pos, length)); } else { LOG(FATAL) << "Unexpected type in column complex"; } @@ -426,6 +430,13 @@ void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size, using ColumnBitmap = ColumnComplexType<BitmapValue>; using ColumnHLL = ColumnComplexType<HyperLogLog>; +template <typename T> +using ColumnQuantileState = ColumnComplexType<QuantileState<T>>; + +using ColumnQuantileStateDouble = ColumnQuantileState<double>; + +//template class ColumnQuantileState<double>; + template <typename T> struct is_complex : std::false_type {}; @@ -437,6 +448,10 @@ template <> struct is_complex<HyperLogLog> : std::true_type {}; //DataTypeHLL::FieldType = HyperLogLog +template <> +struct is_complex<QuantileState<double>> : std::true_type {}; +//DataTypeQuantileState::FieldType = QuantileState<double> + template <class T> constexpr bool is_complex_v = is_complex<T>::value; diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h index 177d2166ed..dcb41ffabd 100644 --- a/be/src/vec/core/types.h +++ b/be/src/vec/core/types.h @@ -36,6 +36,9 @@ class HyperLogLog; struct decimal12_t; struct uint24_t; +template <typename T> +class QuantileState; + namespace vectorized { /// Data types for representing elementary values from a database in RAM. @@ -85,6 +88,7 @@ enum class TypeIndex { Map, Struct, VARIANT, + QuantileState, }; struct Consted { @@ -206,6 +210,11 @@ struct TypeName<HyperLogLog> { static const char* get() { return "HLL"; } }; +template <> +struct TypeName<QuantileState<double>> { + static const char* get() { return "QuantileState"; } +}; + template <typename T> struct TypeId; template <> @@ -604,6 +613,8 @@ inline const char* getTypeName(TypeIndex idx) { return "JSONB"; case TypeIndex::Struct: return "Struct"; + case TypeIndex::QuantileState: + return TypeName<QuantileState<double>>::get(); } __builtin_unreachable(); diff --git a/be/src/vec/data_types/data_type.cpp b/be/src/vec/data_types/data_type.cpp index 74c1211800..6d70850564 100644 --- a/be/src/vec/data_types/data_type.cpp +++ b/be/src/vec/data_types/data_type.cpp @@ -145,6 +145,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const IDataType* data_type) { return PGenericType::BITMAP; case TypeIndex::HLL: return PGenericType::HLL; + case TypeIndex::QuantileState: + return PGenericType::QUANTILE_STATE; case TypeIndex::Array: return PGenericType::LIST; case TypeIndex::Struct: diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp index 80495fa593..ec1eea5637 100644 --- a/be/src/vec/data_types/data_type_factory.cpp +++ b/be/src/vec/data_types/data_type_factory.cpp @@ -155,6 +155,9 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bo case TYPE_DECIMALV2: nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9); break; + case TYPE_QUANTILE_STATE: + nested = std::make_shared<vectorized::DataTypeQuantileStateDouble>(); + break; case TYPE_DECIMAL32: case TYPE_DECIMAL64: case TYPE_DECIMAL128I: @@ -263,6 +266,9 @@ DataTypePtr DataTypeFactory::_create_primitive_data_type(const FieldType& type, case OLAP_FIELD_TYPE_DECIMAL: result = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9); break; + case OLAP_FIELD_TYPE_QUANTILE_STATE: + result = std::make_shared<vectorized::DataTypeQuantileStateDouble>(); + break; case OLAP_FIELD_TYPE_DECIMAL32: case OLAP_FIELD_TYPE_DECIMAL64: case OLAP_FIELD_TYPE_DECIMAL128I: @@ -386,6 +392,10 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) { nested = std::make_shared<DataTypeObject>("object", true); break; } + case PGenericType::QUANTILE_STATE: { + nested = std::make_shared<DataTypeQuantileStateDouble>(); + break; + } default: { LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type()); return nullptr; diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp index ec229980af..869bb75e29 100644 --- a/be/src/vec/data_types/data_type_factory.hpp +++ b/be/src/vec/data_types/data_type_factory.hpp @@ -42,6 +42,7 @@ #include "vec/data_types/data_type_nothing.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_quantilestate.h" #include "vec/data_types/data_type_string.h" #include "vec/data_types/data_type_struct.h" @@ -85,6 +86,7 @@ public: {"Jsonb", std::make_shared<DataTypeJsonb>()}, {"BitMap", std::make_shared<DataTypeBitMap>()}, {"Hll", std::make_shared<DataTypeHLL>()}, + {"QuantileState", std::make_shared<DataTypeQuantileStateDouble>()}, }; for (auto const& [key, val] : base_type_map) { instance.register_data_type(key, val); diff --git a/be/src/vec/data_types/data_type_quantilestate.cpp b/be/src/vec/data_types/data_type_quantilestate.cpp new file mode 100644 index 0000000000..59f7665d67 --- /dev/null +++ b/be/src/vec/data_types/data_type_quantilestate.cpp @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/data_types/data_type_quantilestate.h" + +#include "vec/columns/column_complex.h" +#include "vec/common/assert_cast.h" +#include "vec/io/io_helper.h" + +namespace doris::vectorized { +// binary: <size array> | <quantilestate array> +// <size array>: row num | quantilestate1 size | quantilestate2 size | ... +// <quantilestate array>: quantilestate1 | quantilestate2 | ... +template <typename T> +int64_t DataTypeQuantileState<T>::get_uncompressed_serialized_bytes(const IColumn& column, + int be_exec_version) const { + auto ptr = column.convert_to_full_column_if_const(); + auto& data_column = assert_cast<const ColumnQuantileState<T>&>(*ptr); + + auto allocate_len_size = sizeof(size_t) * (column.size() + 1); + auto allocate_content_size = 0; + for (size_t i = 0; i < column.size(); ++i) { + auto& quantile_state = const_cast<QuantileState<T>&>(data_column.get_element(i)); + allocate_content_size += quantile_state.get_serialized_size(); + } + + return allocate_len_size + allocate_content_size; +} + +template <typename T> +char* DataTypeQuantileState<T>::serialize(const IColumn& column, char* buf, + int be_exec_version) const { + auto ptr = column.convert_to_full_column_if_const(); + auto& data_column = assert_cast<const ColumnQuantileState<T>&>(*ptr); + + // serialize the quantile_state size array, row num saves at index 0 + size_t* meta_ptr = (size_t*)buf; + meta_ptr[0] = column.size(); + for (size_t i = 0; i < meta_ptr[0]; ++i) { + auto& quantile_state = const_cast<QuantileState<T>&>(data_column.get_element(i)); + meta_ptr[i + 1] = quantile_state.get_serialized_size(); + } + + // serialize each quantile_state + char* data_ptr = buf + sizeof(size_t) * (meta_ptr[0] + 1); + for (size_t i = 0; i < meta_ptr[0]; ++i) { + auto& quantile_state = const_cast<QuantileState<T>&>(data_column.get_element(i)); + quantile_state.serialize((uint8_t*)data_ptr); + data_ptr += meta_ptr[i + 1]; + } + + return data_ptr; +} + +template <typename T> +const char* DataTypeQuantileState<T>::deserialize(const char* buf, IColumn* column, + int be_exec_version) const { + auto& data_column = assert_cast<ColumnQuantileState<T>&>(*column); + auto& data = data_column.get_data(); + + // deserialize the quantile_state size array + const size_t* meta_ptr = reinterpret_cast<const size_t*>(buf); + + // deserialize each quantile_state + data.resize(meta_ptr[0]); + const char* data_ptr = buf + sizeof(size_t) * (meta_ptr[0] + 1); + for (size_t i = 0; i < meta_ptr[0]; ++i) { + Slice slice(data_ptr, meta_ptr[i + 1]); + data[i].deserialize(slice); + data_ptr += meta_ptr[i + 1]; + } + + return data_ptr; +} + +template <typename T> +MutableColumnPtr DataTypeQuantileState<T>::create_column() const { + return ColumnQuantileState<T>::create(); +} + +template <typename T> +void DataTypeQuantileState<T>::serialize_as_stream(const QuantileState<T>& cvalue, + BufferWritable& buf) { + auto& value = const_cast<QuantileState<T>&>(cvalue); + std::string memory_buffer; + int bytesize = value.get_serialized_size(); + memory_buffer.resize(bytesize); + value.serialize(const_cast<uint8_t*>(reinterpret_cast<uint8_t*>(memory_buffer.data()))); + write_string_binary(memory_buffer, buf); +} + +template <typename T> +void DataTypeQuantileState<T>::deserialize_as_stream(QuantileState<T>& value, BufferReadable& buf) { + StringRef ref; + read_string_binary(ref, buf); + value.deserialize(ref.to_slice()); +} + +template <typename T> +void DataTypeQuantileState<T>::to_string(const class doris::vectorized::IColumn& column, + size_t row_num, + doris::vectorized::BufferWritable& ostr) const { + auto& data = const_cast<QuantileState<T>&>( + assert_cast<const ColumnQuantileState<T>&>(column).get_element(row_num)); + std::string result(data.get_serialized_size(), '0'); + data.serialize((uint8_t*)result.data()); + ostr.write(result.data(), result.size()); +} + +template class DataTypeQuantileState<double>; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/data_types/data_type_quantilestate.h b/be/src/vec/data_types/data_type_quantilestate.h new file mode 100644 index 0000000000..b6a48f803f --- /dev/null +++ b/be/src/vec/data_types/data_type_quantilestate.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include "util/quantile_state.h" +#include "vec/columns/column.h" +#include "vec/columns/column_complex.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +namespace doris::vectorized { +template <typename T> +class DataTypeQuantileState : public IDataType { +public: + DataTypeQuantileState() = default; + ~DataTypeQuantileState() override = default; + using ColumnType = ColumnQuantileState<T>; + using FieldType = QuantileState<T>; + + std::string do_get_name() const override { return get_family_name(); } + const char* get_family_name() const override { return "QuantileState"; } + + TypeIndex get_type_id() const override { return TypeIndex::QuantileState; } + int64_t get_uncompressed_serialized_bytes(const IColumn& column, + int be_exec_version) const override; + char* serialize(const IColumn& column, char* buf, int be_exec_version) const override; + const char* deserialize(const char* buf, IColumn* column, int be_exec_version) const override; + + MutableColumnPtr create_column() const override; + + bool get_is_parametric() const override { return false; } + bool have_subtypes() const override { return false; } + bool should_align_right_in_pretty_formats() const override { return false; } + bool text_can_contain_only_valid_utf8() const override { return true; } + bool is_comparable() const override { return false; } + bool is_value_represented_by_number() const override { return false; } + bool is_value_represented_by_integer() const override { return false; } + bool is_value_represented_by_unsigned_integer() const override { return false; } + // TODO: + bool is_value_unambiguously_represented_in_contiguous_memory_region() const override { + return true; + } + bool have_maximum_size_of_value() const override { return false; } + + bool can_be_used_as_version() const override { return false; } + + bool can_be_inside_nullable() const override { return true; } + + bool equals(const IDataType& rhs) const override { return typeid(rhs) == typeid(*this); } + + bool is_categorial() const override { return is_value_represented_by_integer(); } + + bool can_be_inside_low_cardinality() const override { return false; } + + std::string to_string(const IColumn& column, size_t row_num) const override { + return "QuantileState()"; + } + void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + + [[noreturn]] virtual Field get_default() const override { + LOG(FATAL) << "Method get_default() is not implemented for data type " << get_name(); + __builtin_unreachable(); + } + + static void serialize_as_stream(const QuantileState<T>& value, BufferWritable& buf); + + static void deserialize_as_stream(QuantileState<T>& value, BufferReadable& buf); +}; +using DataTypeQuantileStateDouble = DataTypeQuantileState<double>; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/functions/function_quantile_state.cpp b/be/src/vec/functions/function_quantile_state.cpp new file mode 100644 index 0000000000..3cc3166ec8 --- /dev/null +++ b/be/src/vec/functions/function_quantile_state.cpp @@ -0,0 +1,277 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/FunctionBitmap.h +// and modified by Doris + +#include "util/string_parser.hpp" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_quantilestate.h" +#include "vec/functions/function_always_not_nullable.h" +#include "vec/functions/function_const.h" +#include "vec/functions/function_string.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris::vectorized { + +template <typename InternalType> +struct QuantileStateEmpty { + static constexpr auto name = "quantile_state_empty"; + using ReturnColVec = ColumnQuantileState<InternalType>; + static DataTypePtr get_return_type() { + return std::make_shared<DataTypeQuantileState<InternalType>>(); + } + static auto init_value() { return QuantileState<InternalType> {}; } +}; + +template <typename InternalType> +class FunctionToQuantileState : public IFunction { +public: + static constexpr auto name = "to_quantile_state"; + String get_name() const override { return name; } + + static FunctionPtr create() { + return std::make_shared<FunctionToQuantileState<InternalType>>(); + } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return std::make_shared<DataTypeQuantileState<InternalType>>(); + } + + size_t get_number_of_arguments() const override { return 2; } + + bool use_default_implementation_for_nulls() const override { return false; } + + bool use_default_implementation_for_constants() const override { return true; } + + template <typename ColumnType, bool is_nullable> + Status execute_internal(const ColumnPtr& column, const DataTypePtr& data_type, + MutableColumnPtr& column_result) { + auto type_error = [&]() { + return Status::RuntimeError("Illegal column {} of argument of function {}", + column->get_name(), get_name()); + }; + const ColumnNullable* col_nullable = nullptr; + const ColumnUInt8* col_nullmap = nullptr; + const ColumnType* col = nullptr; + const NullMap* nullmap = nullptr; + if constexpr (is_nullable) { + col_nullable = check_and_get_column<ColumnNullable>(column.get()); + col_nullmap = check_and_get_column<ColumnUInt8>( + col_nullable->get_null_map_column_ptr().get()); + col = check_and_get_column<ColumnType>(col_nullable->get_nested_column_ptr().get()); + if (col == nullptr || col_nullmap == nullptr) { + return type_error(); + } + + nullmap = &col_nullmap->get_data(); + } else { + col = check_and_get_column<ColumnType>(column.get()); + } + auto* res_column = + reinterpret_cast<ColumnQuantileState<InternalType>*>(column_result.get()); + auto& res_data = res_column->get_data(); + + size_t size = col->size(); + for (size_t i = 0; i < size; ++i) { + if constexpr (is_nullable) { + if ((*nullmap)[i]) { + continue; + } + } + + if constexpr (std::is_same_v<ColumnType, ColumnString>) { + const ColumnString::Chars& data = col->get_chars(); + const ColumnString::Offsets& offsets = col->get_offsets(); + + const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]); + size_t str_size = offsets[i] - offsets[i - 1]; + StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; + InternalType value = StringParser::string_to_float<InternalType>(raw_str, str_size, + &parse_result); + if (LIKELY(parse_result == StringParser::PARSE_SUCCESS)) { + res_data[i].add_value(value); + } else { + std::stringstream ss; + ss << "The input column content: " << std::string(raw_str, str_size) + << " is not valid in function: " << get_name(); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + } else if constexpr (std::is_same_v<ColumnType, ColumnInt64> || + std::is_same_v<ColumnType, ColumnFloat32> || + std::is_same_v<ColumnType, ColumnFloat64>) { + // InternalType only can be double or float, so we can cast directly + InternalType value = (InternalType)col->get_data()[i]; + res_data[i].set_compression(compression); + res_data[i].add_value(value); + } else { + type_error(); + } + } + return Status::OK(); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count) override { + if constexpr (!(std::is_same_v<InternalType, float> || + std::is_same_v<InternalType, double>)) { + std::stringstream ss; + ss << "The InternalType of quantile_state must be float or double"; + + return Status::InternalError(ss.str()); + } + + const ColumnPtr& column = block.get_by_position(arguments[0]).column; + const DataTypePtr& data_type = block.get_by_position(arguments[0]).type; + auto compression_arg = check_and_get_column_const<ColumnFloat32>( + block.get_by_position(arguments.back()).column); + if (compression_arg) { + auto compression_arg_val = compression_arg->get_value<Float32>(); + if (compression_arg_val && compression_arg_val >= QUANTILE_STATE_COMPRESSION_MIN && + compression_arg_val <= QUANTILE_STATE_COMPRESSION_MAX) { + this->compression = compression_arg_val; + } + } + WhichDataType which(data_type); + MutableColumnPtr column_result = get_return_type_impl({})->create_column(); + column_result->resize(input_rows_count); + + auto type_error = [&]() { + return Status::RuntimeError("Illegal column {} of argument of function {}", + block.get_by_position(arguments[0]).column->get_name(), + get_name()); + }; + Status status = Status::OK(); + if (which.is_nullable()) { + const DataTypePtr& nested_data_type = + static_cast<const DataTypeNullable*>(data_type.get())->get_nested_type(); + WhichDataType nested_which(nested_data_type); + if (nested_which.is_string_or_fixed_string()) { + status = execute_internal<ColumnString, true>(column, data_type, column_result); + } else if (nested_which.is_int64()) { + status = execute_internal<ColumnInt64, true>(column, data_type, column_result); + } else if (which.is_float32()) { + status = execute_internal<ColumnFloat32, true>(column, data_type, column_result); + } else if (which.is_float64()) { + status = execute_internal<ColumnFloat64, true>(column, data_type, column_result); + } else { + return type_error(); + } + } else { + if (which.is_string_or_fixed_string()) { + status = execute_internal<ColumnString, false>(column, data_type, column_result); + } else if (which.is_int64()) { + status = execute_internal<ColumnInt64, false>(column, data_type, column_result); + } else if (which.is_float32()) { + status = execute_internal<ColumnFloat32, false>(column, data_type, column_result); + } else if (which.is_float64()) { + status = execute_internal<ColumnFloat64, false>(column, data_type, column_result); + } else { + return type_error(); + } + } + if (status.ok()) { + block.replace_by_position(result, std::move(column_result)); + } + return status; + } + +private: + float compression = 2048; +}; + +template <typename InternalType> +class FunctionQuantileStatePercent : public IFunction { +public: + static constexpr auto name = "quantile_percent"; + String get_name() const override { return name; } + + static FunctionPtr create() { + return std::make_shared<FunctionQuantileStatePercent<InternalType>>(); + } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return std::make_shared<DataTypeFloat64>(); + } + + size_t get_number_of_arguments() const override { return 2; } + + bool use_default_implementation_for_nulls() const override { return false; } + + bool use_default_implementation_for_constants() const override { return true; } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count) override { + auto res_data_column = ColumnFloat64::create(); + auto& res = res_data_column->get_data(); + auto data_null_map = ColumnUInt8::create(input_rows_count, 0); + auto& null_map = data_null_map->get_data(); + + auto column = block.get_by_position(arguments[0]).column->convert_to_full_column_if_const(); + if (auto* nullable = check_and_get_column<const ColumnNullable>(*column)) { + VectorizedUtils::update_null_map(null_map, nullable->get_null_map_data()); + column = nullable->get_nested_column_ptr(); + } + auto str_col = assert_cast<const ColumnQuantileState<InternalType>*>(column.get()); + auto& col_data = str_col->get_data(); + auto percent_arg = check_and_get_column_const<ColumnFloat32>( + block.get_by_position(arguments.back()).column); + + if (!percent_arg) { + LOG(FATAL) << fmt::format( + "Second argument to {} must be a constant string describing type", get_name()); + } + float percent_arg_value = percent_arg->get_value<Float32>(); + if (percent_arg_value < 0 || percent_arg_value > 1) { + std::stringstream ss; + ss << "the input argument of percentage: " << percent_arg_value + << " is not valid, must be in range [0,1] "; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + res.reserve(input_rows_count); + for (size_t i = 0; i < input_rows_count; ++i) { + if (null_map[i]) { + // if null push_back meaningless result to make sure idxs can be matched + res.push_back(0); + continue; + } + + res.push_back(col_data[i].get_value_by_percentile(percent_arg_value)); + } + + block.replace_by_position(result, std::move(res_data_column)); + return Status::OK(); + } +}; + +using FunctionQuantileStateEmpty = FunctionConst<QuantileStateEmpty<double>, false>; +using FunctionQuantileStatePercentDouble = FunctionQuantileStatePercent<double>; +using FunctionToQuantileStateDouble = FunctionToQuantileState<double>; + +void register_function_quantile_state(SimpleFunctionFactory& factory) { + factory.register_function<FunctionQuantileStateEmpty>(); + factory.register_function<FunctionQuantileStatePercentDouble>(); + factory.register_function<FunctionToQuantileStateDouble>(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index 8495b5493b..c85c16d5c6 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -291,6 +291,24 @@ void RPCFnImpl::_convert_col_to_pvalue(const ColumnPtr& column, const DataTypePt } break; } + case TypeIndex::QuantileState: { + ptype->set_id(PGenericType::QUANTILE_STATE); + arg->mutable_bytes_value()->Reserve(row_count); + for (size_t row_num = start; row_num < end; ++row_num) { + if constexpr (nullable) { + if (column->is_null_at(row_num)) { + arg->add_bytes_value(nullptr); + } else { + StringRef data = column->get_data_at(row_num); + arg->add_bytes_value(data.data, data.size); + } + } else { + StringRef data = column->get_data_at(row_num); + arg->add_bytes_value(data.data, data.size); + } + } + break; + } default: LOG(INFO) << "unknown type: " << data_type->get_name(); ptype->set_id(PGenericType::UNKNOWN); @@ -443,6 +461,13 @@ void RPCFnImpl::_convert_to_column(MutableColumnPtr& column, const PValues& resu } break; } + case PGenericType::QUANTILE_STATE: { + column->reserve(result.bytes_value_size()); + for (int i = 0; i < result.bytes_value_size(); ++i) { + column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size()); + } + break; + } default: { LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString(); break; diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index 2b20c8ff5c..9ab060d11a 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -49,6 +49,7 @@ void register_function_math(SimpleFunctionFactory& factory); void register_function_modulo(SimpleFunctionFactory& factory); void register_function_bitmap(SimpleFunctionFactory& factory); void register_function_bitmap_variadic(SimpleFunctionFactory& factory); +void register_function_quantile_state(SimpleFunctionFactory& factory); void register_function_is_null(SimpleFunctionFactory& factory); void register_function_is_not_null(SimpleFunctionFactory& factory); void register_function_non_nullable(SimpleFunctionFactory& factory); @@ -176,6 +177,7 @@ public: static SimpleFunctionFactory instance; std::call_once(oc, []() { register_function_bitmap(instance); + register_function_quantile_state(instance); register_function_bitmap_variadic(instance); register_function_hll_cardinality(instance); register_function_hll_empty(instance); diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 78c5d0abd9..ae7dac0b41 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -54,6 +54,9 @@ OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co case FieldType::OLAP_FIELD_TYPE_OBJECT: { return std::make_unique<OlapColumnDataConvertorBitMap>(); } + case FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE: { + return std::make_unique<OlapColumnDataConvertorQuantileState>(); + } case FieldType::OLAP_FIELD_TYPE_HLL: { return std::make_unique<OlapColumnDataConvertorHLL>(); } @@ -298,6 +301,85 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorBitMap::convert_to_olap() return Status::OK(); } +Status OlapBlockDataConvertor::OlapColumnDataConvertorQuantileState::convert_to_olap() { + assert(_typed_column.column); + + const vectorized::ColumnQuantileStateDouble* column_quantile_state = nullptr; + if (_nullmap) { + auto nullable_column = + assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); + column_quantile_state = assert_cast<const vectorized::ColumnQuantileStateDouble*>( + nullable_column->get_nested_column_ptr().get()); + } else { + column_quantile_state = assert_cast<const vectorized::ColumnQuantileStateDouble*>( + _typed_column.column.get()); + } + + assert(column_quantile_state); + QuantileStateDouble* quantile_state = + const_cast<QuantileStateDouble*>(column_quantile_state->get_data().data() + _row_pos); + QuantileStateDouble* quantile_state_cur = quantile_state; + QuantileStateDouble* quantile_state_end = quantile_state_cur + _num_rows; + + size_t total_size = 0; + if (_nullmap) { + const UInt8* nullmap_cur = _nullmap + _row_pos; + while (quantile_state_cur != quantile_state_end) { + if (!*nullmap_cur) { + total_size += quantile_state_cur->get_serialized_size(); + } + ++nullmap_cur; + ++quantile_state_cur; + } + } else { + while (quantile_state_cur != quantile_state_end) { + total_size += quantile_state_cur->get_serialized_size(); + ++quantile_state_cur; + } + } + _raw_data.resize(total_size); + + quantile_state_cur = quantile_state; + size_t slice_size; + char* raw_data = _raw_data.data(); + Slice* slice = _slice.data(); + if (_nullmap) { + const UInt8* nullmap_cur = _nullmap + _row_pos; + while (quantile_state_cur != quantile_state_end) { + if (!*nullmap_cur) { + slice_size = quantile_state_cur->get_serialized_size(); + quantile_state_cur->serialize((uint8_t*)raw_data); + + slice->data = raw_data; + slice->size = slice_size; + raw_data += slice_size; + } else { + // TODO: this may not be necessary, check and remove later + slice->data = nullptr; + slice->size = 0; + } + ++slice; + ++nullmap_cur; + ++quantile_state_cur; + } + assert(nullmap_cur == _nullmap + _row_pos + _num_rows && slice == _slice.get_end_ptr()); + } else { + while (quantile_state_cur != quantile_state_end) { + slice_size = quantile_state_cur->get_serialized_size(); + quantile_state_cur->serialize((uint8_t*)raw_data); + + slice->data = raw_data; + slice->size = slice_size; + raw_data += slice_size; + + ++slice; + ++quantile_state_cur; + } + assert(slice == _slice.get_end_ptr()); + } + return Status::OK(); +} + Status OlapBlockDataConvertor::OlapColumnDataConvertorHLL::convert_to_olap() { assert(_typed_column.column); const vectorized::ColumnHLL* column_hll = nullptr; diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index f3456bc3af..1c6a74c5fd 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -112,6 +112,11 @@ private: Status convert_to_olap() override; }; + class OlapColumnDataConvertorQuantileState final : public OlapColumnDataConvertorObject { + public: + Status convert_to_olap() override; + }; + class OlapColumnDataConvertorChar : public OlapColumnDataConvertorBase { public: OlapColumnDataConvertorChar(size_t length); diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index a3aa1d97f4..4723c26701 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -85,7 +85,8 @@ Status VMysqlResultWriter<is_binary_format>::_add_one_column( int buf_ret = 0; - if constexpr (type == TYPE_OBJECT || type == TYPE_VARCHAR || type == TYPE_JSONB) { + if constexpr (type == TYPE_OBJECT || type == TYPE_QUANTILE_STATE || type == TYPE_VARCHAR || + type == TYPE_JSONB) { for (int i = 0; i < row_size; ++i) { if (0 != buf_ret) { return Status::InternalError("pack mysql buffer failed."); @@ -117,6 +118,16 @@ Status VMysqlResultWriter<is_binary_format>::_add_one_column( std::unique_ptr<char[]> buf = std::make_unique<char[]>(size); hyperLogLog.serialize((uint8*)buf.get()); buf_ret = rows_buffer[i].push_string(buf.get(), size); + + } else if (column->is_quantile_state() && output_object_data()) { + const vectorized::ColumnComplexType<QuantileStateDouble>* pColumnComplexType = + assert_cast<const vectorized::ColumnComplexType<QuantileStateDouble>*>( + column.get()); + QuantileStateDouble quantileValue = pColumnComplexType->get_element(i); + size_t size = quantileValue.get_serialized_size(); + std::unique_ptr<char[]> buf = std::make_unique<char[]>(size); + quantileValue.serialize((uint8_t*)buf.get()); + buf_ret = rows_buffer[i].push_string(buf.get(), size); } else { buf_ret = rows_buffer[i].push_null(); } @@ -728,6 +739,7 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) { break; } case TYPE_HLL: + case TYPE_QUANTILE_STATE: case TYPE_OBJECT: { if (type_ptr->is_nullable()) { status = _add_one_column<PrimitiveType::TYPE_OBJECT, true>(column_ptr, result, diff --git a/be/test/vec/core/column_complex_test.cpp b/be/test/vec/core/column_complex_test.cpp index 87e5998fa3..26a96986a3 100644 --- a/be/test/vec/core/column_complex_test.cpp +++ b/be/test/vec/core/column_complex_test.cpp @@ -26,6 +26,7 @@ #include "agent/heartbeat_server.h" #include "vec/core/block.h" #include "vec/data_types/data_type_bitmap.h" +#include "vec/data_types/data_type_quantilestate.h" namespace doris::vectorized { TEST(ColumnComplexTest, BasicTest) { using ColumnSTLString = ColumnComplexType<std::string>; @@ -83,7 +84,42 @@ private: DataTypeBitMap _bitmap_type; }; -TEST_F(ColumnBitmapTest, SerializeAndDeserialize) { +class ColumnQuantileStateTest : public testing::Test { +public: + virtual void SetUp() override {} + virtual void TearDown() override {} + + void check_bitmap_column(const IColumn& l, const IColumn& r) { + ASSERT_EQ(l.size(), r.size()); + const auto& l_col = assert_cast<const ColumnQuantileStateDouble&>(l); + const auto& r_col = assert_cast<const ColumnQuantileStateDouble&>(r); + for (size_t i = 0; i < l_col.size(); ++i) { + auto& l_value = const_cast<QuantileStateDouble&>(l_col.get_element(i)); + auto& r_value = const_cast<QuantileStateDouble&>(r_col.get_element(i)); + ASSERT_EQ(l_value.get_serialized_size(), r_value.get_serialized_size()); + } + } + + void check_serialize_and_deserialize(MutableColumnPtr& col) { + auto column = assert_cast<ColumnQuantileStateDouble*>(col.get()); + auto size = _quantile_state_type.get_uncompressed_serialized_bytes( + *column, BeExecVersionManager::get_newest_version()); + std::unique_ptr<char[]> buf = std::make_unique<char[]>(size); + auto result = _quantile_state_type.serialize(*column, buf.get(), + BeExecVersionManager::get_newest_version()); + ASSERT_EQ(result, buf.get() + size); + + auto column2 = _quantile_state_type.create_column(); + _quantile_state_type.deserialize(buf.get(), column2.get(), + BeExecVersionManager::get_newest_version()); + check_bitmap_column(*column, *column2.get()); + } + +private: + DataTypeQuantileStateDouble _quantile_state_type; +}; + +TEST_F(ColumnBitmapTest, ColumnBitmapReadWrite) { auto column = _bitmap_type.create_column(); // empty column @@ -106,4 +142,31 @@ TEST_F(ColumnBitmapTest, SerializeAndDeserialize) { check_serialize_and_deserialize(column); } +TEST_F(ColumnQuantileStateTest, ColumnQuantileStateReadWrite) { + auto column = _quantile_state_type.create_column(); + // empty column + check_serialize_and_deserialize(column); + + // quantile column with lots of rows + const size_t row_size = 20000; + auto& data = assert_cast<ColumnQuantileStateDouble&>(*column.get()).get_data(); + data.resize(row_size); + // EMPTY type + check_serialize_and_deserialize(column); + // SINGLE type + for (size_t i = 0; i < row_size; ++i) { + data[i].add_value(i); + } + check_serialize_and_deserialize(column); + // EXPLICIT type + for (size_t i = 0; i < row_size; ++i) { + data[i].add_value(i + 1); + } + // TDIGEST type + for (size_t i = 0; i < QUANTILE_STATE_EXPLICIT_NUM; ++i) { + data[0].add_value(i); + } + check_serialize_and_deserialize(column); +} + } // namespace doris::vectorized diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 4da40c9938..1f7561c8cb 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1697,7 +1697,7 @@ public class Config extends ConfigBase { * Default is false. * */ @ConfField(mutable = true, masterOnly = true) - public static boolean enable_quantile_state_type = false; + public static boolean enable_quantile_state_type = true; @ConfField public static boolean enable_vectorized_load = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index b51a07b01d..af1abe1197 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -638,6 +638,9 @@ public class CreateFunctionStmt extends DdlStmt { case BITMAP: typeBuilder.setId(Types.PGenericType.TypeId.BITMAP); break; + case QUANTILE_STATE: + typeBuilder.setId(Types.PGenericType.TypeId.QUANTILE_STATE); + break; case DATE: typeBuilder.setId(Types.PGenericType.TypeId.DATE); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index ef577b06c1..aa14e31109 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -872,7 +872,6 @@ public class FunctionCallExpr extends Expr { if (!getChild(1).isConstant()) { throw new AnalysisException(fnName + "function's second argument should be constant"); } - throw new AnalysisException(fnName + "not support on vectorized engine now."); } if ((fnName.getFunction().equalsIgnoreCase("HLL_UNION_AGG") diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto index c97dc64b4c..12b1e34df9 100644 --- a/gensrc/proto/types.proto +++ b/gensrc/proto/types.proto @@ -106,6 +106,7 @@ message PGenericType { JSONB = 31; DECIMAL128I = 32; VARIANT = 33; + QUANTILE_STATE = 34; UNKNOWN = 999; } required TypeId id = 2; diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 3954d92693..f39f9f065e 100644 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -1542,10 +1542,13 @@ visible_functions = [ [['bitmap_or_count'], 'BIGINT', ['BITMAP','BITMAP'], ''], [['sub_bitmap'], 'BITMAP', ['BITMAP', 'BIGINT', 'BIGINT'], 'ALWAYS_NULLABLE'], [['bitmap_to_array'], 'ARRAY_BIGINT', ['BITMAP'], ''], - # quantile_function - [['to_quantile_state'], 'QUANTILE_STATE', ['VARCHAR', 'FLOAT'], ''], - [['quantile_percent'], 'DOUBLE', ['QUANTILE_STATE', 'FLOAT'], ''], + # quantile_function + [['to_quantile_state'], 'QUANTILE_STATE', ['VARCHAR', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'], + [['to_quantile_state'], 'QUANTILE_STATE', ['DOUBLE', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'], + [['to_quantile_state'], 'QUANTILE_STATE', ['FLOAT', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'], + [['to_quantile_state'], 'QUANTILE_STATE', ['BIGINT', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'], + [['quantile_percent'], 'DOUBLE', ['QUANTILE_STATE', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'], # hash functions diff --git a/regression-test/common/load/quantile_state_basic_agg.sql b/regression-test/common/load/quantile_state_basic_agg.sql new file mode 100644 index 0000000000..e201e2ac98 --- /dev/null +++ b/regression-test/common/load/quantile_state_basic_agg.sql @@ -0,0 +1,4 @@ +insert into quantile_state_basic_agg values +(1,to_quantile_state(-1, 2048)), +(2,to_quantile_state(0, 2048)),(2,to_quantile_state(1, 2048)), +(3,to_quantile_state(0, 2048)),(3,to_quantile_state(1, 2048)),(3,to_quantile_state(2, 2048)); diff --git a/regression-test/common/table/quantile_state_basic_agg.sql b/regression-test/common/table/quantile_state_basic_agg.sql new file mode 100644 index 0000000000..7e71b37e4e --- /dev/null +++ b/regression-test/common/table/quantile_state_basic_agg.sql @@ -0,0 +1,6 @@ +create TABLE if not exists `quantile_state_basic_agg` ( + `k1` int(11) NULL, + `k2` QUANTILE_STATE QUANTILE_UNION NOT NULL + )AGGREGATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); diff --git a/regression-test/data/datatype_p0/complex_types/basic_agg_test.out b/regression-test/data/datatype_p0/complex_types/basic_agg_test.out index 767a1dc038..b1dfeeca40 100644 --- a/regression-test/data/datatype_p0/complex_types/basic_agg_test.out +++ b/regression-test/data/datatype_p0/complex_types/basic_agg_test.out @@ -14,3 +14,12 @@ 2 1 3 2 +-- !sql_quantile_state -- +1 \N +2 \N +3 \N + +-- !sql_quantile_state_percent -- +1 -1.0 +2 0.5 +3 1.0 diff --git a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out index 2fc6021067..78282bd470 100644 --- a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out +++ b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out @@ -242,3 +242,18 @@ beijing chengdu shanghai -- !select47 -- 6 +-- !select48 -- +20220201 0 1.0 +20220201 1 -1.0 +20220202 2 0.0 + +-- !select49 -- +20220201 0 1.0 +20220201 1 1.0 +20220202 2 2500.0 + +-- !select50 -- +20220201 0 1.0 +20220201 1 3.0 +20220202 2 4999.0 + diff --git a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out index 98260049ca..6e76e30c55 100644 --- a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out +++ b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.out @@ -255,3 +255,17 @@ beijing chengdu shanghai -- !select47 -- 6 +-- !select48 -- +20220201 0 1.0 +20220201 1 -1.0 +20220202 2 0.0 + +-- !select49 -- +20220201 0 1.0 +20220201 1 1.0 +20220202 2 2500.0 + +-- !select50 -- +20220201 0 1.0 +20220201 1 3.0 +20220202 2 4999.0 diff --git a/regression-test/data/types/complex_types/basic_agg_test.out b/regression-test/data/types/complex_types/basic_agg_test.out index 767a1dc038..b1dfeeca40 100644 --- a/regression-test/data/types/complex_types/basic_agg_test.out +++ b/regression-test/data/types/complex_types/basic_agg_test.out @@ -14,3 +14,12 @@ 2 1 3 2 +-- !sql_quantile_state -- +1 \N +2 \N +3 \N + +-- !sql_quantile_state_percent -- +1 -1.0 +2 0.5 +3 1.0 diff --git a/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy b/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy index 6314f76dab..d8a676398f 100644 --- a/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy +++ b/regression-test/suites/datatype_p0/complex_types/basic_agg_test.groovy @@ -16,7 +16,7 @@ // under the License. suite("basic_agg_test") { - def tables=["bitmap_basic_agg","hll_basic_agg"] + def tables=["bitmap_basic_agg","hll_basic_agg","quantile_state_basic_agg"] for (String table in tables) { sql """drop table if exists ${table};""" @@ -29,4 +29,8 @@ suite("basic_agg_test") { qt_sql_hll """select * from hll_basic_agg;""" qt_sql_hll_cardinality """select k1, hll_cardinality(hll_union(k2)) from hll_basic_agg group by k1 order by k1;""" + + qt_sql_quantile_state """select * from quantile_state_basic_agg;""" + + qt_sql_quantile_state_percent """select k1, quantile_percent(quantile_union(k2), 0.5) from quantile_state_basic_agg group by k1 order by k1;""" } diff --git a/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy b/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy index 01d8a0faf6..e3cb44e2c0 100644 --- a/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy +++ b/regression-test/suites/nereids_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy @@ -498,4 +498,43 @@ suite("test_aggregate_all_functions") { qt_select46 """select * from ${tableName_12} where id>=5 and id <=5 and level >10 order by id,level;""" qt_select47 """select count(*) from ${tableName_12}""" + + def tableName_21 = "quantile_state_agg_test" + + sql "DROP TABLE IF EXISTS ${tableName_21}" + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName_21} ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `price` quantile_state QUANTILE_UNION NOT NULL COMMENT "" + ) ENGINE=OLAP + AGGREGATE KEY(`dt`, `id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`dt`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """INSERT INTO ${tableName_21} values(20220201,0, to_quantile_state(1, 2048))""" + sql """INSERT INTO ${tableName_21} values(20220201,1, to_quantile_state(-1, 2048)), + (20220201,1, to_quantile_state(0, 2048)),(20220201,1, to_quantile_state(1, 2048)), + (20220201,1, to_quantile_state(2, 2048)),(20220201,1, to_quantile_state(3, 2048)) + """ + + List rows = new ArrayList() + for (int i = 0; i < 5000; ++i) { + rows.add([20220202, 2 , i]) + } + streamLoad { + table "${tableName_21}" + set 'label', UUID.randomUUID().toString() + set 'columns', 'dt, id, price, price=to_quantile_state(price, 2048)' + inputIterator rows.iterator() + } + + qt_select48 """select dt, id, quantile_percent(quantile_union(price), 0) from ${tableName_21} group by dt, id order by dt, id""" + + qt_select49 """select dt, id, quantile_percent(quantile_union(price), 0.5) from ${tableName_21} group by dt, id order by dt, id""" + qt_select50 """select dt, id, quantile_percent(quantile_union(price), 1) from ${tableName_21} group by dt, id order by dt, id""" } diff --git a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy index 94e18db0c1..973ea22dff 100644 --- a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy +++ b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy @@ -495,4 +495,45 @@ suite("test_aggregate_all_functions") { qt_select46 """select * from ${tableName_12} where id>=5 and id <=5 and level >10 order by id,level;""" qt_select47 """select count(*) from ${tableName_12}""" + + def tableName_21 = "quantile_state_agg_test" + + sql "DROP TABLE IF EXISTS ${tableName_21}" + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName_21} ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `price` quantile_state QUANTILE_UNION NOT NULL COMMENT "" + ) ENGINE=OLAP + AGGREGATE KEY(`dt`, `id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`dt`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """INSERT INTO ${tableName_21} values(20220201,0, to_quantile_state(1, 2048))""" + sql """INSERT INTO ${tableName_21} values(20220201,1, to_quantile_state(-1, 2048)), + (20220201,1, to_quantile_state(0, 2048)),(20220201,1, to_quantile_state(1, 2048)), + (20220201,1, to_quantile_state(2, 2048)),(20220201,1, to_quantile_state(3, 2048)) + """ + + List rows = new ArrayList() + for (int i = 0; i < 5000; ++i) { + rows.add([20220202, 2 , i]) + } + streamLoad { + table "${tableName_21}" + set 'label', UUID.randomUUID().toString() + set 'columns', 'dt, id, price, price=to_quantile_state(price, 2048)' + inputIterator rows.iterator() + } + + qt_select48 """select dt, id, quantile_percent(quantile_union(price), 0) from ${tableName_21} group by dt, id order by dt, id""" + + qt_select49 """select dt, id, quantile_percent(quantile_union(price), 0.5) from ${tableName_21} group by dt, id order by dt, id""" + qt_select50 """select dt, id, quantile_percent(quantile_union(price), 1) from ${tableName_21} group by dt, id order by dt, id""" + + } diff --git a/regression-test/suites/types/complex_types/basic_agg_test.groovy b/regression-test/suites/types/complex_types/basic_agg_test.groovy index 06ddf9a383..051c555152 100644 --- a/regression-test/suites/types/complex_types/basic_agg_test.groovy +++ b/regression-test/suites/types/complex_types/basic_agg_test.groovy @@ -16,7 +16,7 @@ // under the License. suite("basic_agg_test", "types") { - def tables=["bitmap_basic_agg","hll_basic_agg"] + def tables=["bitmap_basic_agg","hll_basic_agg", "quantile_state_basic_agg"] for (String table in tables) { sql """drop table if exists ${table};""" @@ -29,4 +29,8 @@ suite("basic_agg_test", "types") { qt_sql_hll """select * from hll_basic_agg;""" qt_sql_hll_cardinality """select k1, hll_cardinality(hll_union(k2)) from hll_basic_agg group by k1 order by k1;""" + + qt_sql_quantile_state """select * from quantile_state_basic_agg;""" + + qt_sql_quantile_state_percent """select k1, quantile_percent(quantile_union(k2), 0.5) from quantile_state_basic_agg group by k1 order by k1;""" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org