This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 41b5629e912 [bug](rf) fix only min/max rf return error when has remote 
target (#25588) (#25763)
41b5629e912 is described below

commit 41b5629e912a47f983513c93f33998995079d151
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Mon Oct 23 23:25:51 2023 +0800

    [bug](rf) fix only min/max rf return error when has remote target (#25588) 
(#25763)
---
 be/src/exprs/runtime_filter.cpp                 | 19 ++++++++++++++++---
 be/src/exprs/runtime_filter.h                   |  4 ++--
 be/src/exprs/runtime_filter_slots_cross.h       |  5 +++--
 be/src/vec/exec/join/vnested_loop_join_node.cpp | 10 +++++-----
 gensrc/proto/internal_service.proto             |  2 ++
 5 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 747785a730c..9450e1c5cda 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -171,6 +171,10 @@ RuntimeFilterType get_type(int filter_type) {
     }
     case PFilterType::MINMAX_FILTER:
         return RuntimeFilterType::MINMAX_FILTER;
+    case PFilterType::MIN_FILTER:
+        return RuntimeFilterType::MIN_FILTER;
+    case PFilterType::MAX_FILTER:
+        return RuntimeFilterType::MAX_FILTER;
     default:
         return RuntimeFilterType::UNKNOWN_FILTER;
     }
@@ -183,6 +187,10 @@ PFilterType get_type(RuntimeFilterType type) {
         return PFilterType::IN_FILTER;
     case RuntimeFilterType::BLOOM_FILTER:
         return PFilterType::BLOOM_FILTER;
+    case RuntimeFilterType::MIN_FILTER:
+        return PFilterType::MIN_FILTER;
+    case RuntimeFilterType::MAX_FILTER:
+        return PFilterType::MAX_FILTER;
     case RuntimeFilterType::MINMAX_FILTER:
         return PFilterType::MINMAX_FILTER;
     case RuntimeFilterType::IN_OR_BLOOM_FILTER:
@@ -1437,6 +1445,8 @@ Status IRuntimeFilter::create_wrapper(QueryContext* 
query_ctx,
         DCHECK(param->request->has_bloom_filter());
         return (*wrapper)->assign(&param->request->bloom_filter(), 
param->data);
     }
+    case PFilterType::MIN_FILTER:
+    case PFilterType::MAX_FILTER:
     case PFilterType::MINMAX_FILTER: {
         DCHECK(param->request->has_minmax_filter());
         return (*wrapper)->assign(&param->request->minmax_filter());
@@ -1478,6 +1488,8 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* 
state, const T* param, Obje
         DCHECK(param->request->has_bloom_filter());
         return (*wrapper)->assign(&param->request->bloom_filter(), 
param->data);
     }
+    case PFilterType::MIN_FILTER:
+    case PFilterType::MAX_FILTER:
     case PFilterType::MINMAX_FILTER: {
         DCHECK(param->request->has_minmax_filter());
         return (*wrapper)->assign(&param->request->minmax_filter());
@@ -1563,7 +1575,9 @@ Status IRuntimeFilter::serialize_impl(T* request, void** 
data, int* len) {
         DCHECK(data != nullptr);
         request->mutable_bloom_filter()->set_filter_length(*len);
         request->mutable_bloom_filter()->set_always_true(false);
-    } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+    } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER ||
+               real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
+               real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) {
         auto minmax_filter = request->mutable_minmax_filter();
         to_protobuf(minmax_filter);
     } else {
@@ -1706,8 +1720,7 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
     void* min_data = nullptr;
     void* max_data = nullptr;
     _wrapper->get_minmax_filter_desc(&min_data, &max_data);
-    DCHECK(min_data != nullptr);
-    DCHECK(max_data != nullptr);
+    DCHECK(min_data != nullptr && max_data != nullptr);
     filter->set_column_type(to_proto(_wrapper->column_type()));
 
     switch (_wrapper->column_type()) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index b92bb4aabd7..1272ac2e7b0 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -76,8 +76,8 @@ enum class RuntimeFilterType {
     BLOOM_FILTER = 2,
     IN_OR_BLOOM_FILTER = 3,
     BITMAP_FILTER = 4,
-    MIN_FILTER = 5, // only min // now only support at local
-    MAX_FILTER = 6  // only max // now only support at local
+    MIN_FILTER = 5, // only min
+    MAX_FILTER = 6  // only max
 };
 
 inline std::string to_string(RuntimeFilterType type) {
diff --git a/be/src/exprs/runtime_filter_slots_cross.h 
b/be/src/exprs/runtime_filter_slots_cross.h
index 8711ba181cf..4868b27a4ea 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -48,8 +48,9 @@ public:
             if (runtime_filter == nullptr) {
                 return Status::InternalError("runtime filter is nullptr");
             }
-            // cross join has not remote filter
-            if (runtime_filter->has_remote_target()) {
+            // cross join has not remote filter for bitmap filter(non shuffle 
join)
+            if (runtime_filter->type() == RuntimeFilterType::BITMAP_FILTER &&
+                runtime_filter->has_remote_target()) {
                 return Status::InternalError("cross join runtime filter has 
remote target");
             }
             _runtime_filters.push_back(runtime_filter);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 6265f6c609c..eee179f7837 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -79,7 +79,7 @@ struct RuntimeFilterBuild {
         if (!runtime_filter_slots.empty() && 
!_join_node->_build_blocks.empty()) {
             SCOPED_TIMER(_join_node->_push_compute_timer);
             for (auto& build_block : _join_node->_build_blocks) {
-                runtime_filter_slots.insert(&build_block);
+                RETURN_IF_ERROR(runtime_filter_slots.insert(&build_block));
             }
         }
         {
@@ -187,7 +187,7 @@ Status 
VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
                               std::placeholders::_3)));
         }
 
-        sink(state, &block, eos);
+        RETURN_IF_ERROR(sink(state, &block, eos));
 
         if (eos) {
             break;
@@ -213,7 +213,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* 
state, vectorized::Block*
 
     if (eos) {
         COUNTER_UPDATE(_build_rows_counter, _build_rows);
-        RuntimeFilterBuild(this)(state);
+        RETURN_IF_ERROR(RuntimeFilterBuild(this)(state));
 
         // optimize `in bitmap`, see 
https://github.com/apache/doris/issues/14338
         if (_is_output_left_side_only &&
@@ -268,7 +268,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
     RETURN_IF_CANCELLED(state);
     while (need_more_input_data()) {
         RETURN_IF_ERROR(_fresh_left_block(state));
-        push(state, &_left_block, _left_side_eos);
+        RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
     }
 
     return pull(state, block, eos);
@@ -637,7 +637,7 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
     RETURN_IF_CANCELLED(state);
     // We can close the right child to release its resources because its input 
has been
     // fully consumed.
-    child(1)->close(state);
+    RETURN_IF_ERROR(child(1)->close(state));
     return Status::OK();
 }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 8af26a317e3..681bf071dab 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -475,6 +475,8 @@ enum PFilterType {
     MINMAX_FILTER = 2;
     IN_FILTER = 3;
     IN_OR_BLOOM_FILTER = 4;
+    MIN_FILTER = 5;
+    MAX_FILTER = 6;
 };
 
 message PMergeFilterRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to