This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e993b6a5e1f [feature-WIP](query cache) cache tablets aggregate result, BE part (#40171) (#42375) e993b6a5e1f is described below commit e993b6a5e1f8935fbbc427d240751023dcbeb0ca Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Oct 24 21:15:27 2024 +0800 [feature-WIP](query cache) cache tablets aggregate result, BE part (#40171) (#42375) support cache tablets aggregate result for example SQL 1: ```sql select key, sum(value) from tbl where dt between '2024-08-01' and '2024-08-10' group by key ``` SQL 2: ```sql select key, sum(value) from tbl where dt between '2024-08-5' and '2024-08-15' group by key ``` SQL 1 will update the tablets aggregate result which partition between '2024-08-01' and '2024-08-10'. Then SQL 2 will reuse the tablets aggregate which partition between '2024-08-05' and '2024-08-10', and compute aggregate which partition between '2024-08-11' and '2024-08-15' We only support simple aggregate which not contains join with runtime filter, at present. ```sql set enable_query_cache=true; ``` --- be/src/common/config.cpp | 2 + be/src/common/config.h | 3 + be/src/pipeline/dependency.h | 7 + be/src/pipeline/exec/cache_sink_operator.cpp | 73 +++++++++ be/src/pipeline/exec/cache_sink_operator.h | 73 +++++++++ be/src/pipeline/exec/cache_source_operator.cpp | 199 +++++++++++++++++++++++++ be/src/pipeline/exec/cache_source_operator.h | 104 +++++++++++++ be/src/pipeline/exec/olap_scan_operator.cpp | 34 +++-- be/src/pipeline/exec/olap_scan_operator.h | 4 +- be/src/pipeline/exec/operator.cpp | 7 +- be/src/pipeline/pipeline_fragment_context.cpp | 80 ++++++++-- be/src/pipeline/query_cache/query_cache.cpp | 69 +++++++++ be/src/pipeline/query_cache/query_cache.h | 151 +++++++++++++++++++ be/src/runtime/exec_env.h | 7 + be/src/runtime/exec_env_init.cpp | 9 +- be/src/runtime/memory/cache_policy.h | 3 + be/src/vec/core/block.cpp | 2 - 17 files changed, 802 insertions(+), 25 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b077deac04f..5a8c607bad7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1374,6 +1374,8 @@ DEFINE_mInt32(lz4_compression_block_size, "262144"); DEFINE_mBool(enable_pipeline_task_leakage_detect, "false"); +DEFINE_Int32(query_cache_size, "512"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 734d73f46d8..7e70e067f3a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1454,6 +1454,9 @@ DECLARE_mInt32(lz4_compression_block_size); DECLARE_mBool(enable_pipeline_task_leakage_detect); +// MB +DECLARE_Int32(query_cache_size); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index d0c16a3ff5a..9364170898d 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -35,6 +35,7 @@ #include "pipeline/exec/join/process_hash_table_probe.h" #include "vec/common/sort/partition_sorter.h" #include "vec/common/sort/sorter.h" +#include "vec/core/block.h" #include "vec/core/types.h" #include "vec/spill/spill_stream.h" @@ -541,6 +542,12 @@ public: const int _child_count; }; +struct CacheSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(CacheSharedState) +public: + DataQueue data_queue; +}; + class MultiCastDataStreamer; struct MultiCastSharedState : public BasicSharedState { diff --git a/be/src/pipeline/exec/cache_sink_operator.cpp b/be/src/pipeline/exec/cache_sink_operator.cpp new file mode 100644 index 00000000000..b8b5b534659 --- /dev/null +++ b/be/src/pipeline/exec/cache_sink_operator.cpp @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cache_sink_operator.h" + +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "pipeline/exec/data_queue.h" +#include "pipeline/exec/operator.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" + +namespace doris::pipeline { + +Status CacheSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + _shared_state->data_queue.set_sink_dependency(_dependency, 0); + return Status::OK(); +} + +Status CacheSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + // auto& p = _parent->cast<Parent>(); + + _shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks()); + return Status::OK(); +} + +CacheSinkOperatorX::CacheSinkOperatorX(int sink_id, int child_id) + : Base(sink_id, child_id, child_id) { + _name = "CACHE_SINK_OPERATOR"; +} + +Status CacheSinkOperatorX::open(RuntimeState* state) { + return Status::OK(); +} + +Status CacheSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + + if (in_block->rows() > 0) { + local_state._shared_state->data_queue.push_block( + vectorized::Block::create_unique(std::move(*in_block)), 0); + } + if (UNLIKELY(eos)) { + local_state._shared_state->data_queue.set_finish(0); + } + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/cache_sink_operator.h b/be/src/pipeline/exec/cache_sink_operator.h new file mode 100644 index 00000000000..9c4beb48df2 --- /dev/null +++ b/be/src/pipeline/exec/cache_sink_operator.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "vec/core/block.h" + +namespace doris { +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class CacheSinkOperatorX; +class CacheSinkLocalState final : public PipelineXSinkLocalState<CacheSharedState> { +public: + ENABLE_FACTORY_CREATOR(CacheSinkLocalState); + CacheSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; + friend class CacheSinkOperatorX; + using Base = PipelineXSinkLocalState<CacheSharedState>; + using Parent = CacheSinkOperatorX; +}; + +class CacheSinkOperatorX final : public DataSinkOperatorX<CacheSinkLocalState> { +public: + using Base = DataSinkOperatorX<CacheSinkLocalState>; + + friend class CacheSinkLocalState; + CacheSinkOperatorX(int sink_id, int child_id); + ~CacheSinkOperatorX() override = default; + Status init(const TDataSink& tsink) override { + return Status::InternalError("{} should not init with TDataSink", + DataSinkOperatorX<CacheSinkLocalState>::_name); + } + + Status open(RuntimeState* state) override; + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; + + std::shared_ptr<BasicSharedState> create_shared_state() const override { + std::shared_ptr<BasicSharedState> ss = std::make_shared<CacheSharedState>(); + ss->id = operator_id(); + for (auto& dest : dests_id()) { + ss->related_op_ids.insert(dest); + } + return ss; + } +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp new file mode 100644 index 00000000000..5f8c5befc6a --- /dev/null +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/cache_source_operator.h" + +#include <functional> +#include <utility> + +#include "common/status.h" +#include "pipeline/dependency.h" +#include "pipeline/exec/operator.h" +#include "vec/core/block.h" + +namespace doris { +class RuntimeState; + +namespace pipeline { + +Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + ((CacheSharedState*)_dependency->shared_state()) + ->data_queue.set_source_dependency(_shared_state->source_deps.front()); + const auto& scan_ranges = info.scan_ranges; + bool hit_cache = false; + if (scan_ranges.size() > 1) { + return Status::InternalError("CacheSourceOperator only support one scan range, plan error"); + } + + const auto& cache_param = _parent->cast<CacheSourceOperatorX>()._cache_param; + // 1. init the slot orders + const auto& tuple_descs = _parent->cast<CacheSourceOperatorX>().row_desc().tuple_descriptors(); + for (auto tuple_desc : tuple_descs) { + for (auto slot_desc : tuple_desc->slots()) { + if (cache_param.output_slot_mapping.find(slot_desc->id()) != + cache_param.output_slot_mapping.end()) { + _slot_orders.emplace_back(cache_param.output_slot_mapping.at(slot_desc->id())); + } else { + return Status::InternalError( + fmt::format("Cache can find the mapping slot id {}, node id {}", + slot_desc->id(), cache_param.node_id)); + } + } + } + + // 2. build cache key by digest_tablet_id + RETURN_IF_ERROR(QueryCache::build_cache_key(scan_ranges, cache_param, &_cache_key, &_version)); + _runtime_profile->add_info_string( + "CacheTabletId", std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id)); + + // 3. lookup the cache and find proper slot order + hit_cache = QueryCache::instance()->lookup(_cache_key, _version, &_query_cache_handle); + _runtime_profile->add_info_string("HitCache", hit_cache ? "1" : "0"); + if (hit_cache && !cache_param.force_refresh_query_cache) { + _hit_cache_results = _query_cache_handle.get_cache_result(); + auto hit_cache_slot_orders = _query_cache_handle.get_cache_slot_orders(); + + bool need_reorder = _slot_orders.size() != hit_cache_slot_orders->size(); + if (!need_reorder) { + for (int i = 0; i < _slot_orders.size(); ++i) { + need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i]; + } + } + + if (need_reorder) { + for (auto slot_id : _slot_orders) { + auto find_res = std::find(hit_cache_slot_orders->begin(), + hit_cache_slot_orders->end(), slot_id); + if (find_res != hit_cache_slot_orders->end()) { + _hit_cache_column_orders.emplace_back(find_res - + hit_cache_slot_orders->begin()); + } else { + return Status::InternalError(fmt::format( + "Cache can find the mapping slot id {}, node id {}, " + "hit_cache_column_orders [{}]", + slot_id, cache_param.node_id, fmt::join(*hit_cache_slot_orders, ","))); + } + } + } + } + + return Status::OK(); +} + +Status CacheSourceLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + + return Status::OK(); +} + +std::string CacheSourceLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); + if (_shared_state) { + fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", + _shared_state->data_queue.is_all_finish(), + _shared_state->data_queue.remaining_has_data()); + } + return fmt::to_string(debug_string_buffer); +} + +Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + + block->clear_column_data(_row_descriptor.num_materialized_slots()); + bool need_clone_empty = block->columns() == 0; + + if (local_state._hit_cache_results == nullptr) { + Defer insert_cache([&] { + if (*eos && local_state._need_insert_cache) { + local_state._runtime_profile->add_info_string("InsertCache", "1"); + local_state._global_cache->insert(local_state._cache_key, local_state._version, + local_state._local_cache_blocks, + local_state._slot_orders, + local_state._current_query_cache_bytes); + local_state._local_cache_blocks.clear(); + } + }); + + std::unique_ptr<vectorized::Block> output_block; + int child_idx = 0; + RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block, + &child_idx)); + // Here, check the value of `_has_data(state)` again after `data_queue.is_all_finish()` is TRUE + // as there may be one or more blocks when `data_queue.is_all_finish()` is TRUE. + *eos = !_has_data(state) && local_state._shared_state->data_queue.is_all_finish(); + + if (!output_block) { + return Status::OK(); + } + + if (local_state._need_insert_cache) { + if (need_clone_empty) { + *block = output_block->clone_empty(); + } + RETURN_IF_ERROR( + vectorized::MutableBlock::build_mutable_block(block).merge(*output_block)); + local_state._current_query_cache_rows += output_block->rows(); + auto mem_consume = output_block->allocated_bytes(); + local_state._current_query_cache_bytes += mem_consume; + local_state._mem_tracker->consume(mem_consume); + + if (_cache_param.entry_max_bytes < local_state._current_query_cache_bytes || + _cache_param.entry_max_rows < local_state._current_query_cache_rows) { + // over the max bytes, pass through the data, no need to do cache + local_state._local_cache_blocks.clear(); + local_state._need_insert_cache = false; + local_state._runtime_profile->add_info_string("InsertCache", "0"); + } else { + local_state._local_cache_blocks.emplace_back(std::move(output_block)); + } + } else { + *block = std::move(*output_block); + } + } else { + if (local_state._hit_cache_pos < local_state._hit_cache_results->size()) { + const auto& hit_cache_block = + local_state._hit_cache_results->at(local_state._hit_cache_pos++); + if (need_clone_empty) { + *block = hit_cache_block->clone_empty(); + } + RETURN_IF_ERROR( + vectorized::MutableBlock::build_mutable_block(block).merge(*hit_cache_block)); + if (!local_state._hit_cache_column_orders.empty()) { + auto datas = block->get_columns_with_type_and_name(); + block->clear(); + for (auto loc : local_state._hit_cache_column_orders) { + block->insert(datas[loc]); + } + } + } else { + *eos = true; + } + } + + local_state.reached_limit(block, eos); + return Status::OK(); +} + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/cache_source_operator.h b/be/src/pipeline/exec/cache_source_operator.h new file mode 100644 index 00000000000..e764323846b --- /dev/null +++ b/be/src/pipeline/exec/cache_source_operator.h @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <stdint.h> + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/query_cache/query_cache.h" + +namespace doris { +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized + +namespace pipeline { +class DataQueue; + +class CacheSourceOperatorX; +class CacheSourceLocalState final : public PipelineXLocalState<CacheSharedState> { +public: + ENABLE_FACTORY_CREATOR(CacheSourceLocalState); + using Base = PipelineXLocalState<CacheSharedState>; + using Parent = CacheSourceOperatorX; + CacheSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; + + [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; + +private: + friend class CacheSourceOperatorX; + friend class OperatorX<CacheSourceLocalState>; + + QueryCache* _global_cache = QueryCache::instance(); + + std::string _cache_key {}; + int64_t _version = 0; + std::vector<vectorized::BlockUPtr> _local_cache_blocks; + std::vector<int> _slot_orders; + size_t _current_query_cache_bytes = 0; + size_t _current_query_cache_rows = 0; + bool _need_insert_cache = true; + + QueryCacheHandle _query_cache_handle; + std::vector<vectorized::BlockUPtr>* _hit_cache_results = nullptr; + std::vector<int> _hit_cache_column_orders; + int _hit_cache_pos = 0; +}; + +class CacheSourceOperatorX final : public OperatorX<CacheSourceLocalState> { +public: + using Base = OperatorX<CacheSourceLocalState>; + CacheSourceOperatorX(ObjectPool* pool, int plan_node_id, int operator_id, + const TQueryCacheParam& cache_param) + : Base(pool, plan_node_id, operator_id), _cache_param(cache_param) { + _op_name = "CACHE_SOURCE_OPERATOR"; + }; + ~CacheSourceOperatorX() override = default; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; + + bool is_source() const override { return true; } + + Status open(RuntimeState* state) override { + static_cast<void>(Base::open(state)); + return Status::OK(); + } + + const RowDescriptor& intermediate_row_desc() const override { + return _child->intermediate_row_desc(); + } + RowDescriptor& row_descriptor() override { return _child->row_descriptor(); } + const RowDescriptor& row_desc() const override { return _child->row_desc(); } + +private: + TQueryCacheParam _cache_param; + bool _has_data(RuntimeState* state) const { + auto& local_state = get_local_state(state); + return local_state._shared_state->data_queue.remaining_has_data(); + } + friend class CacheSourceLocalState; +}; + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index aa6f0ed49f0..09e999d4737 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -29,15 +29,13 @@ #include "olap/tablet_manager.h" #include "pipeline/common/runtime_filter_consumer.h" #include "pipeline/exec/scan_operator.h" +#include "pipeline/query_cache/query_cache.h" #include "service/backend_options.h" #include "util/to_string.h" #include "vec/exec/scan/new_olap_scanner.h" -#include "vec/exec/scan/vscan_node.h" -#include "vec/exprs/vcompound_pred.h" #include "vec/exprs/vectorized_fn_call.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" -#include "vec/exprs/vin_predicate.h" #include "vec/exprs/vslot_ref.h" #include "vec/functions/in.h" @@ -147,7 +145,6 @@ Status OlapScanLocalState::_init_profile() { ADD_COUNTER(_segment_profile, "InvertedIndexDowngradeCount", TUnit::UNIT); _output_index_result_column_timer = ADD_TIMER(_segment_profile, "OutputIndexResultColumnTimer"); - _filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT); _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT); _tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT); @@ -395,10 +392,25 @@ TOlapScanNode& OlapScanLocalState::olap_scan_node() const { void OlapScanLocalState::set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) { - for (auto& scan_range : scan_ranges) { - DCHECK(scan_range.scan_range.__isset.palo_scan_range); - _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); - COUNTER_UPDATE(_tablet_counter, 1); + const auto& cache_param = _parent->cast<OlapScanOperatorX>()._cache_param; + bool hit_cache = false; + if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache) { + std::string cache_key; + int64_t version = 0; + auto status = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version); + if (!status.ok()) { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, status.msg()); + } + doris::QueryCacheHandle handle; + hit_cache = QueryCache::instance()->lookup(cache_key, version, &handle); + } + + if (!hit_cache) { + for (auto& scan_range : scan_ranges) { + DCHECK(scan_range.scan_range.__isset.palo_scan_range); + _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); + COUNTER_UPDATE(_tablet_counter, 1); + } } } @@ -572,9 +584,11 @@ void OlapScanLocalState::add_filter_info(int id, const PredicateFilterInfo& upda } OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs, int parallel_tasks) + const DescriptorTbl& descs, int parallel_tasks, + const TQueryCacheParam& param) : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), - _olap_scan_node(tnode.olap_scan_node) { + _olap_scan_node(tnode.olap_scan_node), + _cache_param(param) { _output_tuple_id = tnode.olap_scan_node.tuple_id; if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) { _limit_per_scanner = _olap_scan_node.sort_limit; diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 6a03a46e65e..4465ce5690e 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -198,11 +198,13 @@ private: class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> { public: OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs, int parallel_tasks); + const DescriptorTbl& descs, int parallel_tasks, + const TQueryCacheParam& cache_param); private: friend class OlapScanLocalState; TOlapScanNode _olap_scan_node; + TQueryCacheParam _cache_param; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index d65769254b9..4a93bac67fe 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -17,7 +17,6 @@ #include "operator.h" -#include "common/logging.h" #include "common/status.h" #include "pipeline/dependency.h" #include "pipeline/exec/aggregation_sink_operator.h" @@ -25,6 +24,8 @@ #include "pipeline/exec/analytic_sink_operator.h" #include "pipeline/exec/analytic_source_operator.h" #include "pipeline/exec/assert_num_rows_operator.h" +#include "pipeline/exec/cache_sink_operator.h" +#include "pipeline/exec/cache_source_operator.h" #include "pipeline/exec/datagen_operator.h" #include "pipeline/exec/distinct_streaming_aggregation_operator.h" #include "pipeline/exec/empty_set_operator.h" @@ -694,6 +695,7 @@ DECLARE_OPERATOR(SetSinkLocalState<true>) DECLARE_OPERATOR(SetSinkLocalState<false>) DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) +DECLARE_OPERATOR(CacheSinkLocalState) #undef DECLARE_OPERATOR @@ -725,6 +727,7 @@ DECLARE_OPERATOR(SchemaScanLocalState) DECLARE_OPERATOR(MetaScanLocalState) DECLARE_OPERATOR(LocalExchangeSourceLocalState) DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) +DECLARE_OPERATOR(CacheSourceLocalState) #undef DECLARE_OPERATOR @@ -754,6 +757,7 @@ template class PipelineXSinkLocalState<MultiCastSharedState>; template class PipelineXSinkLocalState<SetSharedState>; template class PipelineXSinkLocalState<LocalExchangeSharedState>; template class PipelineXSinkLocalState<BasicSharedState>; +template class PipelineXSinkLocalState<CacheSharedState>; template class PipelineXLocalState<HashJoinSharedState>; template class PipelineXLocalState<PartitionedHashJoinSharedState>; @@ -765,6 +769,7 @@ template class PipelineXLocalState<AggSharedState>; template class PipelineXLocalState<PartitionedAggSharedState>; template class PipelineXLocalState<FakeSharedState>; template class PipelineXLocalState<UnionSharedState>; +template class PipelineXLocalState<CacheSharedState>; template class PipelineXLocalState<MultiCastSharedState>; template class PipelineXLocalState<PartitionSortNodeSharedState>; template class PipelineXLocalState<SetSharedState>; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b1ee5933d27..7f3fa348237 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -43,6 +43,8 @@ #include "pipeline/exec/analytic_sink_operator.h" #include "pipeline/exec/analytic_source_operator.h" #include "pipeline/exec/assert_num_rows_operator.h" +#include "pipeline/exec/cache_sink_operator.h" +#include "pipeline/exec/cache_source_operator.h" #include "pipeline/exec/datagen_operator.h" #include "pipeline/exec/distinct_streaming_aggregation_operator.h" #include "pipeline/exec/empty_set_operator.h" @@ -1210,10 +1212,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); std::stringstream error_msg; + bool enable_query_cache = request.fragment.__isset.query_cache_param; switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { - op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); + op.reset(new OlapScanOperatorX( + pool, tnode, next_operator_id(), descs, _num_instances, + enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {})); RETURN_IF_ERROR(cur_pipe->add_operator(op)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); @@ -1286,6 +1291,26 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo ": group by and output is empty"); } + auto create_query_cache_operator = [&](PipelinePtr& new_pipe) { + auto cache_node_id = request.local_params[0].per_node_scan_ranges.begin()->first; + auto cache_source_id = next_operator_id(); + op.reset(new CacheSourceOperatorX(pool, cache_node_id, cache_source_id, + request.fragment.query_cache_param)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + new_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(new_pipe->id()); + + DataSinkOperatorPtr cache_sink( + new CacheSinkOperatorX(next_sink_operator_id(), cache_source_id)); + cache_sink->set_dests_id({op->operator_id()}); + RETURN_IF_ERROR(new_pipe->set_sink(cache_sink)); + return Status::OK(); + }; const bool group_by_limit_opt = tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0; @@ -1298,24 +1323,59 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { - op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, - _require_bucket_distribution)); - op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); - _require_bucket_distribution = - _require_bucket_distribution || op->require_data_distribution(); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (enable_query_cache) { + PipelinePtr new_pipe; + RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); + + op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, + _require_bucket_distribution)); + op->set_followed_by_shuffled_operator(false); + _require_bucket_distribution = true; + RETURN_IF_ERROR(new_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); + cur_pipe = new_pipe; + } else { + op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, + _require_bucket_distribution)); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + } } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && !tnode.agg_node.grouping_exprs.empty()) { - op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (enable_query_cache) { + PipelinePtr new_pipe; + RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); + + op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); + RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); + RETURN_IF_ERROR(new_pipe->add_operator(op)); + cur_pipe = new_pipe; + } else { + op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + } } else { + // create new pipeline to add query cache operator + PipelinePtr new_pipe; + if (enable_query_cache) { + RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); + } + if (enable_spill) { op.reset(new PartitionedAggSourceOperatorX(pool, tnode, next_operator_id(), descs)); } else { op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs)); } - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (enable_query_cache) { + RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); + RETURN_IF_ERROR(new_pipe->add_operator(op)); + cur_pipe = new_pipe; + } else { + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + } const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { diff --git a/be/src/pipeline/query_cache/query_cache.cpp b/be/src/pipeline/query_cache/query_cache.cpp new file mode 100644 index 00000000000..20e342e140f --- /dev/null +++ b/be/src/pipeline/query_cache/query_cache.cpp @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "query_cache.h" + +namespace doris { + +std::vector<int>* QueryCacheHandle::get_cache_slot_orders() { + DCHECK(_handle); + auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value; + return &((QueryCache::CacheValue*)(result_ptr))->slot_orders; +} + +CacheResult* QueryCacheHandle::get_cache_result() { + DCHECK(_handle); + auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value; + return &((QueryCache::CacheValue*)(result_ptr))->result; +} + +int64_t QueryCacheHandle::get_cache_version() { + DCHECK(_handle); + auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value; + return ((QueryCache::CacheValue*)(result_ptr))->version; +} + +void QueryCache::insert(const CacheKey& key, int64_t version, CacheResult& res, + const std::vector<int>& slot_orders, int64_t cache_size) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->query_cache_mem_tracker()); + CacheResult cache_result; + for (auto& block_data : res) { + cache_result.emplace_back(vectorized::Block::create_unique()) + ->swap(block_data->clone_empty()); + (void)vectorized::MutableBlock(cache_result.back().get()).merge(*block_data); + } + auto cache_value_ptr = + std::make_unique<QueryCache::CacheValue>(version, std::move(cache_result), slot_orders); + + QueryCacheHandle(this, LRUCachePolicy::insert(key, (void*)cache_value_ptr.release(), cache_size, + cache_size, CachePriority::NORMAL)); +} + +bool QueryCache::lookup(const CacheKey& key, int64_t version, doris::QueryCacheHandle* handle) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->query_cache_mem_tracker()); + auto* lru_handle = LRUCachePolicy::lookup(key); + if (lru_handle) { + QueryCacheHandle tmp_handle(this, lru_handle); + if (tmp_handle.get_cache_version() == version) { + *handle = std::move(tmp_handle); + return true; + } + } + return false; +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/query_cache/query_cache.h b/be/src/pipeline/query_cache/query_cache.h new file mode 100644 index 00000000000..a905831b530 --- /dev/null +++ b/be/src/pipeline/query_cache/query_cache.h @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <butil/macros.h> +#include <glog/logging.h> +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <roaring/roaring.hh> +#include <string> + +#include "common/config.h" +#include "common/status.h" +#include "io/fs/file_system.h" +#include "io/fs/path.h" +#include "olap/lru_cache.h" +#include "runtime/exec_env.h" +#include "runtime/memory/lru_cache_policy.h" +#include "runtime/memory/mem_tracker.h" +#include "util/slice.h" +#include "util/time.h" + +namespace doris { + +using CacheResult = std::vector<vectorized::BlockUPtr>; +// A handle for mid-result from query lru cache. +// The handle will automatically release the cache entry when it is destroyed. +// So the caller need to make sure the handle is valid in lifecycle. +class QueryCacheHandle { +public: + QueryCacheHandle() = default; + QueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle) + : _cache(cache), _handle(handle) {} + + ~QueryCacheHandle() { + if (_handle != nullptr) { + CHECK(_cache != nullptr); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->query_cache_mem_tracker()); + _cache->release(_handle); + } + } + } + + QueryCacheHandle(QueryCacheHandle&& other) noexcept { + std::swap(_cache, other._cache); + std::swap(_handle, other._handle); + } + + QueryCacheHandle& operator=(QueryCacheHandle&& other) noexcept { + std::swap(_cache, other._cache); + std::swap(_handle, other._handle); + return *this; + } + + std::vector<int>* get_cache_slot_orders(); + + CacheResult* get_cache_result(); + + int64_t get_cache_version(); + +private: + LRUCachePolicy* _cache = nullptr; + Cache::Handle* _handle = nullptr; + + // Don't allow copy and assign + DISALLOW_COPY_AND_ASSIGN(QueryCacheHandle); +}; + +class QueryCache : public LRUCachePolicy { +public: + using LRUCachePolicy::insert; + + struct CacheValue : public LRUCacheValueBase { + int64_t version; + CacheResult result; + std::vector<int> slot_orders; + + CacheValue(int64_t v, CacheResult&& r, const std::vector<int>& so) + : LRUCacheValueBase(), version(v), result(std::move(r)), slot_orders(so) {} + }; + + // Create global instance of this class + static QueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) { + auto* res = new QueryCache(capacity, num_shards); + return res; + } + + static Status build_cache_key(const std::vector<TScanRangeParams>& scan_ranges, + const TQueryCacheParam& cache_param, std::string* cache_key, + int64_t* version) { + if (scan_ranges.size() > 1) { + return Status::InternalError( + "CacheSourceOperator only support one scan range, plan error"); + } + auto& scan_range = scan_ranges[0]; + DCHECK(scan_range.scan_range.__isset.palo_scan_range); + auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id; + + std::from_chars(scan_range.scan_range.palo_scan_range.version.data(), + scan_range.scan_range.palo_scan_range.version.data() + + scan_range.scan_range.palo_scan_range.version.size(), + *version); + + auto find_tablet = cache_param.tablet_to_range.find(tablet_id); + if (find_tablet == cache_param.tablet_to_range.end()) { + return Status::InternalError("Not find tablet in partition_to_tablets, plan error"); + } + + *cache_key = cache_param.digest + + std::string(reinterpret_cast<char*>(&tablet_id), sizeof(tablet_id)) + + find_tablet->second; + + return Status::OK(); + } + + // Return global instance. + // Client should call create_global_cache before. + static QueryCache* instance() { return ExecEnv::GetInstance()->get_query_cache(); } + + QueryCache() = delete; + + QueryCache(size_t capacity, uint32_t num_shards) + : LRUCachePolicy(CachePolicy::CacheType::QUERY_CACHE, capacity, LRUCacheType::SIZE, + 3600 * 24, num_shards) {} + + bool lookup(const CacheKey& key, int64_t version, QueryCacheHandle* handle); + + void insert(const CacheKey& key, int64_t version, CacheResult& result, + const std::vector<int>& solt_orders, int64_t cache_size); +}; +} // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index e77a1c7ae41..61cebad10b9 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -65,6 +65,7 @@ class InvertedIndexQueryCache; class TmpFileDirs; } // namespace segment_v2 +class QueryCache; class WorkloadSchedPolicyMgr; class BfdParser; class BrokerMgr; @@ -187,6 +188,9 @@ public: std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() { return _point_query_executor_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> query_cache_mem_tracker() { + return _query_cache_mem_tracker; + } std::shared_ptr<MemTrackerLimiter> block_compression_mem_tracker() { return _block_compression_mem_tracker; } @@ -305,6 +309,7 @@ public: segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() { return _inverted_index_query_cache; } + QueryCache* get_query_cache() { return _query_cache; } std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return _dummy_lru_cache; } pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() { @@ -366,6 +371,7 @@ private: // Tracking memory may be shared between multiple queries. std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker; std::shared_ptr<MemTrackerLimiter> _block_compression_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _query_cache_mem_tracker; // TODO, looking forward to more accurate tracking. std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker; @@ -437,6 +443,7 @@ private: CacheManager* _cache_manager = nullptr; segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; + QueryCache* _query_cache = nullptr; std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr; std::unique_ptr<io::FDCache> _file_cache_open_fd_cache; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index eb9fa12ea4b..c91e623b990 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -56,6 +56,7 @@ #include "olap/tablet_schema_cache.h" #include "olap/wal/wal_manager.h" #include "pipeline/pipeline_tracing.h" +#include "pipeline/query_cache/query_cache.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/broker_mgr.h" @@ -584,6 +585,9 @@ Status ExecEnv::_init_mem_env() { _orc_memory_pool = new doris::vectorized::ORCMemoryPool(); _arrow_memory_pool = new doris::vectorized::ArrowMemoryPool(); + _query_cache = QueryCache::create_global_cache(config::query_cache_size * 1024L * 1024L); + LOG(INFO) << "query cache memory limit: " << config::query_cache_size << "MB"; + return Status::OK(); } @@ -600,7 +604,9 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); _point_query_executor_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); - _block_compression_mem_tracker = + _query_cache_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "QueryCache"); + _block_compression_mem_tracker = _block_compression_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression"); _rowid_storage_reader_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); @@ -690,6 +696,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_schema_cache); SAFE_DELETE(_segment_loader); SAFE_DELETE(_row_cache); + SAFE_DELETE(_query_cache); // Free resource after threads are stopped. // Some threads are still running, like threads created by _new_load_stream_mgr ... diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index c43ca0b2fb7..5241efb9c29 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -48,6 +48,7 @@ public: CLOUD_TXN_DELETE_BITMAP_CACHE = 17, NONE = 18, // not be used FOR_UT_CACHE_NUMBER = 19, + QUERY_CACHE = 20 }; static std::string type_string(CacheType type) { @@ -90,6 +91,8 @@ public: return "CloudTxnDeleteBitmapCache"; case CacheType::FOR_UT_CACHE_NUMBER: return "ForUTCacheNumber"; + case CacheType::QUERY_CACHE: + return "QUERY_CACHE"; default: LOG(FATAL) << "not match type of cache policy :" << static_cast<int>(type); } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 69231804336..95cedf4c362 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -45,12 +45,10 @@ #include "util/runtime_profile.h" #include "util/simd/bits.h" #include "util/slice.h" -#include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" -#include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_nullable.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org