This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 41b5629e912 [bug](rf) fix only min/max rf return error when has remote target (#25588) (#25763) 41b5629e912 is described below commit 41b5629e912a47f983513c93f33998995079d151 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Mon Oct 23 23:25:51 2023 +0800 [bug](rf) fix only min/max rf return error when has remote target (#25588) (#25763) --- be/src/exprs/runtime_filter.cpp | 19 ++++++++++++++++--- be/src/exprs/runtime_filter.h | 4 ++-- be/src/exprs/runtime_filter_slots_cross.h | 5 +++-- be/src/vec/exec/join/vnested_loop_join_node.cpp | 10 +++++----- gensrc/proto/internal_service.proto | 2 ++ 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 747785a730c..9450e1c5cda 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -171,6 +171,10 @@ RuntimeFilterType get_type(int filter_type) { } case PFilterType::MINMAX_FILTER: return RuntimeFilterType::MINMAX_FILTER; + case PFilterType::MIN_FILTER: + return RuntimeFilterType::MIN_FILTER; + case PFilterType::MAX_FILTER: + return RuntimeFilterType::MAX_FILTER; default: return RuntimeFilterType::UNKNOWN_FILTER; } @@ -183,6 +187,10 @@ PFilterType get_type(RuntimeFilterType type) { return PFilterType::IN_FILTER; case RuntimeFilterType::BLOOM_FILTER: return PFilterType::BLOOM_FILTER; + case RuntimeFilterType::MIN_FILTER: + return PFilterType::MIN_FILTER; + case RuntimeFilterType::MAX_FILTER: + return PFilterType::MAX_FILTER; case RuntimeFilterType::MINMAX_FILTER: return PFilterType::MINMAX_FILTER; case RuntimeFilterType::IN_OR_BLOOM_FILTER: @@ -1437,6 +1445,8 @@ Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx, DCHECK(param->request->has_bloom_filter()); return (*wrapper)->assign(¶m->request->bloom_filter(), param->data); } + case PFilterType::MIN_FILTER: + case PFilterType::MAX_FILTER: case PFilterType::MINMAX_FILTER: { DCHECK(param->request->has_minmax_filter()); return (*wrapper)->assign(¶m->request->minmax_filter()); @@ -1478,6 +1488,8 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje DCHECK(param->request->has_bloom_filter()); return (*wrapper)->assign(¶m->request->bloom_filter(), param->data); } + case PFilterType::MIN_FILTER: + case PFilterType::MAX_FILTER: case PFilterType::MINMAX_FILTER: { DCHECK(param->request->has_minmax_filter()); return (*wrapper)->assign(¶m->request->minmax_filter()); @@ -1563,7 +1575,9 @@ Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) { DCHECK(data != nullptr); request->mutable_bloom_filter()->set_filter_length(*len); request->mutable_bloom_filter()->set_always_true(false); - } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) { + } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER || + real_runtime_filter_type == RuntimeFilterType::MIN_FILTER || + real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) { auto minmax_filter = request->mutable_minmax_filter(); to_protobuf(minmax_filter); } else { @@ -1706,8 +1720,7 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) { void* min_data = nullptr; void* max_data = nullptr; _wrapper->get_minmax_filter_desc(&min_data, &max_data); - DCHECK(min_data != nullptr); - DCHECK(max_data != nullptr); + DCHECK(min_data != nullptr && max_data != nullptr); filter->set_column_type(to_proto(_wrapper->column_type())); switch (_wrapper->column_type()) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b92bb4aabd7..1272ac2e7b0 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -76,8 +76,8 @@ enum class RuntimeFilterType { BLOOM_FILTER = 2, IN_OR_BLOOM_FILTER = 3, BITMAP_FILTER = 4, - MIN_FILTER = 5, // only min // now only support at local - MAX_FILTER = 6 // only max // now only support at local + MIN_FILTER = 5, // only min + MAX_FILTER = 6 // only max }; inline std::string to_string(RuntimeFilterType type) { diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h index 8711ba181cf..4868b27a4ea 100644 --- a/be/src/exprs/runtime_filter_slots_cross.h +++ b/be/src/exprs/runtime_filter_slots_cross.h @@ -48,8 +48,9 @@ public: if (runtime_filter == nullptr) { return Status::InternalError("runtime filter is nullptr"); } - // cross join has not remote filter - if (runtime_filter->has_remote_target()) { + // cross join has not remote filter for bitmap filter(non shuffle join) + if (runtime_filter->type() == RuntimeFilterType::BITMAP_FILTER && + runtime_filter->has_remote_target()) { return Status::InternalError("cross join runtime filter has remote target"); } _runtime_filters.push_back(runtime_filter); diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 6265f6c609c..eee179f7837 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -79,7 +79,7 @@ struct RuntimeFilterBuild { if (!runtime_filter_slots.empty() && !_join_node->_build_blocks.empty()) { SCOPED_TIMER(_join_node->_push_compute_timer); for (auto& build_block : _join_node->_build_blocks) { - runtime_filter_slots.insert(&build_block); + RETURN_IF_ERROR(runtime_filter_slots.insert(&build_block)); } } { @@ -187,7 +187,7 @@ Status VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) { std::placeholders::_3))); } - sink(state, &block, eos); + RETURN_IF_ERROR(sink(state, &block, eos)); if (eos) { break; @@ -213,7 +213,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block* if (eos) { COUNTER_UPDATE(_build_rows_counter, _build_rows); - RuntimeFilterBuild(this)(state); + RETURN_IF_ERROR(RuntimeFilterBuild(this)(state)); // optimize `in bitmap`, see https://github.com/apache/doris/issues/14338 if (_is_output_left_side_only && @@ -268,7 +268,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo RETURN_IF_CANCELLED(state); while (need_more_input_data()) { RETURN_IF_ERROR(_fresh_left_block(state)); - push(state, &_left_block, _left_side_eos); + RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos)); } return pull(state, block, eos); @@ -637,7 +637,7 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); // We can close the right child to release its resources because its input has been // fully consumed. - child(1)->close(state); + RETURN_IF_ERROR(child(1)->close(state)); return Status::OK(); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8af26a317e3..681bf071dab 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -475,6 +475,8 @@ enum PFilterType { MINMAX_FILTER = 2; IN_FILTER = 3; IN_OR_BLOOM_FILTER = 4; + MIN_FILTER = 5; + MAX_FILTER = 6; }; message PMergeFilterRequest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org