This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit e40eebfa79d4fa4c433cc2e1574aa243b443a03d Author: Pxl <pxl...@qq.com> AuthorDate: Thu Apr 25 17:30:22 2024 +0800 [Bug](runtime-filter) fix bloom filter size error on rf merge (#34082) fix bloom filter size error on rf merge W20240424 11:28:56.826277 3494287 ref_count_closure.h:80] RPC meet error status: [INVALID_ARGUMENT]PStatus: (172.21.0.15)[INVALID_ARGUMENT]bloom filter size not the same: already allocated bytes 65536, expected allocated bytes 32768 --- be/src/exprs/bloom_filter_func.h | 2 +- be/src/exprs/runtime_filter.cpp | 88 +++++++++++++++------------- be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +- 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 10d30212ff8..af9337f61ba 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -167,7 +167,7 @@ public: DCHECK(bloomfilter_func != nullptr); auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { - return Status::InvalidArgument( + return Status::InternalError( "bloom filter size not the same: already allocated bytes {}, expected " "allocated bytes {}", _bloom_filter_alloced, other_func->_bloom_filter_alloced); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 50105d56068..1cbf86c95c4 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -246,7 +246,7 @@ Status create_vbin_predicate(const TypeDescriptor& type, TExprOpcode::type opcod fn_name.__set_function_name("ge"); break; default: - return Status::InvalidArgument( + return Status::InternalError( strings::Substitute("Invalid opcode for max_min_runtimefilter: '$0'", opcode)); } fn.__set_name(fn_name); @@ -282,17 +282,16 @@ class RuntimePredicateWrapper { public: RuntimePredicateWrapper(ObjectPool* pool, const RuntimeFilterParams* params) : RuntimePredicateWrapper(pool, params->column_return_type, params->filter_type, - params->filter_id, params->build_bf_exactly) {}; + params->filter_id) {}; // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge RuntimePredicateWrapper(ObjectPool* pool, PrimitiveType column_type, RuntimeFilterType type, - uint32_t filter_id, bool build_bf_exactly = false) + uint32_t filter_id) : _pool(pool), _column_return_type(column_type), _filter_type(type), _context(new RuntimeFilterContext()), - _filter_id(filter_id), - _build_bf_exactly(build_bf_exactly) {} + _filter_id(filter_id) {} // init runtime filter wrapper // alloc memory to init runtime filter function @@ -333,24 +332,26 @@ public: return Status::OK(); } default: - return Status::InvalidArgument("Unknown Filter type"); + return Status::InternalError("Unknown Filter type"); } return Status::OK(); } - Status change_to_bloom_filter(bool need_init_bf = false) { - CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) - << "Can not change to bloom filter because of runtime filter type is " - << IRuntimeFilter::to_string(_filter_type); + Status change_to_bloom_filter() { + if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) { + return Status::InternalError( + "Can not change to bloom filter because of runtime filter type is {}", + IRuntimeFilter::to_string(_filter_type)); + } BloomFilterFuncBase* bf = _context->bloom_filter_func.get(); - if (need_init_bf) { - // BloomFilter may be not init - RETURN_IF_ERROR(bf->init_with_fixed_length()); + + if (bf != nullptr) { insert_to_bloom_filter(bf); - } else { - DCHECK(_context->hybrid_set == nullptr || _context->hybrid_set->size() == 0) - << "set size: " << (_context->hybrid_set ? _context->hybrid_set->size() : 0); + } else if (_context->hybrid_set != nullptr && _context->hybrid_set->size() != 0) { + return Status::InternalError("change to bloom filter need empty set ", + IRuntimeFilter::to_string(_filter_type)); } + // release in filter _context->hybrid_set.reset(); return Status::OK(); @@ -514,26 +515,27 @@ public: } if (real_filter_type == RuntimeFilterType::IN_FILTER) { - if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in + // when we meet base rf is in-filter, threre only have two case: + // case1: all input-filter's build_bf_exactly is true, inited by synced global size + // case2: all input-filter's build_bf_exactly is false, inited by default size + if (other_filter_type == RuntimeFilterType::IN_FILTER) { _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { - VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id - << ") because: in_num(" << _context->hybrid_set->size() - << ") >= max_in_num(" << _max_in_num << ")"; - RETURN_IF_ERROR(change_to_bloom_filter(true)); + // case2: use default size to init bf + RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length()); + RETURN_IF_ERROR(change_to_bloom_filter()); } } else { - VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id - << ") because: already exist a bloom filter"; - RETURN_IF_ERROR(change_to_bloom_filter(!_build_bf_exactly)); - RETURN_IF_ERROR(_context->bloom_filter_func->merge( - wrapper->_context->bloom_filter_func.get())); + // case1&case2: use input bf directly and insert hybrid set data into bf + _context->bloom_filter_func = wrapper->_context->bloom_filter_func; + RETURN_IF_ERROR(change_to_bloom_filter()); } } else { - if (other_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in + if (other_filter_type == RuntimeFilterType::IN_FILTER) { + // case2: insert data to global filter wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get()); - // bloom filter merge bloom filter } else { + // case1&case2: all input bf must has same size RETURN_IF_ERROR(_context->bloom_filter_func->merge( wrapper->_context->bloom_filter_func.get())); } @@ -716,9 +718,8 @@ public: break; } default: { - DCHECK(false) << "unknown type: " << type_to_string(type); - return Status::InvalidArgument("not support assign to in filter, type: " + - type_to_string(type)); + return Status::InternalError("not support assign to in filter, type: " + + type_to_string(type)); } } return Status::OK(); @@ -870,10 +871,9 @@ public: return _context->minmax_func->assign(&min_val, &max_val); } default: - DCHECK(false) << "unknown type"; break; } - return Status::InvalidArgument("not support!"); + return Status::InternalError("not support!"); } HybridSetBase::IteratorBase* get_in_filter_iterator() { return _context->hybrid_set->begin(); } @@ -950,7 +950,6 @@ private: SharedRuntimeFilterContext _context; uint32_t _filter_id; - bool _build_bf_exactly; }; Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool, @@ -1374,11 +1373,11 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue } if (_runtime_filter_type == RuntimeFilterType::BITMAP_FILTER) { if (!build_ctx->root()->type().is_bitmap_type()) { - return Status::InvalidArgument("Unexpected src expr type:{} for bitmap filter.", - build_ctx->root()->type().debug_string()); + return Status::InternalError("Unexpected src expr type:{} for bitmap filter.", + build_ctx->root()->type().debug_string()); } if (!desc->__isset.bitmap_target_expr) { - return Status::InvalidArgument("Unknown bitmap filter target expr."); + return Status::InternalError("Unknown bitmap filter target expr."); } vectorized::VExprContextSPtr bitmap_target_ctx; RETURN_IF_ERROR( @@ -1454,7 +1453,7 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, return (*wrapper)->assign(¶m->request->minmax_filter(), param->request->contain_null()); } default: - return Status::InvalidArgument("unknown filter type"); + return Status::InternalError("unknown filter type"); } } @@ -1503,7 +1502,7 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, return (*wrapper)->assign(¶m->request->minmax_filter(), param->request->contain_null()); } default: - return Status::InvalidArgument("unknown filter type"); + return Status::InternalError("unknown filter type"); } } @@ -1522,7 +1521,14 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { - return _wrapper->merge(wrapper); + auto status = _wrapper->merge(wrapper); + if (!status) { + LOG(WARNING) << "runtime filter merge failed: " << _name + << " ,need_local_merge: " << _need_local_merge + << " ,is_broadcast: " << _is_broadcast_join; + DCHECK(false); // rpc response is often ignored, so let it crash directly here + } + return status; } template <typename T> @@ -1557,7 +1563,7 @@ Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) { auto minmax_filter = request->mutable_minmax_filter(); to_protobuf(minmax_filter); } else { - return Status::InvalidArgument("not implemented !"); + return Status::InternalError("not implemented !"); } return Status::OK(); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 9a9af079ab3..e331a7d6751 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -136,7 +136,9 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu uint64_t hash_table_size = block ? block->rows() : 0; { SCOPED_TIMER(_runtime_filter_init_timer); - RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + if (_should_build_hash_table) { + RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + } RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); } if (_should_build_hash_table && hash_table_size > 1) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org