This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4728e75079 [feature](bitmap) Support in bitmap syntax and bitmap 
runtime filter (#14340)
4728e75079 is described below

commit 4728e750795c59b6c1af96fc490ed073adc7aa64
Author: luozenglin <37725793+luozeng...@users.noreply.github.com>
AuthorDate: Fri Nov 25 15:22:44 2022 +0800

    [feature](bitmap) Support in bitmap syntax and bitmap runtime filter 
(#14340)
    
    1.Support in bitmap syntax, like 'where k1 in (select bitmap_column from 
tbl)';
    2.Support bitmap runtime filter. Generate a bitmap filter using the right 
table bitmap and push it down to the left table storage layer for filtering.
---
 be/src/exprs/bitmapfilter_predicate.h              | 143 +++++++++++++++++++++
 be/src/exprs/create_predicate_function.h           |  35 +++++
 be/src/exprs/runtime_filter.cpp                    |  69 +++++++++-
 be/src/exprs/runtime_filter.h                      |  10 +-
 be/src/exprs/runtime_filter_slots_cross.h          | 110 ++++++++++++++++
 be/src/olap/bitmap_filter_predicate.h              | 121 +++++++++++++++++
 be/src/olap/column_predicate.h                     |   3 +-
 be/src/olap/predicate_creator.h                    |  23 +++-
 be/src/olap/reader.cpp                             |  16 +++
 be/src/olap/reader.h                               |   5 +
 be/src/runtime/types.h                             |   2 +
 be/src/vec/CMakeLists.txt                          |   1 +
 be/src/vec/exec/join/vjoin_node_base.h             |   2 +
 be/src/vec/exec/join/vnested_loop_join_node.cpp    | 101 +++++++++++++--
 be/src/vec/exec/join/vnested_loop_join_node.h      |  13 ++
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |   4 +-
 be/src/vec/exec/scan/new_olap_scan_node.h          |   2 +
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  26 ++--
 be/src/vec/exec/scan/new_olap_scanner.h            |  16 +--
 be/src/vec/exec/scan/vscan_node.cpp                |  22 +++-
 be/src/vec/exec/scan/vscan_node.h                  |  22 +++-
 be/src/vec/exprs/vbitmap_predicate.cpp             | 107 +++++++++++++++
 be/src/vec/exprs/vbitmap_predicate.h               |  63 +++++++++
 be/src/vec/exprs/vexpr.h                           |   8 ++
 be/src/vec/runtime/shared_hash_table_controller.h  |   2 +
 .../advanced/join-optimization/runtime-filter.md   |   8 +-
 .../docs/sql-manual/sql-reference/Operators/in.md  |  78 +++++++++++
 docs/sidebars.json                                 |   7 +
 .../advanced/join-optimization/runtime-filter.md   |   7 +-
 .../docs/sql-manual/sql-reference/Operators/in.md  |  78 +++++++++++
 .../doris/analysis/BitmapFilterPredicate.java      | 100 ++++++++++++++
 .../org/apache/doris/analysis/InPredicate.java     |  14 +-
 .../org/apache/doris/analysis/StmtRewriter.java    |   7 +-
 .../apache/doris/planner/NestedLoopJoinNode.java   |  33 +++++
 .../java/org/apache/doris/planner/PlanNode.java    |  11 +-
 .../org/apache/doris/planner/RuntimeFilter.java    |  62 ++++++++-
 .../doris/planner/RuntimeFilterGenerator.java      |  43 ++++++-
 .../apache/doris/qe/RuntimeFilterTypeHelper.java   |   4 +-
 .../doris/planner/RuntimeFilterGeneratorTest.java  |   4 +-
 .../doris/qe/RuntimeFilterTypeHelperTest.java      |   2 +-
 gensrc/thrift/Exprs.thrift                         |   3 +
 gensrc/thrift/PlanNodes.thrift                     |  10 ++
 .../data/query_p0/join/test_bitmap_filter.out      |  50 +++++++
 .../suites/query_p0/join/test_bitmap_filter.groovy |  52 ++++++++
 44 files changed, 1430 insertions(+), 69 deletions(-)

diff --git a/be/src/exprs/bitmapfilter_predicate.h 
b/be/src/exprs/bitmapfilter_predicate.h
new file mode 100644
index 0000000000..c9a2efc621
--- /dev/null
+++ b/be/src/exprs/bitmapfilter_predicate.h
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/object_pool.h"
+#include "gutil/integral_types.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/primitive_type.h"
+#include "util/bitmap_value.h"
+
+namespace doris {
+
+// only used in Runtime Filter
+class BitmapFilterFuncBase {
+public:
+    virtual void insert(const void* data) = 0;
+    virtual void insert_many(const std::vector<const BitmapValue*> bitmaps) = 
0;
+    virtual bool find(uint64 data) const = 0;
+    virtual bool is_empty() = 0;
+    virtual Status assign(BitmapValue* bitmap_value) = 0;
+    virtual void light_copy(BitmapFilterFuncBase* other) { _not_in = 
other->_not_in; };
+    virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* 
nullmap,
+                                                uint16_t* offsets, int number) 
= 0;
+    virtual void find_batch(const char* data, const uint8* nullmap, int number,
+                            uint8* results) const = 0;
+    void set_not_in(bool not_in) { _not_in = not_in; }
+    virtual ~BitmapFilterFuncBase() = default;
+
+protected:
+    // true -> not in bitmap, false -> in bitmap
+    bool _not_in {false};
+};
+
+template <PrimitiveType type>
+class BitmapFilterFunc : public BitmapFilterFuncBase {
+public:
+    using CppType = typename PrimitiveTypeTraits<type>::CppType;
+
+    BitmapFilterFunc() : _bitmap_value(std::make_shared<BitmapValue>()), 
_empty(true) {}
+
+    ~BitmapFilterFunc() override = default;
+
+    void insert(const void* data) override;
+
+    void insert_many(const std::vector<const BitmapValue*> bitmaps) override;
+
+    bool find(uint64 data) const override { return _not_in ^ 
_bitmap_value->contains(data); }
+
+    uint16_t find_fixed_len_olap_engine(const char* data, const uint8* 
nullmap, uint16_t* offsets,
+                                        int number) override;
+
+    void find_batch(const char* data, const uint8* nullmap, int number,
+                    uint8* results) const override;
+
+    bool is_empty() override { return _empty; }
+
+    Status assign(BitmapValue* bitmap_value) override {
+        *_bitmap_value = *bitmap_value;
+        return Status::OK();
+    }
+
+    void light_copy(BitmapFilterFuncBase* bloomfilter_func) override;
+
+private:
+    std::shared_ptr<BitmapValue> _bitmap_value;
+    bool _empty;
+};
+
+template <PrimitiveType type>
+void BitmapFilterFunc<type>::insert(const void* data) {
+    if (data == nullptr) {
+        return;
+    }
+
+    *_bitmap_value |= *reinterpret_cast<const BitmapValue*>(data);
+    _empty = false;
+}
+
+template <PrimitiveType type>
+void BitmapFilterFunc<type>::insert_many(const std::vector<const BitmapValue*> 
bitmaps) {
+    if (bitmaps.empty()) {
+        return;
+    }
+    _bitmap_value->fastunion(bitmaps);
+    _empty = false;
+}
+
+template <PrimitiveType type>
+uint16_t BitmapFilterFunc<type>::find_fixed_len_olap_engine(const char* data, 
const uint8* nullmap,
+                                                            uint16_t* offsets, 
int number) {
+    uint16_t new_size = 0;
+    for (int i = 0; i < number; i++) {
+        uint16_t idx = offsets[i];
+        if (nullmap != nullptr && nullmap[idx]) {
+            continue;
+        }
+        if (!find(*((CppType*)data + idx))) {
+            continue;
+        }
+        offsets[new_size++] = idx;
+    }
+    return new_size;
+}
+
+template <PrimitiveType type>
+void BitmapFilterFunc<type>::find_batch(const char* data, const uint8* 
nullmap, int number,
+                                        uint8* results) const {
+    for (int i = 0; i < number; i++) {
+        results[i] = false;
+        if (nullmap != nullptr && nullmap[i]) {
+            continue;
+        }
+        if (!find(*((CppType*)data + i))) {
+            continue;
+        }
+        results[i] = true;
+    }
+}
+
+template <PrimitiveType type>
+void BitmapFilterFunc<type>::light_copy(BitmapFilterFuncBase* 
bitmapfilter_func) {
+    BitmapFilterFuncBase::light_copy(bitmapfilter_func);
+    auto other_func = reinterpret_cast<BitmapFilterFunc*>(bitmapfilter_func);
+    _empty = other_func->_empty;
+    _bitmap_value = other_func->_bitmap_value;
+}
+
+} // namespace doris
diff --git a/be/src/exprs/create_predicate_function.h 
b/be/src/exprs/create_predicate_function.h
index 1f249044fb..20bd665206 100644
--- a/be/src/exprs/create_predicate_function.h
+++ b/be/src/exprs/create_predicate_function.h
@@ -20,6 +20,8 @@
 #include "exprs/bloomfilter_predicate.h"
 #include "exprs/hybrid_set.h"
 #include "exprs/minmax_predicate.h"
+#include "runtime/define_primitive_type.h"
+#include "vec/exprs/vbitmap_predicate.h"
 
 namespace doris {
 
@@ -54,6 +56,15 @@ public:
     };
 };
 
+class BitmapFilterTraits {
+public:
+    using BasePtr = BitmapFilterFuncBase*;
+    template <PrimitiveType type>
+    static BasePtr get_function() {
+        return new BitmapFilterFunc<type>();
+    };
+};
+
 template <class Traits>
 class PredicateFunctionCreator {
 public:
@@ -118,6 +129,26 @@ typename Traits::BasePtr 
create_predicate_function(PrimitiveType type) {
     return nullptr;
 }
 
+template <class Traits>
+typename Traits::BasePtr create_bitmap_predicate_function(PrimitiveType type) {
+    using Creator = PredicateFunctionCreator<Traits>;
+
+    switch (type) {
+    case TYPE_TINYINT:
+        return Creator::template create<TYPE_TINYINT>();
+    case TYPE_SMALLINT:
+        return Creator::template create<TYPE_SMALLINT>();
+    case TYPE_INT:
+        return Creator::template create<TYPE_INT>();
+    case TYPE_BIGINT:
+        return Creator::template create<TYPE_BIGINT>();
+    default:
+        DCHECK(false) << "Invalid type.";
+    }
+
+    return nullptr;
+}
+
 inline auto create_minmax_filter(PrimitiveType type) {
     return create_predicate_function<MinmaxFunctionTraits>(type);
 }
@@ -134,4 +165,8 @@ inline auto create_bloom_filter(PrimitiveType type) {
     return create_predicate_function<BloomFilterTraits>(type);
 }
 
+inline auto create_bitmap_filter(PrimitiveType type) {
+    return create_bitmap_predicate_function<BitmapFilterTraits>(type);
+}
+
 } // namespace doris
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c9b3a3e2a1..e56b6ed73e 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -20,6 +20,7 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "exprs/binary_predicate.h"
+#include "exprs/bitmapfilter_predicate.h"
 #include "exprs/bloomfilter_predicate.h"
 #include "exprs/create_predicate_function.h"
 #include "exprs/expr.h"
@@ -37,6 +38,7 @@
 #include "util/runtime_profile.h"
 #include "util/string_parser.hpp"
 #include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
 #include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vdirect_in_predicate.h"
 #include "vec/exprs/vexpr.h"
@@ -453,6 +455,11 @@ public:
             _context.bloom_filter_func->set_length(params->bloom_filter_size);
             return Status::OK();
         }
+        case RuntimeFilterType::BITMAP_FILTER: {
+            
_context._bitmap_filter_func.reset(create_bitmap_filter(_column_return_type));
+            
_context._bitmap_filter_func->set_not_in(params->bitmap_filter_not_in);
+            return Status::OK();
+        }
         default:
             return Status::InvalidArgument("Unknown Filter type");
         }
@@ -515,6 +522,10 @@ public:
             }
             break;
         }
+        case RuntimeFilterType::BITMAP_FILTER: {
+            _context._bitmap_filter_func->insert(data);
+            break;
+        }
         default:
             DCHECK(false);
             break;
@@ -557,7 +568,6 @@ public:
         case TYPE_CHAR:
         case TYPE_VARCHAR:
         case TYPE_HLL:
-        case TYPE_OBJECT:
         case TYPE_STRING: {
             // StringRef->StringValue
             StringValue data = StringValue(const_cast<char*>(value.data), 
value.size);
@@ -572,7 +582,10 @@ public:
     }
 
     void insert_batch(const vectorized::ColumnPtr column, const 
std::vector<int>& rows) {
-        if (IRuntimeFilter::enable_use_batch(_state->be_exec_version(), 
_column_return_type)) {
+        if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
+            bitmap_filter_insert_batch(column, rows);
+        } else if (IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
+                                                    _column_return_type)) {
             insert_fixed_len(column->get_raw_data().data, rows.data(), 
rows.size());
         } else {
             for (int index : rows) {
@@ -581,6 +594,16 @@ public:
         }
     }
 
+    void bitmap_filter_insert_batch(const vectorized::ColumnPtr column,
+                                    const std::vector<int>& rows) {
+        std::vector<const BitmapValue*> bitmaps;
+        auto* col = assert_cast<const 
vectorized::ColumnComplexType<BitmapValue>*>(column.get());
+        for (int index : rows) {
+            bitmaps.push_back(&(col->get_data()[index]));
+        }
+        _context._bitmap_filter_func->insert_many(bitmaps);
+    }
+
     RuntimeFilterType get_real_type() {
         auto real_filter_type = _filter_type;
         if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
@@ -1252,6 +1275,8 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
         _runtime_filter_type = RuntimeFilterType::IN_FILTER;
     } else if (desc->type == TRuntimeFilterType::IN_OR_BLOOM) {
         _runtime_filter_type = RuntimeFilterType::IN_OR_BLOOM_FILTER;
+    } else if (desc->type == TRuntimeFilterType::BITMAP) {
+        _runtime_filter_type = RuntimeFilterType::BITMAP_FILTER;
     } else {
         return Status::InvalidArgument("unknown filter type");
     }
@@ -1274,6 +1299,24 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     if (desc->__isset.bloom_filter_size_bytes) {
         params.bloom_filter_size = desc->bloom_filter_size_bytes;
     }
+    if (_runtime_filter_type == RuntimeFilterType::BITMAP_FILTER) {
+        if (!build_ctx->root()->type().is_bitmap_type()) {
+            return Status::InvalidArgument("Unexpected src expr type:{} for 
bitmap filter.",
+                                           
build_ctx->root()->type().debug_string());
+        }
+        if (!desc->__isset.bitmap_target_expr) {
+            return Status::InvalidArgument("Unknown bitmap filter target 
expr.");
+        }
+        doris::vectorized::VExprContext* bitmap_target_ctx = nullptr;
+        RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(_pool, 
desc->bitmap_target_expr,
+                                                                   
&bitmap_target_ctx));
+        auto* target_expr = 
doris::vectorized::VExpr::expr_without_cast(bitmap_target_ctx->root());
+        params.column_return_type = 
const_cast<doris::vectorized::VExpr*>(target_expr)->type().type;
+
+        if (desc->__isset.bitmap_filter_not_in) {
+            params.bitmap_filter_not_in = desc->bitmap_filter_not_in;
+        }
+    }
 
     if (node_id >= 0) {
         DCHECK(is_consumer());
@@ -1798,7 +1841,8 @@ Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
     DCHECK(_pool != nullptr);
     DCHECK(vprob_expr->root()->type().type == _column_return_type ||
            (is_string_type(vprob_expr->root()->type().type) &&
-            is_string_type(_column_return_type)));
+            is_string_type(_column_return_type)) ||
+           _filter_type == RuntimeFilterType::BITMAP_FILTER);
 
     auto real_filter_type = get_real_type();
     switch (real_filter_type) {
@@ -1875,6 +1919,25 @@ Status 
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
         container->push_back(wrapper);
         break;
     }
+    case RuntimeFilterType::BITMAP_FILTER: {
+        // create a bitmap filter
+        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+        type_desc.__set_is_nullable(false);
+        TExprNode node;
+        node.__set_type(type_desc);
+        node.__set_node_type(TExprNodeType::BITMAP_PRED);
+        node.__set_opcode(TExprOpcode::RT_FILTER);
+        node.__isset.vector_opcode = true;
+        node.__set_vector_opcode(to_in_opcode(_column_return_type));
+        node.__set_is_nullable(false);
+        auto bitmap_pred = _pool->add(new 
doris::vectorized::VBitmapPredicate(node));
+        bitmap_pred->set_filter(_context._bitmap_filter_func);
+        auto cloned_vexpr = vprob_expr->root()->clone(_pool);
+        bitmap_pred->add_child(cloned_vexpr);
+        auto wrapper = _pool->add(new 
doris::vectorized::VRuntimeFilterWrapper(node, bitmap_pred));
+        container->push_back(wrapper);
+        break;
+    }
     default:
         DCHECK(false);
         break;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 89c82abfef..09afe6c640 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -55,7 +55,8 @@ enum class RuntimeFilterType {
     IN_FILTER = 0,
     MINMAX_FILTER = 1,
     BLOOM_FILTER = 2,
-    IN_OR_BLOOM_FILTER = 3
+    IN_OR_BLOOM_FILTER = 3,
+    BITMAP_FILTER = 4
 };
 
 inline std::string to_string(RuntimeFilterType type) {
@@ -72,6 +73,9 @@ inline std::string to_string(RuntimeFilterType type) {
     case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
         return std::string("in_or_bloomfilter");
     }
+    case RuntimeFilterType::BITMAP_FILTER: {
+        return std::string("bitmapfilter");
+    }
     default:
         return std::string("UNKNOWN");
     }
@@ -85,7 +89,8 @@ struct RuntimeFilterParams {
               bloom_filter_size(-1),
               max_in_num(0),
               filter_id(0),
-              fragment_instance_id(0, 0) {}
+              fragment_instance_id(0, 0),
+              bitmap_filter_not_in(false) {}
 
     RuntimeFilterType filter_type;
     PrimitiveType column_return_type;
@@ -94,6 +99,7 @@ struct RuntimeFilterParams {
     int32_t max_in_num;
     int32_t filter_id;
     UniqueId fragment_instance_id;
+    bool bitmap_filter_not_in;
 };
 
 struct UpdateRuntimeFilterParams {
diff --git a/be/src/exprs/runtime_filter_slots_cross.h 
b/be/src/exprs/runtime_filter_slots_cross.h
new file mode 100644
index 0000000000..a7f3151136
--- /dev/null
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "exprs/runtime_filter.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/columns_number.h"
+#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+// this class used in cross join node
+template <typename ExprCtxType = ExprContext>
+class RuntimeFilterSlotsCross {
+public:
+    RuntimeFilterSlotsCross(const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs,
+                            const std::vector<vectorized::VExprContext*>& 
src_expr_ctxs)
+            : _runtime_filter_descs(runtime_filter_descs), 
filter_src_expr_ctxs(src_expr_ctxs) {}
+
+    ~RuntimeFilterSlotsCross() = default;
+
+    Status init(RuntimeState* state) {
+        for (auto& filter_desc : _runtime_filter_descs) {
+            IRuntimeFilter* runtime_filter = nullptr;
+            
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
+                                                                             
&runtime_filter));
+            DCHECK(runtime_filter != nullptr);
+            // cross join has not remote filter
+            DCHECK(!runtime_filter->has_remote_target());
+            _runtime_filters.push_back(runtime_filter);
+        }
+        return Status::OK();
+    }
+
+    Status insert(vectorized::Block* block) {
+        for (int i = 0; i < _runtime_filters.size(); ++i) {
+            auto* filter = _runtime_filters[i];
+            auto* vexpr_ctx = filter_src_expr_ctxs[i];
+
+            int result_column_id = -1;
+            RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
+            DCHECK(result_column_id != -1);
+            block->get_by_position(result_column_id).column =
+                    block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+
+            auto& column = block->get_by_position(result_column_id).column;
+            if (auto* nullable =
+                        
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
+                auto& column_nested = nullable->get_nested_column_ptr();
+                auto& column_nullmap = nullable->get_null_map_column_ptr();
+                std::vector<int> indexs;
+                for (int row_index = 0; row_index < column->size(); 
++row_index) {
+                    if (assert_cast<const 
vectorized::ColumnUInt8*>(column_nullmap.get())
+                                ->get_bool(row_index)) {
+                        continue;
+                    }
+                    indexs.push_back(row_index);
+                }
+                filter->insert_batch(column_nested, indexs);
+            } else {
+                std::vector<int> rows(column->size());
+                std::iota(rows.begin(), rows.end(), 0);
+                filter->insert_batch(column, rows);
+            }
+        }
+        return Status::OK();
+    }
+
+    void publish() {
+        for (auto& filter : _runtime_filters) {
+            filter->publish();
+        }
+        for (auto& filter : _runtime_filters) {
+            // todo: cross join may not need publish_finally()
+            filter->publish_finally();
+        }
+    }
+
+    bool empty() { return !_runtime_filters.size(); }
+
+private:
+    const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+    const std::vector<vectorized::VExprContext*> filter_src_expr_ctxs;
+    std::vector<IRuntimeFilter*> _runtime_filters;
+};
+
+using VRuntimeFilterSlotsCross = 
RuntimeFilterSlotsCross<vectorized::VExprContext>;
+} // namespace doris
diff --git a/be/src/olap/bitmap_filter_predicate.h 
b/be/src/olap/bitmap_filter_predicate.h
new file mode 100644
index 0000000000..be3dc23cc9
--- /dev/null
+++ b/be/src/olap/bitmap_filter_predicate.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exprs/bitmapfilter_predicate.h"
+#include "exprs/runtime_filter.h"
+#include "olap/column_predicate.h"
+#include "vec/columns/column_dictionary.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_vector.h"
+#include "vec/columns/predicate_column.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+namespace doris {
+
+// only use in runtime filter and segment v2
+template <PrimitiveType T>
+class BitmapFilterColumnPredicate : public ColumnPredicate {
+public:
+    using CppType = typename PrimitiveTypeTraits<T>::CppType;
+    using SpecificFilter = BitmapFilterFunc<T>;
+
+    BitmapFilterColumnPredicate(uint32_t column_id,
+                                const std::shared_ptr<BitmapFilterFuncBase>& 
filter, int)
+            : ColumnPredicate(column_id),
+              _filter(filter),
+              _specific_filter(static_cast<SpecificFilter*>(_filter.get())) {}
+    ~BitmapFilterColumnPredicate() override = default;
+
+    PredicateType type() const override { return PredicateType::BITMAP_FILTER; 
}
+
+    void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const 
override;
+
+    void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size,
+                     bool* flags) const override {};
+    void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size,
+                      bool* flags) const override {};
+
+    Status evaluate(BitmapIndexIterator*, uint32_t, roaring::Roaring*) const 
override {
+        return Status::OK();
+    }
+
+    uint16_t evaluate(const vectorized::IColumn& column, uint16_t* sel,
+                      uint16_t size) const override;
+
+private:
+    template <bool is_nullable>
+    uint16_t evaluate(const vectorized::IColumn& column, const uint8_t* 
null_map, uint16_t* sel,
+                      uint16_t size) const {
+        if constexpr (is_nullable) {
+            DCHECK(null_map);
+        }
+
+        uint16_t new_size = 0;
+        new_size = _specific_filter->find_fixed_len_olap_engine(
+                (char*)reinterpret_cast<const 
vectorized::PredicateColumnType<T>*>(&column)
+                        ->get_data()
+                        .data(),
+                null_map, sel, size);
+        return new_size;
+    }
+
+    std::string _debug_string() override {
+        return "BitmapFilterColumnPredicate(" + type_to_string(T) + ")";
+    }
+
+    std::shared_ptr<BitmapFilterFuncBase> _filter;
+    SpecificFilter* _specific_filter; // owned by _filter
+};
+
+template <PrimitiveType T>
+void BitmapFilterColumnPredicate<T>::evaluate(ColumnBlock* block, uint16_t* 
sel,
+                                              uint16_t* size) const {
+    uint16_t new_size = 0;
+    if (block->is_nullable()) {
+        for (uint16_t i = 0; i < *size; ++i) {
+            uint16_t idx = sel[i];
+            sel[new_size] = idx;
+            new_size += (!block->cell(idx).is_null() &&
+                         _specific_filter->find(*block->cell(idx).cell_ptr()));
+        }
+    } else {
+        for (uint16_t i = 0; i < *size; ++i) {
+            uint16_t idx = sel[i];
+            sel[new_size] = idx;
+            new_size += _specific_filter->find(*block->cell(idx).cell_ptr());
+        }
+    }
+    *size = new_size;
+}
+
+template <PrimitiveType T>
+uint16_t BitmapFilterColumnPredicate<T>::evaluate(const vectorized::IColumn& 
column, uint16_t* sel,
+                                                  uint16_t size) const {
+    uint16_t new_size = 0;
+    if (column.is_nullable()) {
+        auto* nullable_col = reinterpret_cast<const 
vectorized::ColumnNullable*>(&column);
+        auto& null_map_data = nullable_col->get_null_map_column().get_data();
+        new_size =
+                evaluate<true>(nullable_col->get_nested_column(), 
null_map_data.data(), sel, size);
+    } else {
+        new_size = evaluate<false>(column, nullptr, sel, size);
+    }
+    return new_size;
+}
+} //namespace doris
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 350d945f7c..7e1981cc86 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -44,7 +44,8 @@ enum class PredicateType {
     NOT_IN_LIST = 8,
     IS_NULL = 9,
     IS_NOT_NULL = 10,
-    BF = 11, // BloomFilter
+    BF = 11,            // BloomFilter
+    BITMAP_FILTER = 12, // BitmapFilter
 };
 
 inline std::string type_to_string(PredicateType type) {
diff --git a/be/src/olap/predicate_creator.h b/be/src/olap/predicate_creator.h
index 2d3916529c..a4c4e73bad 100644
--- a/be/src/olap/predicate_creator.h
+++ b/be/src/olap/predicate_creator.h
@@ -18,10 +18,13 @@
 #pragma once
 
 #include <charconv>
+#include <type_traits>
 
+#include "exprs/bitmapfilter_predicate.h"
 #include "exprs/bloomfilter_predicate.h"
 #include "exprs/create_predicate_function.h"
 #include "exprs/hybrid_set.h"
+#include "olap/bitmap_filter_predicate.h"
 #include "olap/bloom_filter_predicate.h"
 #include "olap/column_predicate.h"
 #include "olap/comparison_predicate.h"
@@ -313,18 +316,32 @@ inline ColumnPredicate* 
parse_to_predicate(TabletSchemaSPtr tablet_schema,
 template <PrimitiveType PT>
 inline ColumnPredicate* create_olap_column_predicate(
         uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& 
filter, int be_exec_version,
-        const TabletColumn* column = nullptr) {
+        const TabletColumn*) {
     std::shared_ptr<BloomFilterFuncBase> filter_olap;
     filter_olap.reset(create_bloom_filter(PT));
     filter_olap->light_copy(filter.get());
     return new BloomFilterColumnPredicate<PT>(column_id, filter, 
be_exec_version);
 }
 
+template <PrimitiveType PT>
+inline ColumnPredicate* create_olap_column_predicate(
+        uint32_t column_id, const std::shared_ptr<BitmapFilterFuncBase>& 
filter,
+        int be_exec_version, const TabletColumn*) {
+    if constexpr (PT == TYPE_TINYINT || PT == TYPE_SMALLINT || PT == TYPE_INT 
||
+                  PT == TYPE_BIGINT) {
+        std::shared_ptr<BitmapFilterFuncBase> filter_olap;
+        filter_olap.reset(create_bitmap_filter(PT));
+        filter_olap->light_copy(filter.get());
+        return new BitmapFilterColumnPredicate<PT>(column_id, filter, 
be_exec_version);
+    } else {
+        return nullptr;
+    }
+}
+
 template <PrimitiveType PT>
 inline ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
                                                      const 
std::shared_ptr<HybridSetBase>& filter,
-                                                     int be_exec_version,
-                                                     const TabletColumn* 
column = nullptr) {
+                                                     int, const TabletColumn* 
column = nullptr) {
     return new InListPredicateBase<PT, PredicateType::IN_LIST>(column_id, 
filter, column->length());
 }
 
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 1b03ed7e8d..d106666eb6 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "exprs/hybrid_set.h"
+#include "olap/bitmap_filter_predicate.h"
 #include "olap/like_column_predicate.h"
 #include "olap/olap_common.h"
 #include "olap/predicate_creator.h"
@@ -451,6 +452,10 @@ void TabletReader::_init_conditions_param(const 
ReaderParams& read_params) {
         _col_predicates.emplace_back(_parse_to_predicate(filter));
     }
 
+    for (const auto& filter : read_params.bitmap_filters) {
+        _col_predicates.emplace_back(_parse_to_predicate(filter));
+    }
+
     for (const auto& filter : read_params.in_filters) {
         _col_predicates.emplace_back(_parse_to_predicate(filter));
     }
@@ -483,6 +488,17 @@ ColumnPredicate* TabletReader::_parse_to_predicate(
                                    
_reader_context.runtime_state->be_exec_version(), &column);
 }
 
+ColumnPredicate* TabletReader::_parse_to_predicate(
+        const std::pair<std::string, std::shared_ptr<BitmapFilterFuncBase>>& 
bitmap_filter) {
+    int32_t index = _tablet_schema->field_index(bitmap_filter.first);
+    if (index < 0) {
+        return nullptr;
+    }
+    const TabletColumn& column = _tablet_schema->column(index);
+    return create_column_predicate(index, bitmap_filter.second, column.type(),
+                                   
_reader_context.runtime_state->be_exec_version(), &column);
+}
+
 ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& 
function_filter) {
     int32_t index = _tablet_schema->field_index(function_filter._col_name);
     if (index < 0) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 2395888ffc..1d65cdad39 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -19,6 +19,7 @@
 
 #include <thrift/protocol/TDebugProtocol.h>
 
+#include "exprs/bitmapfilter_predicate.h"
 #include "exprs/bloomfilter_predicate.h"
 #include "exprs/function_filter.h"
 #include "exprs/hybrid_set.h"
@@ -77,6 +78,7 @@ public:
 
         std::vector<TCondition> conditions;
         std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> 
bloom_filters;
+        std::vector<std::pair<string, std::shared_ptr<BitmapFilterFuncBase>>> 
bitmap_filters;
         std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>> 
in_filters;
         std::vector<FunctionFilter> function_filters;
         std::vector<RowsetMetaSharedPtr> delete_predicates;
@@ -171,6 +173,9 @@ protected:
     ColumnPredicate* _parse_to_predicate(
             const std::pair<std::string, 
std::shared_ptr<BloomFilterFuncBase>>& bloom_filter);
 
+    ColumnPredicate* _parse_to_predicate(
+            const std::pair<std::string, 
std::shared_ptr<BitmapFilterFuncBase>>& bitmap_filter);
+
     ColumnPredicate* _parse_to_predicate(
             const std::pair<std::string, std::shared_ptr<HybridSetBase>>& 
in_filter);
 
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 5cd839b88f..d4a30b3207 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -191,6 +191,8 @@ struct TypeDescriptor {
 
     bool is_array_type() const { return type == TYPE_ARRAY; }
 
+    bool is_bitmap_type() const { return type == TYPE_OBJECT; }
+
     /// Returns the byte size of this type.  Returns 0 for variable length 
types.
     int get_byte_size() const { return ::doris::get_byte_size(type); }
 
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index c3139a0316..4155a974c9 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -134,6 +134,7 @@ set(VEC_FILES
   exprs/varray_literal.cpp
   exprs/vin_predicate.cpp
   exprs/vbloom_predicate.cpp
+  exprs/vbitmap_predicate.cpp
   exprs/vruntimefilter_wrapper.cpp
   exprs/vtuple_is_null_predicate.cpp
   exprs/vslot_ref.cpp
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 23a8e8a4e0..81869c373d 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -101,6 +101,8 @@ protected:
     RuntimeProfile::Counter* _probe_timer;
     RuntimeProfile::Counter* _build_rows_counter;
     RuntimeProfile::Counter* _probe_rows_counter;
+    RuntimeProfile::Counter* _push_down_timer;
+    RuntimeProfile::Counter* _push_compute_timer;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index d1b65fc5d9..356a9a7a5a 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -17,10 +17,13 @@
 
 #include "vec/exec/join/vnested_loop_join_node.h"
 
+#include <glog/logging.h>
+
 #include <sstream>
 
 #include "common/status.h"
 #include "exprs/expr.h"
+#include "exprs/runtime_filter_slots_cross.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
@@ -33,6 +36,36 @@
 
 namespace doris::vectorized {
 
+struct RuntimeFilterBuild {
+    RuntimeFilterBuild(VNestedLoopJoinNode* join_node) : _join_node(join_node) 
{}
+
+    Status operator()(RuntimeState* state) {
+        if (_join_node->_runtime_filter_descs.empty()) {
+            return Status::OK();
+        }
+        VRuntimeFilterSlotsCross 
runtime_filter_slots(_join_node->_runtime_filter_descs,
+                                                      
_join_node->_filter_src_expr_ctxs);
+
+        RETURN_IF_ERROR(runtime_filter_slots.init(state));
+
+        if (!runtime_filter_slots.empty() && 
!_join_node->_build_blocks.empty()) {
+            SCOPED_TIMER(_join_node->_push_compute_timer);
+            for (auto& build_block : _join_node->_build_blocks) {
+                runtime_filter_slots.insert(&build_block);
+            }
+        }
+        {
+            SCOPED_TIMER(_join_node->_push_down_timer);
+            runtime_filter_slots.publish();
+        }
+
+        return Status::OK();
+    }
+
+private:
+    VNestedLoopJoinNode* _join_node;
+};
+
 VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& 
tnode,
                                          const DescriptorTbl& descs)
         : VJoinNodeBase(pool, tnode, descs),
@@ -40,7 +73,27 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, 
const TPlanNode& tnod
           _matched_rows_done(false),
           _left_block_pos(0),
           _left_side_eos(false),
-          _old_version_flag(!tnode.__isset.nested_loop_join_node) {}
+          _old_version_flag(!tnode.__isset.nested_loop_join_node),
+          _runtime_filter_descs(tnode.runtime_filters) {}
+
+Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state));
+
+    if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) {
+        _is_output_left_side_only = 
tnode.nested_loop_join_node.is_output_left_side_only;
+    }
+
+    std::vector<TExpr> filter_src_exprs;
+    for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
+        filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
+                RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], 
state->query_options()));
+    }
+    RETURN_IF_ERROR(
+            vectorized::VExpr::create_expr_trees(_pool, filter_src_exprs, 
&_filter_src_expr_ctxs));
+    DCHECK(!filter_src_exprs.empty() == _is_output_left_side_only);
+    return Status::OK();
+}
 
 Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
@@ -51,6 +104,8 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
     _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", 
TUnit::UNIT);
     _probe_rows_counter = ADD_COUNTER(runtime_profile(), "ProbeRows", 
TUnit::UNIT);
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
+    _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
 
     // pre-compute the tuple index of build tuples in the output row
     int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
@@ -64,6 +119,8 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
     _num_probe_side_columns = child(0)->row_desc().num_materialized_slots();
     _num_build_side_columns = child(1)->row_desc().num_materialized_slots();
     RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, 
*_intermediate_row_desc));
+    RETURN_IF_ERROR(VExpr::prepare(_filter_src_expr_ctxs, state, 
child(1)->row_desc()));
+
     _construct_mutable_join_block();
     return Status::OK();
 }
@@ -74,7 +131,9 @@ Status VNestedLoopJoinNode::close(RuntimeState* state) {
         return Status::OK();
     }
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VNestedLoopJoinNode::close");
+    VExpr::close(_filter_src_expr_ctxs, state);
     _release_mem();
+
     return VJoinNodeBase::close(state);
 }
 
@@ -108,6 +167,24 @@ Status 
VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
     }
 
     COUNTER_UPDATE(_build_rows_counter, _build_rows);
+
+    RuntimeFilterBuild processRuntimeFilterBuild {this};
+    processRuntimeFilterBuild(state);
+
+    return Status::OK();
+}
+
+Status VNestedLoopJoinNode::get_left_side(RuntimeState* state, Block* block) {
+    do {
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                child(0)->get_next_after_projects(state, block, 
&_left_side_eos),
+                child(0)->get_next_span(), _left_side_eos);
+
+    } while (block->rows() == 0 && !_left_side_eos);
+    COUNTER_UPDATE(_probe_rows_counter, block->rows());
+    if (block->rows() == 0) {
+        _matched_rows_done = _left_side_eos;
+    }
     return Status::OK();
 }
 
@@ -118,6 +195,14 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+
+    if (_is_output_left_side_only) {
+        RETURN_IF_ERROR(get_left_side(state, block));
+        *eos = _left_side_eos;
+        reached_limit(block, eos);
+        return Status::OK();
+    }
+
     if ((_match_all_build && _matched_rows_done &&
          _output_null_idx_build_side == _build_blocks.size()) ||
         _matched_rows_done) {
@@ -141,17 +226,8 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
                         if (_left_side_eos) {
                             _matched_rows_done = true;
                         } else {
-                            do {
-                                release_block_memory(_left_block);
-                                RETURN_IF_ERROR_AND_CHECK_SPAN(
-                                        
child(0)->get_next_after_projects(state, &_left_block,
-                                                                          
&_left_side_eos),
-                                        child(0)->get_next_span(), 
_left_side_eos);
-                            } while (_left_block.rows() == 0 && 
!_left_side_eos);
-                            COUNTER_UPDATE(_probe_rows_counter, 
_left_block.rows());
-                            if (_left_block.rows() == 0) {
-                                _matched_rows_done = _left_side_eos;
-                            }
+                            release_block_memory(_left_block);
+                            RETURN_IF_ERROR(get_left_side(state, 
&_left_block));
                         }
                     }
 
@@ -499,6 +575,7 @@ Status 
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(
 Status VNestedLoopJoinNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VNestedLoopJoinNode::open")
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(VExpr::open(_filter_src_expr_ctxs, state));
     RETURN_IF_ERROR(VJoinNodeBase::open(state));
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h 
b/be/src/vec/exec/join/vnested_loop_join_node.h
index 6d704548ea..7f47b7959e 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -22,6 +22,7 @@
 #include <stack>
 #include <string>
 
+#include "exprs/runtime_filter.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/descriptors.h"
 #include "vec/core/block.h"
@@ -34,6 +35,8 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
 public:
     VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
 
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
+
     Status prepare(RuntimeState* state) override;
 
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
@@ -78,6 +81,8 @@ private:
 
     void _release_mem();
 
+    Status get_left_side(RuntimeState* state, Block* block);
+
     // List of build blocks, constructed in prepare()
     Blocks _build_blocks;
     // Visited flags for each row in build side.
@@ -104,6 +109,14 @@ private:
     bool _left_side_eos; // if true, left child has no more rows to process
 
     bool _old_version_flag;
+
+    MutableColumns _dst_columns;
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    std::vector<vectorized::VExprContext*> _filter_src_expr_ctxs;
+    bool _is_output_left_side_only = false;
+
+    friend struct RuntimeFilterBuild;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 407021b891..46cb029d38 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -370,8 +370,8 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
             // so that scanner can be automatically deconstructed if prepare 
failed.
             _scanner_pool.add(scanner);
             RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, 
_vconjunct_ctx_ptr.get(),
-                                             _olap_filters, 
_bloom_filters_push_down,
-                                             _in_filters_push_down, 
_push_down_functions));
+                                             _olap_filters, _filter_predicates,
+                                             _push_down_functions));
             scanners->push_back((VScanner*)scanner);
             disk_set.insert(scanner->scan_disk());
         }
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 8a56792ac0..f55e715659 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -44,6 +44,8 @@ protected:
 
     PushDownType _should_push_down_bloom_filter() override { return 
PushDownType::ACCEPTABLE; }
 
+    PushDownType _should_push_down_bitmap_filter() override { return 
PushDownType::ACCEPTABLE; }
+
     PushDownType _should_push_down_is_null_predicate() override { return 
PushDownType::ACCEPTABLE; }
 
     Status _init_scanners(std::list<VScanner*>* scanners) override;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index d587f7b6b0..3ddccdf460 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -34,12 +34,12 @@ NewOlapScanner::NewOlapScanner(RuntimeState* state, 
NewOlapScanNode* parent, int
     _tablet_schema = std::make_shared<TabletSchema>();
 }
 
-Status NewOlapScanner::prepare(
-        const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& 
key_ranges,
-        VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& 
filters,
-        const std::vector<std::pair<string, 
std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
-        const std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>>& 
in_filters,
-        const std::vector<FunctionFilter>& function_filters) {
+Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
+                               const std::vector<OlapScanRange*>& key_ranges,
+                               VExprContext** vconjunct_ctx_ptr,
+                               const std::vector<TCondition>& filters,
+                               const FilterPredicates& filter_predicates,
+                               const std::vector<FunctionFilter>& 
function_filters) {
     if (vconjunct_ctx_ptr != nullptr) {
         // Copy vconjunct_ctx_ptr from scan node to this scanner's 
_vconjunct_ctx.
         RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
@@ -104,8 +104,8 @@ Status NewOlapScanner::prepare(
             }
 
             // Initialize tablet_reader_params
-            RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, 
bloom_filters,
-                                                       in_filters, 
function_filters));
+            RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, 
filter_predicates,
+                                                       function_filters));
         }
     }
 
@@ -130,8 +130,7 @@ Status NewOlapScanner::open(RuntimeState* state) {
 // it will be called under tablet read lock because capture rs readers need
 Status NewOlapScanner::_init_tablet_reader_params(
         const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
-        const std::vector<std::pair<string, 
std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
-        const std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>>& 
in_filters,
+        const FilterPredicates& filter_predicates,
         const std::vector<FunctionFilter>& function_filters) {
     // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
     bool single_version =
@@ -174,11 +173,14 @@ Status NewOlapScanner::_init_tablet_reader_params(
     for (auto& filter : filters) {
         _tablet_reader_params.conditions.push_back(filter);
     }
-    std::copy(bloom_filters.cbegin(), bloom_filters.cend(),
+    std::copy(filter_predicates.bloom_filters.cbegin(), 
filter_predicates.bloom_filters.cend(),
               std::inserter(_tablet_reader_params.bloom_filters,
                             _tablet_reader_params.bloom_filters.begin()));
+    std::copy(filter_predicates.bitmap_filters.cbegin(), 
filter_predicates.bitmap_filters.cend(),
+              std::inserter(_tablet_reader_params.bitmap_filters,
+                            _tablet_reader_params.bitmap_filters.begin()));
 
-    std::copy(in_filters.cbegin(), in_filters.cend(),
+    std::copy(filter_predicates.in_filters.cbegin(), 
filter_predicates.in_filters.cend(),
               std::inserter(_tablet_reader_params.in_filters,
                             _tablet_reader_params.in_filters.begin()));
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index e2ac03c4e0..e07f38ea2b 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "exec/olap_utils.h"
+#include "exprs/bitmapfilter_predicate.h"
 #include "exprs/bloomfilter_predicate.h"
 #include "exprs/function_filter.h"
 #include "exprs/hybrid_set.h"
@@ -32,6 +33,7 @@ struct OlapScanRange;
 namespace vectorized {
 
 class NewOlapScanNode;
+struct FilterPredicates;
 
 class NewOlapScanner : public VScanner {
 public:
@@ -46,9 +48,7 @@ public:
 public:
     Status prepare(const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
                    VExprContext** vconjunct_ctx_ptr, const 
std::vector<TCondition>& filters,
-                   const std::vector<std::pair<string, 
std::shared_ptr<BloomFilterFuncBase>>>&
-                           bloom_filters,
-                   const std::vector<std::pair<string, 
std::shared_ptr<HybridSetBase>>>& in_filters,
+                   const FilterPredicates& filter_predicates,
                    const std::vector<FunctionFilter>& function_filters);
 
     const std::string& scan_disk() const { return _tablet->data_dir()->path(); 
}
@@ -60,12 +60,10 @@ protected:
 private:
     void _update_realtime_counters();
 
-    Status _init_tablet_reader_params(
-            const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
-            const std::vector<std::pair<string, 
std::shared_ptr<BloomFilterFuncBase>>>&
-                    bloom_filters,
-            const std::vector<std::pair<string, 
std::shared_ptr<HybridSetBase>>>& in_filters,
-            const std::vector<FunctionFilter>& function_filters);
+    Status _init_tablet_reader_params(const std::vector<OlapScanRange*>& 
key_ranges,
+                                      const std::vector<TCondition>& filters,
+                                      const FilterPredicates& 
filter_predicates,
+                                      const std::vector<FunctionFilter>& 
function_filters);
 
     Status _init_return_columns();
 
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 33c6fab3fe..cda9cb6197 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -20,6 +20,7 @@
 #include "common/status.h"
 #include "exprs/hybrid_set.h"
 #include "runtime/runtime_filter_mgr.h"
+#include "util/runtime_profile.h"
 #include "vec/columns/column_const.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/exec/scan/vscanner.h"
@@ -435,6 +436,8 @@ VExpr* VScanNode::_normalize_predicate(VExpr* 
conjunct_expr_root) {
                                     cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
                                     &pdt));
                             if (_is_key_column(slot->col_name())) {
+                                RETURN_IF_PUSH_DOWN(_normalize_bitmap_filter(
+                                        cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, &pdt));
                                 RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
                                         cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, &pdt));
                                 if (_state->enable_function_pushdown()) {
@@ -478,7 +481,22 @@ Status VScanNode::_normalize_bloom_filter(VExpr* expr, 
VExprContext* expr_ctx, S
         DCHECK(expr->children().size() == 1);
         PushDownType temp_pdt = _should_push_down_bloom_filter();
         if (temp_pdt != PushDownType::UNACCEPTABLE) {
-            _bloom_filters_push_down.emplace_back(slot->col_name(), 
expr->get_bloom_filter_func());
+            _filter_predicates.bloom_filters.emplace_back(slot->col_name(),
+                                                          
expr->get_bloom_filter_func());
+            *pdt = temp_pdt;
+        }
+    }
+    return Status::OK();
+}
+
+Status VScanNode::_normalize_bitmap_filter(VExpr* expr, VExprContext* expr_ctx,
+                                           SlotDescriptor* slot, PushDownType* 
pdt) {
+    if (TExprNodeType::BITMAP_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 1);
+        PushDownType temp_pdt = _should_push_down_bitmap_filter();
+        if (temp_pdt != PushDownType::UNACCEPTABLE) {
+            _filter_predicates.bitmap_filters.emplace_back(slot->col_name(),
+                                                           
expr->get_bitmap_filter_func());
             *pdt = temp_pdt;
         }
     }
@@ -592,7 +610,7 @@ Status VScanNode::_normalize_in_and_eq_predicate(VExpr* 
expr, VExprContext* expr
             if (hybrid_set->size() <= _max_pushdown_conditions_per_column) {
                 iter = hybrid_set->begin();
             } else {
-                _in_filters_push_down.emplace_back(slot->col_name(), 
expr->get_set_func());
+                _filter_predicates.in_filters.emplace_back(slot->col_name(), 
expr->get_set_func());
                 *pdt = PushDownType::ACCEPTABLE;
                 return Status::OK();
             }
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index e19b4f347f..fbe800ccd6 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -31,6 +31,16 @@ namespace doris::vectorized {
 class VScanner;
 class VSlotRef;
 
+struct FilterPredicates {
+    // Save all runtime filter predicates which may be pushed down to data 
source.
+    // column name -> bloom filter function
+    std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>> 
bloom_filters;
+
+    std::vector<std::pair<std::string, std::shared_ptr<BitmapFilterFuncBase>>> 
bitmap_filters;
+
+    std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> 
in_filters;
+};
+
 class VScanNode : public ExecNode {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs)
@@ -133,6 +143,8 @@ protected:
 
     virtual PushDownType _should_push_down_bloom_filter() { return 
PushDownType::UNACCEPTABLE; }
 
+    virtual PushDownType _should_push_down_bitmap_filter() { return 
PushDownType::UNACCEPTABLE; }
+
     virtual PushDownType _should_push_down_is_null_predicate() {
         return PushDownType::UNACCEPTABLE;
     }
@@ -181,12 +193,7 @@ protected:
     // indicate this scan node has no more data to return
     bool _eos = false;
 
-    // Save all bloom filter predicates which may be pushed down to data 
source.
-    // column name -> bloom filter function
-    std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>
-            _bloom_filters_push_down;
-
-    std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> 
_in_filters_push_down;
+    FilterPredicates _filter_predicates {};
 
     // Save all function predicates which may be pushed down to data source.
     std::vector<FunctionFilter> _push_down_functions;
@@ -261,6 +268,9 @@ private:
     Status _normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, 
SlotDescriptor* slot,
                                    PushDownType* pdt);
 
+    Status _normalize_bitmap_filter(VExpr* expr, VExprContext* expr_ctx, 
SlotDescriptor* slot,
+                                    PushDownType* pdt);
+
     Status _normalize_function_filters(VExpr* expr, VExprContext* expr_ctx, 
SlotDescriptor* slot,
                                        PushDownType* pdt);
 
diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp 
b/be/src/vec/exprs/vbitmap_predicate.cpp
new file mode 100644
index 0000000000..2599ee925c
--- /dev/null
+++ b/be/src/vec/exprs/vbitmap_predicate.cpp
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vbitmap_predicate.h"
+
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+vectorized::VBitmapPredicate::VBitmapPredicate(const TExprNode& node)
+        : VExpr(node), _filter(nullptr), _expr_name("bitmap_predicate") {}
+
+doris::Status vectorized::VBitmapPredicate::prepare(doris::RuntimeState* state,
+                                                    const RowDescriptor& desc,
+                                                    vectorized::VExprContext* 
context) {
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+
+    if (_children.size() != 1) {
+        return Status::InternalError("Invalid argument for VBitmapPredicate.");
+    }
+
+    ColumnsWithTypeAndName argument_template;
+    argument_template.reserve(_children.size());
+    for (auto child : _children) {
+        auto column = child->data_type()->create_column();
+        argument_template.emplace_back(std::move(column), child->data_type(), 
child->expr_name());
+    }
+    return Status::OK();
+}
+
+doris::Status vectorized::VBitmapPredicate::open(doris::RuntimeState* state,
+                                                 vectorized::VExprContext* 
context,
+                                                 
FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    return Status::OK();
+}
+
+doris::Status vectorized::VBitmapPredicate::execute(vectorized::VExprContext* 
context,
+                                                    doris::vectorized::Block* 
block,
+                                                    int* result_column_id) {
+    doris::vectorized::ColumnNumbers arguments(_children.size());
+    for (int i = 0; i < _children.size(); ++i) {
+        int column_id = -1;
+        RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
+        arguments[i] = column_id;
+    }
+    // call function
+    size_t num_columns_without_result = block->columns();
+    auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+
+    ColumnPtr argument_column =
+            
block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+    size_t sz = argument_column->size();
+    res_data_column->resize(sz);
+    auto ptr = 
((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+
+    if (argument_column->is_nullable()) {
+        auto column_nested = reinterpret_cast<const 
ColumnNullable*>(argument_column.get())
+                                     ->get_nested_column_ptr();
+        auto column_nullmap = reinterpret_cast<const 
ColumnNullable*>(argument_column.get())
+                                      ->get_null_map_column_ptr();
+        _filter->find_batch(column_nested->get_raw_data().data,
+                            (uint8*)column_nullmap->get_raw_data().data, sz, 
ptr);
+    } else {
+        _filter->find_batch(argument_column->get_raw_data().data, nullptr, sz, 
ptr);
+    }
+
+    if (_data_type->is_nullable()) {
+        auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+        block->insert({ColumnNullable::create(std::move(res_data_column), 
std::move(null_map)),
+                       _data_type, _expr_name});
+    } else {
+        block->insert({std::move(res_data_column), _data_type, _expr_name});
+    }
+    *result_column_id = num_columns_without_result;
+    return Status::OK();
+}
+
+void vectorized::VBitmapPredicate::close(doris::RuntimeState* state,
+                                         vectorized::VExprContext* context,
+                                         FunctionContext::FunctionStateScope 
scope) {
+    VExpr::close(state, context, scope);
+}
+
+const std::string& vectorized::VBitmapPredicate::expr_name() const {
+    return _expr_name;
+}
+
+void 
vectorized::VBitmapPredicate::set_filter(std::shared_ptr<BitmapFilterFuncBase>& 
filter) {
+    _filter = filter;
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbitmap_predicate.h 
b/be/src/vec/exprs/vbitmap_predicate.h
new file mode 100644
index 0000000000..f83ed7a5cf
--- /dev/null
+++ b/be/src/vec/exprs/vbitmap_predicate.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+
+// used for bitmap runtime filter
+class VBitmapPredicate final : public VExpr {
+public:
+    VBitmapPredicate(const TExprNode& node);
+
+    ~VBitmapPredicate() override = default;
+
+    doris::Status execute(VExprContext* context, doris::vectorized::Block* 
block,
+                          int* result_column_id) override;
+
+    doris::Status prepare(doris::RuntimeState* state, const 
doris::RowDescriptor& desc,
+                          VExprContext* context) override;
+
+    doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+
+    void close(doris::RuntimeState* state, VExprContext* context,
+               FunctionContext::FunctionStateScope scope) override;
+
+    VExpr* clone(doris::ObjectPool* pool) const override {
+        return pool->add(new VBitmapPredicate(*this));
+    }
+
+    const std::string& expr_name() const override;
+
+    void set_filter(std::shared_ptr<BitmapFilterFuncBase>& filter);
+
+    std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter_func() const 
override {
+        return _filter;
+    }
+
+    std::string debug_string() const override {
+        return fmt::format(" VBitmapPredicate:{}", VExpr::debug_string());
+    };
+
+private:
+    std::shared_ptr<BitmapFilterFuncBase> _filter;
+    std::string _expr_name;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index beefec98e0..3b81e7bbc8 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "exprs/bitmapfilter_predicate.h"
 #include "exprs/bloomfilter_predicate.h"
 #include "exprs/hybrid_set.h"
 #include "gen_cpp/Exprs_types.h"
@@ -170,6 +171,13 @@ public:
 
     virtual std::shared_ptr<HybridSetBase> get_set_func() const { return 
nullptr; }
 
+    // If this expr is a BitmapPredicate, this method will return a 
BitmapFilterFunc
+    virtual std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter_func() 
const {
+        LOG(FATAL) << "Method 'get_bitmap_filter_func()' is not supported in 
expression: "
+                   << this->debug_string();
+        return nullptr;
+    }
+
 protected:
     /// Simple debug string that provides no expr subclass-specific information
     std::string debug_string(const std::string& expr_name) const {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index d7d50570db..0a308f8aea 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -32,6 +32,7 @@ class TUniqueId;
 class MinMaxFuncBase;
 class HybridSetBase;
 class BloomFilterFuncBase;
+class BitmapFilterFuncBase;
 
 namespace vectorized {
 
@@ -41,6 +42,7 @@ struct SharedRuntimeFilterContext {
     std::shared_ptr<MinMaxFuncBase> minmax_func;
     std::shared_ptr<HybridSetBase> hybrid_set;
     std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
+    std::shared_ptr<BitmapFilterFuncBase> _bitmap_filter_func;
 };
 
 struct SharedHashTableContext {
diff --git a/docs/en/docs/advanced/join-optimization/runtime-filter.md 
b/docs/en/docs/advanced/join-optimization/runtime-filter.md
index da657ea1f7..7b7ce343b7 100644
--- a/docs/en/docs/advanced/join-optimization/runtime-filter.md
+++ b/docs/en/docs/advanced/join-optimization/runtime-filter.md
@@ -89,7 +89,7 @@ For query options related to Runtime Filter, please refer to 
the following secti
 
 - The first query option is to adjust the type of Runtime Filter used. In most 
cases, you only need to adjust this option, and keep the other options as 
default.
 
-  - `runtime_filter_type`: Including Bloom Filter, MinMax Filter, IN predicate 
and IN_OR_BLOOM Filter. By default, only IN_OR_BLOOM Filter will be used. In 
some cases, the performance will be higher when both Bloom Filter, MinMax 
Filter and IN predicate are used at the same time.
+  - `runtime_filter_type`: Including Bloom Filter, MinMax Filter, IN 
predicate, IN_OR_BLOOM Filter and Bitmap_Filter. By default, only IN_OR_BLOOM 
Filter will be used. In some cases, the performance will be higher when both 
Bloom Filter, MinMax Filter and IN predicate are used at the same time.
 
 - Other query options usually only need to be further adjusted in certain 
specific scenarios to achieve the best results. Usually only after performance 
testing, optimize for resource-intensive, long enough running time and high 
enough frequency queries.
 
@@ -112,7 +112,7 @@ The query options are further explained below.
 #### 1.runtime_filter_type
 Type of Runtime Filter used.
 
-**Type**: Number (1, 2, 4, 8) or the corresponding mnemonic string (IN, 
BLOOM_FILTER, MIN_MAX, IN_OR_BLOOM_FILTER), the default is 8 (IN_OR_BLOOM 
FILTER), use multiple commas to separate, pay attention to the need to add 
quotation marks , Or add any number of types, for example:
+**Type**: Number (1, 2, 4, 8, 16) or the corresponding mnemonic string (IN, 
BLOOM_FILTER, MIN_MAX, IN_OR_BLOOM_FILTER, BITMAP_FILTER), the default is 8 
(IN_OR_BLOOM FILTER), use multiple commas to separate, pay attention to the 
need to add quotation marks , Or add any number of types, for example:
 ```
 set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
 ```
@@ -138,6 +138,10 @@ set runtime_filter_type=7;
     - Currently IN predicate already implement a merge method.
     - When IN predicate and other filters are specified at the same time, and 
the filtering value of IN predicate does not reach runtime_filter_max_in_num 
will try to remove other filters. The reason is that IN predicate is an 
accurate filtering condition. Even if there is no other filter, it can filter 
efficiently. If it is used at the same time, other filters will do useless 
work. Currently, only when the producer and consumer of the runtime filter are 
in the same fragment can there be [...]
 
+- **Bitmap Filter**:
+    - Currently, the bitmap filter is used only when the subquery in the [in 
subquery](../../sql-manual/sql-reference/Operators/in.md) operation returns a 
bitmap column.
+    - Currently, bitmap filter is only supported in vectorization engine.
+
 #### 2.runtime_filter_mode
 Used to control the transmission range of Runtime Filter between instances.
 
diff --git a/docs/en/docs/sql-manual/sql-reference/Operators/in.md 
b/docs/en/docs/sql-manual/sql-reference/Operators/in.md
new file mode 100644
index 0000000000..d619a0389f
--- /dev/null
+++ b/docs/en/docs/sql-manual/sql-reference/Operators/in.md
@@ -0,0 +1,78 @@
+---
+{
+    "title": "IN",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## IN
+### description
+#### Syntax
+
+`expr IN (value, ...)`
+
+`expr IN (subquery)`
+
+If expr is equal to any value in the IN list, return true; otherwise, return 
false.
+
+Subquery can only return one column, and the column types returned by subquery 
must be compatible with expr types.
+
+If subquery returns a bitmap data type column, expr must be an integer.
+
+#### notice
+
+- Currently, bitmap columns are only returned to in subqueries supported in 
the vectorized engine.
+
+### example
+
+```
+mysql> select id from cost where id in (1, 2);
++------+
+| id   |
++------+
+|    2 |
+|    1 |
++------+
+```
+```
+mysql> select id from tbl1 where id in (select id from tbl2);
++------+
+| id   |
++------+
+|    1 |
+|    4 |
+|    5 |
++------+
+```
+```
+mysql> select id from tbl1 where id in (select bitmap_col from tbl3);
++------+
+| id   |
++------+
+|    1 |
+|    3 |
++------+
+```
+
+### keywords
+
+    IN
diff --git a/docs/sidebars.json b/docs/sidebars.json
index b1f0884e75..86f3e92c24 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -930,6 +930,13 @@
                                 "sql-manual/sql-reference/Data-Types/JSONB"
                             ]
                         },
+                        {
+                            "type": "category",
+                            "label": "Operators",
+                            "items": [
+                                "sql-manual/sql-reference/Operators/in"
+                            ]
+                        },
                         {
                             "type": "category",
                             "label": "Utility",
diff --git a/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md 
b/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md
index 6377b3d391..49a6c5bc89 100644
--- a/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md
+++ b/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md
@@ -94,7 +94,7 @@ Runtime Filter主要用于大表join小表的优化,如果左表的数据量
 与Runtime Filter相关的查询选项信息,请参阅以下部分:
 
 - 第一个查询选项是调整使用的Runtime Filter类型,大多数情况下,您只需要调整这一个选项,其他选项保持默认即可。
-  - `runtime_filter_type`: 包括Bloom Filter、MinMax Filter、IN predicate、IN Or 
Bloom Filter,默认会使用IN Or Bloom Filter,部分情况下同时使用Bloom Filter、MinMax Filter、IN 
predicate时性能更高。
+  - `runtime_filter_type`: 包括Bloom Filter、MinMax Filter、IN predicate、IN Or 
Bloom Filter、Bitmap Filter,默认会使用IN Or Bloom Filter,部分情况下同时使用Bloom Filter、MinMax 
Filter、IN predicate时性能更高。
 - 其他查询选项通常仅在某些特定场景下,才需进一步调整以达到最优效果。通常只在性能测试后,针对资源密集型、运行耗时足够长且频率足够高的查询进行优化。
   - `runtime_filter_mode`: 用于调整Runtime 
Filter的下推策略,包括OFF、LOCAL、GLOBAL三种策略,默认设置为GLOBAL策略
   - `runtime_filter_wait_time_ms`: 左表的ScanNode等待每个Runtime Filter的时间,默认1000ms
@@ -110,7 +110,7 @@ Runtime Filter主要用于大表join小表的优化,如果左表的数据量
 
 使用的Runtime Filter类型。
 
-**类型**: 数字(1, 2, 4, 8)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX, 
`IN_OR_BLOOM_FILTER`),默认8(`IN_OR_BLOOM_FILTER`),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:
+**类型**: 数字(1, 2, 4, 8, 16)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX, 
`IN_OR_BLOOM_FILTER`, 
BITMAP_FILTER),默认8(`IN_OR_BLOOM_FILTER`),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:
 
 ```sql
 set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
@@ -136,6 +136,9 @@ set runtime_filter_type=7;
 - **IN predicate**: 根据join on clause中Key列在右表上的所有值构建IN predicate,使用构建的IN 
predicate在左表上过滤,相比Bloom Filter构建和应用的开销更低,在右表数据量较少时往往性能更高。
   - 目前IN predicate已实现合并方法。
   - 当同时指定In 
predicate和其他filter,并且in的过滤数值没达到runtime_filter_max_in_num时,会尝试把其他filter去除掉。原因是In 
predicate是精确的过滤条件,即使没有其他filter也可以高效过滤,如果同时使用则其他filter会做无用功。目前仅在Runtime 
filter的生产者和消费者处于同一个fragment时才会有去除非in filter的逻辑。
+- **Bitmap Filter**:
+  - 当前仅当[in 
subquery](../../sql-manual/sql-reference/Operators/in.md)操作中的子查询返回bitmap列时会使用bitmap
 filter.
+  - 当前只在仅在向量化引擎中支持bitmap filter.
 
 #### 2.runtime_filter_mode
 
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Operators/in.md 
b/docs/zh-CN/docs/sql-manual/sql-reference/Operators/in.md
new file mode 100644
index 0000000000..bd9f6383ee
--- /dev/null
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Operators/in.md
@@ -0,0 +1,78 @@
+---
+{
+    "title": "IN",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## IN
+### description
+#### Syntax
+
+`expr IN (value, ...)`
+
+`expr IN (subquery)`
+
+如果 expr 等于 IN 列表中的任何值则返回true,否则返回false。
+
+subquery 只能返回一列,并且子查询返回的列类型必须 expr 类型兼容。
+
+如果 subquery 返回bitmap数据类型列,expr必须是整型。
+
+#### notice
+
+- 当前仅向量化引擎中支持 in 子查询返回bitmap列。
+
+### example
+
+```
+mysql> select id from cost where id in (1, 2);
++------+
+| id   |
++------+
+|    2 |
+|    1 |
++------+
+```
+```
+mysql> select id from tbl1 where id in (select id from tbl2);
++------+
+| id   |
++------+
+|    1 |
+|    4 |
+|    5 |
++------+
+```
+```
+mysql> select id from tbl1 where id in (select bitmap_col from tbl3);
++------+
+| id   |
++------+
+|    1 |
+|    3 |
++------+
+```
+
+### keywords
+
+    IN
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
new file mode 100644
index 0000000000..d7e0463016
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.VectorizedUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.base.Preconditions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Only used to plan the in bitmap syntax into join + bitmap filter.
+ * This predicate not need to be sent to BE.
+ */
+public class BitmapFilterPredicate extends Predicate {
+
+    private static final Logger LOG = 
LogManager.getLogger(BitmapFilterPredicate.class);
+
+    private boolean notIn = false;
+
+    BitmapFilterPredicate(Expr targetExpr, Expr srcExpr, boolean notIn) {
+        super();
+        this.notIn = notIn;
+        Preconditions.checkNotNull(targetExpr);
+        children.add(targetExpr);
+        Preconditions.checkNotNull(srcExpr);
+        children.add(srcExpr);
+    }
+
+    BitmapFilterPredicate(BitmapFilterPredicate other) {
+        super(other);
+        this.notIn = other.notIn;
+    }
+
+    public boolean isNotIn() {
+        return notIn;
+    }
+
+    @Override
+    public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
+        super.analyzeImpl(analyzer);
+
+        Expr targetExpr = children.get(0);
+        if (!targetExpr.getType().isIntegerType()) {
+            throw new AnalysisException("Unsupported targetExpr type: " + 
targetExpr.getType().toSql()
+                    + ". Target expr type must be integer.");
+        }
+
+        Expr srcExpr = children.get(1);
+        if (!srcExpr.getType().isBitmapType()) {
+            throw new AnalysisException("The srcExpr type must be bitmap, not 
" + srcExpr.getType().toSql() + ".");
+        }
+
+        if (ConnectContext.get() == null || 
(ConnectContext.get().getSessionVariable().getRuntimeFilterType()
+                & TRuntimeFilterType.BITMAP.getValue()) == 0) {
+            throw new AnalysisException("In bitmap syntax requires runtime 
filter of bitmap_filter to be enabled. "
+                    + "Please `set runtime_filter_type = 'xxx, bitmap_filter'` 
first.");
+        }
+
+        if (!VectorizedUtil.isVectorized()) {
+            throw new AnalysisException("In bitmap syntax is currently only 
supported in the vectorization engine.");
+        }
+    }
+
+    @Override
+    protected String toSqlImpl() {
+        return (notIn ? "not " : "") + "BitmapFilterPredicate(" + 
children.get(0).toSql() + ", " + children.get(1)
+                .toSql() + ")";
+    }
+
+    @Override
+    protected void toThrift(TExprNode msg) {
+        // Unreachable
+    }
+
+    @Override
+    public Expr clone() {
+        return new BitmapFilterPredicate(this);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/InPredicate.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InPredicate.java
index 8c1a31942f..34b8945a4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InPredicate.java
@@ -176,7 +176,19 @@ public class InPredicate extends Predicate {
             ArrayList<Expr> subqueryExprs = 
subquery.getStatement().getResultExprs();
             Expr compareExpr = children.get(0);
             Expr subqueryExpr = subqueryExprs.get(0);
-            analyzer.getCompatibleType(compareExpr.getType(), compareExpr, 
subqueryExpr);
+            if (subqueryExpr.getType().isBitmapType()) {
+                if (!compareExpr.getType().isIntegerType()) {
+                    throw new AnalysisException(
+                            String.format("Incompatible return types '%s' and 
'%s' of exprs '%s' and '%s'.",
+                                    compareExpr.getType().toSql(), 
subqueryExpr.getType().toSql(), compareExpr.toSql(),
+                                    subqueryExpr.toSql()));
+                }
+                if (!compareExpr.getType().isBigIntType()) {
+                    children.set(0, compareExpr.castTo(Type.BIGINT));
+                }
+            } else {
+                analyzer.getCompatibleType(compareExpr.getType(), compareExpr, 
subqueryExpr);
+            }
         } else {
             analyzer.castAllToCompatibleType(children);
             vectorizedAnalyze(analyzer);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
index f141067038..bb572b8aca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
@@ -725,7 +725,6 @@ public class StmtRewriter {
 
         if (!hasEqJoinPred && !inlineView.isCorrelated()) {
             // Join with InPredicate is actually an equal join, so we choose 
HashJoin.
-            Preconditions.checkArgument(!(expr instanceof InPredicate));
             if (expr instanceof ExistsPredicate) {
                 joinOp = ((ExistsPredicate) expr).isNotExists() ? 
JoinOperator.LEFT_ANTI_JOIN
                         : JoinOperator.LEFT_SEMI_JOIN;
@@ -1113,6 +1112,12 @@ public class StmtRewriter {
         slotRef.analyze(analyzer);
         Expr subquerySubstitute = slotRef;
         if (exprWithSubquery instanceof InPredicate) {
+            if (slotRef.getType().isBitmapType()) {
+                Expr pred = new 
BitmapFilterPredicate(exprWithSubquery.getChild(0), slotRef,
+                        ((InPredicate) exprWithSubquery).isNotIn());
+                pred.analyze(analyzer);
+                return pred;
+            }
             BinaryPredicate pred = new 
BinaryPredicate(BinaryPredicate.Operator.EQ,
                     exprWithSubquery.getChild(0), slotRef);
             pred.analyze(analyzer);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 3443d5e244..a98d201744 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -18,6 +18,7 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BitmapFilterPredicate;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.JoinOperator;
@@ -54,6 +55,10 @@ import java.util.stream.Collectors;
 public class NestedLoopJoinNode extends JoinNodeBase {
     private static final Logger LOG = 
LogManager.getLogger(NestedLoopJoinNode.class);
 
+    private boolean isOutputLeftSideOnly = false;
+
+    private List<Expr> runtimeFilterExpr = Lists.newArrayList();
+
     public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, 
TableRef innerRef) {
         super(id, "NESTED LOOP JOIN", StatisticalType.NESTED_LOOP_JOIN_NODE, 
outer, inner, innerRef);
         tupleIds.addAll(outer.getTupleIds());
@@ -127,6 +132,18 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, 
Collections.emptyList());
     }
 
+    public void setOutputLeftSideOnly(boolean outputLeftSideOnly) {
+        isOutputLeftSideOnly = outputLeftSideOnly;
+    }
+
+    public List<Expr> getRuntimeFilterExpr() {
+        return runtimeFilterExpr;
+    }
+
+    public void addBitmapFilterExpr(Expr runtimeFilterExpr) {
+        this.runtimeFilterExpr.add(runtimeFilterExpr);
+    }
+
     public TableRef getInnerRef() {
         return innerRef;
     }
@@ -172,12 +189,14 @@ public class NestedLoopJoinNode extends JoinNodeBase {
                 
msg.nested_loop_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt());
             }
         }
+        
msg.nested_loop_join_node.setIsOutputLeftSideOnly(isOutputLeftSideOnly);
         msg.node_type = TPlanNodeType.CROSS_JOIN_NODE;
     }
 
     @Override
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
+        computeCrossRuntimeFilterExpr();
 
         // Only for Vec: create new tuple for join result
         if (VectorizedUtil.isVectorized()) {
@@ -185,6 +204,15 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         }
     }
 
+    private void computeCrossRuntimeFilterExpr() {
+        for (int i = conjuncts.size() - 1; i >= 0; --i) {
+            if (conjuncts.get(i) instanceof BitmapFilterPredicate) {
+                addBitmapFilterExpr(conjuncts.get(i));
+                conjuncts.remove(i);
+            }
+        }
+    }
+
     @Override
     public String getNodeExplainString(String detailPrefix, TExplainLevel 
detailLevel) {
         String distrModeStr = "";
@@ -200,6 +228,11 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         if (!conjuncts.isEmpty()) {
             output.append(detailPrefix).append("predicates: 
").append(getExplainString(conjuncts)).append("\n");
         }
+        if (!runtimeFilters.isEmpty()) {
+            output.append(detailPrefix).append("runtime filters: ");
+            output.append(getRuntimeFilterExplainString(true));
+            output.append("isOutputLeftSideOnly: 
").append(isOutputLeftSideOnly).append("\n");
+        }
         output.append(detailPrefix).append(String.format("cardinality=%s", 
cardinality)).append("\n");
         // todo unify in plan node
         if (vOutputTupleDesc != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 12de135994..4ae249f4cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -21,6 +21,7 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BitmapFilterPredicate;
 import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprId;
@@ -952,8 +953,14 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
     }
 
     public void convertToVectoriezd() {
-        if (!conjuncts.isEmpty()) {
-            vconjunct = convertConjunctsToAndCompoundPredicate(conjuncts);
+        List<Expr> conjunctsExcludeBitmapFilter = Lists.newArrayList();
+        for (Expr expr : conjuncts) {
+            if (!(expr instanceof BitmapFilterPredicate)) {
+                conjunctsExcludeBitmapFilter.add(expr);
+            }
+        }
+        if (!conjunctsExcludeBitmapFilter.isEmpty()) {
+            vconjunct = 
convertConjunctsToAndCompoundPredicate(conjunctsExcludeBitmapFilter);
             initCompoundPredicate(vconjunct);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 6f71cf5552..e68e71f6ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -19,6 +19,7 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BitmapFilterPredicate;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.Predicate;
 import org.apache.doris.analysis.SlotId;
@@ -62,7 +63,7 @@ public final class RuntimeFilter {
     // Identifier of the filter (unique within a query)
     private final RuntimeFilterId id;
     // Join node that builds the filter
-    private final HashJoinNode builderNode;
+    private final PlanNode builderNode;
     // Expr (rhs of join predicate) on which the filter is built
     private final Expr srcExpr;
     // The position of expr in the join condition
@@ -98,6 +99,8 @@ public final class RuntimeFilter {
     // The type of filter to build.
     private TRuntimeFilterType runtimeFilterType;
 
+    private boolean bitmapFilterNotIn = false;
+
     /**
      * Internal representation of a runtime filter target.
      */
@@ -129,7 +132,7 @@ public final class RuntimeFilter {
         }
     }
 
-    private RuntimeFilter(RuntimeFilterId filterId, HashJoinNode 
filterSrcNode, Expr srcExpr, int exprOrder,
+    private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, 
Expr srcExpr, int exprOrder,
                           Expr origTargetExpr, Map<TupleId, List<SlotId>> 
targetSlots,
                           TRuntimeFilterType type, 
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
         this.id = filterId;
@@ -171,6 +174,10 @@ public final class RuntimeFilter {
         return finalized;
     }
 
+    public void setBitmapFilterNotIn(boolean bitmapFilterNotIn) {
+        this.bitmapFilterNotIn = bitmapFilterNotIn;
+    }
+
     /**
      * Serializes a runtime filter to Thrift.
      */
@@ -187,6 +194,10 @@ public final class RuntimeFilter {
         }
         tFilter.setType(runtimeFilterType);
         tFilter.setBloomFilterSizeBytes(filterSizeBytes);
+        if (runtimeFilterType.equals(TRuntimeFilterType.BITMAP)) {
+            tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift());
+            tFilter.setBitmapFilterNotIn(bitmapFilterNotIn);
+        }
         return tFilter;
     }
 
@@ -226,7 +237,7 @@ public final class RuntimeFilter {
         return hasRemoteTargets;
     }
 
-    public HashJoinNode getBuilderNode() {
+    public PlanNode getBuilderNode() {
         return builderNode;
     }
 
@@ -297,6 +308,44 @@ public final class RuntimeFilter {
                 targetExpr, targetSlots, type, filterSizeLimits);
     }
 
+    public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, 
Analyzer analyzer, Expr joinPredicate,
+            int exprOrder, NestedLoopJoinNode filterSrcNode, 
TRuntimeFilterType type,
+            RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+        Preconditions.checkNotNull(idGen);
+        Preconditions.checkNotNull(joinPredicate);
+        Preconditions.checkNotNull(filterSrcNode);
+
+        if (type.equals(TRuntimeFilterType.BITMAP)) {
+            if (!(joinPredicate instanceof BitmapFilterPredicate)) {
+                return null;
+            }
+
+            Expr targetExpr = Expr.getFirstBoundChild(joinPredicate, 
filterSrcNode.getChild(0).getTupleIds());
+            Expr srcExpr = Expr.getFirstBoundChild(joinPredicate, 
filterSrcNode.getChild(1).getTupleIds());
+            if (targetExpr == null || srcExpr == null) {
+                return null;
+            }
+
+            Type srcType = srcExpr.getType();
+            if (!srcType.equals(ScalarType.BITMAP)) {
+                return null;
+            }
+
+            Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, 
targetExpr);
+            Preconditions.checkNotNull(targetSlots);
+            if (targetSlots.isEmpty()) {
+                return null;
+            }
+
+            RuntimeFilter runtimeFilter =
+                    new RuntimeFilter(idGen.getNextId(), filterSrcNode, 
srcExpr, exprOrder, targetExpr, targetSlots,
+                            type, filterSizeLimits);
+            runtimeFilter.setBitmapFilterNotIn(((BitmapFilterPredicate) 
joinPredicate).isNotIn());
+            return runtimeFilter;
+        }
+        return null;
+    }
+
     /**
      * Returns the ids of base table tuple slots on which a runtime filter 
expr can be
      * applied. Due to the existence of equivalence classes, a filter expr may 
be
@@ -527,7 +576,12 @@ public final class RuntimeFilter {
     }
 
     public void registerToPlan(Analyzer analyzer) {
-        setIsBroadcast(getBuilderNode().getDistributionMode() == 
HashJoinNode.DistributionMode.BROADCAST);
+        PlanNode node = getBuilderNode();
+        if (node instanceof HashJoinNode) {
+            setIsBroadcast(((HashJoinNode) node).getDistributionMode() == 
HashJoinNode.DistributionMode.BROADCAST);
+        } else {
+            setIsBroadcast(false);
+        }
         if (LOG.isTraceEnabled()) {
             LOG.trace("Runtime filter: " + debugString());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
index 9f7c2838a6..a0c66c8b93 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -74,6 +74,13 @@ import java.util.Set;
 public final class RuntimeFilterGenerator {
     private static final Logger LOG = 
LogManager.getLogger(RuntimeFilterGenerator.class);
 
+    private static final List<TRuntimeFilterType> 
HASH_JOIN_RUNTIME_FILTER_TYPES =
+            Lists.newArrayList(TRuntimeFilterType.IN, 
TRuntimeFilterType.BLOOM, TRuntimeFilterType.MIN_MAX,
+                    TRuntimeFilterType.IN_OR_BLOOM);
+
+    private static final List<TRuntimeFilterType> 
NESTED_LOOP_JOIN_RUNTIME_FILTER_TYPES =
+            Lists.newArrayList(TRuntimeFilterType.BITMAP);
+
     // Map of base table tuple ids to a list of runtime filters that
     // can be applied at the corresponding scan nodes.
     private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid = new 
HashMap<>();
@@ -242,7 +249,7 @@ public final class RuntimeFilterGenerator {
             List<RuntimeFilter> filters = new ArrayList<>();
             // Actually all types of Runtime Filter objects generated by the 
same joinConjunct have the same
             // properties except ID. Maybe consider avoiding repeated 
generation
-            for (TRuntimeFilterType type : TRuntimeFilterType.values()) {
+            for (TRuntimeFilterType type : HASH_JOIN_RUNTIME_FILTER_TYPES) {
                 if ((sessionVariable.getRuntimeFilterType() & type.getValue()) 
== 0) {
                     continue;
                 }
@@ -265,6 +272,37 @@ public final class RuntimeFilterGenerator {
                 finalizeRuntimeFilter(runtimeFilter);
             }
             generateFilters(root.getChild(1));
+        } else if (root instanceof NestedLoopJoinNode) {
+            NestedLoopJoinNode nestedLoopJoinNode = (NestedLoopJoinNode) root;
+            List<Expr> runtimeFilterConjuncts = 
nestedLoopJoinNode.getRuntimeFilterExpr();
+            List<RuntimeFilter> filters = new ArrayList<>();
+            for (TRuntimeFilterType type : 
NESTED_LOOP_JOIN_RUNTIME_FILTER_TYPES) {
+                if ((sessionVariable.getRuntimeFilterType() & type.getValue()) 
== 0) {
+                    continue;
+                }
+                if (type == TRuntimeFilterType.BITMAP) {
+                    for (int i = 0; i < runtimeFilterConjuncts.size(); ++i) {
+                        Expr conjunct = runtimeFilterConjuncts.get(i);
+                        RuntimeFilter filter =
+                                RuntimeFilter.create(filterIdGenerator, 
analyzer, conjunct, i, nestedLoopJoinNode, type,
+                                        bloomFilterSizeLimits);
+                        if (filter == null) {
+                            continue;
+                        }
+                        nestedLoopJoinNode.setOutputLeftSideOnly(true);
+                        registerRuntimeFilter(filter);
+                        filters.add(filter);
+                    }
+                }
+            }
+            generateFilters(root.getChild(0));
+            // Finalize every runtime filter of that join. This is to ensure 
that we don't
+            // assign a filter to a scan node from the right subtree of 
joinNode or ancestor
+            // join nodes in case we don't find a destination node in the left 
subtree.
+            for (RuntimeFilter runtimeFilter : filters) {
+                finalizeRuntimeFilter(runtimeFilter);
+            }
+            generateFilters(root.getChild(1));
         } else if (root instanceof ScanNode) {
             assignRuntimeFilters((ScanNode) root);
         } else {
@@ -420,6 +458,9 @@ public final class RuntimeFilterGenerator {
                 return null;
             }
         }
+        if (filter.getType().equals(TRuntimeFilterType.BITMAP)) {
+            return targetExpr;
+        }
         Type srcType = filter.getSrcExpr().getType();
         // Types of targetExpr and srcExpr must be exactly the same since 
runtime filters are
         // based on hashing.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
index 64a136fca1..1544fee9a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
@@ -42,7 +42,8 @@ public class RuntimeFilterTypeHelper {
     public static final long ALLOWED_MASK = (TRuntimeFilterType.IN.getValue()
             | TRuntimeFilterType.BLOOM.getValue()
             | TRuntimeFilterType.MIN_MAX.getValue()
-            | TRuntimeFilterType.IN_OR_BLOOM.getValue());
+            | TRuntimeFilterType.IN_OR_BLOOM.getValue()
+            | TRuntimeFilterType.BITMAP.getValue());
 
     private static final Map<String, Long> varValueSet = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
@@ -51,6 +52,7 @@ public class RuntimeFilterTypeHelper {
         varValueSet.put("BLOOM_FILTER", (long) 
TRuntimeFilterType.BLOOM.getValue());
         varValueSet.put("MIN_MAX", (long) 
TRuntimeFilterType.MIN_MAX.getValue());
         varValueSet.put("IN_OR_BLOOM_FILTER", (long) 
TRuntimeFilterType.IN_OR_BLOOM.getValue());
+        varValueSet.put("BITMAP_FILTER", (long) 
TRuntimeFilterType.BITMAP.getValue());
     }
 
     // convert long type variable value to string type that user can read
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
index b78df91512..f8bdc7f3ac 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
@@ -213,7 +213,7 @@ public class RuntimeFilterGeneratorTest {
         new Expectations() {
             {
                 
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
-                result = 16;
+                result = 32;
             }
         };
         RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
@@ -563,7 +563,7 @@ public class RuntimeFilterGeneratorTest {
         new Expectations() {
             {
                 
ConnectContext.get().getSessionVariable().getRuntimeFilterType();
-                result = 16;
+                result = 32;
             }
         };
         RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
index 17a8851626..f400d3f094 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
@@ -98,7 +98,7 @@ public class RuntimeFilterTypeHelperTest {
 
     @Test(expected = DdlException.class)
     public void testInvalidDecode() throws DdlException {
-        RuntimeFilterTypeHelper.decode(16L);
+        RuntimeFilterTypeHelper.decode(32L);
         Assert.fail("No exception throws");
     }
 }
diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift
index 5d20ec12ee..ddef7fa94e 100644
--- a/gensrc/thrift/Exprs.thrift
+++ b/gensrc/thrift/Exprs.thrift
@@ -54,6 +54,9 @@ enum TExprNodeType {
 
   // for josn
   JSON_LITERAL,
+
+  // only used in runtime filter
+  BITMAP_PRED,
 }
 
 //enum TAggregationOp {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 168e29dafd..2ba5c0c6b2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -612,6 +612,9 @@ struct TNestedLoopJoinNode {
   3: optional Types.TTupleId voutput_tuple_id
 
   4: optional list<Types.TTupleId> vintermediate_tuple_id_list
+
+  // for bitmap filer, don't need to join, but output left child tuple
+  5: optional bool is_output_left_side_only
 }
 
 struct TMergeJoinNode {
@@ -908,6 +911,7 @@ enum TRuntimeFilterType {
   BLOOM = 2
   MIN_MAX = 4
   IN_OR_BLOOM = 8
+  BITMAP = 16
 }
 
 // Specification of a runtime filter.
@@ -942,6 +946,12 @@ struct TRuntimeFilterDesc {
   // The size of the filter based on the ndv estimate and the min/max limit 
specified in
   // the query options. Should be greater than zero for bloom filters, zero 
otherwise.
   9: optional i64 bloom_filter_size_bytes
+
+  // for bitmap filter target expr
+  10: optional Exprs.TExpr bitmap_target_expr
+
+  // for bitmap filter
+  11: optional bool bitmap_filter_not_in
 }
 
 struct TDataGenScanNode {
diff --git a/regression-test/data/query_p0/join/test_bitmap_filter.out 
b/regression-test/data/query_p0/join/test_bitmap_filter.out
new file mode 100644
index 0000000000..5ca226dfa1
--- /dev/null
+++ b/regression-test/data/query_p0/join/test_bitmap_filter.out
@@ -0,0 +1,50 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql1 --
+1      1989
+3      1989
+5      1985
+7      -32767
+9      1991
+10     1991
+11     1989
+12     32767
+13     -32767
+14     255
+
+-- !sql2 --
+2      1986
+4      1991
+6      32767
+8      255
+9      1991
+10     1991
+11     1989
+12     32767
+13     -32767
+
+-- !sql3 --
+2      1986
+4      1991
+6      32767
+8      255
+10     1991
+12     32767
+14     255
+15     1992
+
+-- !sql4 --
+1      1989
+3      1989
+5      1985
+7      -32767
+9      1991
+11     1989
+13     -32767
+
+-- !sql5 --
+1      1989
+3      1989
+7      -32767
+11     1989
+13     -32767
+
diff --git a/regression-test/suites/query_p0/join/test_bitmap_filter.groovy 
b/regression-test/suites/query_p0/join/test_bitmap_filter.groovy
new file mode 100644
index 0000000000..5d98d46b3b
--- /dev/null
+++ b/regression-test/suites/query_p0/join/test_bitmap_filter.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_bitmap_filter", "query_p0") {
+    def tbl1 = "bigtable"
+    def tbl2 = "bitmap_table"
+    def tbl3 = "baseall"
+
+    sql "set runtime_filter_type = 16"
+    sql "set enable_vectorized_engine = true"
+    sql "use test_query_db"
+    sql "DROP TABLE IF EXISTS ${tbl2}"
+    sql """
+    CREATE TABLE ${tbl2} (
+      `k1` int(11) NULL,
+      `k2` bitmap BITMAP_UNION NULL,
+      `k3` bitmap BITMAP_UNION NULL
+    ) ENGINE=OLAP
+    AGGREGATE KEY(`k1`)
+    COMMENT 'OLAP'
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+    sql """insert into bitmap_table values (1, bitmap_from_string('1, 3, 5, 7, 
9, 11, 13, 99'),
+    bitmap_from_string('32767, 1985, 255, 789, 1991')),(2, 
bitmap_from_string('10, 11, 12, 13, 14'), bitmap_empty());"""
+
+    qt_sql1 "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) 
order by k1;"
+
+    qt_sql2 "select k1, k2 from ${tbl1} where k1 + 1 in (select k2 from 
${tbl2}) order by k1;"
+
+    qt_sql3 "select k1, k2 from ${tbl1} where k1 not in (select k2 from 
${tbl2} where k1 = 1) order by k1;"
+
+    qt_sql4 "select t1.k1, t1.k2 from ${tbl1} t1 join ${tbl3} t3 on t1.k1 = 
t3.k1 where t1.k1 in (select k2 from ${tbl2} where k1 = 1) order by t1.k1;"
+
+    qt_sql5 "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) 
and k2 not in (select k3 from ${tbl2}) order by k1;"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to