This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e998ce8293d6bf856e862a6b39871e7e33cbd841 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Apr 8 17:01:11 2024 +0800 [Improvement](runtime-filter) support sync join node build side's size to init bloom runtime filter (#32180) support sync join node build side's size to init bloom runtime filter --- be/src/exprs/bloom_filter_func.h | 4 +- be/src/exprs/runtime_filter.cpp | 457 ++++++++++++--------- be/src/exprs/runtime_filter.h | 25 +- be/src/exprs/runtime_filter_slots.h | 167 +++----- be/src/pipeline/exec/hashjoin_build_sink.cpp | 86 ++-- be/src/pipeline/exec/hashjoin_build_sink.h | 9 +- be/src/pipeline/exec/join_build_sink_operator.cpp | 3 +- be/src/pipeline/exec/join_build_sink_operator.h | 1 + be/src/pipeline/exec/olap_scan_operator.cpp | 1 - be/src/pipeline/pipeline_x/dependency.cpp | 10 + be/src/pipeline/pipeline_x/dependency.h | 31 +- be/src/runtime/fragment_mgr.cpp | 159 ++++--- be/src/runtime/fragment_mgr.h | 4 + be/src/runtime/runtime_filter_mgr.cpp | 95 ++++- be/src/runtime/runtime_filter_mgr.h | 32 +- be/src/service/backend_options.cpp | 1 + be/src/service/internal_service.cpp | 33 +- be/src/service/internal_service.h | 10 + be/src/util/brpc_client_cache.h | 4 + be/src/vec/exec/join/vhash_join_node.cpp | 6 +- be/src/vec/exec/join/vhash_join_node.h | 12 +- be/src/vec/exec/join/vjoin_node_base.cpp | 1 + be/src/vec/exec/join/vjoin_node_base.h | 1 + .../vec/runtime/shared_hash_table_controller.cpp | 11 +- be/src/vec/runtime/shared_hash_table_controller.h | 20 +- .../org/apache/doris/planner/RuntimeFilter.java | 4 + .../java/org/apache/doris/qe/SessionVariable.java | 9 + gensrc/proto/internal_service.proto | 26 ++ gensrc/proto/types.proto | 5 + gensrc/thrift/PlanNodes.thrift | 2 + 30 files changed, 782 insertions(+), 447 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 1473d4a4288..10d30212ff8 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -108,7 +108,9 @@ public: Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); } - Status init_with_cardinality(const size_t build_bf_cardinality, int id = 0) { + bool get_build_bf_cardinality() const { return _build_bf_exactly; } + + Status init_with_cardinality(const size_t build_bf_cardinality) { if (_build_bf_exactly) { // Use the same algorithm as org.apache.doris.planner.RuntimeFilter#calculateFilterSize constexpr double fpp = 0.05; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1c5c3f7d4a2..67349008dac 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -290,6 +290,7 @@ public: : _pool(pool), _column_return_type(column_type), _filter_type(type), + _context(new RuntimeFilterContext()), _filter_id(filter_id), _build_bf_exactly(build_bf_exactly) {} @@ -299,37 +300,36 @@ public: _max_in_num = params->max_in_num; switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - _context.hybrid_set.reset(create_set(_column_return_type)); - _context.hybrid_set->set_null_aware(params->null_aware); + _context->hybrid_set.reset(create_set(_column_return_type)); + _context->hybrid_set->set_null_aware(params->null_aware); break; } // Only use in nested loop join not need set null aware case RuntimeFilterType::MIN_FILTER: case RuntimeFilterType::MAX_FILTER: { - _context.minmax_func.reset(create_minmax_filter(_column_return_type)); + _context->minmax_func.reset(create_minmax_filter(_column_return_type)); break; } case RuntimeFilterType::MINMAX_FILTER: { - _context.minmax_func.reset(create_minmax_filter(_column_return_type)); - _context.minmax_func->set_null_aware(params->null_aware); + _context->minmax_func.reset(create_minmax_filter(_column_return_type)); + _context->minmax_func->set_null_aware(params->null_aware); break; } case RuntimeFilterType::BLOOM_FILTER: { - _is_bloomfilter = true; - _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); - _context.bloom_filter_func->init_params(params); + _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context->bloom_filter_func->init_params(params); return Status::OK(); } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - _context.hybrid_set.reset(create_set(_column_return_type)); - _context.hybrid_set->set_null_aware(params->null_aware); - _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); - _context.bloom_filter_func->init_params(params); + _context->hybrid_set.reset(create_set(_column_return_type)); + _context->hybrid_set->set_null_aware(params->null_aware); + _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context->bloom_filter_func->init_params(params); return Status::OK(); } case RuntimeFilterType::BITMAP_FILTER: { - _context.bitmap_filter_func.reset(create_bitmap_filter(_column_return_type)); - _context.bitmap_filter_func->set_not_in(params->bitmap_filter_not_in); + _context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type)); + _context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in); return Status::OK(); } default: @@ -342,61 +342,66 @@ public: CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) << "Can not change to bloom filter because of runtime filter type is " << IRuntimeFilter::to_string(_filter_type); - _is_bloomfilter = true; - BloomFilterFuncBase* bf = _context.bloom_filter_func.get(); + BloomFilterFuncBase* bf = _context->bloom_filter_func.get(); if (need_init_bf) { // BloomFilter may be not init RETURN_IF_ERROR(bf->init_with_fixed_length()); insert_to_bloom_filter(bf); } // release in filter - _context.hybrid_set.reset(); + _context->hybrid_set.reset(); return Status::OK(); } Status init_bloom_filter(const size_t build_bf_cardinality) { DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER || _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); - return _context.bloom_filter_func->init_with_cardinality(build_bf_cardinality); + return _context->bloom_filter_func->init_with_cardinality(build_bf_cardinality); + } + + bool get_build_bf_cardinality() const { + DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER || + _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); + return _context->bloom_filter_func->get_build_bf_cardinality(); } void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { - if (_context.hybrid_set->size() > 0) { - auto* it = _context.hybrid_set->begin(); + if (_context->hybrid_set->size() > 0) { + auto* it = _context->hybrid_set->begin(); while (it->has_next()) { bloom_filter->insert(it->get_value()); it->next(); } } - if (_context.hybrid_set->contain_null()) { + if (_context->hybrid_set->contain_null()) { bloom_filter->set_contain_null_and_null_aware(); } } - BloomFilterFuncBase* get_bloomfilter() const { return _context.bloom_filter_func.get(); } + BloomFilterFuncBase* get_bloomfilter() const { return _context->bloom_filter_func.get(); } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) { DCHECK(!is_ignored()); switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - _context.hybrid_set->insert_fixed_len(column, start); + _context->hybrid_set->insert_fixed_len(column, start); break; } case RuntimeFilterType::MIN_FILTER: case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { - _context.minmax_func->insert_fixed_len(column, start); + _context->minmax_func->insert_fixed_len(column, start); break; } case RuntimeFilterType::BLOOM_FILTER: { - _context.bloom_filter_func->insert_fixed_len(column, start); + _context->bloom_filter_func->insert_fixed_len(column, start); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - if (_is_bloomfilter) { - _context.bloom_filter_func->insert_fixed_len(column, start); + if (is_bloomfilter()) { + _context->bloom_filter_func->insert_fixed_len(column, start); } else { - _context.hybrid_set->insert_fixed_len(column, start); + _context->hybrid_set->insert_fixed_len(column, start); } break; } @@ -434,23 +439,21 @@ public: bitmaps.push_back(&(col->get_data()[i])); } } - _context.bitmap_filter_func->insert_many(bitmaps); + _context->bitmap_filter_func->insert_many(bitmaps); } RuntimeFilterType get_real_type() const { - auto real_filter_type = _filter_type; - if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER - : RuntimeFilterType::IN_FILTER; + if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + if (_context->hybrid_set) { + return RuntimeFilterType::IN_FILTER; + } + return RuntimeFilterType::BLOOM_FILTER; } - return real_filter_type; + return _filter_type; } size_t get_bloom_filter_size() const { - if (_is_bloomfilter) { - return _context.bloom_filter_func->get_size(); - } - return 0; + return _context->bloom_filter_func ? _context->bloom_filter_func->get_size() : 0; } Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, @@ -458,6 +461,11 @@ public: const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { + if (is_ignored() || wrapper->is_ignored()) { + _context->ignored = true; + return Status::OK(); + } + bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && @@ -475,70 +483,56 @@ public: switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { // try insert set - _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); - if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { - _ignored_msg = fmt::format( - " ignore merge runtime filter(in filter id {})" - "because: in_num({}) >= max_in_num({})", - _filter_id, _context.hybrid_set->size(), _max_in_num); - _ignored = true; + _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); + if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { + _context->ignored = true; // release in filter - _context.hybrid_set.reset(); + _context->hybrid_set.reset(); } break; } case RuntimeFilterType::MIN_FILTER: case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { - RETURN_IF_ERROR(_context.minmax_func->merge(wrapper->_context.minmax_func.get())); + RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get())); break; } case RuntimeFilterType::BLOOM_FILTER: { RETURN_IF_ERROR( - _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get())); + _context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get())); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - auto real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER - : RuntimeFilterType::IN_FILTER; + auto real_filter_type = get_real_type(); auto other_filter_type = wrapper->_filter_type; if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - other_filter_type = wrapper->_is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER - : RuntimeFilterType::IN_FILTER; + other_filter_type = wrapper->get_real_type(); } if (real_filter_type == RuntimeFilterType::IN_FILTER) { if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in - CHECK(!wrapper->_ignored) - << " can not ignore merge runtime filter(in filter id " - << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " - << wrapper->ignored_msg(); - _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); - if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { + _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); + if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id - << ") because: in_num(" << _context.hybrid_set->size() + << ") because: in_num(" << _context->hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; RETURN_IF_ERROR(change_to_bloom_filter(true)); } } else { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; - RETURN_IF_ERROR(change_to_bloom_filter(!_build_bf_exactly)); - RETURN_IF_ERROR(_context.bloom_filter_func->merge( - wrapper->_context.bloom_filter_func.get())); + RETURN_IF_ERROR(change_to_bloom_filter(false)); + RETURN_IF_ERROR(_context->bloom_filter_func->merge( + wrapper->_context->bloom_filter_func.get())); } } else { if (other_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in - CHECK(!wrapper->_ignored) - << " can not ignore merge runtime filter(in filter id " - << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " - << wrapper->ignored_msg(); - wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get()); + wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get()); // bloom filter merge bloom filter } else { - RETURN_IF_ERROR(_context.bloom_filter_func->merge( - wrapper->_context.bloom_filter_func.get())); + RETURN_IF_ERROR(_context->bloom_filter_func->merge( + wrapper->_context->bloom_filter_func.get())); } } break; @@ -550,19 +544,11 @@ public: } Status assign(const PInFilter* in_filter, bool contain_null) { - if (in_filter->has_ignored_msg()) { - VLOG_DEBUG << "Ignore in filter(id=" << _filter_id - << ") because: " << in_filter->ignored_msg(); - _ignored = true; - _ignored_msg = in_filter->ignored_msg(); - return Status::OK(); - } - PrimitiveType type = to_primitive_type(in_filter->column_type()); - _context.hybrid_set.reset(create_set(type)); + _context->hybrid_set.reset(create_set(type)); if (contain_null) { - _context.hybrid_set->set_null_aware(true); - _context.hybrid_set->insert((const void*)nullptr); + _context->hybrid_set->set_null_aware(true); + _context->hybrid_set->insert((const void*)nullptr); } switch (type) { @@ -735,14 +721,13 @@ public: // assign this filter by protobuf Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data, bool contain_null) { - _is_bloomfilter = true; // we won't use this class to insert or find any data // so any type is ok - _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type == INVALID_TYPE - ? PrimitiveType::TYPE_INT - : _column_return_type)); - RETURN_IF_ERROR(_context.bloom_filter_func->assign(data, bloom_filter->filter_length(), - contain_null)); + _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type == INVALID_TYPE + ? PrimitiveType::TYPE_INT + : _column_return_type)); + RETURN_IF_ERROR(_context->bloom_filter_func->assign(data, bloom_filter->filter_length(), + contain_null)); return Status::OK(); } @@ -750,38 +735,38 @@ public: // assign this filter by protobuf Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) { PrimitiveType type = to_primitive_type(minmax_filter->column_type()); - _context.minmax_func.reset(create_minmax_filter(type)); + _context->minmax_func.reset(create_minmax_filter(type)); if (contain_null) { - _context.minmax_func->set_null_aware(true); - _context.minmax_func->set_contain_null(); + _context->minmax_func->set_null_aware(true); + _context->minmax_func->set_contain_null(); } switch (type) { case TYPE_BOOLEAN: { bool min_val = minmax_filter->min_val().boolval(); bool max_val = minmax_filter->max_val().boolval(); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_TINYINT: { int8_t min_val = static_cast<int8_t>(minmax_filter->min_val().intval()); int8_t max_val = static_cast<int8_t>(minmax_filter->max_val().intval()); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_SMALLINT: { int16_t min_val = static_cast<int16_t>(minmax_filter->min_val().intval()); int16_t max_val = static_cast<int16_t>(minmax_filter->max_val().intval()); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_INT: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_BIGINT: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_LARGEINT: { auto min_string_val = minmax_filter->min_val().stringval(); @@ -793,27 +778,27 @@ public: int128_t max_val = StringParser::string_to_int<int128_t>( max_string_val.c_str(), max_string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_FLOAT: { float min_val = static_cast<float>(minmax_filter->min_val().doubleval()); float max_val = static_cast<float>(minmax_filter->max_val().doubleval()); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DOUBLE: { double min_val = static_cast<double>(minmax_filter->min_val().doubleval()); double max_val = static_cast<double>(minmax_filter->max_val().doubleval()); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DATEV2: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DATETIMEV2: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DATETIME: case TYPE_DATE: { @@ -823,24 +808,24 @@ public: VecDateTimeValue max_val; min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length()); max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length()); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMALV2: { auto& min_val_ref = minmax_filter->min_val().stringval(); auto& max_val_ref = minmax_filter->max_val().stringval(); DecimalV2Value min_val(min_val_ref); DecimalV2Value max_val(max_val_ref); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL32: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL64: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL128I: { auto min_string_val = minmax_filter->min_val().stringval(); @@ -852,7 +837,7 @@ public: int128_t max_val = StringParser::string_to_int<int128_t>( max_string_val.c_str(), max_string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL256: { auto min_string_val = minmax_filter->min_val().stringval(); @@ -864,7 +849,7 @@ public: auto max_val = StringParser::string_to_int<wide::Int256>( max_string_val.c_str(), max_string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } case TYPE_VARCHAR: case TYPE_CHAR: @@ -875,7 +860,7 @@ public: auto max_val_ptr = _pool->add(new std::string(max_val_ref)); StringRef min_val(min_val_ptr->c_str(), min_val_ptr->length()); StringRef max_val(max_val_ptr->c_str(), max_val_ptr->length()); - return _context.minmax_func->assign(&min_val, &max_val); + return _context->minmax_func->assign(&min_val, &max_val); } default: DCHECK(false) << "unknown type"; @@ -884,65 +869,67 @@ public: return Status::InvalidArgument("not support!"); } - HybridSetBase::IteratorBase* get_in_filter_iterator() { return _context.hybrid_set->begin(); } + HybridSetBase::IteratorBase* get_in_filter_iterator() { return _context->hybrid_set->begin(); } void get_bloom_filter_desc(char** data, int* filter_length) { - _context.bloom_filter_func->get_data(data, filter_length); + _context->bloom_filter_func->get_data(data, filter_length); } void get_minmax_filter_desc(void** min_data, void** max_data) { - *min_data = _context.minmax_func->get_min(); - *max_data = _context.minmax_func->get_max(); + *min_data = _context->minmax_func->get_min(); + *max_data = _context->minmax_func->get_max(); } PrimitiveType column_type() { return _column_return_type; } - bool is_bloomfilter() const { return _is_bloomfilter; } + bool is_bloomfilter() const { return get_real_type() == RuntimeFilterType::BLOOM_FILTER; } bool contain_null() const { - if (_is_bloomfilter) { - return _context.bloom_filter_func->contain_null(); + if (is_bloomfilter()) { + return _context->bloom_filter_func->contain_null(); } - if (_context.hybrid_set) { + if (_context->hybrid_set) { DCHECK(get_real_type() == RuntimeFilterType::IN_FILTER); - return _context.hybrid_set->contain_null(); + return _context->hybrid_set->contain_null(); } - if (_context.minmax_func) { - return _context.minmax_func->contain_null(); + if (_context->minmax_func) { + return _context->minmax_func->contain_null(); } return false; } - bool is_ignored() const { return _ignored; } + bool is_ignored() const { return _context->ignored; } - const std::string& ignored_msg() const { return _ignored_msg; } + void set_ignored() { _context->ignored = true; } void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, PColumnValue&, ObjectPool*)) { for (int i = 0; i < filter->values_size(); ++i) { PColumnValue column = filter->values(i); - assign_func(_context.hybrid_set, column, _pool); + assign_func(_context->hybrid_set, column, _pool); } } - size_t get_in_filter_size() const { return _context.hybrid_set->size(); } + size_t get_in_filter_size() const { + return _context->hybrid_set ? _context->hybrid_set->size() : 0; + } std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const { - return _context.bitmap_filter_func; + return _context->bitmap_filter_func; } friend class IRuntimeFilter; void set_filter_id(int id) { - if (_context.bloom_filter_func) { - _context.bloom_filter_func->set_filter_id(id); + if (_context->bloom_filter_func) { + _context->bloom_filter_func->set_filter_id(id); } - if (_context.bitmap_filter_func) { - _context.bitmap_filter_func->set_filter_id(id); + if (_context->bitmap_filter_func) { + _context->bitmap_filter_func->set_filter_id(id); } - if (_context.hybrid_set) { - _context.hybrid_set->set_filter_id(id); + if (_context->hybrid_set) { + _context->hybrid_set->set_filter_id(id); } } @@ -954,10 +941,7 @@ private: RuntimeFilterType _filter_type; int32_t _max_in_num = -1; - vectorized::SharedRuntimeFilterContext _context; - bool _is_bloomfilter = false; - bool _ignored = false; - std::string _ignored_msg; + SharedRuntimeFilterContext _context; uint32_t _filter_id; bool _build_bf_exactly; }; @@ -968,11 +952,10 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* poo bool build_bf_exactly, bool need_local_merge) { *res = pool->add(new IRuntimeFilter(state, pool, desc, need_local_merge)); (*res)->set_role(role); - return (*res)->init_with_desc(desc, query_options, node_id, - need_local_merge ? false : build_bf_exactly); + return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); } -vectorized::SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() { +SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() { return _wrapper->_context; } @@ -983,6 +966,7 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta Status IRuntimeFilter::publish(bool publish_local) { DCHECK(is_producer()); + auto send_to_remote = [&](IRuntimeFilter* filter) { TNetworkAddress addr; DCHECK(_state != nullptr); @@ -994,7 +978,7 @@ Status IRuntimeFilter::publish(bool publish_local) { RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); DCHECK(!filters.empty()); // push down - for (auto filter : filters) { + for (auto* filter : filters) { filter->_wrapper = wrapper; filter->update_runtime_filter_type_to_profile(); filter->signal(); @@ -1036,16 +1020,77 @@ Status IRuntimeFilter::publish(bool publish_local) { return Status::OK(); } -Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remote_rf) { +Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) { DCHECK(is_producer()); + + if (_need_local_merge) { + LocalMergeFilters* local_merge_filters = nullptr; + RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters( + _filter_id, &local_merge_filters)); + std::lock_guard l(*local_merge_filters->lock); + local_merge_filters->merge_size_times--; + local_merge_filters->local_merged_size += local_filter_size; + if (local_merge_filters->merge_size_times) { + return Status::OK(); + } else { + if (_has_local_target) { + for (auto* filter : local_merge_filters->filters) { + filter->set_synced_size(local_merge_filters->local_merged_size); + } + return Status::OK(); + } else { + local_filter_size = local_merge_filters->local_merged_size; + } + } + } else if (_has_local_target) { + set_synced_size(local_filter_size); + return Status::OK(); + } + + TNetworkAddress addr; + DCHECK(_state != nullptr); + RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); std::shared_ptr<PBackendService_Stub> stub( - _state->exec_env->brpc_internal_client_cache()->get_client(*addr)); + _state->exec_env->brpc_internal_client_cache()->get_client(addr)); if (!stub) { std::string msg = - fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port); + fmt::format("Get rpc stub failed, host={}, port=", addr.hostname, addr.port); return Status::InternalError(msg); } + auto request = std::make_shared<PSendFilterSizeRequest>(); + auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared(); + auto closure = + AutoReleaseClosure<PSendFilterSizeRequest, + DummyBrpcCallback<PSendFilterSizeResponse>>::create_unique(request, + callback); + auto* pquery_id = request->mutable_query_id(); + pquery_id->set_hi(_state->query_id.hi()); + pquery_id->set_lo(_state->query_id.lo()); + + auto* source_addr = request->mutable_source_addr(); + source_addr->set_hostname(BackendOptions::get_local_backend().host); + source_addr->set_port(BackendOptions::get_local_backend().brpc_port); + + request->set_filter_size(local_filter_size); + request->set_filter_id(_filter_id); + callback->cntl_->set_timeout_ms(wait_time_ms()); + + stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), + closure.get()); + closure.release(); + return Status::OK(); +} + +Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remote_rf) { + DCHECK(is_producer()); + std::shared_ptr<PBackendService_Stub> stub( + _state->exec_env->brpc_internal_client_cache()->get_client(*addr)); + if (!stub) { + return Status::InternalError( + fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port)); + } + auto merge_filter_request = std::make_shared<PMergeFilterRequest>(); auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared(); auto merge_filter_closure = @@ -1054,11 +1099,11 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo void* data = nullptr; int len = 0; - auto pquery_id = merge_filter_request->mutable_query_id(); + auto* pquery_id = merge_filter_request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); - auto pfragment_instance_id = merge_filter_request->mutable_fragment_instance_id(); + auto* pfragment_instance_id = merge_filter_request->mutable_fragment_instance_id(); pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id); pfragment_instance_id->set_lo((int64_t)this); @@ -1069,21 +1114,23 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo merge_filter_request->set_column_type(to_proto(column_type)); merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); - Status serialize_status = serialize(merge_filter_request.get(), &data, &len); - if (serialize_status.ok()) { - VLOG_NOTICE << "Producer:" << merge_filter_request->ShortDebugString() << addr->hostname - << ":" << addr->port; - if (len > 0) { - DCHECK(data != nullptr); - merge_filter_callback->cntl_->request_attachment().append(data, len); - } + if (get_ignored()) { + merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER); + merge_filter_request->set_ignored(true); + } else { + RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len)); + } - stub->merge_filter(merge_filter_closure->cntl_.get(), merge_filter_closure->request_.get(), - merge_filter_closure->response_.get(), merge_filter_closure.get()); - // the closure will be released by brpc during closure->Run. - merge_filter_closure.release(); + if (len > 0) { + DCHECK(data != nullptr); + merge_filter_callback->cntl_->request_attachment().append(data, len); } - return serialize_status; + + stub->merge_filter(merge_filter_closure->cntl_.get(), merge_filter_closure->request_.get(), + merge_filter_closure->response_.get(), merge_filter_closure.get()); + // the closure will be released by brpc during closure->Run. + merge_filter_closure.release(); + return Status::OK(); } Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, @@ -1248,17 +1295,31 @@ void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim _filter_timer.push_back(timer); } -void IRuntimeFilter::set_ignored(const std::string& msg) { - _wrapper->_ignored = true; - _wrapper->_ignored_msg = msg; +void IRuntimeFilter::set_dependency(pipeline::CountedFinishDependency* dependency) { + _dependency = dependency; + _dependency->add(); + CHECK(_dependency); +} + +void IRuntimeFilter::set_synced_size(uint64_t global_size) { + CHECK(_dependency); + _synced_size = global_size; + _dependency->sub(); +} + +void IRuntimeFilter::set_ignored() { + _wrapper->_context->ignored = true; +} + +bool IRuntimeFilter::get_ignored() { + return _wrapper->is_ignored(); } std::string IRuntimeFilter::formatted_state() const { return fmt::format( - "[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, HasRemoteTarget = {}, " + "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " "HasLocalTarget = {}]", - _is_push_down, _get_explain_state_string(), _wrapper->ignored_msg(), _has_remote_target, - _has_local_target); + _is_push_down, _get_explain_state_string(), _has_remote_target, _has_local_target); } BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { @@ -1288,11 +1349,16 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue // 1. Only 1 join key // 2. Do not have remote target (e.g. do not need to merge), or broadcast join // 3. Bloom filter - params.build_bf_exactly = build_bf_exactly && (!_has_remote_target || _is_broadcast_join) && - (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || - _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); + params.build_bf_exactly = + build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || + _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); + params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv; + if (!desc->__isset.sync_filter_size || !desc->sync_filter_size) { + params.build_bf_exactly &= (!_has_remote_target || _is_broadcast_join); + } + if (desc->__isset.bloom_filter_size_bytes) { params.bloom_filter_size = desc->bloom_filter_size_bytes; } @@ -1358,6 +1424,12 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, PrimitiveType column_type = param->column_type; *wrapper = param->pool->add(new RuntimePredicateWrapper( param->pool, column_type, get_type(filter_type), param->request->filter_id())); + + if (param->request->has_ignored() && param->request->ignored()) { + (*wrapper)->set_ignored(); + return Status::OK(); + } + switch (filter_type) { case PFilterType::IN_FILTER: { DCHECK(param->request->has_in_filter()); @@ -1399,8 +1471,14 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, if (param->request->has_column_type()) { column_type = to_primitive_type(param->request->column_type()); } - wrapper->reset(new RuntimePredicateWrapper(pool, column_type, get_type(filter_type), - param->request->filter_id())); + *wrapper = std::make_unique<RuntimePredicateWrapper>(pool, column_type, get_type(filter_type), + param->request->filter_id()); + + if (param->request->has_ignored() && param->request->ignored()) { + (*wrapper)->set_ignored(); + return Status::OK(); + } + switch (filter_type) { case PFilterType::IN_FILTER: { DCHECK(param->request->has_in_filter()); @@ -1437,12 +1515,7 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { - if (wrapper->is_ignored()) { - set_ignored(wrapper->ignored_msg()); - } else if (!_wrapper->is_ignored()) { - return _wrapper->merge(wrapper); - } - return Status::OK(); + return _wrapper->merge(wrapper); } template <typename T> @@ -1458,11 +1531,7 @@ void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it, template <class T> Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) { - auto real_runtime_filter_type = _runtime_filter_type; - if (real_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - real_runtime_filter_type = _wrapper->is_bloomfilter() ? RuntimeFilterType::BLOOM_FILTER - : RuntimeFilterType::IN_FILTER; - } + auto real_runtime_filter_type = _wrapper->get_real_type(); request->set_filter_type(get_type(real_runtime_filter_type)); request->set_contain_null(_wrapper->contain_null()); @@ -1490,11 +1559,6 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) { auto column_type = _wrapper->column_type(); filter->set_column_type(to_proto(column_type)); - if (_wrapper->is_ignored()) { - filter->set_ignored_msg(_wrapper->ignored_msg()); - return; - } - auto it = _wrapper->get_in_filter_iterator(); DCHECK(it != nullptr); @@ -1736,16 +1800,21 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) { } } -bool IRuntimeFilter::is_bloomfilter() { - return _wrapper->is_bloomfilter(); +RuntimeFilterType IRuntimeFilter::get_real_type() { + return _wrapper->get_real_type(); +} + +bool IRuntimeFilter::need_sync_filter_size() { + return (type() == RuntimeFilterType::IN_OR_BLOOM_FILTER || + type() == RuntimeFilterType::BLOOM_FILTER) && + _wrapper->get_build_bf_cardinality() && !_is_broadcast_join; } Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms"); - if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { - const PInFilter in_filter = param->request->in_filter(); - set_ignored(in_filter.ignored_msg()); + if (param->request->has_ignored() && param->request->ignored()) { + set_ignored(); } else { std::unique_ptr<RuntimePredicateWrapper> wrapper; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, &wrapper)); @@ -1769,7 +1838,7 @@ void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer } _wrapper = wrapper; update_runtime_filter_type_to_profile(); - this->signal(); + signal(); } Status RuntimePredicateWrapper::get_push_exprs( @@ -1800,7 +1869,7 @@ Status RuntimePredicateWrapper::get_push_exprs( node.in_predicate.__set_is_not_in(false); node.__set_opcode(TExprOpcode::FILTER_IN); node.__set_is_nullable(false); - auto in_pred = vectorized::VDirectInPredicate::create_shared(node, _context.hybrid_set); + auto in_pred = vectorized::VDirectInPredicate::create_shared(node, _context->hybrid_set); in_pred->add_child(probe_ctx->root()); auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred, null_aware); container.push_back(wrapper); @@ -1813,7 +1882,7 @@ Status RuntimePredicateWrapper::get_push_exprs( RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::GE, min_pred, &min_pred_node)); vectorized::VExprSPtr min_literal; - RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_min(), + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context->minmax_func->get_min(), min_literal)); min_pred->add_child(probe_ctx->root()); min_pred->add_child(min_literal); @@ -1828,7 +1897,7 @@ Status RuntimePredicateWrapper::get_push_exprs( RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, &max_pred_node)); vectorized::VExprSPtr max_literal; - RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_max(), + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context->minmax_func->get_max(), max_literal)); max_pred->add_child(probe_ctx->root()); max_pred->add_child(max_literal); @@ -1843,7 +1912,7 @@ Status RuntimePredicateWrapper::get_push_exprs( RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, &max_pred_node, null_aware)); vectorized::VExprSPtr max_literal; - RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_max(), + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context->minmax_func->get_max(), max_literal)); max_pred->add_child(probe_ctx->root()); max_pred->add_child(max_literal); @@ -1861,7 +1930,7 @@ Status RuntimePredicateWrapper::get_push_exprs( min_pred, &min_pred_node, null_aware)); vectorized::VExprSPtr min_literal; RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(), - _context.minmax_func->get_min(), min_literal)); + _context->minmax_func->get_min(), min_literal)); min_pred->add_child(new_probe_ctx->root()); min_pred->add_child(min_literal); container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, @@ -1878,7 +1947,7 @@ Status RuntimePredicateWrapper::get_push_exprs( node.__set_opcode(TExprOpcode::RT_FILTER); node.__set_is_nullable(false); auto bloom_pred = vectorized::VBloomPredicate::create_shared(node); - bloom_pred->set_filter(_context.bloom_filter_func); + bloom_pred->set_filter(_context->bloom_filter_func); bloom_pred->add_child(probe_ctx->root()); auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, bloom_pred); container.push_back(wrapper); @@ -1894,7 +1963,7 @@ Status RuntimePredicateWrapper::get_push_exprs( node.__set_opcode(TExprOpcode::RT_FILTER); node.__set_is_nullable(false); auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node); - bitmap_pred->set_filter(_context.bitmap_filter_func); + bitmap_pred->set_filter(_context->bitmap_filter_func); bitmap_pred->add_child(probe_ctx->root()); auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, bitmap_pred); container.push_back(wrapper); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index ff825523ae1..a65561d7cb8 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -70,6 +70,7 @@ struct SharedRuntimeFilterContext; namespace pipeline { class RuntimeFilterTimer; +struct CountedFinishDependency; } // namespace pipeline enum class RuntimeFilterType { @@ -221,7 +222,7 @@ public: const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, bool build_bf_exactly = false, bool need_local_merge = false); - vectorized::SharedRuntimeFilterContext& get_shared_context_ref(); + SharedRuntimeFilterContext& get_shared_context_ref(); // insert data to build filter void insert_batch(vectorized::ColumnPtr column, size_t start); @@ -230,6 +231,8 @@ public: // push filter to remote node or push down it to scan_node Status publish(bool publish_local = false); + Status send_filter_size(uint64_t local_filter_size); + RuntimeFilterType type() const { return _runtime_filter_type; } PrimitiveType column_type() const; @@ -294,10 +297,13 @@ public: void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t merge_time, int64_t start_apply); - void set_ignored(const std::string& msg); + void set_ignored(); + + bool get_ignored(); - // for ut - bool is_bloomfilter(); + RuntimeFilterType get_real_type(); + + bool need_sync_filter_size(); // async push runtimefilter to remote node Status push_to_remote(const TNetworkAddress* addr, bool opt_remote_rf); @@ -358,6 +364,14 @@ public: void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>); std::string formatted_state() const; + void set_synced_size(uint64_t global_size); + + void set_dependency(pipeline::CountedFinishDependency* dependency); + + int64_t get_synced_size() const { return _synced_size; } + + bool isset_synced_size() const { return _synced_size != -1; } + protected: // serialize _wrapper to protobuf void to_protobuf(PInFilter* filter); @@ -437,6 +451,9 @@ protected: bool _need_local_merge = false; std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer; + + int64_t _synced_size = -1; + pipeline::CountedFinishDependency* _dependency = nullptr; }; // avoid expose RuntimePredicateWrapper diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 1fadf81e809..2a44c6a3745 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -34,130 +34,77 @@ class VRuntimeFilterSlots { public: VRuntimeFilterSlots( const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs, - const std::vector<IRuntimeFilter*>& runtime_filters, bool need_local_merge = false) - : _build_expr_context(build_expr_ctxs), - _runtime_filters(runtime_filters), - _need_local_merge(need_local_merge) {} - - Status init(RuntimeState* state, int64_t hash_table_size) { - // runtime filter effect strategy - // 1. we will ignore IN filter when hash_table_size is too big - // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size - // is too small and IN filter has effect - std::map<int, bool> has_in_filter; - - auto ignore_local_filter = [&](int filter_id) { - auto runtime_filter_mgr = _need_local_merge ? state->global_runtime_filter_mgr() - : state->local_runtime_filter_mgr(); - - std::vector<IRuntimeFilter*> filters; - RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters)); - if (filters.empty()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, filter_id={}", - filter_id); - } - for (auto* filter : filters) { - filter->set_ignored(""); - filter->signal(); - } - return Status::OK(); - }; + const std::vector<IRuntimeFilter*>& runtime_filters) + : _build_expr_context(build_expr_ctxs), _runtime_filters(runtime_filters) { + for (auto* runtime_filter : _runtime_filters) { + _runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter); + } + } - auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) { - runtime_filter->set_ignored(msg); - RETURN_IF_ERROR(runtime_filter->publish()); + Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, bool publish_local, + pipeline::CountedFinishDependency* dependency) { + if (_runtime_filters.empty() || publish_local) { return Status::OK(); - }; - - // ordered vector: IN, IN_OR_BLOOM, others. - // so we can ignore other filter if IN Predicate exists. - auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) { - if (d1->type() == d2->type()) { - return false; - } else if (d1->type() == RuntimeFilterType::IN_FILTER) { - return true; - } else if (d2->type() == RuntimeFilterType::IN_FILTER) { - return false; - } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - return true; - } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - return false; - } else { - return d1->type() < d2->type(); + } + for (auto* runtime_filter : _runtime_filters) { + if (runtime_filter->need_sync_filter_size()) { + runtime_filter->set_dependency(dependency); } - }; - std::sort(_runtime_filters.begin(), _runtime_filters.end(), compare_desc); - - // do not create 'in filter' when hash_table size over limit - const auto max_in_num = state->runtime_filter_max_in_num(); - const bool over_max_in_num = (hash_table_size >= max_in_num); + } + // send_filter_size may call dependency->sub(), so we call set_dependency firstly for all rf to avoid dependency set_ready repeatedly for (auto* runtime_filter : _runtime_filters) { - if (runtime_filter->expr_order() < 0 || - runtime_filter->expr_order() >= _build_expr_context.size()) { - return Status::InternalError( - "runtime_filter meet invalid expr_order, expr_order={}, " - "_build_expr_context.size={}", - runtime_filter->expr_order(), _build_expr_context.size()); + if (runtime_filter->need_sync_filter_size()) { + RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size)); } + } + return Status::OK(); + } - bool is_in_filter = (runtime_filter->type() == RuntimeFilterType::IN_FILTER); + // use synced size when this rf has global merged + static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t hash_table_size) { + return runtime_filter->isset_synced_size() ? runtime_filter->get_synced_size() + : hash_table_size; + } - if (over_max_in_num && - runtime_filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter()); + Status ignore_filters(RuntimeState* state) { + // process ignore duplicate IN_FILTER + std::unordered_set<int> has_in_filter; + for (auto* filter : _runtime_filters) { + if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) { + continue; } - - if (runtime_filter->is_bloomfilter()) { - RETURN_IF_ERROR(runtime_filter->init_bloom_filter(hash_table_size)); + if (has_in_filter.contains(filter->expr_order())) { + filter->set_ignored(); + continue; } + has_in_filter.insert(filter->expr_order()); + } - // Note: - // In the case that exist *remote target* and in filter and other filter, - // we must merge other filter whatever in filter is over the max num in current node, - // because: - // case 1: (in filter >= max num) in current node, so in filter will be ignored, - // and then other filter can be used - // case 2: (in filter < max num) in current node, we don't know whether the in filter - // will be ignored in merge node, so we must transfer other filter to merge node - if (!runtime_filter->has_remote_target()) { - bool exists_in_filter = has_in_filter[runtime_filter->expr_order()]; - if (is_in_filter && over_max_in_num) { - VLOG_DEBUG << "fragment instance " << print_id(state->fragment_instance_id()) - << " ignore runtime filter(in filter id " - << runtime_filter->filter_id() << ") because: in_num(" - << hash_table_size << ") >= max_in_num(" << max_in_num << ")"; - RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id())); - continue; - } else if (!is_in_filter && exists_in_filter) { - // do not create 'bloom filter' and 'minmax filter' when 'in filter' has created - // because in filter is exactly filter, so it is enough to filter data - VLOG_DEBUG << "fragment instance " << print_id(state->fragment_instance_id()) - << " ignore runtime filter(" - << IRuntimeFilter::to_string(runtime_filter->type()) << " id " - << runtime_filter->filter_id() - << ") because: already exists in filter"; - RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id())); - continue; - } - } else if (is_in_filter && over_max_in_num) { - std::string msg = fmt::format( - "fragment instance {} ignore runtime filter(in filter id {}) because: " - "in_num({}) >= max_in_num({})", - print_id(state->fragment_instance_id()), runtime_filter->filter_id(), - hash_table_size, max_in_num); - RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg)); + // process ignore filter when it has IN_FILTER on same expr, and init bloom filter size + for (auto* filter : _runtime_filters) { + if (filter->get_real_type() == RuntimeFilterType::IN_FILTER || + !has_in_filter.contains(filter->expr_order())) { continue; } + filter->set_ignored(); + } + return Status::OK(); + } - if ((runtime_filter->type() == RuntimeFilterType::IN_FILTER) || - (runtime_filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && - !over_max_in_num)) { - has_in_filter[runtime_filter->expr_order()] = true; + Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { + // process IN_OR_BLOOM_FILTER's real type + for (auto* filter : _runtime_filters) { + if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && + get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) { + RETURN_IF_ERROR(filter->change_to_bloom_filter()); } - _runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter); - } + if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) { + RETURN_IF_ERROR( + filter->init_bloom_filter(get_real_size(filter, local_hash_table_size))); + } + } return Status::OK(); } @@ -171,6 +118,9 @@ public: int result_column_id = _build_expr_context[i]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; for (auto* filter : iter->second) { + if (filter->get_ignored()) { + continue; + } filter->insert_batch(column, 1); } } @@ -213,7 +163,6 @@ public: private: const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context; std::vector<IRuntimeFilter*> _runtime_filters; - const bool _need_local_merge = false; // prob_contition index -> [IRuntimeFilter] std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map; }; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index dab127c2c50..d4dc1956400 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -39,7 +39,11 @@ Overload(Callables&&... callables) -> Overload<Callables...>; HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : JoinBuildSinkLocalState(parent, state) {} + : JoinBuildSinkLocalState(parent, state) { + _finish_dependency = std::make_shared<CountedFinishDependency>( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); +} Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); @@ -72,8 +76,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo } if (!_should_build_hash_table) { _dependency->block(); + _finish_dependency->block(); p._shared_hashtable_controller->append_dependency(p.node_id(), - _dependency->shared_from_this()); + _dependency->shared_from_this(), + _finish_dependency->shared_from_this()); } _build_blocks_memory_usage = @@ -102,6 +108,9 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _build_expr_ctxs.size() == 1)); } + _runtime_filter_slots = + std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs, runtime_filters()); + return Status::OK(); } @@ -112,6 +121,34 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { return Status::OK(); } +Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { + auto p = _parent->cast<HashJoinBuildSinkOperatorX>(); + Defer defer {[&]() { + if (_should_build_hash_table && p._shared_hashtable_controller) { + p._shared_hashtable_controller->signal_finish(p.node_id()); + } + }}; + + if (!_runtime_filter_slots || _runtime_filters.empty()) { + return Status::OK(); + } + auto* block = _shared_state->build_block.get(); + uint64_t hash_table_size = block ? block->rows() : 0; + { + SCOPED_TIMER(_runtime_filter_init_timer); + RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + } + if (_should_build_hash_table && hash_table_size > 1) { + SCOPED_TIMER(_runtime_filter_compute_timer); + _runtime_filter_slots->insert(block); + } + + SCOPED_TIMER(_publish_runtime_filter_timer); + RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); + return Status::OK(); +} + bool HashJoinBuildSinkLocalState::build_unique() const { return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique; } @@ -444,6 +481,7 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st is_null_safe_equal || (_build_expr_ctxs.back()->root()->is_nullable() && build_stores_null)); } + return Status::OK(); } @@ -501,9 +539,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* const bool need_local_merge = local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge; - RETURN_IF_ERROR(vectorized::process_runtime_filter_build( - state, local_state._shared_state->build_block.get(), &local_state, - need_local_merge)); + RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( + state, local_state._shared_state->build_block->rows(), need_local_merge, + (CountedFinishDependency*)(local_state._finish_dependency.get()))); RETURN_IF_ERROR( local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { @@ -514,13 +552,10 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->hash_table_variants; _shared_hash_table_context->short_circuit_for_null_in_probe_side = local_state._shared_state->_has_null_in_build_side; - if (local_state._runtime_filter_slots) { - local_state._runtime_filter_slots->copy_to_shared_context( - _shared_hash_table_context); - } _shared_hash_table_context->block = local_state._shared_state->build_block; _shared_hash_table_context->build_indexes_null = local_state._shared_state->build_indexes_null; + local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); _shared_hashtable_controller->signal(node_id()); } } else if (!local_state._should_build_hash_table) { @@ -532,6 +567,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* return _shared_hash_table_context->status; } + RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context( + _shared_hash_table_context)); + local_state.profile()->add_info_string( "SharedHashTableFrom", print_id( @@ -553,36 +591,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_block = _shared_hash_table_context->block; local_state._shared_state->build_indexes_null = _shared_hash_table_context->build_indexes_null; - const bool need_local_merge = - local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge; - - if (!_shared_hash_table_context->runtime_filters.empty()) { - auto ret = std::visit( - Overload { - [&](std::monostate&) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) -> Status { - if (local_state._runtime_filters.empty()) { - return Status::OK(); - } - local_state._runtime_filter_slots = - std::make_shared<VRuntimeFilterSlots>( - _build_expr_ctxs, local_state._runtime_filters, - need_local_merge); - - RETURN_IF_ERROR(local_state._runtime_filter_slots->init( - state, arg.hash_table->size())); - RETURN_IF_ERROR( - local_state._runtime_filter_slots->copy_from_shared_context( - _shared_hash_table_context)); - RETURN_IF_ERROR(local_state._runtime_filter_slots->publish(true)); - return Status::OK(); - }}, - *local_state._shared_state->hash_table_variants); - RETURN_IF_ERROR(ret); - } } if (eos) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 2712edc838d..0998884c99b 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -70,6 +70,10 @@ public: _profile->add_info_string("HashTableFilledBuckets", info); } + Dependency* finishdependency() override { return _finish_dependency.get(); } + + Status close(RuntimeState* state, Status exec_status) override; + protected: void _hash_table_init(RuntimeState* state); void _set_build_ignore_flag(vectorized::Block& block, const std::vector<int>& res_col_ids); @@ -84,10 +88,6 @@ protected: friend class PartitionedHashJoinSinkLocalState; template <class HashTableContext, typename Parent> friend struct vectorized::ProcessHashTableBuild; - template <typename Parent> - friend Status vectorized::process_runtime_filter_build(RuntimeState* state, - vectorized::Block* block, Parent* parent, - bool is_global); // build expr vectorized::VExprContextSPtrs _build_expr_ctxs; @@ -108,6 +108,7 @@ protected: */ bool _build_side_ignore_null = false; std::vector<int> _build_col_ids; + std::shared_ptr<Dependency> _finish_dependency; RuntimeProfile::Counter* _build_table_timer = nullptr; RuntimeProfile::Counter* _build_expr_call_timer = nullptr; diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp b/be/src/pipeline/exec/join_build_sink_operator.cpp index 44c34a3c057..60b0970781d 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.cpp +++ b/be/src/pipeline/exec/join_build_sink_operator.cpp @@ -40,7 +40,8 @@ Status JoinBuildSinkLocalState<SharedStateArg, Derived>::init(RuntimeState* stat "PublishRuntimeFilterTime"); _runtime_filter_compute_timer = ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(), "RuntimeFilterComputeTime"); - + _runtime_filter_init_timer = + ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(), "RuntimeFilterInitTime"); return Status::OK(); } diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index 9dbea12e021..2a204a75a5a 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -44,6 +44,7 @@ protected: RuntimeProfile::Counter* _build_rows_counter = nullptr; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; + RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr; std::vector<IRuntimeFilter*> _runtime_filters; }; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 6ec6734243c..5fbc657c78a 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -317,7 +317,6 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts)); olap_scanner->set_compound_filters(_compound_filters); } - LOG(INFO) << "parallel scanners count: " << scanners->size(); return Status::OK(); } } diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 3b415892c93..6dccf15dbd8 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -98,6 +98,16 @@ std::string Dependency::debug_string(int indentation_level) { return fmt::to_string(debug_string_buffer); } +std::string CountedFinishDependency::debug_string(int indentation_level) { + fmt::memory_buffer debug_string_buffer; + fmt::format_to( + debug_string_buffer, + "{}{}: id={}, block task = {}, ready={}, _always_ready={}, is cancelled={}, count={}", + std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), _ready, + _always_ready, _is_cancelled(), _counter); + return fmt::to_string(debug_string_buffer); +} + std::string RuntimeFilterDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, runtime filter: {}", diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index a2c858e4c5c..ebd44bec4d2 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -199,7 +199,7 @@ public: [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { return nullptr; } }; -struct FinishDependency final : public Dependency { +struct FinishDependency : public Dependency { public: using SharedState = FakeSharedState; FinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx) @@ -208,6 +208,35 @@ public: [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override; }; +struct CountedFinishDependency final : public FinishDependency { +public: + using SharedState = FakeSharedState; + CountedFinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx) + : FinishDependency(id, node_id, name, query_ctx) {} + + void add() { + std::unique_lock<std::mutex> l(_mtx); + if (!_counter) { + block(); + } + _counter++; + } + + void sub() { + std::unique_lock<std::mutex> l(_mtx); + _counter--; + if (!_counter) { + set_ready(); + } + } + + std::string debug_string(int indentation_level = 0) override; + +private: + std::mutex _mtx; + uint32_t _counter = 0; +}; + class RuntimeFilterDependency; struct RuntimeFilterTimerQueue; class RuntimeFilterTimer { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7e15477a641..9fff093b3bf 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1348,78 +1348,112 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, bool is_pipeline = request->has_is_pipeline() && request->is_pipeline(); int64_t start_apply = MonotonicMillis(); + std::shared_ptr<PlanFragmentExecutor> fragment_executor; + std::shared_ptr<pipeline::PipelineFragmentContext> pip_context; + QueryThreadContext query_thread_context; + + RuntimeFilterMgr* runtime_filter_mgr = nullptr; + ObjectPool* pool = nullptr; + const auto& fragment_instance_ids = request->fragment_instance_ids(); - if (!fragment_instance_ids.empty()) { - UniqueId fragment_instance_id = fragment_instance_ids[0]; - TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - - std::shared_ptr<PlanFragmentExecutor> fragment_executor; - std::shared_ptr<pipeline::PipelineFragmentContext> pip_context; - QueryThreadContext query_thread_context; - - RuntimeFilterMgr* runtime_filter_mgr = nullptr; - ObjectPool* pool = nullptr; - if (is_pipeline) { - std::unique_lock<std::mutex> lock(_lock); - auto iter = _pipeline_map.find(tfragment_instance_id); - if (iter == _pipeline_map.end()) { - VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; - return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); - } - pip_context = iter->second; + { + std::unique_lock<std::mutex> lock(_lock); + for (UniqueId fragment_instance_id : fragment_instance_ids) { + TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - DCHECK(pip_context != nullptr); - runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); - pool = &pip_context->get_query_ctx()->obj_pool; - query_thread_context = {pip_context->get_query_ctx()->query_id(), - pip_context->get_query_ctx()->query_mem_tracker}; - } else { - std::unique_lock<std::mutex> lock(_lock); - auto iter = _fragment_instance_map.find(tfragment_instance_id); - if (iter == _fragment_instance_map.end()) { - VLOG_CRITICAL << "unknown.... fragment instance id:" - << print_id(tfragment_instance_id); - return Status::InvalidArgument("fragment instance id: {}", - print_id(tfragment_instance_id)); - } - fragment_executor = iter->second; + if (is_pipeline) { + auto iter = _pipeline_map.find(tfragment_instance_id); + if (iter == _pipeline_map.end()) { + continue; + } + pip_context = iter->second; - DCHECK(fragment_executor != nullptr); - runtime_filter_mgr = fragment_executor->get_query_ctx()->runtime_filter_mgr(); - pool = &fragment_executor->get_query_ctx()->obj_pool; - query_thread_context = {fragment_executor->get_query_ctx()->query_id(), - fragment_executor->get_query_ctx()->query_mem_tracker}; - } + DCHECK(pip_context != nullptr); + runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); + pool = &pip_context->get_query_ctx()->obj_pool; + query_thread_context = {pip_context->get_query_ctx()->query_id(), + pip_context->get_query_ctx()->query_mem_tracker}; + } else { + auto iter = _fragment_instance_map.find(tfragment_instance_id); + if (iter == _fragment_instance_map.end()) { + continue; + } + fragment_executor = iter->second; - SCOPED_ATTACH_TASK(query_thread_context); - - // 1. get the target filters - std::vector<IRuntimeFilter*> filters; - RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), filters)); - - // 2. create the filter wrapper to replace or ignore the target filters - if (request->has_in_filter() && request->in_filter().has_ignored_msg()) { - const auto& in_filter = request->in_filter(); - - std::ranges::for_each(filters, [&in_filter](auto& filter) { - filter->set_ignored(in_filter.ignored_msg()); - filter->signal(); - }); - } else if (!filters.empty()) { - UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, - filters[0]->column_type()}; - RuntimePredicateWrapper* filter_wrapper = nullptr; - RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, &filter_wrapper)); - - std::ranges::for_each(filters, [&](auto& filter) { - filter->update_filter(filter_wrapper, request->merge_time(), start_apply); - }); + DCHECK(fragment_executor != nullptr); + runtime_filter_mgr = fragment_executor->get_query_ctx()->runtime_filter_mgr(); + pool = &fragment_executor->get_query_ctx()->obj_pool; + query_thread_context = {fragment_executor->get_query_ctx()->query_id(), + fragment_executor->get_query_ctx()->query_mem_tracker}; + } + break; } } + if (runtime_filter_mgr == nullptr) { + // all instance finished + return Status::OK(); + } + + SCOPED_ATTACH_TASK(query_thread_context); + // 1. get the target filters + std::vector<IRuntimeFilter*> filters; + RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), filters)); + + // 2. create the filter wrapper to replace or ignore the target filters + if (!filters.empty()) { + UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, filters[0]->column_type()}; + RuntimePredicateWrapper* filter_wrapper = nullptr; + RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, &filter_wrapper)); + + std::ranges::for_each(filters, [&](auto& filter) { + filter->update_filter(filter_wrapper, request->merge_time(), start_apply); + }); + } + return Status::OK(); } +Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { + UniqueId queryid = request->query_id(); + std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; + RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); + + std::shared_ptr<QueryContext> query_ctx; + { + TUniqueId query_id; + query_id.__set_hi(queryid.hi); + query_id.__set_lo(queryid.lo); + std::lock_guard<std::mutex> lock(_lock); + auto iter = _query_ctx_map.find(query_id); + if (iter == _query_ctx_map.end()) { + return Status::InvalidArgument("query-id: {}", queryid.to_string()); + } + + query_ctx = iter->second; + } + auto merge_status = filter_controller->send_filter_size(request); + return merge_status; +} + +Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { + UniqueId queryid = request->query_id(); + std::shared_ptr<QueryContext> query_ctx; + { + TUniqueId query_id; + query_id.__set_hi(queryid.hi); + query_id.__set_lo(queryid.lo); + std::lock_guard<std::mutex> lock(_lock); + auto iter = _query_ctx_map.find(query_id); + if (iter == _query_ctx_map.end()) { + return Status::InvalidArgument("query-id: {}", queryid.to_string()); + } + + query_ctx = iter->second; + } + return query_ctx->runtime_filter_mgr()->sync_filter_size(request); +} + Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data) { UniqueId queryid = request->query_id(); @@ -1444,7 +1478,6 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, } SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, query_ctx->query_id()); auto merge_status = filter_controller->merge(request, attach_data, opt_remote_rf); - DCHECK(merge_status.ok()); return merge_status; } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 76e9a50eb58..16da4826165 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -128,6 +128,10 @@ public: Status merge_filter(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data); + Status send_filter_size(const PSendFilterSizeRequest* request); + + Status sync_filter_size(const PSyncFilterSizeRequest* request); + std::string to_http_path(const std::string& file_name); void coordinator_callback(const ReportStatusRequest& req); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 42fbfb418fc..f0407e42ce2 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -40,6 +40,7 @@ #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" namespace doris { @@ -141,6 +142,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( iter->second.filters.emplace_back(merge_filter); } iter->second.merge_time++; + iter->second.merge_size_times++; iter->second.filters.emplace_back(*producer_filter); } return Status::OK(); @@ -151,7 +153,8 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( std::lock_guard<std::mutex> l(_lock); auto iter = _local_merge_producer_map.find(filter_id); if (iter == _local_merge_producer_map.end()) { - return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", filter_id); + return Status::InvalidArgument("unknown filter: {}, role: LOCAL_MERGE_PRODUCER.", + filter_id); } *local_merge_filters = &iter->second; DCHECK(!iter->second.filters.empty()); @@ -241,7 +244,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( auto filter_id = runtime_filter_desc->filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1, false)); - _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, std::make_unique<std::mutex>()}); + _filter_map.emplace(filter_id, cnt_val); return Status::OK(); } @@ -262,7 +265,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options)); std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); - _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, std::make_unique<std::mutex>()}); + _filter_map.emplace(filter_id, cnt_val); return Status::OK(); } @@ -311,6 +314,67 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, return Status::OK(); } +Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSizeRequest* request) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + std::shared_ptr<RuntimeFilterCntlVal> cnt_val; + + auto filter_id = request->filter_id(); + std::map<int, CntlValwithLock>::iterator iter; + { + std::shared_lock<std::shared_mutex> guard(_filter_map_mutex); + iter = _filter_map.find(filter_id); + if (iter == _filter_map.end()) { + return Status::InvalidArgument("unknown filter id {}", + std::to_string(request->filter_id())); + } + } + cnt_val = iter->second.cnt_val; + std::unique_lock<std::mutex> l(*iter->second.mutex); + cnt_val->global_size += request->filter_size(); + cnt_val->source_addrs.push_back(request->source_addr()); + + if (cnt_val->source_addrs.size() == cnt_val->producer_size) { + for (auto addr : cnt_val->source_addrs) { + std::shared_ptr<PBackendService_Stub> stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr)); + AsyncRPCContext<PSyncFilterSizeRequest, PSyncFilterSizeResponse> ctx; + auto* pquery_id = ctx.request.mutable_query_id(); + pquery_id->set_hi(_state->query_id.hi()); + pquery_id->set_lo(_state->query_id.lo()); + + ctx.request.set_filter_id(filter_id); + ctx.request.set_filter_size(cnt_val->global_size); + + stub->sync_filter_size(&ctx.cntl, &ctx.request, &ctx.response, brpc::DoNothing()); + brpc::Join(ctx.cntl.call_id()); + if (auto status = Status::create(ctx.response.status()); !status) { + return status; + } + if (ctx.cntl.Failed()) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(ctx.cntl.remote_side()); + return Status::InternalError(ctx.cntl.ErrorText()); + } + } + } + return Status::OK(); +} + +Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { + auto* filter = try_get_product_filter(request->filter_id()); + if (filter) { + filter->set_synced_size(request->filter_size()); + return Status::OK(); + } + + LocalMergeFilters* local_merge_filters = nullptr; + RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), &local_merge_filters)); + // first filter size merged filter + for (size_t i = 1; i < local_merge_filters->filters.size(); i++) { + local_merge_filters->filters[i]->set_synced_size(request->filter_size()); + } + return Status::OK(); +} + // merge data Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data, @@ -331,9 +395,9 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ std::to_string(request->filter_id())); } } - cnt_val = iter->second.first; + cnt_val = iter->second.cnt_val; { - std::lock_guard<std::mutex> l(*iter->second.second); + std::lock_guard<std::mutex> l(*iter->second.mutex); // Skip the other broadcast join runtime filter if (cnt_val->arrive_id.size() == 1 && cnt_val->runtime_filter_desc.is_broadcast_join) { return Status::OK(); @@ -372,7 +436,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); + if (!cnt_val->filter->get_ignored()) { + RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); + } else { + apply_request.set_ignored(true); + apply_request.set_filter_type(PFilterType::UNKNOW_FILTER); + } + if (data != nullptr && len > 0) { request_attachment.append(data, len); has_attachment = true; @@ -392,7 +462,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ rpc_contexts[cur]->cntl.request_attachment().append(request_attachment); } rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id(); - // set fragment-id for (size_t fid = 0; fid < targets[cur].target_fragment_instance_ids.size(); fid++) { @@ -417,6 +486,9 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ } for (auto& rpc_context : rpc_contexts) { brpc::Join(rpc_context->cid); + if (auto status = Status::create(rpc_context->response.status()); !status) { + return status; + } if (rpc_context->cntl.Failed()) { LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText(); ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( @@ -437,7 +509,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); + if (!cnt_val->filter->get_ignored()) { + RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); + } else { + apply_request.set_ignored(true); + apply_request.set_filter_type(PFilterType::UNKNOW_FILTER); + } + if (data != nullptr && len > 0) { request_attachment.append(data, len); has_attachment = true; @@ -457,7 +535,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ rpc_contexts[cur]->cntl.request_attachment().append(request_attachment); } rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id(); - // set fragment_instance_id auto request_fragment_instance_id = rpc_contexts[cur]->request.mutable_fragment_instance_id(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index bd543e782a2..dbf68460606 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -20,8 +20,10 @@ #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Types_types.h> +#include <gen_cpp/internal_service.pb.h> #include <stdint.h> +#include <condition_variable> #include <functional> #include <map> #include <memory> @@ -30,6 +32,7 @@ #include <string> #include <unordered_map> #include <unordered_set> +#include <utility> #include <vector> #include "common/object_pool.h" @@ -57,6 +60,8 @@ class ExecEnv; struct LocalMergeFilters { std::unique_ptr<std::mutex> lock = std::make_unique<std::mutex>(); int merge_time = 0; + int merge_size_times = 0; + uint64_t local_merged_size = 0; std::vector<IRuntimeFilter*> filters; }; @@ -81,6 +86,15 @@ public: Status get_consume_filters(const int filter_id, std::vector<IRuntimeFilter*>& consumer_filters); + IRuntimeFilter* try_get_product_filter(const int filter_id) { + std::lock_guard<std::mutex> l(_lock); + auto iter = _producer_map.find(filter_id); + if (iter == _producer_map.end()) { + return nullptr; + } + return iter->second; + } + // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, IRuntimeFilter** consumer_filter, @@ -105,6 +119,8 @@ public: Status get_merge_addr(TNetworkAddress* addr); + Status sync_filter_size(const PSyncFilterSizeRequest* request); + private: struct ConsumerFilterHolder { int node_id; @@ -146,16 +162,20 @@ public: Status merge(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data, bool opt_remote_rf); + Status send_filter_size(const PSendFilterSizeRequest* request); + UniqueId query_id() const { return _query_id; } struct RuntimeFilterCntlVal { int64_t merge_time; int producer_size; + uint64_t global_size; TRuntimeFilterDesc runtime_filter_desc; std::vector<doris::TRuntimeFilterTargetParams> target_info; std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info; IRuntimeFilter* filter = nullptr; - std::unordered_set<UniqueId> arrive_id; // fragment_instance_id ? + std::unordered_set<UniqueId> arrive_id; + std::vector<PNetworkAddress> source_addrs; std::shared_ptr<ObjectPool> pool; }; @@ -174,8 +194,14 @@ private: // protect _filter_map std::shared_mutex _filter_map_mutex; std::shared_ptr<MemTracker> _mem_tracker; - using CntlValwithLock = - std::pair<std::shared_ptr<RuntimeFilterCntlVal>, std::unique_ptr<std::mutex>>; + + struct CntlValwithLock { + std::shared_ptr<RuntimeFilterCntlVal> cnt_val; + std::unique_ptr<std::mutex> mutex; + CntlValwithLock(std::shared_ptr<RuntimeFilterCntlVal> input_cnt_val) + : cnt_val(std::move(input_cnt_val)), mutex(std::make_unique<std::mutex>()) {} + }; + std::map<int, CntlValwithLock> _filter_map; RuntimeFilterParamsContext* _state = nullptr; }; diff --git a/be/src/service/backend_options.cpp b/be/src/service/backend_options.cpp index a8c48fd710e..19167d7d9b1 100644 --- a/be/src/service/backend_options.cpp +++ b/be/src/service/backend_options.cpp @@ -71,6 +71,7 @@ TBackend BackendOptions::get_local_backend() { _backend.__set_host(_s_localhost); _backend.__set_be_port(config::be_port); _backend.__set_http_port(config::webserver_port); + _backend.__set_brpc_port(config::brpc_port); return _backend; } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 20ab10022d1..ef0521b5219 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1195,9 +1195,36 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "merge meet error" << st.to_string(); - } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::send_filter_size(::google::protobuf::RpcController* controller, + const ::doris::PSendFilterSizeRequest* request, + ::doris::PSendFilterSizeResponse* response, + ::google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = _exec_env->fragment_mgr()->send_filter_size(request); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::sync_filter_size(::google::protobuf::RpcController* controller, + const ::doris::PSyncFilterSizeRequest* request, + ::doris::PSyncFilterSizeResponse* response, + ::google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = _exec_env->fragment_mgr()->sync_filter_size(request); st.to_protobuf(response->mutable_status()); }); if (!ret) { diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 4fffdc33875..b8b12ecbcb8 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -130,6 +130,16 @@ public: ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) override; + void send_filter_size(::google::protobuf::RpcController* controller, + const ::doris::PSendFilterSizeRequest* request, + ::doris::PSendFilterSizeResponse* response, + ::google::protobuf::Closure* done) override; + + void sync_filter_size(::google::protobuf::RpcController* controller, + const ::doris::PSyncFilterSizeRequest* request, + ::doris::PSyncFilterSizeResponse* response, + ::google::protobuf::Closure* done) override; + void apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index 7b313d6ae02..2e1d0508ba3 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -77,6 +77,10 @@ public: } #endif + std::shared_ptr<T> get_client(const PNetworkAddress& paddr) { + return get_client(paddr.hostname(), paddr.port()); + } + std::shared_ptr<T> get_client(const std::string& host, int port) { std::string realhost; realhost = host; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 7f820eafea5..a95236b3664 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -758,8 +758,10 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc DCHECK(!_build_side_mutable_block.empty()); _build_block = std::make_shared<Block>(_build_side_mutable_block.to_block()); COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes()); - RETURN_IF_ERROR(process_runtime_filter_build(state, _build_block.get(), this)); + _runtime_filter_slots = + std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs, runtime_filters()); RETURN_IF_ERROR(_process_build_block(state, *_build_block)); + RETURN_IF_ERROR(process_runtime_filter_build(state, _build_block.get(), this)); if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); // arena will be shared with other instances. @@ -811,8 +813,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc _runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>( _build_expr_ctxs, _runtime_filters); - RETURN_IF_ERROR(_runtime_filter_slots->init( - state, arg.hash_table->size())); RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); RETURN_IF_ERROR(_runtime_filter_slots->publish(true)); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 1559a5dfa5d..20245be06c2 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -78,12 +78,14 @@ Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* p if (parent->runtime_filters().empty()) { return Status::OK(); } - parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>( - parent->_build_expr_ctxs, parent->runtime_filters(), is_global); - - RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows())); + uint64_t rows = block->rows(); + { + SCOPED_TIMER(parent->_runtime_filter_init_timer); + RETURN_IF_ERROR(parent->_runtime_filter_slots->init_filters(state, rows)); + RETURN_IF_ERROR(parent->_runtime_filter_slots->ignore_filters(state)); + } - if (!parent->_runtime_filter_slots->empty() && block->rows() > 1) { + if (!parent->_runtime_filter_slots->empty() && rows > 1) { SCOPED_TIMER(parent->_runtime_filter_compute_timer); parent->_runtime_filter_slots->insert(block); } diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index a9e25e7626b..e7b7a6b96b9 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -144,6 +144,7 @@ Status VJoinNodeBase::prepare(RuntimeState* state) { _publish_runtime_filter_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime"); _runtime_filter_compute_timer = ADD_TIMER(runtime_profile(), "RunmtimeFilterComputeTime"); + _runtime_filter_init_timer = ADD_TIMER(runtime_profile(), "RunmtimeFilterInitTime"); return Status::OK(); } diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 63a8bb20ba6..099b4d37e56 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -152,6 +152,7 @@ protected: RuntimeProfile::Counter* _probe_rows_counter = nullptr; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; + RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr; RuntimeProfile::Counter* _join_filter_timer = nullptr; RuntimeProfile::Counter* _build_output_block_timer = nullptr; diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 53e24df183a..4b1203d4822 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -35,6 +35,7 @@ void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend()); _builder_fragment_ids.insert({node_id, builder}); _dependencies.insert({node_id, {}}); + _finish_dependencies.insert({node_id, {}}); } bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id, @@ -57,7 +58,7 @@ bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragmen SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) { std::lock_guard<std::mutex> lock(_mutex); - if (!_shared_contexts.count(my_node_id)) { + if (!_shared_contexts.contains(my_node_id)) { _shared_contexts.insert({my_node_id, std::make_shared<SharedHashTableContext>()}); } return _shared_contexts[my_node_id]; @@ -90,6 +91,14 @@ void SharedHashTableController::signal(int my_node_id) { _cv.notify_all(); } +void SharedHashTableController::signal_finish(int my_node_id) { + std::lock_guard<std::mutex> lock(_mutex); + for (auto& dep : _finish_dependencies[my_node_id]) { + dep->set_ready(); + } + _cv.notify_all(); +} + TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int my_node_id) { std::lock_guard<std::mutex> lock(_mutex); auto it = _builder_fragment_ids.find(my_node_id); diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 1f48ffa3439..8fe46b97b85 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -40,17 +40,20 @@ namespace pipeline { class Dependency; } -namespace vectorized { - -class Arena; - -struct SharedRuntimeFilterContext { +struct RuntimeFilterContext { std::shared_ptr<MinMaxFuncBase> minmax_func; std::shared_ptr<HybridSetBase> hybrid_set; std::shared_ptr<BloomFilterFuncBase> bloom_filter_func; std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func; + bool ignored = false; }; +using SharedRuntimeFilterContext = std::shared_ptr<RuntimeFilterContext>; + +namespace vectorized { + +class Arena; + struct SharedHashTableContext { SharedHashTableContext() : hash_table_variants(nullptr), block(std::make_shared<vectorized::Block>()) {} @@ -74,13 +77,16 @@ public: TUniqueId get_builder_fragment_instance_id(int my_node_id); SharedHashTableContextPtr get_context(int my_node_id); void signal(int my_node_id); + void signal_finish(int my_node_id); void signal(int my_node_id, Status status); Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context); bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id); void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled = enabled; } - void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> dep) { + void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> finish_dep) { std::lock_guard<std::mutex> lock(_mutex); _dependencies[node_id].push_back(dep); + _finish_dependencies[node_id].push_back(finish_dep); } private: @@ -88,6 +94,8 @@ private: std::mutex _mutex; // For pipelineX, we update all dependencies once hash table is built; std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies; + std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::Dependency>>> + _finish_dependencies; std::condition_variable _cv; std::map<int /*node id*/, TUniqueId /*fragment instance id*/> _builder_fragment_ids; std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index a049079ccd4..7decc5c2baa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -239,6 +239,10 @@ public final class RuntimeFilter { tFilter.setNullAware(false); } } + tFilter.setSyncFilterSize( + ConnectContext.get() != null && ConnectContext.get().getSessionVariable().getEnablePipelineXEngine() + && ConnectContext.get().getSessionVariable().getEnablePipelineEngine() + && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize()); return tFilter; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 72df816bf62..d1ff6550b39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -185,6 +185,8 @@ public class SessionVariable implements Serializable, Writable { // if the right table is greater than this value in the hash join, we will ignore IN filter public static final String RUNTIME_FILTER_MAX_IN_NUM = "runtime_filter_max_in_num"; + public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = "enable_sync_runtime_filter_size"; + public static final String BE_NUMBER_FOR_TEST = "be_number_for_test"; // max ms to wait transaction publish finish when exec insert stmt. @@ -984,6 +986,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM, needForward = true) private int runtimeFilterMaxInNum = 1024; + @VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = true) + private boolean enableSyncRuntimeFilterSize = true; + @VariableMgr.VarAttr(name = USE_RF_DEFAULT) public boolean useRuntimeFilterDefaultSize = false; @@ -3433,6 +3438,10 @@ public class SessionVariable implements Serializable, Writable { return enablePipelineXEngine; } + public boolean enableSyncRuntimeFilterSize() { + return enableSyncRuntimeFilterSize; + } + public static boolean enablePipelineEngine() { ConnectContext connectContext = ConnectContext.get(); if (connectContext == null) { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 0230180796a..bd15c8d2017 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -509,6 +509,27 @@ enum PFilterType { MAX_FILTER = 6; }; +message PSendFilterSizeRequest { + required int32 filter_id = 1; + required PUniqueId query_id = 2; + required PNetworkAddress source_addr = 3; + required uint64 filter_size = 4; +}; + +message PSendFilterSizeResponse { + required PStatus status = 1; +}; + +message PSyncFilterSizeRequest { + required int32 filter_id = 1; + required PUniqueId query_id = 2; + required uint64 filter_size = 3; +}; + +message PSyncFilterSizeResponse { + required PStatus status = 1; +}; + message PMergeFilterRequest { required int32 filter_id = 1; required PUniqueId query_id = 2; @@ -521,6 +542,7 @@ message PMergeFilterRequest { optional bool opt_remote_rf = 9; optional PColumnType column_type = 10; optional bool contain_null = 11; + optional bool ignored = 12; }; message PMergeFilterResponse { @@ -540,6 +562,7 @@ message PPublishFilterRequest { optional int64 merge_time = 9; optional PColumnType column_type = 10; optional bool contain_null = 11; + optional bool ignored = 12; }; message PPublishFilterRequestV2 { @@ -553,6 +576,7 @@ message PPublishFilterRequestV2 { optional bool is_pipeline = 8; optional int64 merge_time = 9; optional bool contain_null = 10; + optional bool ignored = 11; }; message PPublishFilterResponse { @@ -874,6 +898,8 @@ service PBackendService { rpc commit(PCommitRequest) returns (PCommitResult); rpc rollback(PRollbackRequest) returns (PRollbackResult); rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse); + rpc send_filter_size(PSendFilterSizeRequest) returns (PSendFilterSizeResponse); + rpc sync_filter_size(PSyncFilterSizeRequest) returns (PSyncFilterSizeResponse); rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse); rpc apply_filterv2(PPublishFilterRequestV2) returns (PPublishFilterResponse); rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult); diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto index b7a4b6ee53a..41bb3373f23 100644 --- a/gensrc/proto/types.proto +++ b/gensrc/proto/types.proto @@ -234,3 +234,8 @@ enum PPlanFragmentCancelReason { CALL_RPC_ERROR = 5; MEMORY_LIMIT_EXCEED = 6; } + +message PNetworkAddress { + required string hostname = 1; + required int32 port = 2; +} diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 80042fb0c57..5f6457682e5 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1197,6 +1197,8 @@ struct TRuntimeFilterDesc { // true, if join type is null aware like <=>. rf should dispose the case 15: optional bool null_aware; + + 16: optional bool sync_filter_size; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org