This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fad4a207aa [refactor](pipeline) Refactor non-pipeline code structure 
(#35900)
1fad4a207aa is described below

commit 1fad4a207aa7d608dbd6ef51b3d551ebe3538e7e
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Jun 5 19:48:17 2024 +0800

    [refactor](pipeline) Refactor non-pipeline code structure (#35900)
---
 be/src/pipeline/common/agg_utils.h                 | 340 +++++++++++++++++++
 .../data_gen_functions/vdata_gen_function_inf.h    |   4 +-
 .../common}/data_gen_functions/vnumbers_tvf.cpp    |  14 +-
 .../common}/data_gen_functions/vnumbers_tvf.h      |   6 +-
 be/src/pipeline/common/join_utils.h                |  68 ++++
 .../common}/runtime_filter_consumer.cpp            |  10 +-
 .../common}/runtime_filter_consumer.h              |   9 +-
 be/src/pipeline/dependency.cpp                     |  20 +-
 be/src/pipeline/dependency.h                       |  85 ++---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  38 +--
 be/src/pipeline/exec/aggregation_sink_operator.h   |   4 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |   1 +
 be/src/pipeline/exec/analytic_sink_operator.cpp    |   1 +
 be/src/pipeline/exec/analytic_source_operator.cpp  |   1 +
 be/src/pipeline/exec/datagen_operator.cpp          |   7 +-
 be/src/pipeline/exec/datagen_operator.h            |   4 +-
 .../distinct_streaming_aggregation_operator.cpp    |   3 +-
 .../exec/distinct_streaming_aggregation_operator.h |   2 +-
 be/src/pipeline/exec/es_scan_operator.cpp          |  14 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  29 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  60 +++-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |   5 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  24 +-
 .../exec/join/cross_join_impl.cpp                  |   2 +-
 .../exec/join/full_outer_join_impl.cpp             |   2 +-
 .../exec/join/inner_join_impl.cpp                  |   2 +-
 be/src/{vec => pipeline}/exec/join/join_op.h       |  14 +-
 .../exec/join/left_anti_join_impl.cpp              |   2 +-
 .../exec/join/left_outer_join_impl.cpp             |   2 +-
 .../exec/join/left_semi_join_impl.cpp              |   2 +-
 .../exec/join/null_aware_left_anti_join_impl.cpp   |   2 +-
 .../exec/join/null_aware_left_semi_join_impl.cpp   |   2 +-
 .../exec/join/process_hash_table_probe.h           |  56 ++--
 .../exec/join/process_hash_table_probe_impl.h      | 192 +++++------
 .../exec/join/right_anti_join_impl.cpp             |   2 +-
 .../exec/join/right_outer_join_impl.cpp            |   2 +-
 .../exec/join/right_semi_join_impl.cpp             |   2 +-
 .../exec/multi_cast_data_stream_source.cpp         |   7 +-
 .../pipeline/exec/multi_cast_data_stream_source.h  |   5 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        |   2 +-
 .../exec/partitioned_aggregation_sink_operator.h   |   1 +
 be/src/pipeline/exec/scan_operator.cpp             |   2 +-
 be/src/pipeline/exec/scan_operator.h               |  11 +-
 be/src/pipeline/exec/set_source_operator.cpp       |   2 +-
 be/src/pipeline/exec/set_source_operator.h         |   2 +-
 .../exec/streaming_aggregation_operator.cpp        |  16 +-
 .../pipeline/exec/streaming_aggregation_operator.h |   4 +-
 be/src/runtime/fragment_mgr.cpp                    |  20 --
 be/src/runtime/query_context.cpp                   |  21 +-
 be/src/runtime/query_context.h                     |   2 +-
 be/src/vec/common/hash_table/hash_map_context.h    |  12 +-
 .../vec/common/hash_table/hash_table_set_build.h   |   1 +
 be/src/vec/exec/join/vacquire_list.hpp             |  54 ---
 be/src/vec/exec/join/vhash_join_node.h             | 180 ----------
 be/src/vec/exec/scan/vscan_node.h                  |  52 ---
 be/src/vec/exec/vaggregation_node.h                | 366 ---------------------
 56 files changed, 791 insertions(+), 1002 deletions(-)

diff --git a/be/src/pipeline/common/agg_utils.h 
b/be/src/pipeline/common/agg_utils.h
new file mode 100644
index 00000000000..e0435954b8b
--- /dev/null
+++ b/be/src/pipeline/common/agg_utils.h
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <variant>
+#include <vector>
+
+#include "vec/common/arena.h"
+#include "vec/common/hash_table/hash_map_context.h"
+#include "vec/common/hash_table/hash_map_context_creator.h"
+#include "vec/common/hash_table/hash_map_util.h"
+#include "vec/common/hash_table/ph_hash_map.h"
+#include "vec/common/hash_table/string_hash_map.h"
+
+namespace doris {
+namespace pipeline {
+
+using AggregatedDataWithoutKey = vectorized::AggregateDataPtr;
+using AggregatedDataWithStringKey = PHHashMap<StringRef, 
vectorized::AggregateDataPtr>;
+using AggregatedDataWithShortStringKey = 
StringHashMap<vectorized::AggregateDataPtr>;
+using AggregatedDataWithUInt8Key = PHHashMap<vectorized::UInt8, 
vectorized::AggregateDataPtr>;
+using AggregatedDataWithUInt16Key = PHHashMap<vectorized::UInt16, 
vectorized::AggregateDataPtr>;
+using AggregatedDataWithUInt32Key =
+        PHHashMap<vectorized::UInt32, vectorized::AggregateDataPtr, 
HashCRC32<vectorized::UInt32>>;
+using AggregatedDataWithUInt64Key =
+        PHHashMap<vectorized::UInt64, vectorized::AggregateDataPtr, 
HashCRC32<vectorized::UInt64>>;
+using AggregatedDataWithUInt128Key = PHHashMap<vectorized::UInt128, 
vectorized::AggregateDataPtr,
+                                               HashCRC32<vectorized::UInt128>>;
+using AggregatedDataWithUInt256Key = PHHashMap<vectorized::UInt256, 
vectorized::AggregateDataPtr,
+                                               HashCRC32<vectorized::UInt256>>;
+using AggregatedDataWithUInt136Key = PHHashMap<vectorized::UInt136, 
vectorized::AggregateDataPtr,
+                                               HashCRC32<vectorized::UInt136>>;
+
+using AggregatedDataWithUInt32KeyPhase2 =
+        PHHashMap<vectorized::UInt32, vectorized::AggregateDataPtr,
+                  HashMixWrapper<vectorized::UInt32>>;
+using AggregatedDataWithUInt64KeyPhase2 =
+        PHHashMap<vectorized::UInt64, vectorized::AggregateDataPtr,
+                  HashMixWrapper<vectorized::UInt64>>;
+using AggregatedDataWithUInt128KeyPhase2 =
+        PHHashMap<vectorized::UInt128, vectorized::AggregateDataPtr,
+                  HashMixWrapper<vectorized::UInt128>>;
+using AggregatedDataWithUInt256KeyPhase2 =
+        PHHashMap<vectorized::UInt256, vectorized::AggregateDataPtr,
+                  HashMixWrapper<vectorized::UInt256>>;
+
+using AggregatedDataWithUInt136KeyPhase2 =
+        PHHashMap<vectorized::UInt136, vectorized::AggregateDataPtr,
+                  HashMixWrapper<vectorized::UInt136>>;
+
+using AggregatedDataWithNullableUInt8Key = 
vectorized::DataWithNullKey<AggregatedDataWithUInt8Key>;
+using AggregatedDataWithNullableUInt16Key =
+        vectorized::DataWithNullKey<AggregatedDataWithUInt16Key>;
+using AggregatedDataWithNullableUInt32Key =
+        vectorized::DataWithNullKey<AggregatedDataWithUInt32Key>;
+using AggregatedDataWithNullableUInt64Key =
+        vectorized::DataWithNullKey<AggregatedDataWithUInt64Key>;
+using AggregatedDataWithNullableUInt32KeyPhase2 =
+        vectorized::DataWithNullKey<AggregatedDataWithUInt32KeyPhase2>;
+using AggregatedDataWithNullableUInt64KeyPhase2 =
+        vectorized::DataWithNullKey<AggregatedDataWithUInt64KeyPhase2>;
+using AggregatedDataWithNullableShortStringKey =
+        vectorized::DataWithNullKey<AggregatedDataWithShortStringKey>;
+using AggregatedDataWithNullableUInt128Key =
+        vectorized::DataWithNullKey<AggregatedDataWithUInt128Key>;
+using AggregatedDataWithNullableUInt128KeyPhase2 =
+        vectorized::DataWithNullKey<AggregatedDataWithUInt128KeyPhase2>;
+
+using AggregatedMethodVariants = std::variant<
+        std::monostate, 
vectorized::MethodSerialized<AggregatedDataWithStringKey>,
+        vectorized::MethodOneNumber<vectorized::UInt8, 
AggregatedDataWithUInt8Key>,
+        vectorized::MethodOneNumber<vectorized::UInt16, 
AggregatedDataWithUInt16Key>,
+        vectorized::MethodOneNumber<vectorized::UInt32, 
AggregatedDataWithUInt32Key>,
+        vectorized::MethodOneNumber<vectorized::UInt64, 
AggregatedDataWithUInt64Key>,
+        vectorized::MethodStringNoCache<AggregatedDataWithShortStringKey>,
+        vectorized::MethodOneNumber<vectorized::UInt128, 
AggregatedDataWithUInt128Key>,
+        vectorized::MethodOneNumber<vectorized::UInt32, 
AggregatedDataWithUInt32KeyPhase2>,
+        vectorized::MethodOneNumber<vectorized::UInt64, 
AggregatedDataWithUInt64KeyPhase2>,
+        vectorized::MethodOneNumber<vectorized::UInt128, 
AggregatedDataWithUInt128KeyPhase2>,
+        vectorized::MethodSingleNullableColumn<
+                vectorized::MethodOneNumber<vectorized::UInt8, 
AggregatedDataWithNullableUInt8Key>>,
+        vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+                vectorized::UInt16, AggregatedDataWithNullableUInt16Key>>,
+        vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+                vectorized::UInt32, AggregatedDataWithNullableUInt32Key>>,
+        vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+                vectorized::UInt64, AggregatedDataWithNullableUInt64Key>>,
+        vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+                vectorized::UInt32, 
AggregatedDataWithNullableUInt32KeyPhase2>>,
+        vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+                vectorized::UInt64, 
AggregatedDataWithNullableUInt64KeyPhase2>>,
+        vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+                vectorized::UInt128, AggregatedDataWithNullableUInt128Key>>,
+        vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+                vectorized::UInt128, 
AggregatedDataWithNullableUInt128KeyPhase2>>,
+        vectorized::MethodSingleNullableColumn<
+                
vectorized::MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt64Key, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt64Key, true>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt128Key, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt128Key, true>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt256Key, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt256Key, true>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt136Key, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt136Key, true>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, false>,
+        vectorized::MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, true>>;
+
+struct AggregatedDataVariants
+        : public vectorized::DataVariants<AggregatedMethodVariants,
+                                          
vectorized::MethodSingleNullableColumn,
+                                          vectorized::MethodOneNumber, 
vectorized::MethodKeysFixed,
+                                          vectorized::DataWithNullKey> {
+    AggregatedDataWithoutKey without_key = nullptr;
+
+    template <bool nullable>
+    void init(Type type) {
+        _type = type;
+        switch (_type) {
+        case Type::without_key:
+            break;
+        case Type::serialized:
+            
method_variant.emplace<vectorized::MethodSerialized<AggregatedDataWithStringKey>>();
+            break;
+        case Type::int8_key:
+            emplace_single<vectorized::UInt8, AggregatedDataWithUInt8Key, 
nullable>();
+            break;
+        case Type::int16_key:
+            emplace_single<vectorized::UInt16, AggregatedDataWithUInt16Key, 
nullable>();
+            break;
+        case Type::int32_key:
+            emplace_single<vectorized::UInt32, AggregatedDataWithUInt32Key, 
nullable>();
+            break;
+        case Type::int32_key_phase2:
+            emplace_single<vectorized::UInt32, 
AggregatedDataWithUInt32KeyPhase2, nullable>();
+            break;
+        case Type::int64_key:
+            emplace_single<vectorized::UInt64, AggregatedDataWithUInt64Key, 
nullable>();
+            break;
+        case Type::int64_key_phase2:
+            emplace_single<vectorized::UInt64, 
AggregatedDataWithUInt64KeyPhase2, nullable>();
+            break;
+        case Type::int128_key:
+            emplace_single<vectorized::UInt128, AggregatedDataWithUInt128Key, 
nullable>();
+            break;
+        case Type::int128_key_phase2:
+            emplace_single<vectorized::UInt128, 
AggregatedDataWithUInt128KeyPhase2, nullable>();
+            break;
+        case Type::string_key:
+            if (nullable) {
+                method_variant.emplace<
+                        
vectorized::MethodSingleNullableColumn<vectorized::MethodStringNoCache<
+                                AggregatedDataWithNullableShortStringKey>>>();
+            } else {
+                method_variant.emplace<
+                        
vectorized::MethodStringNoCache<AggregatedDataWithShortStringKey>>();
+            }
+            break;
+        default:
+            throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type, 
type={}", type);
+        }
+    }
+
+    void init(Type type, bool is_nullable = false) {
+        if (is_nullable) {
+            init<true>(type);
+        } else {
+            init<false>(type);
+        }
+    }
+};
+
+using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>;
+using ArenaUPtr = std::unique_ptr<vectorized::Arena>;
+
+struct AggregateDataContainer {
+public:
+    AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states)
+            : _size_of_key(size_of_key), 
_size_of_aggregate_states(size_of_aggregate_states) {}
+
+    int64_t memory_usage() const { return _arena_pool.size(); }
+
+    template <typename KeyType>
+    vectorized::AggregateDataPtr append_data(const KeyType& key) {
+        DCHECK_EQ(sizeof(KeyType), _size_of_key);
+        // SUB_CONTAINER_CAPACITY should add a new sub container, and also 
expand when it is zero
+        if (UNLIKELY(_index_in_sub_container % SUB_CONTAINER_CAPACITY == 0)) {
+            _expand();
+        }
+
+        *reinterpret_cast<KeyType*>(_current_keys) = key;
+        auto aggregate_data = _current_agg_data;
+        ++_total_count;
+        ++_index_in_sub_container;
+        _current_agg_data += _size_of_aggregate_states;
+        _current_keys += _size_of_key;
+        return aggregate_data;
+    }
+
+    template <typename Derived, bool IsConst>
+    class IteratorBase {
+        using Container =
+                std::conditional_t<IsConst, const AggregateDataContainer, 
AggregateDataContainer>;
+
+        Container* container = nullptr;
+        uint32_t index;
+        uint32_t sub_container_index;
+        uint32_t index_in_sub_container;
+
+        friend class HashTable;
+
+    public:
+        IteratorBase() = default;
+        IteratorBase(Container* container_, uint32_t index_)
+                : container(container_), index(index_) {
+            sub_container_index = index / SUB_CONTAINER_CAPACITY;
+            index_in_sub_container = index - sub_container_index * 
SUB_CONTAINER_CAPACITY;
+        }
+
+        bool operator==(const IteratorBase& rhs) const { return index == 
rhs.index; }
+        bool operator!=(const IteratorBase& rhs) const { return index != 
rhs.index; }
+
+        Derived& operator++() {
+            index++;
+            index_in_sub_container++;
+            if (index_in_sub_container == SUB_CONTAINER_CAPACITY) {
+                index_in_sub_container = 0;
+                sub_container_index++;
+            }
+            return static_cast<Derived&>(*this);
+        }
+
+        template <typename KeyType>
+        KeyType get_key() {
+            DCHECK_EQ(sizeof(KeyType), container->_size_of_key);
+            return 
((KeyType*)(container->_key_containers[sub_container_index]))
+                    [index_in_sub_container];
+        }
+
+        vectorized::AggregateDataPtr get_aggregate_data() {
+            return &(container->_value_containers[sub_container_index]
+                                                 
[container->_size_of_aggregate_states *
+                                                  index_in_sub_container]);
+        }
+    };
+
+    class Iterator : public IteratorBase<Iterator, false> {
+    public:
+        using IteratorBase<Iterator, false>::IteratorBase;
+    };
+
+    class ConstIterator : public IteratorBase<ConstIterator, true> {
+    public:
+        using IteratorBase<ConstIterator, true>::IteratorBase;
+    };
+
+    ConstIterator begin() const { return ConstIterator(this, 0); }
+
+    ConstIterator cbegin() const { return begin(); }
+
+    Iterator begin() { return Iterator(this, 0); }
+
+    ConstIterator end() const { return ConstIterator(this, _total_count); }
+    ConstIterator cend() const { return end(); }
+    Iterator end() { return Iterator(this, _total_count); }
+
+    void init_once() {
+        if (_inited) {
+            return;
+        }
+        _inited = true;
+        iterator = begin();
+    }
+    Iterator iterator;
+
+private:
+    void _expand() {
+        _index_in_sub_container = 0;
+        _current_keys = nullptr;
+        _current_agg_data = nullptr;
+        try {
+            _current_keys = _arena_pool.alloc(_size_of_key * 
SUB_CONTAINER_CAPACITY);
+            _key_containers.emplace_back(_current_keys);
+
+            _current_agg_data = 
(vectorized::AggregateDataPtr)_arena_pool.alloc(
+                    _size_of_aggregate_states * SUB_CONTAINER_CAPACITY);
+            _value_containers.emplace_back(_current_agg_data);
+        } catch (...) {
+            if (_current_keys) {
+                _key_containers.pop_back();
+                _current_keys = nullptr;
+            }
+            if (_current_agg_data) {
+                _value_containers.pop_back();
+                _current_agg_data = nullptr;
+            }
+            throw;
+        }
+    }
+
+    static constexpr uint32_t SUB_CONTAINER_CAPACITY = 8192;
+    vectorized::Arena _arena_pool;
+    std::vector<char*> _key_containers;
+    std::vector<vectorized::AggregateDataPtr> _value_containers;
+    vectorized::AggregateDataPtr _current_agg_data = nullptr;
+    char* _current_keys = nullptr;
+    size_t _size_of_key {};
+    size_t _size_of_aggregate_states {};
+    uint32_t _index_in_sub_container {};
+    uint32_t _total_count {};
+    bool _inited = false;
+};
+
+} // namespace pipeline
+
+constexpr auto init_agg_hash_method =
+        init_hash_method<pipeline::AggregatedDataVariants, 
vectorized::AggregateDataPtr>;
+
+} // namespace doris
diff --git a/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h 
b/be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h
similarity index 97%
rename from be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h
rename to be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h
index 515be45ad13..bb9ffdb74b7 100644
--- a/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h
+++ b/be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h
@@ -29,7 +29,7 @@ class RuntimeState;
 class Status;
 class TScanRangeParams;
 
-namespace vectorized {
+namespace pipeline {
 
 class VDataGenFunctionInf {
 public:
@@ -51,6 +51,6 @@ protected:
     const TupleDescriptor* _tuple_desc = nullptr;
 };
 
-} // namespace vectorized
+} // namespace pipeline
 
 } // namespace doris
diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp 
b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp
similarity index 87%
rename from be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
rename to be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp
index d33d02aa953..3bbcb544abc 100644
--- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
+++ b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
+#include "pipeline/common/data_gen_functions/vnumbers_tvf.h"
 
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
@@ -35,7 +35,7 @@
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 VNumbersTVF::VNumbersTVF(TupleId tuple_id, const TupleDescriptor* tuple_desc)
         : VDataGenFunctionInf(tuple_id, tuple_desc) {}
@@ -59,7 +59,7 @@ Status VNumbersTVF::get_next(RuntimeState* state, 
vectorized::Block* block, bool
             *eos = true;
             continue;
         }
-        auto* column_res = assert_cast<ColumnInt64*>(columns[i].get()); 
//BIGINT
+        auto* column_res = 
assert_cast<vectorized::ColumnInt64*>(columns[i].get()); //BIGINT
         int64_t end_value = std::min((int64_t)(_next_number + batch_size), 
_total_numbers);
         if (_use_const) {
             column_res->insert_many_vals(_const_value, end_value - 
_next_number);
@@ -78,9 +78,9 @@ Status VNumbersTVF::get_next(RuntimeState* state, 
vectorized::Block* block, bool
     } else {
         size_t n_columns = 0;
         for (const auto* slot_desc : _tuple_desc->slots()) {
-            
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                slot_desc->get_data_type_ptr(),
-                                                slot_desc->col_name()));
+            
block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
+                                                            
slot_desc->get_data_type_ptr(),
+                                                            
slot_desc->col_name()));
         }
     }
     return Status::OK();
@@ -97,4 +97,4 @@ Status VNumbersTVF::set_scan_ranges(const 
std::vector<TScanRangeParams>& scan_ra
     return Status::OK();
 }
 
-} // namespace doris::vectorized
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h 
b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h
similarity index 93%
rename from be/src/vec/exec/data_gen_functions/vnumbers_tvf.h
rename to be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h
index 1968637fd36..bf8b117a378 100644
--- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h
+++ b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h
@@ -21,7 +21,7 @@
 #include <vector>
 
 #include "common/global_types.h"
-#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
+#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h"
 
 namespace doris {
 
@@ -30,7 +30,7 @@ class RuntimeState;
 class Status;
 class TScanRangeParams;
 
-namespace vectorized {
+namespace pipeline {
 class Block;
 
 class VNumbersTVF : public VDataGenFunctionInf {
@@ -51,6 +51,6 @@ private:
     int64_t _next_number = 0;
 };
 
-} // namespace vectorized
+} // namespace pipeline
 
 } // namespace doris
diff --git a/be/src/pipeline/common/join_utils.h 
b/be/src/pipeline/common/join_utils.h
new file mode 100644
index 00000000000..cd3374995f7
--- /dev/null
+++ b/be/src/pipeline/common/join_utils.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <variant>
+#include <vector>
+
+#include "vec/common/hash_table/hash_map_context_creator.h"
+#include "vec/common/hash_table/hash_map_util.h"
+
+namespace doris::pipeline {
+using JoinOpVariants =
+        std::variant<std::integral_constant<TJoinOp::type, 
TJoinOp::INNER_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::LEFT_SEMI_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::LEFT_ANTI_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::LEFT_OUTER_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::FULL_OUTER_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::RIGHT_OUTER_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::CROSS_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::RIGHT_SEMI_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::RIGHT_ANTI_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
+                     std::integral_constant<TJoinOp::type, 
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
+
+using I8HashTableContext = 
vectorized::PrimaryTypeHashTableContext<vectorized::UInt8>;
+using I16HashTableContext = 
vectorized::PrimaryTypeHashTableContext<vectorized::UInt16>;
+using I32HashTableContext = 
vectorized::PrimaryTypeHashTableContext<vectorized::UInt32>;
+using I64HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt64>;
+using I128HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt128>;
+using I256HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt256>;
+
+template <bool has_null>
+using I64FixedKeyHashTableContext = 
vectorized::FixedKeyHashTableContext<UInt64, has_null>;
+
+template <bool has_null>
+using I128FixedKeyHashTableContext = 
vectorized::FixedKeyHashTableContext<UInt128, has_null>;
+
+template <bool has_null>
+using I256FixedKeyHashTableContext = 
vectorized::FixedKeyHashTableContext<UInt256, has_null>;
+
+template <bool has_null>
+using I136FixedKeyHashTableContext = 
vectorized::FixedKeyHashTableContext<UInt136, has_null>;
+
+using HashTableVariants =
+        std::variant<std::monostate, vectorized::SerializedHashTableContext, 
I8HashTableContext,
+                     I16HashTableContext, I32HashTableContext, 
I64HashTableContext,
+                     I128HashTableContext, I256HashTableContext, 
I64FixedKeyHashTableContext<true>,
+                     I64FixedKeyHashTableContext<false>, 
I128FixedKeyHashTableContext<true>,
+                     I128FixedKeyHashTableContext<false>, 
I256FixedKeyHashTableContext<true>,
+                     I256FixedKeyHashTableContext<false>, 
I136FixedKeyHashTableContext<true>,
+                     I136FixedKeyHashTableContext<false>>;
+
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/pipeline/common/runtime_filter_consumer.cpp
similarity index 96%
rename from be/src/vec/exec/runtime_filter_consumer.cpp
rename to be/src/pipeline/common/runtime_filter_consumer.cpp
index 8f3a0e9fac1..0e9c2d0f304 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/pipeline/common/runtime_filter_consumer.cpp
@@ -15,16 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/exec/runtime_filter_consumer.h"
+#include "pipeline/common/runtime_filter_consumer.h"
 
 #include "pipeline/pipeline_task.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
                                              const 
std::vector<TRuntimeFilterDesc>& runtime_filters,
                                              const RowDescriptor& 
row_descriptor,
-                                             VExprContextSPtrs& conjuncts)
+                                             vectorized::VExprContextSPtrs& 
conjuncts)
         : _filter_id(filter_id),
           _runtime_filter_descs(runtime_filters),
           _row_descriptor_ref(row_descriptor),
@@ -148,7 +148,7 @@ Status RuntimeFilterConsumer::_append_rf_into_conjuncts(
     }
 
     for (const auto& expr : vexprs) {
-        VExprContextSPtr conjunct = VExprContext::create_shared(expr);
+        vectorized::VExprContextSPtr conjunct = 
vectorized::VExprContext::create_shared(expr);
         RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
         RETURN_IF_ERROR(conjunct->open(_state));
         _rf_vexpr_set.insert(expr);
@@ -202,4 +202,4 @@ void 
RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) {
     _acquire_runtime_filter_timer = ADD_TIMER(profile, 
"AcquireRuntimeFilterTime");
 }
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/pipeline/common/runtime_filter_consumer.h
similarity index 92%
rename from be/src/vec/exec/runtime_filter_consumer.h
rename to be/src/pipeline/common/runtime_filter_consumer.h
index 7f06edcbf09..9bee6053f6f 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/pipeline/common/runtime_filter_consumer.h
@@ -20,13 +20,14 @@
 #include "exprs/runtime_filter.h"
 #include "pipeline/dependency.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 class RuntimeFilterConsumer {
 public:
     RuntimeFilterConsumer(const int32_t filter_id,
                           const std::vector<TRuntimeFilterDesc>& 
runtime_filters,
-                          const RowDescriptor& row_descriptor, 
VExprContextSPtrs& conjuncts);
+                          const RowDescriptor& row_descriptor,
+                          vectorized::VExprContextSPtrs& conjuncts);
     ~RuntimeFilterConsumer() = default;
 
     Status init(RuntimeState* state, bool need_local_merge = false);
@@ -65,7 +66,7 @@ protected:
     // Set to true if the runtime filter is ready.
     std::vector<bool> _runtime_filter_ready_flag;
     std::mutex _rf_locks;
-    phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
+    phmap::flat_hash_set<vectorized::VExprSPtr> _rf_vexpr_set;
     RuntimeState* _state = nullptr;
 
 private:
@@ -85,4 +86,4 @@ private:
     RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
 };
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 5ce5e0a56a3..68c00af409d 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -21,11 +21,14 @@
 #include <mutex>
 
 #include "common/logging.h"
+#include "exprs/runtime_filter.h"
 #include "pipeline/exec/multi_cast_data_streamer.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vslot_ref.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -297,7 +300,7 @@ Status AggSharedState::reset_hash_table() {
                             RETURN_IF_ERROR(st);
                         }
 
-                        aggregate_data_container.reset(new 
vectorized::AggregateDataContainer(
+                        aggregate_data_container.reset(new 
AggregateDataContainer(
                                 sizeof(typename HashTableType::key_type),
                                 ((total_size_of_aggregate_states + 
align_aggregate_states - 1) /
                                  align_aggregate_states) *
@@ -376,4 +379,19 @@ MultiCastSharedState::MultiCastSharedState(const 
RowDescriptor& row_desc, Object
         : 
multi_cast_data_streamer(std::make_unique<pipeline::MultiCastDataStreamer>(
                   row_desc, pool, cast_sender_count, true)) {}
 
+int AggSharedState::get_slot_column_id(const vectorized::AggFnEvaluator* 
evaluator) {
+    auto ctxs = evaluator->input_exprs_ctxs();
+    CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref())
+            << "input_exprs_ctxs is invalid, input_exprs_ctx[0]="
+            << ctxs[0]->root()->debug_string();
+    return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id();
+}
+
+Status AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
+    for (int i = 0; i < aggregate_evaluators.size(); ++i) {
+        aggregate_evaluators[i]->function()->destroy(data + 
offsets_of_aggregate_states[i]);
+    }
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index b7b4258f53a..46335caade5 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -29,16 +29,20 @@
 #include "common/logging.h"
 #include "concurrentqueue.h"
 #include "gutil/integral_types.h"
+#include "pipeline/common/agg_utils.h"
+#include "pipeline/common/join_utils.h"
 #include "pipeline/exec/data_queue.h"
-#include "vec/common/hash_table/hash_map_context_creator.h"
+#include "pipeline/exec/join/process_hash_table_probe.h"
 #include "vec/common/sort/partition_sorter.h"
 #include "vec/common/sort/sorter.h"
 #include "vec/core/types.h"
-#include "vec/exec/join/process_hash_table_probe.h"
-#include "vec/exec/join/vhash_join_node.h"
-#include "vec/exec/vaggregation_node.h"
 #include "vec/spill/spill_stream.h"
 
+namespace doris::vectorized {
+class AggFnEvaluator;
+class VSlotRef;
+} // namespace doris::vectorized
+
 namespace doris::pipeline {
 
 class Dependency;
@@ -294,7 +298,7 @@ struct AggSharedState : public BasicSharedState {
     ENABLE_FACTORY_CREATOR(AggSharedState)
 public:
     AggSharedState() {
-        agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
+        agg_data = std::make_unique<AggregatedDataVariants>();
         agg_arena_pool = std::make_unique<vectorized::Arena>();
     }
     ~AggSharedState() override {
@@ -313,17 +317,11 @@ public:
     // We should call this function only at 1st phase.
     // 1st phase: is_merge=true, only have one SlotRef.
     // 2nd phase: is_merge=false, maybe have multiple exprs.
-    static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator) 
{
-        auto ctxs = evaluator->input_exprs_ctxs();
-        CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref())
-                << "input_exprs_ctxs is invalid, input_exprs_ctx[0]="
-                << ctxs[0]->root()->debug_string();
-        return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id();
-    }
+    static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator);
 
-    vectorized::AggregatedDataVariantsUPtr agg_data = nullptr;
-    std::unique_ptr<vectorized::AggregateDataContainer> 
aggregate_data_container;
-    vectorized::ArenaUPtr agg_arena_pool;
+    AggregatedDataVariantsUPtr agg_data = nullptr;
+    std::unique_ptr<AggregateDataContainer> aggregate_data_container;
+    ArenaUPtr agg_arena_pool;
     std::vector<vectorized::AggFnEvaluator*> aggregate_evaluators;
     // group by k1,k2
     vectorized::VExprContextSPtrs probe_expr_ctxs;
@@ -445,12 +443,7 @@ private:
             agg_data_created_without_key = false;
         }
     }
-    Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
-        for (int i = 0; i < aggregate_evaluators.size(); ++i) {
-            aggregate_evaluators[i]->function()->destroy(data + 
offsets_of_aggregate_states[i]);
-        }
-        return Status::OK();
-    }
+    Status _destroy_agg_status(vectorized::AggregateDataPtr data);
 };
 
 struct AggSpillPartition;
@@ -613,19 +606,6 @@ public:
     std::vector<int64_t> ordey_by_column_idxs;
 };
 
-using JoinOpVariants =
-        std::variant<std::integral_constant<TJoinOp::type, 
TJoinOp::INNER_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::LEFT_SEMI_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::LEFT_ANTI_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::LEFT_OUTER_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::FULL_OUTER_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::RIGHT_OUTER_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::CROSS_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::RIGHT_SEMI_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::RIGHT_ANTI_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
-                     std::integral_constant<TJoinOp::type, 
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
-
 struct JoinSharedState : public BasicSharedState {
     // For some join case, we can apply a short circuit strategy
     // 1. _has_null_in_build_side = true
@@ -646,8 +626,7 @@ struct HashJoinSharedState : public JoinSharedState {
     std::shared_ptr<vectorized::Arena> arena = 
std::make_shared<vectorized::Arena>();
 
     // maybe share hash table with other fragment instances
-    std::shared_ptr<vectorized::HashTableVariants> hash_table_variants =
-            std::make_shared<vectorized::HashTableVariants>();
+    std::shared_ptr<HashTableVariants> hash_table_variants = 
std::make_shared<HashTableVariants>();
     const std::vector<TupleDescriptor*> build_side_child_desc;
     size_t build_exprs_size = 0;
     std::shared_ptr<vectorized::Block> build_block;
@@ -696,23 +675,23 @@ public:
     ~AsyncWriterDependency() override = default;
 };
 
-using SetHashTableVariants = std::variant<
-        std::monostate,
-        vectorized::MethodSerialized<HashMap<StringRef, 
vectorized::RowRefListWithFlags>>,
-        vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt8>,
-        vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt16>,
-        vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt32>,
-        vectorized::SetPrimaryTypeHashTableContext<UInt64>,
-        vectorized::SetPrimaryTypeHashTableContext<UInt128>,
-        vectorized::SetPrimaryTypeHashTableContext<UInt256>,
-        vectorized::SetFixedKeyHashTableContext<UInt64, true>,
-        vectorized::SetFixedKeyHashTableContext<UInt64, false>,
-        vectorized::SetFixedKeyHashTableContext<UInt128, true>,
-        vectorized::SetFixedKeyHashTableContext<UInt128, false>,
-        vectorized::SetFixedKeyHashTableContext<UInt256, true>,
-        vectorized::SetFixedKeyHashTableContext<UInt256, false>,
-        vectorized::SetFixedKeyHashTableContext<UInt136, true>,
-        vectorized::SetFixedKeyHashTableContext<UInt136, false>>;
+using SetHashTableVariants =
+        std::variant<std::monostate,
+                     vectorized::MethodSerialized<HashMap<StringRef, 
RowRefListWithFlags>>,
+                     
vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt8>,
+                     
vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt16>,
+                     
vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt32>,
+                     vectorized::SetPrimaryTypeHashTableContext<UInt64>,
+                     vectorized::SetPrimaryTypeHashTableContext<UInt128>,
+                     vectorized::SetPrimaryTypeHashTableContext<UInt256>,
+                     vectorized::SetFixedKeyHashTableContext<UInt64, true>,
+                     vectorized::SetFixedKeyHashTableContext<UInt64, false>,
+                     vectorized::SetFixedKeyHashTableContext<UInt128, true>,
+                     vectorized::SetFixedKeyHashTableContext<UInt128, false>,
+                     vectorized::SetFixedKeyHashTableContext<UInt256, true>,
+                     vectorized::SetFixedKeyHashTableContext<UInt256, false>,
+                     vectorized::SetFixedKeyHashTableContext<UInt136, true>,
+                     vectorized::SetFixedKeyHashTableContext<UInt136, false>>;
 
 struct SetSharedState : public BasicSharedState {
     ENABLE_FACTORY_CREATOR(SetSharedState)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 9870571fa5e..41498fd94fa 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -24,6 +24,7 @@
 #include "pipeline/exec/operator.h"
 #include "runtime/primitive_type.h"
 #include "vec/common/hash_table/hash.h"
+#include "vec/exprs/vectorized_agg_fn.h"
 
 namespace doris::pipeline {
 
@@ -110,25 +111,24 @@ Status AggSinkLocalState::open(RuntimeState* state) {
     } else {
         
RETURN_IF_ERROR(_init_hash_method(Base::_shared_state->probe_expr_ctxs));
 
-        std::visit(vectorized::Overload {
-                           [&](std::monostate& arg) {
-                               throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
-                                                      "uninited hash table");
-                           },
-                           [&](auto& agg_method) {
-                               using HashTableType = 
std::decay_t<decltype(agg_method)>;
-                               using KeyType = typename HashTableType::Key;
-
-                               /// some aggregate functions (like AVG for 
decimal) have align issues.
-                               Base::_shared_state->aggregate_data_container =
-                                       
std::make_unique<vectorized::AggregateDataContainer>(
-
-                                               sizeof(KeyType),
-                                               
((p._total_size_of_aggregate_states +
-                                                 p._align_aggregate_states - 
1) /
-                                                p._align_aggregate_states) *
-                                                       
p._align_aggregate_states);
-                           }},
+        std::visit(vectorized::Overload {[&](std::monostate& arg) {
+                                             throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                                                    "uninited 
hash table");
+                                         },
+                                         [&](auto& agg_method) {
+                                             using HashTableType =
+                                                     
std::decay_t<decltype(agg_method)>;
+                                             using KeyType = typename 
HashTableType::Key;
+
+                                             /// some aggregate functions 
(like AVG for decimal) have align issues.
+                                             
Base::_shared_state->aggregate_data_container =
+                                                     
std::make_unique<AggregateDataContainer>(
+                                                             sizeof(KeyType),
+                                                             
((p._total_size_of_aggregate_states +
+                                                               
p._align_aggregate_states - 1) /
+                                                              
p._align_aggregate_states) *
+                                                                     
p._align_aggregate_states);
+                                         }},
                    _agg_data->method_variant);
         if (p._is_merge) {
             _executor = std::make_unique<Executor<false, true>>();
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 2964cba9a1d..add6712453f 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -119,7 +119,7 @@ protected:
 
     vectorized::Block _preagg_block = vectorized::Block();
 
-    vectorized::AggregatedDataVariants* _agg_data = nullptr;
+    AggregatedDataVariants* _agg_data = nullptr;
     vectorized::Arena* _agg_arena_pool = nullptr;
 
     std::unique_ptr<ExecutorBase> _executor = nullptr;
@@ -156,7 +156,7 @@ public:
     bool require_data_distribution() const override { return _is_colocate; }
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
-    vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) {
+    AggregatedDataVariants* get_agg_data(RuntimeState* state) {
         auto& local_state = get_local_state(state);
         return local_state._agg_data;
     }
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index cca9fefbdb2..5b371877f36 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -22,6 +22,7 @@
 
 #include "common/exception.h"
 #include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
 
 namespace doris::pipeline {
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 5358cffcff5..43859c7cebd 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -21,6 +21,7 @@
 #include <string>
 
 #include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
 
 namespace doris::pipeline {
 
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 6d30d02ff53..bc8f3279f92 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -21,6 +21,7 @@
 
 #include "pipeline/exec/operator.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/exprs/vectorized_agg_fn.h"
 
 namespace doris::pipeline {
 
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 95b284c94b4..39e35ed8836 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -19,10 +19,11 @@
 
 #include <memory>
 
+#include "exprs/runtime_filter.h"
+#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h"
+#include "pipeline/common/data_gen_functions/vnumbers_tvf.h"
 #include "pipeline/exec/operator.h"
 #include "util/runtime_profile.h"
-#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
-#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
 
 namespace doris {
 class RuntimeState;
@@ -76,7 +77,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block*
 Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
     auto& p = _parent->cast<DataGenSourceOperatorX>();
-    _table_func = std::make_shared<vectorized::VNumbersTVF>(p._tuple_id, 
p._tuple_desc);
+    _table_func = std::make_shared<VNumbersTVF>(p._tuple_id, p._tuple_desc);
     _table_func->set_tuple_desc(p._tuple_desc);
     RETURN_IF_ERROR(_table_func->set_scan_ranges(info.scan_ranges));
 
diff --git a/be/src/pipeline/exec/datagen_operator.h 
b/be/src/pipeline/exec/datagen_operator.h
index d4d649a6c06..8aeeea2a699 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -20,8 +20,8 @@
 #include <stdint.h>
 
 #include "common/status.h"
+#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h"
 #include "pipeline/exec/operator.h"
-#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
 
 namespace doris {
 class RuntimeState;
@@ -43,7 +43,7 @@ public:
 
 private:
     friend class DataGenSourceOperatorX;
-    std::shared_ptr<vectorized::VDataGenFunctionInf> _table_func;
+    std::shared_ptr<VDataGenFunctionInf> _table_func;
 };
 
 class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index f0cb738a303..73ce8ce5fb4 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -23,6 +23,7 @@
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "vec/exprs/vectorized_agg_fn.h"
 
 namespace doris {
 class ExecNode;
@@ -61,7 +62,7 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
           dummy_mapped_data(std::make_shared<char>('A')),
           batch_size(state->batch_size()),
           _agg_arena_pool(std::make_unique<vectorized::Arena>()),
-          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_data(std::make_unique<AggregatedDataVariants>()),
           _agg_profile_arena(std::make_unique<vectorized::Arena>()),
           _child_block(vectorized::Block::create_unique()),
           _aggregated_block(vectorized::Block::create_unique()) {}
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index e8166446678..d6ff5fde0c5 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -73,7 +73,7 @@ private:
     bool _stop_emplace_flag = false;
     const int batch_size;
     std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
-    vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
+    AggregatedDataVariantsUPtr _agg_data = nullptr;
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index aab16ff3bff..9fcfdd999d4 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -81,12 +81,11 @@ Status 
EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
         }
         properties[ESScanReader::KEY_SHARD] = 
std::to_string(es_scan_range->shard_id);
         properties[ESScanReader::KEY_BATCH_SIZE] =
-                
std::to_string(vectorized::RuntimeFilterConsumer::_state->batch_size());
+                std::to_string(RuntimeFilterConsumer::_state->batch_size());
         properties[ESScanReader::KEY_HOST_PORT] = 
get_host_and_port(es_scan_range->es_hosts);
         // push down limit to Elasticsearch
         // if predicate in _conjunct_ctxs can not be processed by 
Elasticsearch, we can not push down limit operator to Elasticsearch
-        if (p.limit() != -1 &&
-            p.limit() <= 
vectorized::RuntimeFilterConsumer::_state->batch_size()) {
+        if (p.limit() != -1 && p.limit() <= 
RuntimeFilterConsumer::_state->batch_size()) {
             properties[ESScanReader::KEY_TERMINATE_AFTER] = 
std::to_string(p.limit());
         }
 
@@ -95,12 +94,11 @@ Status 
EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
                 properties, p._column_names, p._docvalue_context, 
&doc_value_mode);
 
         std::shared_ptr<vectorized::NewEsScanner> scanner = 
vectorized::NewEsScanner::create_shared(
-                vectorized::RuntimeFilterConsumer::_state, this, 
p._limit_per_scanner, p._tuple_id,
-                properties, p._docvalue_context, doc_value_mode,
-                vectorized::RuntimeFilterConsumer::_state->runtime_profile());
+                RuntimeFilterConsumer::_state, this, p._limit_per_scanner, 
p._tuple_id, properties,
+                p._docvalue_context, doc_value_mode,
+                RuntimeFilterConsumer::_state->runtime_profile());
 
-        RETURN_IF_ERROR(
-                scanner->prepare(vectorized::RuntimeFilterConsumer::_state, 
Base::_conjuncts));
+        RETURN_IF_ERROR(scanner->prepare(RuntimeFilterConsumer::_state, 
Base::_conjuncts));
         scanners->push_back(scanner);
     }
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index c77761b01bf..3ea10110084 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -289,10 +289,8 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                         auto with_other_conjuncts) -> Status {
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         using JoinOpType = std::decay_t<decltype(join_op)>;
-                        vectorized::ProcessHashTableBuild<HashTableCtxType,
-                                                          
HashJoinBuildSinkLocalState>
-                                hash_table_build_process(rows, raw_ptrs, this, 
state->batch_size(),
-                                                         state);
+                        ProcessHashTableBuild<HashTableCtxType> 
hash_table_build_process(
+                                rows, raw_ptrs, this, state->batch_size(), 
state);
                         auto old_hash_table_size = 
arg.hash_table->get_byte_size();
                         auto old_key_size = arg.serialized_keys_size(true);
                         auto st = hash_table_build_process.template run<
@@ -338,26 +336,22 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
                     switch (_build_expr_ctxs[0]->root()->result_type()) {
                     case TYPE_BOOLEAN:
                     case TYPE_TINYINT:
-                        _shared_state->hash_table_variants
-                                ->emplace<vectorized::I8HashTableContext>();
+                        
_shared_state->hash_table_variants->emplace<I8HashTableContext>();
                         break;
                     case TYPE_SMALLINT:
-                        _shared_state->hash_table_variants
-                                ->emplace<vectorized::I16HashTableContext>();
+                        
_shared_state->hash_table_variants->emplace<I16HashTableContext>();
                         break;
                     case TYPE_INT:
                     case TYPE_FLOAT:
                     case TYPE_DATEV2:
-                        _shared_state->hash_table_variants
-                                ->emplace<vectorized::I32HashTableContext>();
+                        
_shared_state->hash_table_variants->emplace<I32HashTableContext>();
                         break;
                     case TYPE_BIGINT:
                     case TYPE_DOUBLE:
                     case TYPE_DATETIME:
                     case TYPE_DATE:
                     case TYPE_DATETIMEV2:
-                        _shared_state->hash_table_variants
-                                ->emplace<vectorized::I64HashTableContext>();
+                        
_shared_state->hash_table_variants->emplace<I64HashTableContext>();
                         break;
                     case TYPE_LARGEINT:
                     case TYPE_DECIMALV2:
@@ -375,14 +369,11 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
                                         : type_ptr->get_type_id();
                         vectorized::WhichDataType which(idx);
                         if (which.is_decimal32()) {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I32HashTableContext>();
+                            
_shared_state->hash_table_variants->emplace<I32HashTableContext>();
                         } else if (which.is_decimal64()) {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I64HashTableContext>();
+                            
_shared_state->hash_table_variants->emplace<I64HashTableContext>();
                         } else {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I128HashTableContext>();
+                            
_shared_state->hash_table_variants->emplace<I128HashTableContext>();
                         }
                         break;
                     }
@@ -606,7 +597,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                     }
                 },
                 *local_state._shared_state->hash_table_variants,
-                *std::static_pointer_cast<vectorized::HashTableVariants>(
+                *std::static_pointer_cast<HashTableVariants>(
                         _shared_hash_table_context->hash_table_variants));
 
         local_state._shared_state->build_block = 
_shared_hash_table_context->block;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index fb66c5222ab..d785c20ee7f 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -17,8 +17,7 @@
 
 #pragma once
 
-#include <stdint.h>
-
+#include "exprs/runtime_filter_slots.h"
 #include "join_build_sink_operator.h"
 #include "operator.h"
 
@@ -67,8 +66,8 @@ protected:
                                 const std::vector<int>& res_col_ids);
     friend class HashJoinBuildSinkOperatorX;
     friend class PartitionedHashJoinSinkLocalState;
-    template <class HashTableContext, typename Parent>
-    friend struct vectorized::ProcessHashTableBuild;
+    template <class HashTableContext>
+    friend struct ProcessHashTableBuild;
 
     // build expr
     vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -176,4 +175,57 @@ private:
     const bool _need_local_merge;
 };
 
+template <class HashTableContext>
+struct ProcessHashTableBuild {
+    ProcessHashTableBuild(int rows, vectorized::ColumnRawPtrs& build_raw_ptrs,
+                          HashJoinBuildSinkLocalState* parent, int batch_size, 
RuntimeState* state)
+            : _rows(rows),
+              _build_raw_ptrs(build_raw_ptrs),
+              _parent(parent),
+              _batch_size(batch_size),
+              _state(state) {}
+
+    template <int JoinOpType, bool ignore_null, bool short_circuit_for_null,
+              bool with_other_conjuncts>
+    Status run(HashTableContext& hash_table_ctx, vectorized::ConstNullMapPtr 
null_map,
+               bool* has_null_key) {
+        if (short_circuit_for_null || ignore_null) {
+            // first row is mocked and is null
+            for (uint32_t i = 1; i < _rows; i++) {
+                if ((*null_map)[i]) {
+                    *has_null_key = true;
+                }
+            }
+            if (short_circuit_for_null && *has_null_key) {
+                return Status::OK();
+            }
+        }
+
+        SCOPED_TIMER(_parent->_build_table_insert_timer);
+        hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, 
_batch_size,
+                                                                      
*has_null_key);
+
+        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
+                                            null_map ? null_map->data() : 
nullptr, true, true,
+                                            
hash_table_ctx.hash_table->get_bucket_size());
+        hash_table_ctx.hash_table->template build<JoinOpType, 
with_other_conjuncts>(
+                hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows);
+        hash_table_ctx.bucket_nums.resize(_batch_size);
+        hash_table_ctx.bucket_nums.shrink_to_fit();
+
+        COUNTER_SET(_parent->_hash_table_memory_usage,
+                    (int64_t)hash_table_ctx.hash_table->get_byte_size());
+        COUNTER_SET(_parent->_build_arena_memory_usage,
+                    (int64_t)hash_table_ctx.serialized_keys_size(true));
+        return Status::OK();
+    }
+
+private:
+    const uint32_t _rows;
+    vectorized::ColumnRawPtrs& _build_raw_ptrs;
+    HashJoinBuildSinkLocalState* _parent = nullptr;
+    int _batch_size;
+    RuntimeState* _state = nullptr;
+};
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 56630efba3a..002ead551f6 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -73,9 +73,8 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {
     std::visit(
             [&](auto&& join_op_variants, auto have_other_join_conjunct) {
                 using JoinOpType = std::decay_t<decltype(join_op_variants)>;
-                _process_hashtable_ctx_variants
-                        
->emplace<vectorized::ProcessHashTableProbe<JoinOpType::value>>(
-                                this, state->batch_size());
+                
_process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>(
+                        this, state->batch_size());
             },
             _shared_state->join_op_variants,
             vectorized::make_bool_variant(p._have_other_join_conjunct));
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 310cf52fec6..028b0583167 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -30,17 +30,17 @@ namespace pipeline {
 class HashJoinProbeLocalState;
 
 using HashTableCtxVariants =
-        std::variant<std::monostate, 
vectorized::ProcessHashTableProbe<TJoinOp::INNER_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>,
-                     vectorized::ProcessHashTableProbe<TJoinOp::CROSS_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
-                     
vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
+        std::variant<std::monostate, 
ProcessHashTableProbe<TJoinOp::INNER_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::CROSS_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
+                     ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
+                     
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
 
 class HashJoinProbeOperatorX;
 class HashJoinProbeLocalState final
@@ -85,7 +85,7 @@ private:
     Status _extract_join_column(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
     friend class HashJoinProbeOperatorX;
     template <int JoinOpType>
-    friend struct vectorized::ProcessHashTableProbe;
+    friend struct ProcessHashTableProbe;
 
     int _probe_index = -1;
     uint32_t _build_index = 0;
diff --git a/be/src/vec/exec/join/cross_join_impl.cpp 
b/be/src/pipeline/exec/join/cross_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/cross_join_impl.cpp
rename to be/src/pipeline/exec/join/cross_join_impl.cpp
index 74fbba20277..16c4bf7f583 100644
--- a/be/src/vec/exec/join/cross_join_impl.cpp
+++ b/be/src/pipeline/exec/join/cross_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::CROSS_JOIN);
 
diff --git a/be/src/vec/exec/join/full_outer_join_impl.cpp 
b/be/src/pipeline/exec/join/full_outer_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/full_outer_join_impl.cpp
rename to be/src/pipeline/exec/join/full_outer_join_impl.cpp
index 27868390970..0e1ddb58642 100644
--- a/be/src/vec/exec/join/full_outer_join_impl.cpp
+++ b/be/src/pipeline/exec/join/full_outer_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::FULL_OUTER_JOIN);
 
diff --git a/be/src/vec/exec/join/inner_join_impl.cpp 
b/be/src/pipeline/exec/join/inner_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/inner_join_impl.cpp
rename to be/src/pipeline/exec/join/inner_join_impl.cpp
index 6c1e0d399a5..21767ecafa0 100644
--- a/be/src/vec/exec/join/inner_join_impl.cpp
+++ b/be/src/pipeline/exec/join/inner_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::INNER_JOIN);
 
diff --git a/be/src/vec/exec/join/join_op.h 
b/be/src/pipeline/exec/join/join_op.h
similarity index 92%
rename from be/src/vec/exec/join/join_op.h
rename to be/src/pipeline/exec/join/join_op.h
index 62569270d9e..616753b72de 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/pipeline/exec/join/join_op.h
@@ -20,7 +20,7 @@
 #include "vec/common/columns_hashing.h"
 #include "vec/core/block.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 /**
  * Now we have different kinds of RowRef for join operation. Overall, RowRef 
is the base class and
  * the class inheritance is below:
@@ -72,7 +72,7 @@ struct Batch {
 
     bool full() const { return size == MAX_SIZE; }
 
-    Batch<RowRefType>* insert(RowRefType&& row_ref, Arena& pool) {
+    Batch<RowRefType>* insert(RowRefType&& row_ref, vectorized::Arena& pool) {
         if (full()) {
             auto batch = pool.alloc<Batch<RowRefType>>();
             *batch = Batch<RowRefType>(this);
@@ -132,7 +132,9 @@ struct RowRefList : RowRef {
     ForwardIterator<RowRefList> begin() { return 
ForwardIterator<RowRefList>(this); }
 
     /// insert element after current one
-    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(std::move(row_ref)); }
+    void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
+        next.emplace_back(std::move(row_ref));
+    }
 
     void clear() { next.clear(); }
 
@@ -152,7 +154,7 @@ struct RowRefListWithFlag : RowRef {
     }
 
     /// insert element after current one
-    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+    void insert(RowRefType&& row_ref, vectorized::Arena& pool) { 
next.emplace_back(row_ref); }
 
     void clear() { next.clear(); }
 
@@ -174,7 +176,7 @@ struct RowRefListWithFlags : RowRefWithFlag {
     }
 
     /// insert element after current one
-    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+    void insert(RowRefType&& row_ref, vectorized::Arena& pool) { 
next.emplace_back(row_ref); }
 
     void clear() { next.clear(); }
 
@@ -183,4 +185,4 @@ private:
     std::vector<RowRefType> next;
 };
 
-} // namespace doris::vectorized
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/join/left_anti_join_impl.cpp 
b/be/src/pipeline/exec/join/left_anti_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/left_anti_join_impl.cpp
rename to be/src/pipeline/exec/join/left_anti_join_impl.cpp
index e1e1037eeab..ab6a45442b2 100644
--- a/be/src/vec/exec/join/left_anti_join_impl.cpp
+++ b/be/src/pipeline/exec/join/left_anti_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::LEFT_ANTI_JOIN);
 
diff --git a/be/src/vec/exec/join/left_outer_join_impl.cpp 
b/be/src/pipeline/exec/join/left_outer_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/left_outer_join_impl.cpp
rename to be/src/pipeline/exec/join/left_outer_join_impl.cpp
index 681bd2b8164..4c8e2c5ac9f 100644
--- a/be/src/vec/exec/join/left_outer_join_impl.cpp
+++ b/be/src/pipeline/exec/join/left_outer_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::LEFT_OUTER_JOIN);
 
diff --git a/be/src/vec/exec/join/left_semi_join_impl.cpp 
b/be/src/pipeline/exec/join/left_semi_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/left_semi_join_impl.cpp
rename to be/src/pipeline/exec/join/left_semi_join_impl.cpp
index d06fe1783f6..8365403abfe 100644
--- a/be/src/vec/exec/join/left_semi_join_impl.cpp
+++ b/be/src/pipeline/exec/join/left_semi_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::LEFT_SEMI_JOIN);
 
diff --git a/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp 
b/be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp
rename to be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp
index 8b541d37495..834335f282f 100644
--- a/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp
+++ b/be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
 
diff --git a/be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp 
b/be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp
rename to be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp
index 98d39e61479..b079a0ec95f 100644
--- a/be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp
+++ b/be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN);
 
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/pipeline/exec/join/process_hash_table_probe.h
similarity index 73%
rename from be/src/vec/exec/join/process_hash_table_probe.h
rename to be/src/pipeline/exec/join/process_hash_table_probe.h
index 11df66d4aaa..965d62192b2 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -25,38 +25,40 @@
 #include "vec/common/arena.h"
 
 namespace doris {
-namespace pipeline {
-class HashJoinProbeLocalState;
-}
 namespace vectorized {
-
 class Block;
 class MutableBlock;
 struct HashJoinProbeContext;
+} // namespace vectorized
+namespace pipeline {
+
+class HashJoinProbeLocalState;
 
-using MutableColumnPtr = IColumn::MutablePtr;
-using MutableColumns = std::vector<MutableColumnPtr>;
+using MutableColumnPtr = vectorized::IColumn::MutablePtr;
+using MutableColumns = std::vector<vectorized::MutableColumnPtr>;
 
-using NullMap = ColumnUInt8::Container;
-using ConstNullMapPtr = const NullMap*;
+using NullMap = vectorized::ColumnUInt8::Container;
+using ConstNullMapPtr = const vectorized::NullMap*;
 
 template <int JoinOpType>
 struct ProcessHashTableProbe {
-    ProcessHashTableProbe(pipeline::HashJoinProbeLocalState* parent, int 
batch_size);
+    ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size);
     ~ProcessHashTableProbe() = default;
 
     // output build side result column
-    void build_side_output_column(MutableColumns& mcol, const 
std::vector<bool>& output_slot_flags,
-                                  int size, bool have_other_join_conjunct, 
bool is_mark_join);
+    void build_side_output_column(vectorized::MutableColumns& mcol,
+                                  const std::vector<bool>& output_slot_flags, 
int size,
+                                  bool have_other_join_conjunct, bool 
is_mark_join);
 
-    void probe_side_output_column(MutableColumns& mcol, const 
std::vector<bool>& output_slot_flags,
-                                  int size, int last_probe_index, bool 
all_match_one,
+    void probe_side_output_column(vectorized::MutableColumns& mcol,
+                                  const std::vector<bool>& output_slot_flags, 
int size,
+                                  int last_probe_index, bool all_match_one,
                                   bool have_other_join_conjunct);
 
     template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType>
     Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
-                   MutableBlock& mutable_block, Block* output_block, size_t 
probe_rows,
-                   bool is_mark_join, bool have_other_join_conjunct);
+                   vectorized::MutableBlock& mutable_block, vectorized::Block* 
output_block,
+                   size_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);
 
     // Only process the join with no other join conjunct, because of no other 
join conjunt
     // the output block struct is same with mutable block. we can do more opt 
on it and simplify
@@ -65,16 +67,17 @@ struct ProcessHashTableProbe {
     template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType,
               bool with_other_conjuncts, bool is_mark_join>
     Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
-                      MutableBlock& mutable_block, Block* output_block, size_t 
probe_rows);
+                      vectorized::MutableBlock& mutable_block, 
vectorized::Block* output_block,
+                      size_t probe_rows);
     // In the presence of other join conjunct, the process of join become more 
complicated.
     // each matching join column need to be processed by other join conjunct. 
so the struct of mutable block
     // and output block may be different
     // The output result is determined by the other join conjunct result and 
same_to_prev struct
-    Status do_other_join_conjuncts(Block* output_block, std::vector<uint8_t>& 
visited,
+    Status do_other_join_conjuncts(vectorized::Block* output_block, 
std::vector<uint8_t>& visited,
                                    bool has_null_in_build_side);
 
     template <bool with_other_conjuncts>
-    Status do_mark_join_conjuncts(Block* output_block, size_t 
hash_table_bucket_size);
+    Status do_mark_join_conjuncts(vectorized::Block* output_block, size_t 
hash_table_bucket_size);
 
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
@@ -84,8 +87,9 @@ struct ProcessHashTableProbe {
     // Process full outer join/ right join / right semi/anti join to output 
the join result
     // in hash table
     template <typename HashTableType>
-    Status process_data_in_hashtable(HashTableType& hash_table_ctx, 
MutableBlock& mutable_block,
-                                     Block* output_block, bool* eos, bool 
is_mark_join);
+    Status process_data_in_hashtable(HashTableType& hash_table_ctx,
+                                     vectorized::MutableBlock& mutable_block,
+                                     vectorized::Block* output_block, bool* 
eos, bool is_mark_join);
 
     /// For null aware join with other conjuncts, if the probe key of one row 
on left side is null,
     /// we should make this row match with all rows in build side.
@@ -93,8 +97,8 @@ struct ProcessHashTableProbe {
 
     pipeline::HashJoinProbeLocalState* _parent = nullptr;
     const int _batch_size;
-    const std::shared_ptr<Block>& _build_block;
-    std::unique_ptr<Arena> _arena;
+    const std::shared_ptr<vectorized::Block>& _build_block;
+    std::unique_ptr<vectorized::Arena> _arena;
     std::vector<StringRef> _probe_keys;
 
     std::vector<uint32_t> _probe_indexs;
@@ -110,13 +114,13 @@ struct ProcessHashTableProbe {
 
     std::vector<int> _build_blocks_locs;
     // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
-    ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
+    vectorized::ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
     // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
-    ColumnUInt8::Container* _tuple_is_null_right_flags = nullptr;
+    vectorized::ColumnUInt8::Container* _tuple_is_null_right_flags = nullptr;
 
     size_t _serialized_key_buffer_size {0};
     uint8_t* _serialized_key_buffer = nullptr;
-    std::unique_ptr<Arena> _serialize_key_arena;
+    std::unique_ptr<vectorized::Arena> _serialize_key_arena;
     std::vector<char> _probe_side_find_result;
 
     bool _have_other_join_conjunct;
@@ -139,5 +143,5 @@ struct ProcessHashTableProbe {
     int _right_col_len;
 };
 
-} // namespace vectorized
+} // namespace pipeline
 } // namespace doris
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
similarity index 83%
rename from be/src/vec/exec/join/process_hash_table_probe_impl.h
rename to be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 8654f17591c..5e023f2c861 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -27,23 +27,22 @@
 #include "vec/columns/column_filter_helper.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/exprs/vexpr_context.h"
-#include "vhash_join_node.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 template <int JoinOpType>
-ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(pipeline::HashJoinProbeLocalState*
 parent,
+ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState*
 parent,
                                                          int batch_size)
         : _parent(parent),
           _batch_size(batch_size),
           _build_block(parent->build_block()),
           _tuple_is_null_left_flags(parent->is_outer_join()
-                                            ? &(reinterpret_cast<ColumnUInt8&>(
+                                            ? 
&(reinterpret_cast<vectorized::ColumnUInt8&>(
                                                         
*parent->_tuple_is_null_left_flag_column)
                                                         .get_data())
                                             : nullptr),
           _tuple_is_null_right_flags(parent->is_outer_join()
-                                             ? 
&(reinterpret_cast<ColumnUInt8&>(
+                                             ? 
&(reinterpret_cast<vectorized::ColumnUInt8&>(
                                                          
*parent->_tuple_is_null_right_flag_column)
                                                          .get_data())
                                              : nullptr),
@@ -65,7 +64,7 @@ 
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(pipeline::HashJoinProbe
 
 template <int JoinOpType>
 void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
-        MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int 
size,
+        vectorized::MutableColumns& mcol, const std::vector<bool>& 
output_slot_flags, int size,
         bool have_other_join_conjunct, bool is_mark_join) {
     SCOPED_TIMER(_build_side_output_timer);
     constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN 
||
@@ -104,7 +103,7 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
                 const auto& column = 
*_build_block->safe_get_by_position(i).column;
                 _build_column_has_null[i] = false;
                 if (output_slot_flags[i] && column.is_nullable()) {
-                    const auto& nullable = assert_cast<const 
ColumnNullable&>(column);
+                    const auto& nullable = assert_cast<const 
vectorized::ColumnNullable&>(column);
                     _build_column_has_null[i] = !simd::contain_byte(
                             nullable.get_null_map_data().data() + 1, 
nullable.size() - 1, 1);
                     _need_calculate_build_index_has_zero |= 
_build_column_has_null[i];
@@ -116,7 +115,7 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
             const auto& column = *_build_block->safe_get_by_position(i).column;
             if (output_slot_flags[i]) {
                 if (!build_index_has_zero && _build_column_has_null[i]) {
-                    assert_cast<ColumnNullable*>(mcol[i + 
_right_col_idx].get())
+                    assert_cast<vectorized::ColumnNullable*>(mcol[i + 
_right_col_idx].get())
                             ->insert_indices_from_not_has_null(column, 
_build_indexs.data(),
                                                                
_build_indexs.data() + size);
                 } else {
@@ -132,7 +131,7 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
 
 template <int JoinOpType>
 void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
-        MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int 
size,
+        vectorized::MutableColumns& mcol, const std::vector<bool>& 
output_slot_flags, int size,
         int last_probe_index, bool all_match_one, bool 
have_other_join_conjunct) {
     SCOPED_TIMER(_probe_side_output_timer);
     auto& probe_block = _parent->_probe_block;
@@ -189,9 +188,10 @@ template <int JoinOpType>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType,
           bool with_other_conjuncts, bool is_mark_join>
 Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& 
hash_table_ctx,
-                                                     ConstNullMapPtr null_map,
-                                                     MutableBlock& 
mutable_block,
-                                                     Block* output_block, 
size_t probe_rows) {
+                                                     
vectorized::ConstNullMapPtr null_map,
+                                                     vectorized::MutableBlock& 
mutable_block,
+                                                     vectorized::Block* 
output_block,
+                                                     size_t probe_rows) {
     if (_right_col_len && !_build_block) {
         return Status::InternalError("build block is nullptr");
     }
@@ -361,7 +361,7 @@ size_t 
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t probe
      */
 template <int JoinOpType>
 template <bool with_other_conjuncts>
-Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* 
output_block,
+Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block* 
output_block,
                                                                  size_t 
hash_table_bucket_size) {
     DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
            JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
@@ -380,8 +380,9 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
 
     auto mark_column_mutable =
             
output_block->get_by_position(_parent->_mark_column_id).column->assume_mutable();
-    auto& mark_column = assert_cast<ColumnNullable&>(*mark_column_mutable);
-    IColumn::Filter& filter = 
assert_cast<ColumnUInt8&>(mark_column.get_nested_column()).get_data();
+    auto& mark_column = 
assert_cast<vectorized::ColumnNullable&>(*mark_column_mutable);
+    vectorized::IColumn::Filter& filter =
+            
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()).get_data();
 
     if (_parent->_mark_join_conjuncts.empty()) {
         // For null aware anti/semi join, if the equal conjuncts was not 
matched and the build side has null value,
@@ -391,8 +392,9 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
         const bool should_be_null_if_build_side_has_null = 
*_has_null_in_build_side;
 
         mark_column.resize(row_count);
-        auto* filter_data =
-                
assert_cast<ColumnUInt8&>(mark_column.get_nested_column()).get_data().data();
+        auto* filter_data = 
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column())
+                                    .get_data()
+                                    .data();
         auto* mark_null_map = mark_column.get_null_map_data().data();
         int last_probe_matched = -1;
         for (size_t i = 0; i != row_count; ++i) {
@@ -417,20 +419,21 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
             memset(mark_null_map, 0, row_count);
         }
     } else {
-        
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_mark_join_conjuncts, 
output_block,
-                                                        
mark_column.get_null_map_column(), filter));
+        RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
+                _parent->_mark_join_conjuncts, output_block, 
mark_column.get_null_map_column(),
+                filter));
     }
     auto* mark_null_map = mark_column.get_null_map_data().data();
 
     auto* mark_filter_data = filter.data();
 
     if constexpr (with_other_conjuncts) {
-        IColumn::Filter other_conjunct_filter(row_count, 1);
+        vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
         {
             bool can_be_filter_all = false;
-            
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts, 
nullptr,
-                                                            output_block, 
&other_conjunct_filter,
-                                                            
&can_be_filter_all));
+            RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
+                    _parent->_other_join_conjuncts, nullptr, output_block, 
&other_conjunct_filter,
+                    &can_be_filter_all));
         }
         DCHECK_EQ(filter.size(), other_conjunct_filter.size());
         const auto* other_filter_data = other_conjunct_filter.data();
@@ -444,7 +447,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
         }
     }
 
-    auto filter_column = ColumnUInt8::create(row_count, 0);
+    auto filter_column = vectorized::ColumnUInt8::create(row_count, 0);
     auto* __restrict filter_map = filter_column->get_data().data();
 
     /**
@@ -483,12 +486,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
     }
 
     auto result_column_id = output_block->columns();
-    output_block->insert({std::move(filter_column), 
std::make_shared<DataTypeUInt8>(), ""});
-    return Block::filter_block(output_block, result_column_id, 
result_column_id);
+    output_block->insert(
+            {std::move(filter_column), 
std::make_shared<vectorized::DataTypeUInt8>(), ""});
+    return vectorized::Block::filter_block(output_block, result_column_id, 
result_column_id);
 }
 
 template <int JoinOpType>
-Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* 
output_block,
+Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block* 
output_block,
                                                                   
std::vector<uint8_t>& visited,
                                                                   bool 
has_null_in_build_side) {
     // dispose the other join conjunct exec
@@ -499,27 +503,28 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
 
     SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
     int orig_columns = output_block->columns();
-    IColumn::Filter other_conjunct_filter(row_count, 1);
+    vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
     {
         bool can_be_filter_all = false;
-        
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts, 
nullptr,
-                                                        output_block, 
&other_conjunct_filter,
-                                                        &can_be_filter_all));
+        RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
+                _parent->_other_join_conjuncts, nullptr, output_block, 
&other_conjunct_filter,
+                &can_be_filter_all));
     }
 
-    auto filter_column = ColumnUInt8::create();
+    auto filter_column = vectorized::ColumnUInt8::create();
     filter_column->get_data() = std::move(other_conjunct_filter);
     auto result_column_id = output_block->columns();
-    output_block->insert({std::move(filter_column), 
std::make_shared<DataTypeUInt8>(), ""});
+    output_block->insert(
+            {std::move(filter_column), 
std::make_shared<vectorized::DataTypeUInt8>(), ""});
     uint8_t* __restrict filter_column_ptr =
-            assert_cast<ColumnUInt8&>(
+            assert_cast<vectorized::ColumnUInt8&>(
                     
output_block->get_by_position(result_column_id).column->assume_mutable_ref())
                     .get_data()
                     .data();
 
     if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
                   JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
-        auto new_filter_column = ColumnUInt8::create(row_count);
+        auto new_filter_column = vectorized::ColumnUInt8::create(row_count);
         auto* __restrict filter_map = new_filter_column->get_data().data();
 
         // process equal-conjuncts-matched tuples that are newly generated
@@ -551,7 +556,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
     } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
                          JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
                          JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
-        auto new_filter_column = ColumnUInt8::create(row_count);
+        auto new_filter_column = vectorized::ColumnUInt8::create(row_count);
         auto* __restrict filter_map = new_filter_column->get_data().data();
 
         for (size_t i = 0; i < row_count; ++i) {
@@ -602,7 +607,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
                       JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             orig_columns = _right_col_idx;
         }
-        RETURN_IF_ERROR(Block::filter_block(output_block, result_column_id, 
orig_columns));
+        RETURN_IF_ERROR(
+                vectorized::Block::filter_block(output_block, 
result_column_id, orig_columns));
     }
 
     return Status::OK();
@@ -610,15 +616,14 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
 
 template <int JoinOpType>
 template <typename HashTableType>
-Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType& 
hash_table_ctx,
-                                                                    
MutableBlock& mutable_block,
-                                                                    Block* 
output_block, bool* eos,
-                                                                    bool 
is_mark_join) {
+Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
+        HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
+        vectorized::Block* output_block, bool* eos, bool is_mark_join) {
     SCOPED_TIMER(_probe_process_hashtable_timer);
     auto& mcol = mutable_block.mutable_columns();
     if (is_mark_join) {
-        std::unique_ptr<ColumnFilterHelper> mark_column =
-                std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 1]);
+        std::unique_ptr<vectorized::ColumnFilterHelper> mark_column =
+                
std::make_unique<vectorized::ColumnFilterHelper>(*mcol[mcol.size() - 1]);
         *eos = hash_table_ctx.hash_table->template iterate_map<JoinOpType, 
true>(_build_indexs,
                                                                                
  mark_column.get());
     } else {
@@ -651,7 +656,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
         if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN ||
                       JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
             for (int i = 0; i < _right_col_idx; ++i) {
-                
assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size);
+                assert_cast<vectorized::ColumnNullable*>(mcol[i].get())
+                        ->insert_many_defaults(block_size);
             }
             _tuple_is_null_left_flags->resize_fill(block_size, 1);
         }
@@ -664,8 +670,9 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
 template <int JoinOpType>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType>
 Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& 
hash_table_ctx,
-                                                  ConstNullMapPtr null_map,
-                                                  MutableBlock& mutable_block, 
Block* output_block,
+                                                  vectorized::ConstNullMapPtr 
null_map,
+                                                  vectorized::MutableBlock& 
mutable_block,
+                                                  vectorized::Block* 
output_block,
                                                   size_t probe_rows, bool 
is_mark_join,
                                                   bool 
have_other_join_conjunct) {
     Status res;
@@ -675,7 +682,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
                                  have_other_join_conjunct, is_mark_join>(
                         hash_table_ctx, null_map, mutable_block, output_block, 
probe_rows);
             },
-            make_bool_variant(is_mark_join), 
make_bool_variant(have_other_join_conjunct));
+            vectorized::make_bool_variant(is_mark_join),
+            vectorized::make_bool_variant(have_other_join_conjunct));
     return res;
 }
 
@@ -687,50 +695,50 @@ struct ExtractType<T(U)> {
     using Type = U;
 };
 
-#define INSTANTIATION(JoinOpType, T)                                           
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<false, false, 
ExtractType<void(T)>::Type>(     \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<false, true, 
ExtractType<void(T)>::Type>(      \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<true, false, 
ExtractType<void(T)>::Type>(      \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<true, true, 
ExtractType<void(T)>::Type>(       \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-                                                                               
               \
-    template Status                                                            
               \
-    
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>(
 \
-            ExtractType<void(T)>::Type & hash_table_ctx, MutableBlock & 
mutable_block,        \
-            Block * output_block, bool* eos, bool is_mark_join);
-
-#define INSTANTIATION_FOR(JoinOpType)                                 \
-    template struct ProcessHashTableProbe<JoinOpType>;                \
-                                                                      \
-    INSTANTIATION(JoinOpType, (SerializedHashTableContext));          \
-    INSTANTIATION(JoinOpType, (I8HashTableContext));                  \
-    INSTANTIATION(JoinOpType, (I16HashTableContext));                 \
-    INSTANTIATION(JoinOpType, (I32HashTableContext));                 \
-    INSTANTIATION(JoinOpType, (I64HashTableContext));                 \
-    INSTANTIATION(JoinOpType, (I128HashTableContext));                \
-    INSTANTIATION(JoinOpType, (I256HashTableContext));                \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true>));   \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false>));  \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true>));  \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false>)); \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true>));  \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false>)); \
-    INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>));  \
+#define INSTANTIATION(JoinOpType, T)                                           
                    \
+    template Status                                                            
                    \
+    ProcessHashTableProbe<JoinOpType>::process<false, false, 
ExtractType<void(T)>::Type>(          \
+            ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::ConstNullMapPtr null_map,     \
+            vectorized::MutableBlock & mutable_block, vectorized::Block * 
output_block,            \
+            size_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);                  \
+    template Status                                                            
                    \
+    ProcessHashTableProbe<JoinOpType>::process<false, true, 
ExtractType<void(T)>::Type>(           \
+            ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::ConstNullMapPtr null_map,     \
+            vectorized::MutableBlock & mutable_block, vectorized::Block * 
output_block,            \
+            size_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);                  \
+    template Status                                                            
                    \
+    ProcessHashTableProbe<JoinOpType>::process<true, false, 
ExtractType<void(T)>::Type>(           \
+            ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::ConstNullMapPtr null_map,     \
+            vectorized::MutableBlock & mutable_block, vectorized::Block * 
output_block,            \
+            size_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);                  \
+    template Status                                                            
                    \
+    ProcessHashTableProbe<JoinOpType>::process<true, true, 
ExtractType<void(T)>::Type>(            \
+            ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::ConstNullMapPtr null_map,     \
+            vectorized::MutableBlock & mutable_block, vectorized::Block * 
output_block,            \
+            size_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);                  \
+                                                                               
                    \
+    template Status                                                            
                    \
+    
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>(
      \
+            ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::MutableBlock & mutable_block, \
+            vectorized::Block * output_block, bool* eos, bool is_mark_join);
+
+#define INSTANTIATION_FOR(JoinOpType)                                    \
+    template struct ProcessHashTableProbe<JoinOpType>;                   \
+                                                                         \
+    INSTANTIATION(JoinOpType, (vectorized::SerializedHashTableContext)); \
+    INSTANTIATION(JoinOpType, (I8HashTableContext));                     \
+    INSTANTIATION(JoinOpType, (I16HashTableContext));                    \
+    INSTANTIATION(JoinOpType, (I32HashTableContext));                    \
+    INSTANTIATION(JoinOpType, (I64HashTableContext));                    \
+    INSTANTIATION(JoinOpType, (I128HashTableContext));                   \
+    INSTANTIATION(JoinOpType, (I256HashTableContext));                   \
+    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true>));      \
+    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false>));     \
+    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true>));     \
+    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false>));    \
+    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true>));     \
+    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false>));    \
+    INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>));     \
     INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<false>));
 
-} // namespace doris::vectorized
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/join/right_anti_join_impl.cpp 
b/be/src/pipeline/exec/join/right_anti_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/right_anti_join_impl.cpp
rename to be/src/pipeline/exec/join/right_anti_join_impl.cpp
index 1c4eec5cf76..e3cc943bf3b 100644
--- a/be/src/vec/exec/join/right_anti_join_impl.cpp
+++ b/be/src/pipeline/exec/join/right_anti_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::RIGHT_ANTI_JOIN);
 
diff --git a/be/src/vec/exec/join/right_outer_join_impl.cpp 
b/be/src/pipeline/exec/join/right_outer_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/right_outer_join_impl.cpp
rename to be/src/pipeline/exec/join/right_outer_join_impl.cpp
index e732639610e..65c6eaaab18 100644
--- a/be/src/vec/exec/join/right_outer_join_impl.cpp
+++ b/be/src/pipeline/exec/join/right_outer_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::RIGHT_OUTER_JOIN);
 
diff --git a/be/src/vec/exec/join/right_semi_join_impl.cpp 
b/be/src/pipeline/exec/join/right_semi_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/right_semi_join_impl.cpp
rename to be/src/pipeline/exec/join/right_semi_join_impl.cpp
index 0ee93734439..f89826d8845 100644
--- a/be/src/vec/exec/join/right_semi_join_impl.cpp
+++ b/be/src/pipeline/exec/join/right_semi_join_impl.cpp
@@ -19,7 +19,7 @@
 
 #include "process_hash_table_probe_impl.h"
 
-namespace doris::vectorized {
+namespace doris::pipeline {
 
 INSTANTIATION_FOR(TJoinOp::RIGHT_SEMI_JOIN);
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 03d0d380fcb..25bc28b5d43 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -28,10 +28,9 @@ namespace doris::pipeline {
 
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState*
 state,
                                                                          
OperatorXBase* parent)
         : Base(state, parent),
-          
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
-                                            parent->runtime_filter_descs(),
-                                            
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
-}
+          
RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
+                                parent->runtime_filter_descs(),
+                                static_cast<Parent*>(parent)->_row_desc(), 
_conjuncts) {}
 
 Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index a772f1ca70f..8ecbd23764d 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -23,7 +23,7 @@
 
 #include "common/status.h"
 #include "operator.h"
-#include "vec/exec/runtime_filter_consumer.h"
+#include "pipeline/common/runtime_filter_consumer.h"
 
 namespace doris {
 class RuntimeState;
@@ -37,7 +37,7 @@ class MultiCastDataStreamer;
 class MultiCastDataStreamerSourceOperatorX;
 
 class MultiCastDataStreamSourceLocalState final : public 
PipelineXLocalState<MultiCastSharedState>,
-                                                  public 
vectorized::RuntimeFilterConsumer {
+                                                  public RuntimeFilterConsumer 
{
 public:
     ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState);
     using Base = PipelineXLocalState<MultiCastSharedState>;
@@ -85,7 +85,6 @@ public:
 
     Status prepare(RuntimeState* state) override {
         RETURN_IF_ERROR(Base::prepare(state));
-        // RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state));
         // init profile for runtime filter
         // 
RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile());
         if (_t_data_stream_sink.__isset.output_exprs) {
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 5b458314d25..346540e03ec 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -27,10 +27,10 @@
 #include "olap/parallel_scanner_builder.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
+#include "pipeline/common/runtime_filter_consumer.h"
 #include "pipeline/exec/scan_operator.h"
 #include "service/backend_options.h"
 #include "util/to_string.h"
-#include "vec/exec/runtime_filter_consumer.h"
 #include "vec/exec/scan/new_olap_scanner.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exprs/vcompound_pred.h"
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index af3b0fa4077..e5e44498ec0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -18,6 +18,7 @@
 #pragma once
 #include "aggregation_sink_operator.h"
 #include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/spill/spill_stream_manager.h"
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 39e4b8dac26..3f1ce8e7bbb 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -22,6 +22,7 @@
 #include <cstdint>
 #include <memory>
 
+#include "pipeline/common/runtime_filter_consumer.h"
 #include "pipeline/exec/es_scan_operator.h"
 #include "pipeline/exec/file_scan_operator.h"
 #include "pipeline/exec/group_commit_scan_operator.h"
@@ -30,7 +31,6 @@
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/operator.h"
 #include "util/runtime_profile.h"
-#include "vec/exec/runtime_filter_consumer.h"
 #include "vec/exprs/vcast_expr.h"
 #include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vectorized_fn_call.h"
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 120c39bfa43..84db26da051 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -23,10 +23,15 @@
 #include <string>
 
 #include "common/status.h"
+#include "exprs/function_filter.h"
 #include "operator.h"
+#include "pipeline/common/runtime_filter_consumer.h"
 #include "pipeline/dependency.h"
 #include "runtime/descriptors.h"
 #include "vec/exec/scan/vscan_node.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vin_predicate.h"
+#include "vec/utils/util.hpp"
 
 namespace doris::vectorized {
 class ScannerDelegate;
@@ -55,12 +60,12 @@ struct FilterPredicates {
     std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> 
in_filters;
 };
 
-class ScanLocalStateBase : public PipelineXLocalState<>, public 
vectorized::RuntimeFilterConsumer {
+class ScanLocalStateBase : public PipelineXLocalState<>, public 
RuntimeFilterConsumer {
 public:
     ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
             : PipelineXLocalState<>(state, parent),
-              vectorized::RuntimeFilterConsumer(parent->node_id(), 
parent->runtime_filter_descs(),
-                                                parent->row_descriptor(), 
_conjuncts) {}
+              RuntimeFilterConsumer(parent->node_id(), 
parent->runtime_filter_descs(),
+                                    parent->row_descriptor(), _conjuncts) {}
     ~ScanLocalStateBase() override = default;
 
     virtual bool ready_to_read() = 0;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 0994350430b..c6a80f8d06c 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -144,7 +144,7 @@ Status 
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
 
 template <bool is_intersect>
 void SetSourceOperatorX<is_intersect>::_add_result_columns(
-        SetSourceLocalState<is_intersect>& local_state, 
vectorized::RowRefListWithFlags& value,
+        SetSourceLocalState<is_intersect>& local_state, RowRefListWithFlags& 
value,
         int& block_size) {
     auto& build_col_idx = local_state._shared_state->build_col_idx;
     auto& build_block = local_state._shared_state->build_block;
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index 2bbc4257193..5157a2f9c97 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -81,7 +81,7 @@ private:
                                   const int batch_size, bool* eos);
 
     void _add_result_columns(SetSourceLocalState<is_intersect>& local_state,
-                             vectorized::RowRefListWithFlags& value, int& 
block_size);
+                             RowRefListWithFlags& value, int& block_size);
     const int _child_quantity;
 };
 
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index bea67b3050a..837a33dc437 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -24,6 +24,8 @@
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vslot_ref.h"
 
 namespace doris {
 class RuntimeState;
@@ -76,7 +78,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, 
OperatorXBase* parent)
         : Base(state, parent),
           _agg_arena_pool(std::make_unique<vectorized::Arena>()),
-          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_data(std::make_unique<AggregatedDataVariants>()),
           _agg_profile_arena(std::make_unique<vectorized::Arena>()),
           _child_block(vectorized::Block::create_unique()),
           _pre_aggregated_block(vectorized::Block::create_unique()) {}
@@ -164,13 +166,11 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
                                using KeyType = typename HashTableType::Key;
 
                                /// some aggregate functions (like AVG for 
decimal) have align issues.
-                               _aggregate_data_container =
-                                       
std::make_unique<vectorized::AggregateDataContainer>(
-                                               sizeof(KeyType),
-                                               
((p._total_size_of_aggregate_states +
-                                                 p._align_aggregate_states - 
1) /
-                                                p._align_aggregate_states) *
-                                                       
p._align_aggregate_states);
+                               _aggregate_data_container = 
std::make_unique<AggregateDataContainer>(
+                                       sizeof(KeyType), 
((p._total_size_of_aggregate_states +
+                                                          
p._align_aggregate_states - 1) /
+                                                         
p._align_aggregate_states) *
+                                                                
p._align_aggregate_states);
                            }},
                    _agg_data->method_variant);
         if (p._is_merge) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 227536170ea..bf856435982 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -108,12 +108,12 @@ private:
     bool _should_expand_hash_table = true;
     int64_t _cur_num_rows_returned = 0;
     std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
-    vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
+    AggregatedDataVariantsUPtr _agg_data = nullptr;
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
     std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
-    std::unique_ptr<vectorized::AggregateDataContainer> 
_aggregate_data_container = nullptr;
+    std::unique_ptr<AggregateDataContainer> _aggregate_data_container = 
nullptr;
     bool _should_limit_output = false;
     bool _reach_limit = false;
     size_t _input_num_rows = 0;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index bb3ef4ff0f8..41ba68e8cbb 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -817,14 +817,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, 
const Status reason) {
             return;
         }
     }
-    if (query_ctx->enable_pipeline_x_exec()) {
-        query_ctx->cancel_all_pipeline_context(reason);
-    } else {
-        for (auto it : all_instance_ids) {
-            cancel_instance(it, reason);
-        }
-    }
-
     query_ctx->cancel(reason);
     {
         std::lock_guard<std::mutex> state_lock(_lock);
@@ -862,7 +854,6 @@ void FragmentMgr::cancel_instance(const TUniqueId 
instance_id, const Status reas
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
-        std::vector<TUniqueId> to_cancel;
         std::vector<TUniqueId> queries_lost_coordinator;
         std::vector<TUniqueId> queries_timeout;
 
@@ -937,17 +928,6 @@ void FragmentMgr::cancel_worker() {
             }
         }
 
-        // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
-        // designed to count canceled fragment of non-pipeline query.
-        timeout_canceled_fragment_count->increment(to_cancel.size());
-        for (auto& id : to_cancel) {
-            cancel_instance(id,
-                            Status::Error<ErrorCode::TIMEOUT>(
-                                    "FragmentMgr cancel worker going to cancel 
timeout instance "));
-            LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout 
instance "
-                      << print_id(id);
-        }
-
         if (!queries_lost_coordinator.empty()) {
             LOG(INFO) << "There are " << queries_lost_coordinator.size()
                       << " queries need to be cancelled, coordinator dead or 
restarted.";
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a8efd4d9392..44bdaa5971a 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -213,6 +213,10 @@ void QueryContext::cancel(Status new_status, int 
fragment_id) {
     }
 
     set_ready_to_execute(new_status);
+    cancel_all_pipeline_context(new_status, fragment_id);
+}
+
+void QueryContext::cancel_all_pipeline_context(const Status& reason, int 
fragment_id) {
     std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> 
ctx_to_cancel;
     {
         std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
@@ -223,23 +227,6 @@ void QueryContext::cancel(Status new_status, int 
fragment_id) {
             ctx_to_cancel.push_back(f_context);
         }
     }
-    // Must not add lock here. There maybe dead lock because it will call 
fragment
-    // ctx cancel and fragment ctx will call query ctx cancel.
-    for (auto& f_context : ctx_to_cancel) {
-        if (auto pipeline_ctx = f_context.lock()) {
-            pipeline_ctx->cancel(new_status);
-        }
-    }
-}
-
-void QueryContext::cancel_all_pipeline_context(const Status& reason) {
-    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> 
ctx_to_cancel;
-    {
-        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
-        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
-            ctx_to_cancel.push_back(f_context);
-        }
-    }
     for (auto& f_context : ctx_to_cancel) {
         if (auto pipeline_ctx = f_context.lock()) {
             pipeline_ctx->cancel(reason);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dc7ea7e29bf..ee744a89466 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -99,7 +99,7 @@ public:
 
     [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
 
-    void cancel_all_pipeline_context(const Status& reason);
+    void cancel_all_pipeline_context(const Status& reason, int fragment_id = 
-1);
     std::string print_all_pipeline_context();
     Status cancel_pipeline_context(const int fragment_id, const Status& 
reason);
     void set_pipeline_context(const int fragment_id,
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 8795d90553a..6ca0653f7a9 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -31,9 +31,12 @@
 #include "vec/common/hash_table/string_hash_map.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/types.h"
-#include "vec/exec/join/join_op.h"
 #include "vec/utils/util.hpp"
 
+namespace doris::pipeline {
+struct RowRefListWithFlags;
+}
+
 namespace doris::vectorized {
 
 constexpr auto BITSIZE = 8;
@@ -595,12 +598,13 @@ using FixedKeyHashTableContext = 
MethodKeysFixed<JoinHashMap<Key, HashCRC32<Key>
 
 template <class Key, bool has_null>
 using SetFixedKeyHashTableContext =
-        MethodKeysFixed<HashMap<Key, RowRefListWithFlags, HashCRC32<Key>>, 
has_null>;
+        MethodKeysFixed<HashMap<Key, pipeline::RowRefListWithFlags, 
HashCRC32<Key>>, has_null>;
 
 template <class T>
 using SetPrimaryTypeHashTableContext =
-        MethodOneNumber<T, HashMap<T, RowRefListWithFlags, HashCRC32<T>>>;
+        MethodOneNumber<T, HashMap<T, pipeline::RowRefListWithFlags, 
HashCRC32<T>>>;
 
-using SetSerializedHashTableContext = MethodSerialized<HashMap<StringRef, 
RowRefListWithFlags>>;
+using SetSerializedHashTableContext =
+        MethodSerialized<HashMap<StringRef, pipeline::RowRefListWithFlags>>;
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h 
b/be/src/vec/common/hash_table/hash_table_set_build.h
index 8101360a9f9..f9aeeeef14c 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -20,6 +20,7 @@
 #include "vec/columns/column.h"
 
 namespace doris::vectorized {
+constexpr size_t CHECK_FRECUENCY = 65536;
 template <class HashTableContext, bool is_intersect>
 struct HashTableBuild {
     template <typename Parent>
diff --git a/be/src/vec/exec/join/vacquire_list.hpp 
b/be/src/vec/exec/join/vacquire_list.hpp
deleted file mode 100644
index 6a3157cc384..00000000000
--- a/be/src/vec/exec/join/vacquire_list.hpp
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <vector>
-
-namespace doris::vectorized {
-
-template <typename Element, int batch_size = 8>
-struct AcquireList {
-    using Batch = Element[batch_size];
-
-    Element& acquire(Element&& element) {
-        if (_current_batch == nullptr) {
-            _current_batch.reset(new Element[batch_size]);
-        }
-        if (current_full()) {
-            _lst.emplace_back(std::move(_current_batch));
-            _current_batch.reset(new Element[batch_size]);
-            _current_offset = 0;
-        }
-
-        auto base_addr = _current_batch.get();
-        base_addr[_current_offset] = std::move(element);
-        auto& ref = base_addr[_current_offset];
-        _current_offset++;
-        return ref;
-    }
-
-    void remove_last_element() { _current_offset--; }
-
-private:
-    bool current_full() { return _current_offset == batch_size; }
-    std::vector<std::unique_ptr<Element[]>> _lst;
-    std::unique_ptr<Element[]> _current_batch;
-    int _current_offset = 0;
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
deleted file mode 100644
index 80d278a220f..00000000000
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ /dev/null
@@ -1,180 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <gen_cpp/PlanNodes_types.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <atomic>
-#include <iosfwd>
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <variant>
-#include <vector>
-
-#include "common/global_types.h"
-#include "common/status.h"
-#include "exprs/runtime_filter_slots.h"
-#include "util/runtime_profile.h"
-#include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/columns/column.h"
-#include "vec/columns/columns_number.h"
-#include "vec/common/arena.h"
-#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map_context.h"
-#include "vec/common/hash_table/hash_map_context_creator.h"
-#include "vec/common/hash_table/partitioned_hash_map.h"
-#include "vec/common/string_ref.h"
-#include "vec/core/block.h"
-#include "vec/core/types.h"
-#include "vec/exec/join/join_op.h" // IWYU pragma: keep
-#include "vec/exprs/vexpr_fwd.h"
-
-template <typename T>
-struct HashCRC32;
-
-namespace doris {
-class ObjectPool;
-class DescriptorTbl;
-class RuntimeState;
-
-namespace pipeline {
-class HashJoinProbeLocalState;
-class HashJoinBuildSinkLocalState;
-} // namespace pipeline
-
-namespace vectorized {
-
-constexpr size_t CHECK_FRECUENCY = 65536;
-
-struct UInt128;
-struct UInt256;
-template <int JoinOpType>
-struct ProcessHashTableProbe;
-
-template <typename Parent>
-Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* 
parent,
-                                    bool is_global = false) {
-    if (parent->runtime_filters().empty()) {
-        return Status::OK();
-    }
-    uint64_t rows = block->rows();
-    {
-        SCOPED_TIMER(parent->_runtime_filter_init_timer);
-        RETURN_IF_ERROR(parent->_runtime_filter_slots->init_filters(state, 
rows));
-        RETURN_IF_ERROR(parent->_runtime_filter_slots->ignore_filters(state));
-    }
-
-    if (!parent->_runtime_filter_slots->empty() && rows > 1) {
-        SCOPED_TIMER(parent->_runtime_filter_compute_timer);
-        parent->_runtime_filter_slots->insert(block);
-    }
-    {
-        SCOPED_TIMER(parent->_publish_runtime_filter_timer);
-        RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
-    }
-
-    return Status::OK();
-}
-
-using ProfileCounter = RuntimeProfile::Counter;
-
-template <class HashTableContext, typename Parent>
-struct ProcessHashTableBuild {
-    ProcessHashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, Parent* 
parent, int batch_size,
-                          RuntimeState* state)
-            : _rows(rows),
-              _build_raw_ptrs(build_raw_ptrs),
-              _parent(parent),
-              _batch_size(batch_size),
-              _state(state) {}
-
-    template <int JoinOpType, bool ignore_null, bool short_circuit_for_null,
-              bool with_other_conjuncts>
-    Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, 
bool* has_null_key) {
-        if (short_circuit_for_null || ignore_null) {
-            // first row is mocked and is null
-            for (uint32_t i = 1; i < _rows; i++) {
-                if ((*null_map)[i]) {
-                    *has_null_key = true;
-                }
-            }
-            if (short_circuit_for_null && *has_null_key) {
-                return Status::OK();
-            }
-        }
-
-        SCOPED_TIMER(_parent->_build_table_insert_timer);
-        hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, 
_batch_size,
-                                                                      
*has_null_key);
-
-        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
-                                            null_map ? null_map->data() : 
nullptr, true, true,
-                                            
hash_table_ctx.hash_table->get_bucket_size());
-        hash_table_ctx.hash_table->template build<JoinOpType, 
with_other_conjuncts>(
-                hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows);
-        hash_table_ctx.bucket_nums.resize(_batch_size);
-        hash_table_ctx.bucket_nums.shrink_to_fit();
-
-        COUNTER_SET(_parent->_hash_table_memory_usage,
-                    (int64_t)hash_table_ctx.hash_table->get_byte_size());
-        COUNTER_SET(_parent->_build_arena_memory_usage,
-                    (int64_t)hash_table_ctx.serialized_keys_size(true));
-        return Status::OK();
-    }
-
-private:
-    const uint32_t _rows;
-    ColumnRawPtrs& _build_raw_ptrs;
-    Parent* _parent = nullptr;
-    int _batch_size;
-    RuntimeState* _state = nullptr;
-};
-
-using I8HashTableContext = PrimaryTypeHashTableContext<UInt8>;
-using I16HashTableContext = PrimaryTypeHashTableContext<UInt16>;
-using I32HashTableContext = PrimaryTypeHashTableContext<UInt32>;
-using I64HashTableContext = PrimaryTypeHashTableContext<UInt64>;
-using I128HashTableContext = PrimaryTypeHashTableContext<UInt128>;
-using I256HashTableContext = PrimaryTypeHashTableContext<UInt256>;
-
-template <bool has_null>
-using I64FixedKeyHashTableContext = FixedKeyHashTableContext<UInt64, has_null>;
-
-template <bool has_null>
-using I128FixedKeyHashTableContext = FixedKeyHashTableContext<UInt128, 
has_null>;
-
-template <bool has_null>
-using I256FixedKeyHashTableContext = FixedKeyHashTableContext<UInt256, 
has_null>;
-
-template <bool has_null>
-using I136FixedKeyHashTableContext = FixedKeyHashTableContext<UInt136, 
has_null>;
-
-using HashTableVariants =
-        std::variant<std::monostate, SerializedHashTableContext, 
I8HashTableContext,
-                     I16HashTableContext, I32HashTableContext, 
I64HashTableContext,
-                     I128HashTableContext, I256HashTableContext, 
I64FixedKeyHashTableContext<true>,
-                     I64FixedKeyHashTableContext<false>, 
I128FixedKeyHashTableContext<true>,
-                     I128FixedKeyHashTableContext<false>, 
I256FixedKeyHashTableContext<true>,
-                     I256FixedKeyHashTableContext<false>, 
I136FixedKeyHashTableContext<true>,
-                     I136FixedKeyHashTableContext<false>>;
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index cd869cae5d2..70ac5970274 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -17,59 +17,7 @@
 
 #pragma once
 
-#include <fmt/format.h>
-#include <gen_cpp/Exprs_types.h>
-#include <gen_cpp/PlanNodes_types.h>
-#include <parallel_hashmap/phmap.h>
-#include <stdint.h>
-
-#include <functional>
-#include <list>
-#include <map>
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "common/global_types.h"
-#include "common/object_pool.h"
-#include "common/status.h"
-#include "exec/olap_common.h"
-#include "exprs/function_filter.h"
-#include "runtime/define_primitive_type.h"
-#include "runtime/query_context.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-#include "vec/exec/runtime_filter_consumer.h"
-#include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vscanner.h"
-#include "vec/runtime/shared_scanner_controller.h"
-
-namespace doris {
-class BitmapFilterFuncBase;
-class BloomFilterFuncBase;
-class DescriptorTbl;
-class FunctionContext;
-class HybridSetBase;
-class IRuntimeFilter;
-class SlotDescriptor;
-class TScanRangeParams;
-class TupleDescriptor;
-
-namespace vectorized {
-class Block;
-class VExpr;
-class VExprContext;
-class VInPredicate;
-class VectorizedFnCall;
-} // namespace vectorized
-struct StringRef;
-} // namespace doris
-
-namespace doris::pipeline {
-class ScanOperator;
-}
 
 namespace doris::vectorized {
 
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
deleted file mode 100644
index fd369d80278..00000000000
--- a/be/src/vec/exec/vaggregation_node.h
+++ /dev/null
@@ -1,366 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <assert.h>
-#include <glog/logging.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <algorithm>
-#include <functional>
-#include <memory>
-#include <ostream>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <variant>
-#include <vector>
-
-#include "common/compiler_util.h" // IWYU pragma: keep
-#include "common/global_types.h"
-#include "common/status.h"
-#include "util/runtime_profile.h"
-#include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_string.h"
-#include "vec/columns/columns_number.h"
-#include "vec/common/allocator.h"
-#include "vec/common/arena.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash.h"
-#include "vec/common/hash_table/hash_map_context.h"
-#include "vec/common/hash_table/hash_map_context_creator.h"
-#include "vec/common/hash_table/hash_map_util.h"
-#include "vec/common/hash_table/partitioned_hash_map.h"
-#include "vec/common/hash_table/ph_hash_map.h"
-#include "vec/common/hash_table/string_hash_map.h"
-#include "vec/common/pod_array.h"
-#include "vec/common/string_ref.h"
-#include "vec/common/uint128.h"
-#include "vec/core/block.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/types.h"
-#include "vec/exprs/vectorized_agg_fn.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-#include "vec/exprs/vslot_ref.h"
-
-namespace doris {
-class TPlanNode;
-class DescriptorTbl;
-class ObjectPool;
-class RuntimeState;
-class TupleDescriptor;
-
-namespace pipeline {
-class AggSinkOperator;
-class AggSourceOperator;
-class StreamingAggSinkOperator;
-class StreamingAggSourceOperator;
-} // namespace pipeline
-
-namespace vectorized {
-
-using AggregatedDataWithoutKey = AggregateDataPtr;
-using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr>;
-using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
-using AggregatedDataWithUInt8Key = PHHashMap<UInt8, AggregateDataPtr>;
-using AggregatedDataWithUInt16Key = PHHashMap<UInt16, AggregateDataPtr>;
-using AggregatedDataWithUInt32Key = PHHashMap<UInt32, AggregateDataPtr, 
HashCRC32<UInt32>>;
-using AggregatedDataWithUInt64Key = PHHashMap<UInt64, AggregateDataPtr, 
HashCRC32<UInt64>>;
-using AggregatedDataWithUInt128Key = PHHashMap<UInt128, AggregateDataPtr, 
HashCRC32<UInt128>>;
-using AggregatedDataWithUInt256Key = PHHashMap<UInt256, AggregateDataPtr, 
HashCRC32<UInt256>>;
-using AggregatedDataWithUInt136Key = PHHashMap<UInt136, AggregateDataPtr, 
HashCRC32<UInt136>>;
-
-using AggregatedDataWithUInt32KeyPhase2 =
-        PHHashMap<UInt32, AggregateDataPtr, HashMixWrapper<UInt32>>;
-using AggregatedDataWithUInt64KeyPhase2 =
-        PHHashMap<UInt64, AggregateDataPtr, HashMixWrapper<UInt64>>;
-using AggregatedDataWithUInt128KeyPhase2 =
-        PHHashMap<UInt128, AggregateDataPtr, HashMixWrapper<UInt128>>;
-using AggregatedDataWithUInt256KeyPhase2 =
-        PHHashMap<UInt256, AggregateDataPtr, HashMixWrapper<UInt256>>;
-
-using AggregatedDataWithUInt136KeyPhase2 =
-        PHHashMap<UInt136, AggregateDataPtr, HashMixWrapper<UInt136>>;
-
-using AggregatedDataWithNullableUInt8Key = 
DataWithNullKey<AggregatedDataWithUInt8Key>;
-using AggregatedDataWithNullableUInt16Key = 
DataWithNullKey<AggregatedDataWithUInt16Key>;
-using AggregatedDataWithNullableUInt32Key = 
DataWithNullKey<AggregatedDataWithUInt32Key>;
-using AggregatedDataWithNullableUInt64Key = 
DataWithNullKey<AggregatedDataWithUInt64Key>;
-using AggregatedDataWithNullableUInt32KeyPhase2 =
-        DataWithNullKey<AggregatedDataWithUInt32KeyPhase2>;
-using AggregatedDataWithNullableUInt64KeyPhase2 =
-        DataWithNullKey<AggregatedDataWithUInt64KeyPhase2>;
-using AggregatedDataWithNullableShortStringKey = 
DataWithNullKey<AggregatedDataWithShortStringKey>;
-using AggregatedDataWithNullableUInt128Key = 
DataWithNullKey<AggregatedDataWithUInt128Key>;
-using AggregatedDataWithNullableUInt128KeyPhase2 =
-        DataWithNullKey<AggregatedDataWithUInt128KeyPhase2>;
-
-using AggregatedMethodVariants = std::variant<
-        std::monostate, MethodSerialized<AggregatedDataWithStringKey>,
-        MethodOneNumber<UInt8, AggregatedDataWithUInt8Key>,
-        MethodOneNumber<UInt16, AggregatedDataWithUInt16Key>,
-        MethodOneNumber<UInt32, AggregatedDataWithUInt32Key>,
-        MethodOneNumber<UInt64, AggregatedDataWithUInt64Key>,
-        MethodStringNoCache<AggregatedDataWithShortStringKey>,
-        MethodOneNumber<UInt128, AggregatedDataWithUInt128Key>,
-        MethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>,
-        MethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>,
-        MethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>,
-        MethodSingleNullableColumn<MethodOneNumber<UInt8, 
AggregatedDataWithNullableUInt8Key>>,
-        MethodSingleNullableColumn<MethodOneNumber<UInt16, 
AggregatedDataWithNullableUInt16Key>>,
-        MethodSingleNullableColumn<MethodOneNumber<UInt32, 
AggregatedDataWithNullableUInt32Key>>,
-        MethodSingleNullableColumn<MethodOneNumber<UInt64, 
AggregatedDataWithNullableUInt64Key>>,
-        MethodSingleNullableColumn<
-                MethodOneNumber<UInt32, 
AggregatedDataWithNullableUInt32KeyPhase2>>,
-        MethodSingleNullableColumn<
-                MethodOneNumber<UInt64, 
AggregatedDataWithNullableUInt64KeyPhase2>>,
-        MethodSingleNullableColumn<MethodOneNumber<UInt128, 
AggregatedDataWithNullableUInt128Key>>,
-        MethodSingleNullableColumn<
-                MethodOneNumber<UInt128, 
AggregatedDataWithNullableUInt128KeyPhase2>>,
-        
MethodSingleNullableColumn<MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>,
-        MethodKeysFixed<AggregatedDataWithUInt64Key, false>,
-        MethodKeysFixed<AggregatedDataWithUInt64Key, true>,
-        MethodKeysFixed<AggregatedDataWithUInt128Key, false>,
-        MethodKeysFixed<AggregatedDataWithUInt128Key, true>,
-        MethodKeysFixed<AggregatedDataWithUInt256Key, false>,
-        MethodKeysFixed<AggregatedDataWithUInt256Key, true>,
-        MethodKeysFixed<AggregatedDataWithUInt136Key, false>,
-        MethodKeysFixed<AggregatedDataWithUInt136Key, true>,
-        MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>,
-        MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>,
-        MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>,
-        MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>,
-        MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>,
-        MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>,
-        MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, false>,
-        MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, true>>;
-
-struct AggregatedDataVariants
-        : public DataVariants<AggregatedMethodVariants, 
MethodSingleNullableColumn, MethodOneNumber,
-                              MethodKeysFixed, DataWithNullKey> {
-    AggregatedDataWithoutKey without_key = nullptr;
-
-    template <bool nullable>
-    void init(Type type) {
-        _type = type;
-        switch (_type) {
-        case Type::without_key:
-            break;
-        case Type::serialized:
-            
method_variant.emplace<MethodSerialized<AggregatedDataWithStringKey>>();
-            break;
-        case Type::int8_key:
-            emplace_single<UInt8, AggregatedDataWithUInt8Key, nullable>();
-            break;
-        case Type::int16_key:
-            emplace_single<UInt16, AggregatedDataWithUInt16Key, nullable>();
-            break;
-        case Type::int32_key:
-            emplace_single<UInt32, AggregatedDataWithUInt32Key, nullable>();
-            break;
-        case Type::int32_key_phase2:
-            emplace_single<UInt32, AggregatedDataWithUInt32KeyPhase2, 
nullable>();
-            break;
-        case Type::int64_key:
-            emplace_single<UInt64, AggregatedDataWithUInt64Key, nullable>();
-            break;
-        case Type::int64_key_phase2:
-            emplace_single<UInt64, AggregatedDataWithUInt64KeyPhase2, 
nullable>();
-            break;
-        case Type::int128_key:
-            emplace_single<UInt128, AggregatedDataWithUInt128Key, nullable>();
-            break;
-        case Type::int128_key_phase2:
-            emplace_single<UInt128, AggregatedDataWithUInt128KeyPhase2, 
nullable>();
-            break;
-        case Type::string_key:
-            if (nullable) {
-                method_variant.emplace<MethodSingleNullableColumn<
-                        
MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>>();
-            } else {
-                
method_variant.emplace<MethodStringNoCache<AggregatedDataWithShortStringKey>>();
-            }
-            break;
-        default:
-            throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type, 
type={}", type);
-        }
-    }
-
-    void init(Type type, bool is_nullable = false) {
-        if (is_nullable) {
-            init<true>(type);
-        } else {
-            init<false>(type);
-        }
-    }
-};
-
-using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>;
-using ArenaUPtr = std::unique_ptr<Arena>;
-
-struct AggregateDataContainer {
-public:
-    AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states)
-            : _size_of_key(size_of_key), 
_size_of_aggregate_states(size_of_aggregate_states) {}
-
-    int64_t memory_usage() const { return _arena_pool.size(); }
-
-    template <typename KeyType>
-    AggregateDataPtr append_data(const KeyType& key) {
-        DCHECK_EQ(sizeof(KeyType), _size_of_key);
-        // SUB_CONTAINER_CAPACITY should add a new sub container, and also 
expand when it is zero
-        if (UNLIKELY(_index_in_sub_container % SUB_CONTAINER_CAPACITY == 0)) {
-            _expand();
-        }
-
-        *reinterpret_cast<KeyType*>(_current_keys) = key;
-        auto aggregate_data = _current_agg_data;
-        ++_total_count;
-        ++_index_in_sub_container;
-        _current_agg_data += _size_of_aggregate_states;
-        _current_keys += _size_of_key;
-        return aggregate_data;
-    }
-
-    template <typename Derived, bool IsConst>
-    class IteratorBase {
-        using Container =
-                std::conditional_t<IsConst, const AggregateDataContainer, 
AggregateDataContainer>;
-
-        Container* container = nullptr;
-        uint32_t index;
-        uint32_t sub_container_index;
-        uint32_t index_in_sub_container;
-
-        friend class HashTable;
-
-    public:
-        IteratorBase() = default;
-        IteratorBase(Container* container_, uint32_t index_)
-                : container(container_), index(index_) {
-            sub_container_index = index / SUB_CONTAINER_CAPACITY;
-            index_in_sub_container = index - sub_container_index * 
SUB_CONTAINER_CAPACITY;
-        }
-
-        bool operator==(const IteratorBase& rhs) const { return index == 
rhs.index; }
-        bool operator!=(const IteratorBase& rhs) const { return index != 
rhs.index; }
-
-        Derived& operator++() {
-            index++;
-            index_in_sub_container++;
-            if (index_in_sub_container == SUB_CONTAINER_CAPACITY) {
-                index_in_sub_container = 0;
-                sub_container_index++;
-            }
-            return static_cast<Derived&>(*this);
-        }
-
-        template <typename KeyType>
-        KeyType get_key() {
-            DCHECK_EQ(sizeof(KeyType), container->_size_of_key);
-            return 
((KeyType*)(container->_key_containers[sub_container_index]))
-                    [index_in_sub_container];
-        }
-
-        AggregateDataPtr get_aggregate_data() {
-            return &(container->_value_containers[sub_container_index]
-                                                 
[container->_size_of_aggregate_states *
-                                                  index_in_sub_container]);
-        }
-    };
-
-    class Iterator : public IteratorBase<Iterator, false> {
-    public:
-        using IteratorBase<Iterator, false>::IteratorBase;
-    };
-
-    class ConstIterator : public IteratorBase<ConstIterator, true> {
-    public:
-        using IteratorBase<ConstIterator, true>::IteratorBase;
-    };
-
-    ConstIterator begin() const { return ConstIterator(this, 0); }
-
-    ConstIterator cbegin() const { return begin(); }
-
-    Iterator begin() { return Iterator(this, 0); }
-
-    ConstIterator end() const { return ConstIterator(this, _total_count); }
-    ConstIterator cend() const { return end(); }
-    Iterator end() { return Iterator(this, _total_count); }
-
-    void init_once() {
-        if (_inited) {
-            return;
-        }
-        _inited = true;
-        iterator = begin();
-    }
-    Iterator iterator;
-
-private:
-    void _expand() {
-        _index_in_sub_container = 0;
-        _current_keys = nullptr;
-        _current_agg_data = nullptr;
-        try {
-            _current_keys = _arena_pool.alloc(_size_of_key * 
SUB_CONTAINER_CAPACITY);
-            _key_containers.emplace_back(_current_keys);
-
-            _current_agg_data = 
(AggregateDataPtr)_arena_pool.alloc(_size_of_aggregate_states *
-                                                                    
SUB_CONTAINER_CAPACITY);
-            _value_containers.emplace_back(_current_agg_data);
-        } catch (...) {
-            if (_current_keys) {
-                _key_containers.pop_back();
-                _current_keys = nullptr;
-            }
-            if (_current_agg_data) {
-                _value_containers.pop_back();
-                _current_agg_data = nullptr;
-            }
-            throw;
-        }
-    }
-
-    static constexpr uint32_t SUB_CONTAINER_CAPACITY = 8192;
-    Arena _arena_pool;
-    std::vector<char*> _key_containers;
-    std::vector<AggregateDataPtr> _value_containers;
-    AggregateDataPtr _current_agg_data = nullptr;
-    char* _current_keys = nullptr;
-    size_t _size_of_key {};
-    size_t _size_of_aggregate_states {};
-    uint32_t _index_in_sub_container {};
-    uint32_t _total_count {};
-    bool _inited = false;
-};
-
-} // namespace vectorized
-
-constexpr auto init_agg_hash_method =
-        init_hash_method<vectorized::AggregatedDataVariants, 
vectorized::AggregateDataPtr>;
-
-} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to