This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d542818fbcb [UT](runtime filter) Add runtime filter test case (#47035) d542818fbcb is described below commit d542818fbcbdffb94e44d2f1dc20f0b34f1be95a Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Jan 16 10:12:04 2025 +0800 [UT](runtime filter) Add runtime filter test case (#47035) --- be/src/exprs/runtime_filter.cpp | 1343 ++++++++++++------------- be/src/exprs/runtime_filter.h | 111 ++ be/src/exprs/runtime_filter_slots.h | 54 +- be/src/pipeline/exec/operator.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 3 +- be/src/pipeline/pipeline_fragment_context.h | 3 +- be/src/pipeline/task_queue.h | 9 +- be/test/pipeline/dummy_task_queue.h | 50 + be/test/pipeline/pipeline_test.cpp | 276 ++++- be/test/pipeline/thrift_builder.h | 106 +- gensrc/thrift/PaloInternalService.thrift | 5 +- gensrc/thrift/PlanNodes.thrift | 2 +- 12 files changed, 1174 insertions(+), 790 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index b85f370165a..33cc155fc6f 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -283,711 +283,6 @@ Status create_vbin_predicate(const TypeDescriptor& type, TExprOpcode::type opcod *tnode = node; return vectorized::VExpr::create_expr(node, expr); } -// This class is a wrapper of runtime predicate function -class RuntimePredicateWrapper { -public: - RuntimePredicateWrapper(const RuntimeFilterParams* params) - : RuntimePredicateWrapper(params->column_return_type, params->filter_type, - params->filter_id) {}; - // for a 'tmp' runtime predicate wrapper - // only could called assign method or as a param for merge - RuntimePredicateWrapper(PrimitiveType column_type, RuntimeFilterType type, uint32_t filter_id) - : _column_return_type(column_type), - _filter_type(type), - _context(new RuntimeFilterContext()), - _filter_id(filter_id) {} - - // init runtime filter wrapper - // alloc memory to init runtime filter function - Status init(const RuntimeFilterParams* params) { - _max_in_num = params->max_in_num; - switch (_filter_type) { - case RuntimeFilterType::IN_FILTER: { - _context->hybrid_set.reset(create_set(_column_return_type)); - _context->hybrid_set->set_null_aware(params->null_aware); - break; - } - // Only use in nested loop join not need set null aware - case RuntimeFilterType::MIN_FILTER: - case RuntimeFilterType::MAX_FILTER: { - _context->minmax_func.reset(create_minmax_filter(_column_return_type)); - break; - } - case RuntimeFilterType::MINMAX_FILTER: { - _context->minmax_func.reset(create_minmax_filter(_column_return_type)); - _context->minmax_func->set_null_aware(params->null_aware); - break; - } - case RuntimeFilterType::BLOOM_FILTER: { - _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type)); - _context->bloom_filter_func->init_params(params); - return Status::OK(); - } - case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - _context->hybrid_set.reset(create_set(_column_return_type)); - _context->hybrid_set->set_null_aware(params->null_aware); - _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type)); - _context->bloom_filter_func->init_params(params); - return Status::OK(); - } - case RuntimeFilterType::BITMAP_FILTER: { - _context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type)); - _context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in); - return Status::OK(); - } - default: - return Status::InternalError("Unknown Filter type"); - } - return Status::OK(); - } - - Status change_to_bloom_filter() { - if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) { - return Status::InternalError( - "Can not change to bloom filter because of runtime filter type is {}", - IRuntimeFilter::to_string(_filter_type)); - } - BloomFilterFuncBase* bf = _context->bloom_filter_func.get(); - - if (bf != nullptr) { - insert_to_bloom_filter(bf); - } else if (_context->hybrid_set != nullptr && _context->hybrid_set->size() != 0) { - return Status::InternalError("change to bloom filter need empty set ", - IRuntimeFilter::to_string(_filter_type)); - } - - // release in filter - _context->hybrid_set.reset(); - return Status::OK(); - } - - Status init_bloom_filter(const size_t build_bf_cardinality) { - if (_filter_type != RuntimeFilterType::BLOOM_FILTER && - _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "init_bloom_filter meet invalid input type {}", int(_filter_type)); - } - return _context->bloom_filter_func->init_with_cardinality(build_bf_cardinality); - } - - bool get_build_bf_cardinality() const { - if (_filter_type == RuntimeFilterType::BLOOM_FILTER || - _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - return _context->bloom_filter_func->get_build_bf_cardinality(); - } - return false; - } - - void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { - if (_context->hybrid_set->size() > 0) { - auto* it = _context->hybrid_set->begin(); - while (it->has_next()) { - bloom_filter->insert(it->get_value()); - it->next(); - } - } - if (_context->hybrid_set->contain_null()) { - bloom_filter->set_contain_null_and_null_aware(); - } - } - - BloomFilterFuncBase* get_bloomfilter() const { return _context->bloom_filter_func.get(); } - - void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) { - if (is_ignored()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet ignored rf"); - } - switch (_filter_type) { - case RuntimeFilterType::IN_FILTER: { - _context->hybrid_set->insert_fixed_len(column, start); - break; - } - case RuntimeFilterType::MIN_FILTER: - case RuntimeFilterType::MAX_FILTER: - case RuntimeFilterType::MINMAX_FILTER: { - _context->minmax_func->insert_fixed_len(column, start); - break; - } - case RuntimeFilterType::BLOOM_FILTER: { - _context->bloom_filter_func->insert_fixed_len(column, start); - break; - } - case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - if (is_bloomfilter()) { - _context->bloom_filter_func->insert_fixed_len(column, start); - } else { - _context->hybrid_set->insert_fixed_len(column, start); - } - break; - } - default: - DCHECK(false); - break; - } - } - - void insert_batch(const vectorized::ColumnPtr& column, size_t start) { - if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) { - bitmap_filter_insert_batch(column, start); - } else { - insert_fixed_len(column, start); - } - } - - void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t start) { - std::vector<const BitmapValue*> bitmaps; - if (column->is_nullable()) { - const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); - const auto& col = - assert_cast<const vectorized::ColumnBitmap&>(nullable->get_nested_column()); - const auto& nullmap = - assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) - .get_data(); - for (size_t i = start; i < column->size(); i++) { - if (!nullmap[i]) { - bitmaps.push_back(&(col.get_data()[i])); - } - } - } else { - const auto* col = assert_cast<const vectorized::ColumnBitmap*>(column.get()); - for (size_t i = start; i < column->size(); i++) { - bitmaps.push_back(&(col->get_data()[i])); - } - } - _context->bitmap_filter_func->insert_many(bitmaps); - } - - RuntimeFilterType get_real_type() const { - if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - if (_context->hybrid_set) { - return RuntimeFilterType::IN_FILTER; - } - return RuntimeFilterType::BLOOM_FILTER; - } - return _filter_type; - } - - size_t get_bloom_filter_size() const { - return _context->bloom_filter_func ? _context->bloom_filter_func->get_size() : 0; - } - - Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, - const TExpr& probe_expr); - - Status merge(const RuntimePredicateWrapper* wrapper) { - if (wrapper->is_disabled()) { - set_disabled(); - return Status::OK(); - } - - if (wrapper->is_ignored() || is_disabled()) { - return Status::OK(); - } - - _context->ignored = false; - - bool can_not_merge_in_or_bloom = - _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && - (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && - wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER && - wrapper->_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER); - - bool can_not_merge_other = _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER && - _filter_type != wrapper->_filter_type; - - CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other) - << " can not merge runtime filter(id=" << _filter_id - << "), current is filter type is " << IRuntimeFilter::to_string(_filter_type) - << ", other filter type is " << IRuntimeFilter::to_string(wrapper->_filter_type); - - switch (_filter_type) { - case RuntimeFilterType::IN_FILTER: { - _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); - if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { - set_disabled(); - } - break; - } - case RuntimeFilterType::MIN_FILTER: - case RuntimeFilterType::MAX_FILTER: - case RuntimeFilterType::MINMAX_FILTER: { - RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get())); - break; - } - case RuntimeFilterType::BLOOM_FILTER: { - RETURN_IF_ERROR( - _context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get())); - break; - } - case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - auto real_filter_type = get_real_type(); - - auto other_filter_type = wrapper->_filter_type; - if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - other_filter_type = wrapper->get_real_type(); - } - - if (real_filter_type == RuntimeFilterType::IN_FILTER) { - // when we meet base rf is in-filter, threre only have two case: - // case1: all input-filter's build_bf_exactly is true, inited by synced global size - // case2: all input-filter's build_bf_exactly is false, inited by default size - if (other_filter_type == RuntimeFilterType::IN_FILTER) { - _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); - if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { - // case2: use default size to init bf - RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length()); - RETURN_IF_ERROR(change_to_bloom_filter()); - } - } else { - // case1&case2: use input bf directly and insert hybrid set data into bf - _context->bloom_filter_func = wrapper->_context->bloom_filter_func; - RETURN_IF_ERROR(change_to_bloom_filter()); - } - } else { - if (other_filter_type == RuntimeFilterType::IN_FILTER) { - // case2: insert data to global filter - wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get()); - } else { - // case1&case2: all input bf must has same size - RETURN_IF_ERROR(_context->bloom_filter_func->merge( - wrapper->_context->bloom_filter_func.get())); - } - } - break; - } - case RuntimeFilterType::BITMAP_FILTER: { - // use input bitmap directly because we assume bitmap filter join always have full data - _context->bitmap_filter_func = wrapper->_context->bitmap_filter_func; - break; - } - default: - return Status::InternalError("unknown runtime filter"); - } - return Status::OK(); - } - - Status assign(const PInFilter* in_filter, bool contain_null) { - _context->hybrid_set.reset(create_set(_column_return_type)); - if (contain_null) { - _context->hybrid_set->set_null_aware(true); - _context->hybrid_set->insert((const void*)nullptr); - } - - switch (_column_return_type) { - case TYPE_BOOLEAN: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - bool bool_val = column.boolval(); - set->insert(&bool_val); - }); - break; - } - case TYPE_TINYINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto int_val = static_cast<int8_t>(column.intval()); - set->insert(&int_val); - }); - break; - } - case TYPE_SMALLINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto int_val = static_cast<int16_t>(column.intval()); - set->insert(&int_val); - }); - break; - } - case TYPE_INT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - int32_t int_val = column.intval(); - set->insert(&int_val); - }); - break; - } - case TYPE_BIGINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - int64_t long_val = column.longval(); - set->insert(&long_val); - }); - break; - } - case TYPE_LARGEINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto string_val = column.stringval(); - StringParser::ParseResult result; - auto int128_val = StringParser::string_to_int<int128_t>( - string_val.c_str(), string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - set->insert(&int128_val); - }); - break; - } - case TYPE_FLOAT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto float_val = static_cast<float>(column.doubleval()); - set->insert(&float_val); - }); - break; - } - case TYPE_DOUBLE: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - double double_val = column.doubleval(); - set->insert(&double_val); - }); - break; - } - case TYPE_DATEV2: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto date_v2_val = column.intval(); - set->insert(&date_v2_val); - }); - break; - } - case TYPE_DATETIMEV2: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto date_v2_val = column.longval(); - set->insert(&date_v2_val); - }); - break; - } - case TYPE_DATETIME: - case TYPE_DATE: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - const auto& string_val_ref = column.stringval(); - VecDateTimeValue datetime_val; - datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); - set->insert(&datetime_val); - }); - break; - } - case TYPE_DECIMALV2: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - const auto& string_val_ref = column.stringval(); - DecimalV2Value decimal_val(string_val_ref); - set->insert(&decimal_val); - }); - break; - } - case TYPE_DECIMAL32: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - int32_t decimal_32_val = column.intval(); - set->insert(&decimal_32_val); - }); - break; - } - case TYPE_DECIMAL64: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - int64_t decimal_64_val = column.longval(); - set->insert(&decimal_64_val); - }); - break; - } - case TYPE_DECIMAL128I: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto string_val = column.stringval(); - StringParser::ParseResult result; - auto int128_val = StringParser::string_to_int<int128_t>( - string_val.c_str(), string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - set->insert(&int128_val); - }); - break; - } - case TYPE_DECIMAL256: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto string_val = column.stringval(); - StringParser::ParseResult result; - auto int_val = StringParser::string_to_int<wide::Int256>( - string_val.c_str(), string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - set->insert(&int_val); - }); - break; - } - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - const std::string& string_value = column.stringval(); - // string_value is std::string, call insert(data, size) function in StringSet will not cast as StringRef - // so could avoid some cast error at different class object. - set->insert((void*)string_value.data(), string_value.size()); - }); - break; - } - case TYPE_IPV4: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - int32_t tmp = column.intval(); - set->insert(&tmp); - }); - break; - } - case TYPE_IPV6: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { - auto string_val = column.stringval(); - StringParser::ParseResult result; - auto int128_val = StringParser::string_to_int<uint128_t>( - string_val.c_str(), string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - set->insert(&int128_val); - }); - break; - } - default: { - return Status::InternalError("not support assign to in filter, type: " + - type_to_string(_column_return_type)); - } - } - return Status::OK(); - } - - void set_enable_fixed_len_to_uint32_v2() { - if (_context->bloom_filter_func) { - _context->bloom_filter_func->set_enable_fixed_len_to_uint32_v2(); - } - } - - // used by shuffle runtime filter - // assign this filter by protobuf - Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data, - bool contain_null) { - // we won't use this class to insert or find any data - // so any type is ok - _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type == INVALID_TYPE - ? PrimitiveType::TYPE_INT - : _column_return_type)); - RETURN_IF_ERROR(_context->bloom_filter_func->assign(data, bloom_filter->filter_length(), - contain_null)); - return Status::OK(); - } - - // used by shuffle runtime filter - // assign this filter by protobuf - Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) { - _context->minmax_func.reset(create_minmax_filter(_column_return_type)); - - if (contain_null) { - _context->minmax_func->set_null_aware(true); - _context->minmax_func->set_contain_null(); - } - - switch (_column_return_type) { - case TYPE_BOOLEAN: { - bool min_val = minmax_filter->min_val().boolval(); - bool max_val = minmax_filter->max_val().boolval(); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_TINYINT: { - auto min_val = static_cast<int8_t>(minmax_filter->min_val().intval()); - auto max_val = static_cast<int8_t>(minmax_filter->max_val().intval()); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_SMALLINT: { - auto min_val = static_cast<int16_t>(minmax_filter->min_val().intval()); - auto max_val = static_cast<int16_t>(minmax_filter->max_val().intval()); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_INT: { - int32_t min_val = minmax_filter->min_val().intval(); - int32_t max_val = minmax_filter->max_val().intval(); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_BIGINT: { - int64_t min_val = minmax_filter->min_val().longval(); - int64_t max_val = minmax_filter->max_val().longval(); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_LARGEINT: { - auto min_string_val = minmax_filter->min_val().stringval(); - auto max_string_val = minmax_filter->max_val().stringval(); - StringParser::ParseResult result; - auto min_val = StringParser::string_to_int<int128_t>(min_string_val.c_str(), - min_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - auto max_val = StringParser::string_to_int<int128_t>(max_string_val.c_str(), - max_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_FLOAT: { - auto min_val = static_cast<float>(minmax_filter->min_val().doubleval()); - auto max_val = static_cast<float>(minmax_filter->max_val().doubleval()); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DOUBLE: { - auto min_val = static_cast<double>(minmax_filter->min_val().doubleval()); - auto max_val = static_cast<double>(minmax_filter->max_val().doubleval()); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DATEV2: { - int32_t min_val = minmax_filter->min_val().intval(); - int32_t max_val = minmax_filter->max_val().intval(); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DATETIMEV2: { - int64_t min_val = minmax_filter->min_val().longval(); - int64_t max_val = minmax_filter->max_val().longval(); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DATETIME: - case TYPE_DATE: { - const auto& min_val_ref = minmax_filter->min_val().stringval(); - const auto& max_val_ref = minmax_filter->max_val().stringval(); - VecDateTimeValue min_val; - VecDateTimeValue max_val; - min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length()); - max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length()); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DECIMALV2: { - const auto& min_val_ref = minmax_filter->min_val().stringval(); - const auto& max_val_ref = minmax_filter->max_val().stringval(); - DecimalV2Value min_val(min_val_ref); - DecimalV2Value max_val(max_val_ref); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DECIMAL32: { - int32_t min_val = minmax_filter->min_val().intval(); - int32_t max_val = minmax_filter->max_val().intval(); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DECIMAL64: { - int64_t min_val = minmax_filter->min_val().longval(); - int64_t max_val = minmax_filter->max_val().longval(); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DECIMAL128I: { - auto min_string_val = minmax_filter->min_val().stringval(); - auto max_string_val = minmax_filter->max_val().stringval(); - StringParser::ParseResult result; - auto min_val = StringParser::string_to_int<int128_t>(min_string_val.c_str(), - min_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - auto max_val = StringParser::string_to_int<int128_t>(max_string_val.c_str(), - max_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_DECIMAL256: { - auto min_string_val = minmax_filter->min_val().stringval(); - auto max_string_val = minmax_filter->max_val().stringval(); - StringParser::ParseResult result; - auto min_val = StringParser::string_to_int<wide::Int256>( - min_string_val.c_str(), min_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - auto max_val = StringParser::string_to_int<wide::Int256>( - max_string_val.c_str(), max_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - return _context->minmax_func->assign(&min_val, &max_val); - } - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - auto min_val_ref = minmax_filter->min_val().stringval(); - auto max_val_ref = minmax_filter->max_val().stringval(); - return _context->minmax_func->assign(&min_val_ref, &max_val_ref); - } - case TYPE_IPV4: { - int tmp_min = minmax_filter->min_val().intval(); - int tmp_max = minmax_filter->max_val().intval(); - return _context->minmax_func->assign(&tmp_min, &tmp_max); - } - case TYPE_IPV6: { - auto min_string_val = minmax_filter->min_val().stringval(); - auto max_string_val = minmax_filter->max_val().stringval(); - StringParser::ParseResult result; - auto min_val = StringParser::string_to_int<uint128_t>(min_string_val.c_str(), - min_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - auto max_val = StringParser::string_to_int<uint128_t>(max_string_val.c_str(), - max_string_val.length(), &result); - DCHECK(result == StringParser::PARSE_SUCCESS); - return _context->minmax_func->assign(&min_val, &max_val); - } - default: - break; - } - return Status::InternalError("not support!"); - } - - void get_bloom_filter_desc(char** data, int* filter_length) { - _context->bloom_filter_func->get_data(data, filter_length); - } - - PrimitiveType column_type() { return _column_return_type; } - - bool is_bloomfilter() const { return get_real_type() == RuntimeFilterType::BLOOM_FILTER; } - - bool contain_null() const { - if (is_bloomfilter()) { - return _context->bloom_filter_func->contain_null(); - } - if (_context->hybrid_set) { - if (get_real_type() != RuntimeFilterType::IN_FILTER) { - throw Exception(ErrorCode::INTERNAL_ERROR, "rf has hybrid_set but real type is {}", - int(get_real_type())); - } - return _context->hybrid_set->contain_null(); - } - if (_context->minmax_func) { - return _context->minmax_func->contain_null(); - } - return false; - } - - bool is_ignored() const { return _context->ignored; } - - void set_ignored() { _context->ignored = true; } - - bool is_disabled() const { return _context->disabled; } - - void set_disabled() { - _context->disabled = true; - _context->minmax_func.reset(); - _context->hybrid_set.reset(); - _context->bloom_filter_func.reset(); - _context->bitmap_filter_func.reset(); - } - - void batch_assign(const PInFilter* filter, - void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, - PColumnValue&)) { - for (int i = 0; i < filter->values_size(); ++i) { - PColumnValue column = filter->values(i); - assign_func(_context->hybrid_set, column); - } - } - - size_t get_in_filter_size() const { - return _context->hybrid_set ? _context->hybrid_set->size() : 0; - } - - std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const { - return _context->bitmap_filter_func; - } - - friend class IRuntimeFilter; - - void set_filter_id(int id) { - if (_context->bloom_filter_func) { - _context->bloom_filter_func->set_filter_id(id); - } - if (_context->bitmap_filter_func) { - _context->bitmap_filter_func->set_filter_id(id); - } - if (_context->hybrid_set) { - _context->hybrid_set->set_filter_id(id); - } - } - -private: - // When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid. - PrimitiveType _column_return_type; // column type - RuntimeFilterType _filter_type; - int32_t _max_in_num = -1; - - RuntimeFilterContextSPtr _context; - uint32_t _filter_id; -}; Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, @@ -1794,4 +1089,642 @@ Status RuntimePredicateWrapper::get_push_exprs( RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default; RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default; +Status RuntimePredicateWrapper::init(const RuntimeFilterParams* params) { + _max_in_num = params->max_in_num; + switch (_filter_type) { + case RuntimeFilterType::IN_FILTER: { + _context->hybrid_set.reset(create_set(_column_return_type)); + _context->hybrid_set->set_null_aware(params->null_aware); + break; + } + // Only use in nested loop join not need set null aware + case RuntimeFilterType::MIN_FILTER: + case RuntimeFilterType::MAX_FILTER: { + _context->minmax_func.reset(create_minmax_filter(_column_return_type)); + break; + } + case RuntimeFilterType::MINMAX_FILTER: { + _context->minmax_func.reset(create_minmax_filter(_column_return_type)); + _context->minmax_func->set_null_aware(params->null_aware); + break; + } + case RuntimeFilterType::BLOOM_FILTER: { + _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context->bloom_filter_func->init_params(params); + return Status::OK(); + } + case RuntimeFilterType::IN_OR_BLOOM_FILTER: { + _context->hybrid_set.reset(create_set(_column_return_type)); + _context->hybrid_set->set_null_aware(params->null_aware); + _context->bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context->bloom_filter_func->init_params(params); + return Status::OK(); + } + case RuntimeFilterType::BITMAP_FILTER: { + _context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type)); + _context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in); + return Status::OK(); + } + default: + return Status::InternalError("Unknown Filter type"); + } + return Status::OK(); +} + +Status RuntimePredicateWrapper::change_to_bloom_filter() { + if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) { + return Status::InternalError( + "Can not change to bloom filter because of runtime filter type is {}", + IRuntimeFilter::to_string(_filter_type)); + } + BloomFilterFuncBase* bf = _context->bloom_filter_func.get(); + + if (bf != nullptr) { + insert_to_bloom_filter(bf); + } else if (_context->hybrid_set != nullptr && _context->hybrid_set->size() != 0) { + return Status::InternalError("change to bloom filter need empty set ", + IRuntimeFilter::to_string(_filter_type)); + } + + // release in filter + _context->hybrid_set.reset(); + return Status::OK(); +} + +void RuntimePredicateWrapper::set_filter_id(int id) { + if (_context->bloom_filter_func) { + _context->bloom_filter_func->set_filter_id(id); + } + if (_context->bitmap_filter_func) { + _context->bitmap_filter_func->set_filter_id(id); + } + if (_context->hybrid_set) { + _context->hybrid_set->set_filter_id(id); + } +} + +void RuntimePredicateWrapper::batch_assign( + const PInFilter* filter, + void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, PColumnValue&)) { + for (int i = 0; i < filter->values_size(); ++i) { + PColumnValue column = filter->values(i); + assign_func(_context->hybrid_set, column); + } +} + +Status RuntimePredicateWrapper::init_bloom_filter(const size_t build_bf_cardinality) { + if (_filter_type != RuntimeFilterType::BLOOM_FILTER && + _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) { + throw Exception(ErrorCode::INTERNAL_ERROR, "init_bloom_filter meet invalid input type {}", + int(_filter_type)); + } + return _context->bloom_filter_func->init_with_cardinality(build_bf_cardinality); +} + +bool RuntimePredicateWrapper::get_build_bf_cardinality() const { + if (_filter_type == RuntimeFilterType::BLOOM_FILTER || + _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + return _context->bloom_filter_func->get_build_bf_cardinality(); + } + return false; +} + +void RuntimePredicateWrapper::insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { + if (_context->hybrid_set->size() > 0) { + auto* it = _context->hybrid_set->begin(); + while (it->has_next()) { + bloom_filter->insert(it->get_value()); + it->next(); + } + } + if (_context->hybrid_set->contain_null()) { + bloom_filter->set_contain_null_and_null_aware(); + } +} + +void RuntimePredicateWrapper::insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) { + if (is_ignored()) { + throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet ignored rf"); + } + switch (_filter_type) { + case RuntimeFilterType::IN_FILTER: { + _context->hybrid_set->insert_fixed_len(column, start); + break; + } + case RuntimeFilterType::MIN_FILTER: + case RuntimeFilterType::MAX_FILTER: + case RuntimeFilterType::MINMAX_FILTER: { + _context->minmax_func->insert_fixed_len(column, start); + break; + } + case RuntimeFilterType::BLOOM_FILTER: { + _context->bloom_filter_func->insert_fixed_len(column, start); + break; + } + case RuntimeFilterType::IN_OR_BLOOM_FILTER: { + if (is_bloomfilter()) { + _context->bloom_filter_func->insert_fixed_len(column, start); + } else { + _context->hybrid_set->insert_fixed_len(column, start); + } + break; + } + default: + DCHECK(false); + break; + } +} + +void RuntimePredicateWrapper::bitmap_filter_insert_batch(const vectorized::ColumnPtr column, + size_t start) { + std::vector<const BitmapValue*> bitmaps; + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = + assert_cast<const vectorized::ColumnBitmap&>(nullable->get_nested_column()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + bitmaps.push_back(&(col.get_data()[i])); + } + } + } else { + const auto* col = assert_cast<const vectorized::ColumnBitmap*>(column.get()); + for (size_t i = start; i < column->size(); i++) { + bitmaps.push_back(&(col->get_data()[i])); + } + } + _context->bitmap_filter_func->insert_many(bitmaps); +} + +size_t RuntimePredicateWrapper::get_bloom_filter_size() const { + return _context->bloom_filter_func ? _context->bloom_filter_func->get_size() : 0; +} + +Status RuntimePredicateWrapper::merge(const RuntimePredicateWrapper* wrapper) { + if (wrapper->is_disabled()) { + set_disabled(); + return Status::OK(); + } + + if (wrapper->is_ignored() || is_disabled()) { + return Status::OK(); + } + + _context->ignored = false; + + bool can_not_merge_in_or_bloom = + _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && + (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && + wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER && + wrapper->_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER); + + bool can_not_merge_other = _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER && + _filter_type != wrapper->_filter_type; + + CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other) + << " can not merge runtime filter(id=" << _filter_id << "), current is filter type is " + << IRuntimeFilter::to_string(_filter_type) << ", other filter type is " + << IRuntimeFilter::to_string(wrapper->_filter_type); + + switch (_filter_type) { + case RuntimeFilterType::IN_FILTER: { + _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); + if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { + set_disabled(); + } + break; + } + case RuntimeFilterType::MIN_FILTER: + case RuntimeFilterType::MAX_FILTER: + case RuntimeFilterType::MINMAX_FILTER: { + RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get())); + break; + } + case RuntimeFilterType::BLOOM_FILTER: { + RETURN_IF_ERROR( + _context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get())); + break; + } + case RuntimeFilterType::IN_OR_BLOOM_FILTER: { + auto real_filter_type = get_real_type(); + + auto other_filter_type = wrapper->_filter_type; + if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + other_filter_type = wrapper->get_real_type(); + } + + if (real_filter_type == RuntimeFilterType::IN_FILTER) { + // when we meet base rf is in-filter, threre only have two case: + // case1: all input-filter's build_bf_exactly is true, inited by synced global size + // case2: all input-filter's build_bf_exactly is false, inited by default size + if (other_filter_type == RuntimeFilterType::IN_FILTER) { + _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); + if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { + // case2: use default size to init bf + RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length()); + RETURN_IF_ERROR(change_to_bloom_filter()); + } + } else { + // case1&case2: use input bf directly and insert hybrid set data into bf + _context->bloom_filter_func = wrapper->_context->bloom_filter_func; + RETURN_IF_ERROR(change_to_bloom_filter()); + } + } else { + if (other_filter_type == RuntimeFilterType::IN_FILTER) { + // case2: insert data to global filter + wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get()); + } else { + // case1&case2: all input bf must has same size + RETURN_IF_ERROR(_context->bloom_filter_func->merge( + wrapper->_context->bloom_filter_func.get())); + } + } + break; + } + case RuntimeFilterType::BITMAP_FILTER: { + // use input bitmap directly because we assume bitmap filter join always have full data + _context->bitmap_filter_func = wrapper->_context->bitmap_filter_func; + break; + } + default: + return Status::InternalError("unknown runtime filter"); + } + return Status::OK(); +} + +Status RuntimePredicateWrapper::assign(const PInFilter* in_filter, bool contain_null) { + _context->hybrid_set.reset(create_set(_column_return_type)); + if (contain_null) { + _context->hybrid_set->set_null_aware(true); + _context->hybrid_set->insert((const void*)nullptr); + } + + switch (_column_return_type) { + case TYPE_BOOLEAN: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + bool bool_val = column.boolval(); + set->insert(&bool_val); + }); + break; + } + case TYPE_TINYINT: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto int_val = static_cast<int8_t>(column.intval()); + set->insert(&int_val); + }); + break; + } + case TYPE_SMALLINT: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto int_val = static_cast<int16_t>(column.intval()); + set->insert(&int_val); + }); + break; + } + case TYPE_INT: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + int32_t int_val = column.intval(); + set->insert(&int_val); + }); + break; + } + case TYPE_BIGINT: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + int64_t long_val = column.longval(); + set->insert(&long_val); + }); + break; + } + case TYPE_LARGEINT: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto string_val = column.stringval(); + StringParser::ParseResult result; + auto int128_val = StringParser::string_to_int<int128_t>(string_val.c_str(), + string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + set->insert(&int128_val); + }); + break; + } + case TYPE_FLOAT: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto float_val = static_cast<float>(column.doubleval()); + set->insert(&float_val); + }); + break; + } + case TYPE_DOUBLE: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + double double_val = column.doubleval(); + set->insert(&double_val); + }); + break; + } + case TYPE_DATEV2: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto date_v2_val = column.intval(); + set->insert(&date_v2_val); + }); + break; + } + case TYPE_DATETIMEV2: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto date_v2_val = column.longval(); + set->insert(&date_v2_val); + }); + break; + } + case TYPE_DATETIME: + case TYPE_DATE: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + const auto& string_val_ref = column.stringval(); + VecDateTimeValue datetime_val; + datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); + set->insert(&datetime_val); + }); + break; + } + case TYPE_DECIMALV2: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + const auto& string_val_ref = column.stringval(); + DecimalV2Value decimal_val(string_val_ref); + set->insert(&decimal_val); + }); + break; + } + case TYPE_DECIMAL32: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + int32_t decimal_32_val = column.intval(); + set->insert(&decimal_32_val); + }); + break; + } + case TYPE_DECIMAL64: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + int64_t decimal_64_val = column.longval(); + set->insert(&decimal_64_val); + }); + break; + } + case TYPE_DECIMAL128I: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto string_val = column.stringval(); + StringParser::ParseResult result; + auto int128_val = StringParser::string_to_int<int128_t>(string_val.c_str(), + string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + set->insert(&int128_val); + }); + break; + } + case TYPE_DECIMAL256: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto string_val = column.stringval(); + StringParser::ParseResult result; + auto int_val = StringParser::string_to_int<wide::Int256>(string_val.c_str(), + string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + set->insert(&int_val); + }); + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + const std::string& string_value = column.stringval(); + // string_value is std::string, call insert(data, size) function in StringSet will not cast as StringRef + // so could avoid some cast error at different class object. + set->insert((void*)string_value.data(), string_value.size()); + }); + break; + } + case TYPE_IPV4: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + int32_t tmp = column.intval(); + set->insert(&tmp); + }); + break; + } + case TYPE_IPV6: { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + auto string_val = column.stringval(); + StringParser::ParseResult result; + auto int128_val = StringParser::string_to_int<uint128_t>(string_val.c_str(), + string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + set->insert(&int128_val); + }); + break; + } + default: { + return Status::InternalError("not support assign to in filter, type: " + + type_to_string(_column_return_type)); + } + } + return Status::OK(); +} + +void RuntimePredicateWrapper::set_enable_fixed_len_to_uint32_v2() { + if (_context->bloom_filter_func) { + _context->bloom_filter_func->set_enable_fixed_len_to_uint32_v2(); + } +} + +Status RuntimePredicateWrapper::assign(const PBloomFilter* bloom_filter, + butil::IOBufAsZeroCopyInputStream* data, bool contain_null) { + // we won't use this class to insert or find any data + // so any type is ok + _context->bloom_filter_func.reset(create_bloom_filter( + _column_return_type == INVALID_TYPE ? PrimitiveType::TYPE_INT : _column_return_type)); + RETURN_IF_ERROR( + _context->bloom_filter_func->assign(data, bloom_filter->filter_length(), contain_null)); + return Status::OK(); +} + +// used by shuffle runtime filter +// assign this filter by protobuf +Status RuntimePredicateWrapper::assign(const PMinMaxFilter* minmax_filter, bool contain_null) { + _context->minmax_func.reset(create_minmax_filter(_column_return_type)); + + if (contain_null) { + _context->minmax_func->set_null_aware(true); + _context->minmax_func->set_contain_null(); + } + + switch (_column_return_type) { + case TYPE_BOOLEAN: { + bool min_val = minmax_filter->min_val().boolval(); + bool max_val = minmax_filter->max_val().boolval(); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_TINYINT: { + auto min_val = static_cast<int8_t>(minmax_filter->min_val().intval()); + auto max_val = static_cast<int8_t>(minmax_filter->max_val().intval()); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_SMALLINT: { + auto min_val = static_cast<int16_t>(minmax_filter->min_val().intval()); + auto max_val = static_cast<int16_t>(minmax_filter->max_val().intval()); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_INT: { + int32_t min_val = minmax_filter->min_val().intval(); + int32_t max_val = minmax_filter->max_val().intval(); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_BIGINT: { + int64_t min_val = minmax_filter->min_val().longval(); + int64_t max_val = minmax_filter->max_val().longval(); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_LARGEINT: { + auto min_string_val = minmax_filter->min_val().stringval(); + auto max_string_val = minmax_filter->max_val().stringval(); + StringParser::ParseResult result; + auto min_val = StringParser::string_to_int<int128_t>(min_string_val.c_str(), + min_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + auto max_val = StringParser::string_to_int<int128_t>(max_string_val.c_str(), + max_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_FLOAT: { + auto min_val = static_cast<float>(minmax_filter->min_val().doubleval()); + auto max_val = static_cast<float>(minmax_filter->max_val().doubleval()); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DOUBLE: { + auto min_val = static_cast<double>(minmax_filter->min_val().doubleval()); + auto max_val = static_cast<double>(minmax_filter->max_val().doubleval()); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DATEV2: { + int32_t min_val = minmax_filter->min_val().intval(); + int32_t max_val = minmax_filter->max_val().intval(); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DATETIMEV2: { + int64_t min_val = minmax_filter->min_val().longval(); + int64_t max_val = minmax_filter->max_val().longval(); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DATETIME: + case TYPE_DATE: { + const auto& min_val_ref = minmax_filter->min_val().stringval(); + const auto& max_val_ref = minmax_filter->max_val().stringval(); + VecDateTimeValue min_val; + VecDateTimeValue max_val; + min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length()); + max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length()); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DECIMALV2: { + const auto& min_val_ref = minmax_filter->min_val().stringval(); + const auto& max_val_ref = minmax_filter->max_val().stringval(); + DecimalV2Value min_val(min_val_ref); + DecimalV2Value max_val(max_val_ref); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DECIMAL32: { + int32_t min_val = minmax_filter->min_val().intval(); + int32_t max_val = minmax_filter->max_val().intval(); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DECIMAL64: { + int64_t min_val = minmax_filter->min_val().longval(); + int64_t max_val = minmax_filter->max_val().longval(); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DECIMAL128I: { + auto min_string_val = minmax_filter->min_val().stringval(); + auto max_string_val = minmax_filter->max_val().stringval(); + StringParser::ParseResult result; + auto min_val = StringParser::string_to_int<int128_t>(min_string_val.c_str(), + min_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + auto max_val = StringParser::string_to_int<int128_t>(max_string_val.c_str(), + max_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_DECIMAL256: { + auto min_string_val = minmax_filter->min_val().stringval(); + auto max_string_val = minmax_filter->max_val().stringval(); + StringParser::ParseResult result; + auto min_val = StringParser::string_to_int<wide::Int256>(min_string_val.c_str(), + min_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + auto max_val = StringParser::string_to_int<wide::Int256>(max_string_val.c_str(), + max_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + return _context->minmax_func->assign(&min_val, &max_val); + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + auto min_val_ref = minmax_filter->min_val().stringval(); + auto max_val_ref = minmax_filter->max_val().stringval(); + return _context->minmax_func->assign(&min_val_ref, &max_val_ref); + } + case TYPE_IPV4: { + int tmp_min = minmax_filter->min_val().intval(); + int tmp_max = minmax_filter->max_val().intval(); + return _context->minmax_func->assign(&tmp_min, &tmp_max); + } + case TYPE_IPV6: { + auto min_string_val = minmax_filter->min_val().stringval(); + auto max_string_val = minmax_filter->max_val().stringval(); + StringParser::ParseResult result; + auto min_val = StringParser::string_to_int<uint128_t>(min_string_val.c_str(), + min_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + auto max_val = StringParser::string_to_int<uint128_t>(max_string_val.c_str(), + max_string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + return _context->minmax_func->assign(&min_val, &max_val); + } + default: + break; + } + return Status::InternalError("not support!"); +} + +void RuntimePredicateWrapper::get_bloom_filter_desc(char** data, int* filter_length) { + _context->bloom_filter_func->get_data(data, filter_length); +} + +bool RuntimePredicateWrapper::contain_null() const { + if (is_bloomfilter()) { + return _context->bloom_filter_func->contain_null(); + } + if (_context->hybrid_set) { + if (get_real_type() != RuntimeFilterType::IN_FILTER) { + throw Exception(ErrorCode::INTERNAL_ERROR, "rf has hybrid_set but real type is {}", + int(get_real_type())); + } + return _context->hybrid_set->contain_null(); + } + if (_context->minmax_func) { + return _context->minmax_func->contain_null(); + } + return false; +} + +size_t RuntimePredicateWrapper::get_in_filter_size() const { + return _context->hybrid_set ? _context->hybrid_set->size() : 0; +} + +void RuntimePredicateWrapper::set_disabled() { + _context->disabled = true; + _context->minmax_func.reset(); + _context->hybrid_set.reset(); + _context->bloom_filter_func.reset(); + _context->bitmap_filter_func.reset(); +} + } // namespace doris diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 441de7d4da3..e38f6901ef4 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -443,4 +443,115 @@ private: WrapperPtr _wrapper; }; +// This class is a wrapper of runtime predicate function +class RuntimePredicateWrapper { +public: + RuntimePredicateWrapper(const RuntimeFilterParams* params) + : RuntimePredicateWrapper(params->column_return_type, params->filter_type, + params->filter_id) {}; + // for a 'tmp' runtime predicate wrapper + // only could called assign method or as a param for merge + RuntimePredicateWrapper(PrimitiveType column_type, RuntimeFilterType type, uint32_t filter_id) + : _column_return_type(column_type), + _filter_type(type), + _context(new RuntimeFilterContext()), + _filter_id(filter_id) {} + + // init runtime filter wrapper + // alloc memory to init runtime filter function + Status init(const RuntimeFilterParams* params); + + Status change_to_bloom_filter(); + + Status init_bloom_filter(const size_t build_bf_cardinality); + + bool get_build_bf_cardinality() const; + + void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const; + + BloomFilterFuncBase* get_bloomfilter() const { return _context->bloom_filter_func.get(); } + + void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start); + + void insert_batch(const vectorized::ColumnPtr& column, size_t start) { + if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) { + bitmap_filter_insert_batch(column, start); + } else { + insert_fixed_len(column, start); + } + } + + void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t start); + + RuntimeFilterType get_real_type() const { + if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + if (_context->hybrid_set) { + return RuntimeFilterType::IN_FILTER; + } + return RuntimeFilterType::BLOOM_FILTER; + } + return _filter_type; + } + + size_t get_bloom_filter_size() const; + + Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, + std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, + const TExpr& probe_expr); + + Status merge(const RuntimePredicateWrapper* wrapper); + + Status assign(const PInFilter* in_filter, bool contain_null); + + void set_enable_fixed_len_to_uint32_v2(); + + // used by shuffle runtime filter + // assign this filter by protobuf + Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data, + bool contain_null); + + // used by shuffle runtime filter + // assign this filter by protobuf + Status assign(const PMinMaxFilter* minmax_filter, bool contain_null); + + void get_bloom_filter_desc(char** data, int* filter_length); + + PrimitiveType column_type() { return _column_return_type; } + + bool is_bloomfilter() const { return get_real_type() == RuntimeFilterType::BLOOM_FILTER; } + + bool contain_null() const; + + bool is_ignored() const { return _context->ignored; } + + void set_ignored() { _context->ignored = true; } + + bool is_disabled() const { return _context->disabled; } + + void set_disabled(); + + void batch_assign(const PInFilter* filter, + void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, + PColumnValue&)); + + size_t get_in_filter_size() const; + + std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const { + return _context->bitmap_filter_func; + } + + friend class IRuntimeFilter; + + void set_filter_id(int id); + +private: + // When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid. + PrimitiveType _column_return_type; // column type + RuntimeFilterType _filter_type; + int32_t _max_in_num = -1; + + RuntimeFilterContextSPtr _context; + uint32_t _filter_id; +}; + } // namespace doris diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index a9dd631e358..b732ae155f7 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -35,11 +35,7 @@ public: VRuntimeFilterSlots( const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs, const std::vector<std::shared_ptr<IRuntimeFilter>>& runtime_filters) - : _build_expr_context(build_expr_ctxs), _runtime_filters(runtime_filters) { - for (auto runtime_filter : _runtime_filters) { - _runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter.get()); - } - } + : _build_expr_context(build_expr_ctxs), _runtime_filters(runtime_filters) {} Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, std::shared_ptr<pipeline::CountedFinishDependency> dependency) { @@ -139,62 +135,48 @@ public: } void insert(const vectorized::Block* block) { - for (int i = 0; i < _build_expr_context.size(); ++i) { - auto iter = _runtime_filters_map.find(i); - if (iter == _runtime_filters_map.end()) { - continue; - } - - int result_column_id = _build_expr_context[i]->get_last_result_column_id(); + for (auto& filter : _runtime_filters) { + int result_column_id = + _build_expr_context[filter->expr_order()]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; - for (auto* filter : iter->second) { - if (filter->get_ignored() || filter->get_disabled()) { - continue; - } - filter->insert_batch(column, 1); + if (filter->get_ignored() || filter->get_disabled()) { + continue; } + filter->insert_batch(column, 1); } } // publish runtime filter Status publish(RuntimeState* state, bool publish_local) { - for (auto& pair : _runtime_filters_map) { - for (auto& filter : pair.second) { - RETURN_IF_ERROR(filter->publish(state, publish_local)); - } + for (auto& filter : _runtime_filters) { + RETURN_IF_ERROR(filter->publish(state, publish_local)); } return Status::OK(); } void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { - for (auto& it : _runtime_filters_map) { - for (auto& filter : it.second) { - context->runtime_filters[filter->filter_id()] = filter->get_shared_context_ref(); - } + for (auto& filter : _runtime_filters) { + context->runtime_filters[filter->filter_id()] = filter->get_shared_context_ref(); } } Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { - for (auto& it : _runtime_filters_map) { - for (auto& filter : it.second) { - auto filter_id = filter->filter_id(); - auto ret = context->runtime_filters.find(filter_id); - if (ret == context->runtime_filters.end()) { - return Status::Aborted("invalid runtime filter id: {}", filter_id); - } - filter->get_shared_context_ref() = ret->second; + for (auto& filter : _runtime_filters) { + auto filter_id = filter->filter_id(); + auto ret = context->runtime_filters.find(filter_id); + if (ret == context->runtime_filters.end()) { + return Status::Aborted("invalid runtime filter id: {}", filter_id); } + filter->get_shared_context_ref() = ret->second; } return Status::OK(); } - bool empty() { return _runtime_filters_map.empty(); } + bool empty() { return _runtime_filters.empty(); } private: const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context; std::vector<std::shared_ptr<IRuntimeFilter>> _runtime_filters; - // prob_contition index -> [IRuntimeFilter] - std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map; }; } // namespace doris diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index bb254aae72b..9ca0f0fcd40 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -295,7 +295,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori *_output_row_descriptor); if (rows != 0) { auto& mutable_columns = mutable_block.mutable_columns(); - DCHECK(mutable_columns.size() == local_state->_projections.size()); + DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << debug_string(); for (int i = 0; i < mutable_columns.size(); ++i) { auto result_column_id = -1; RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, &result_column_id)); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e8e8ed5d9fe..c8e3429107c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -363,6 +363,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag const auto target_size = request.local_params.size(); _tasks.resize(target_size); _runtime_filter_states.resize(target_size); + _runtime_filter_mgr_map.resize(target_size); _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); @@ -510,7 +511,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag } { std::lock_guard<std::mutex> l(_state_map_lock); - _runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr); + _runtime_filter_mgr_map[i] = std::move(runtime_filter_mgr); } return Status::OK(); }; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index d672ad6e923..bd3a350d0a2 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -275,8 +275,7 @@ private: _op_id_to_le_state; std::map<PipelineId, Pipeline*> _pip_id_to_pipeline; - // UniqueId -> runtime mgr - std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map; + std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map; //Here are two types of runtime states: // - _runtime state is at the Fragment level. diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 1651eb50cac..2218d70fde6 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -107,12 +107,15 @@ class MultiCoreTaskQueue { public: explicit MultiCoreTaskQueue(int core_size); +#ifndef BE_TEST ~MultiCoreTaskQueue(); - - void close(); - // Get the task by core id. PipelineTask* take(int core_id); +#else + virtual ~MultiCoreTaskQueue(); + virtual PipelineTask* take(int core_id); +#endif + void close(); // TODO combine these methods to `push_back(task, core_id = -1)` Status push_back(PipelineTask* task); diff --git a/be/test/pipeline/dummy_task_queue.h b/be/test/pipeline/dummy_task_queue.h new file mode 100644 index 00000000000..bace0ecae96 --- /dev/null +++ b/be/test/pipeline/dummy_task_queue.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/task_queue.h" + +namespace doris::pipeline { + +class DummyTaskQueue final : public MultiCoreTaskQueue { + explicit DummyTaskQueue(int core_size) : MultiCoreTaskQueue(core_size) {} + ~DummyTaskQueue() override = default; + PipelineTask* take(int core_id) override { + PipelineTask* task = nullptr; + do { + DCHECK(_prio_task_queues.size() > core_id) + << " list size: " << _prio_task_queues.size() << " core_id: " << core_id + << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); + task = _prio_task_queues[core_id].try_take(false); + if (task) { + break; + } + task = _steal_take(core_id); + if (task) { + break; + } + task = _prio_task_queues[core_id].take(1); + if (task) { + break; + } + } while (false); + if (task) { + task->pop_out_runnable_queue(); + } + return task; + } +}; +} // namespace doris::pipeline diff --git a/be/test/pipeline/pipeline_test.cpp b/be/test/pipeline/pipeline_test.cpp index d1c0af85f58..6466d6c2927 100644 --- a/be/test/pipeline/pipeline_test.cpp +++ b/be/test/pipeline/pipeline_test.cpp @@ -22,12 +22,15 @@ #include "common/exception.h" #include "common/status.h" +#include "dummy_task_queue.h" +#include "exprs/bloom_filter_func.h" +#include "exprs/hybrid_set.h" +#include "exprs/runtime_filter.h" #include "pipeline/dependency.h" #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/pipeline_fragment_context.h" -#include "pipeline/task_queue.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "thrift_builder.h" @@ -45,10 +48,13 @@ class PipelineTest : public testing::Test { public: PipelineTest() : _obj_pool(new ObjectPool()), - _mgr(std::make_unique<doris::vectorized::VDataStreamMgr>()) { + _mgr(std::make_unique<doris::vectorized::VDataStreamMgr>()) {} + ~PipelineTest() override = default; + void SetUp() override { _query_options = TQueryOptionsBuilder() .set_enable_local_exchange(true) .set_enable_local_shuffle(true) + .set_runtime_filter_max_in_num(15) .build(); auto fe_address = TNetworkAddress(); fe_address.hostname = LOCALHOST; @@ -56,10 +62,12 @@ public: _query_ctx = QueryContext::create_shared(_query_id, ExecEnv::GetInstance(), _query_options, fe_address, true, fe_address, QuerySource::INTERNAL_FRONTEND); + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + TRuntimeFilterParamsBuilder().build()); ExecEnv::GetInstance()->set_stream_mgr(_mgr.get()); - _task_queue = std::make_unique<MultiCoreTaskQueue>(1); + _task_queue = std::make_unique<DummyTaskQueue>(1); } - ~PipelineTest() override = default; + void TearDown() override {} private: std::shared_ptr<Pipeline> _build_pipeline(int num_instances, Pipeline* parent = nullptr) { @@ -111,6 +119,7 @@ private: _pipeline_profiles.clear(); _pipeline_tasks.clear(); _runtime_states.clear(); + _runtime_filter_mgrs.clear(); } int _next_fragment_id() { return _fragment_id++; } int _next_node_id() { return _next_node_idx++; } @@ -125,7 +134,7 @@ private: std::shared_ptr<QueryContext> _query_ctx; TUniqueId _query_id = TUniqueId(); TQueryOptions _query_options; - std::unique_ptr<MultiCoreTaskQueue> _task_queue; + std::unique_ptr<DummyTaskQueue> _task_queue; // Fragment level // Fragment0 -> Fragment1 @@ -149,6 +158,9 @@ private: std::vector<std::vector<std::unique_ptr<PipelineTask>>> _pipeline_tasks; std::vector<std::vector<std::unique_ptr<RuntimeState>>> _runtime_states; + // Instance level + std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgrs; + const std::string LOCALHOST = BackendOptions::get_localhost(); const int DUMMY_PORT = config::brpc_port; }; @@ -184,7 +196,7 @@ TEST_F(PipelineTest, HAPPY_PATH) { .set_scalar_type(TPrimitiveType::INT) .build()) .build()) - .set_nullIndicatorBit(0) + .set_nullIndicatorBit(-1) .set_byteOffset(0) .set_slotIdx(0) .set_isMaterialized(true) @@ -489,7 +501,7 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { .set_scalar_type(TPrimitiveType::INT) .build()) .build()) - .set_nullIndicatorBit(0) + .set_nullIndicatorBit(-1) .set_byteOffset(0) .set_slotIdx(0) .set_isMaterialized(true) @@ -585,7 +597,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .set_scalar_type(TPrimitiveType::INT) .build()) .build()) - .set_nullIndicatorBit(0) + .set_nullIndicatorBit(-1) .set_byteOffset(0) .set_slotIdx(0) .set_isMaterialized(true) @@ -604,7 +616,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .set_scalar_type(TPrimitiveType::INT) .build()) .build()) - .set_nullIndicatorBit(0) + .set_nullIndicatorBit(-1) .set_byteOffset(0) .set_slotIdx(0) .set_isMaterialized(true) @@ -623,7 +635,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .set_scalar_type(TPrimitiveType::INT) .build()) .build()) - .set_nullIndicatorBit(0) + .set_nullIndicatorBit(-1) .set_byteOffset(0) .set_slotIdx(0) .set_isMaterialized(true) @@ -640,7 +652,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .set_scalar_type(TPrimitiveType::INT) .build()) .build()) - .set_nullIndicatorBit(0) + .set_nullIndicatorBit(-1) .set_byteOffset(4) .set_slotIdx(1) .set_isMaterialized(true) @@ -720,6 +732,73 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .append_vintermediate_tuple_id_list(1) .build()) .append_row_tuples(2, false) + .append_projections( + TExprBuilder() + .append_nodes( + TExprNodeBuilder( + TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types( + TTypeNodeBuilder() + .set_type( + TTypeNodeType:: + SCALAR) + .set_scalar_type( + TPrimitiveType:: + INT) + .build()) + .build(), + 0) + .set_slot_ref(TSlotRefBuilder(0, 0).build()) + .build()) + .build()) + .append_projections( + TExprBuilder() + .append_nodes( + TExprNodeBuilder( + TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types( + TTypeNodeBuilder() + .set_type( + TTypeNodeType:: + SCALAR) + .set_scalar_type( + TPrimitiveType:: + INT) + .build()) + .build(), + 0) + .set_slot_ref(TSlotRefBuilder(1, 1).build()) + .build()) + .build()) + .append_runtime_filters( + TRuntimeFilterDescBuilder( + 0, + TExprBuilder() + .append_nodes( + TExprNodeBuilder( + TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types( + TTypeNodeBuilder() + .set_type( + TTypeNodeType:: + SCALAR) + .set_scalar_type( + TPrimitiveType:: + INT) + .build()) + .build(), + 0) + .set_slot_ref( + TSlotRefBuilder(1, 1).build()) + .build()) + .build(), + 0, std::map<TPlanNodeId, TExpr> {}) + .set_bloom_filter_size_bytes(1048576) + .set_build_bf_exactly(false) + .build()) .build(); { @@ -850,6 +929,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { { // Build pipeline task int task_id = 0; + _runtime_filter_mgrs.resize(parallelism); + for (int j = 0; j < parallelism; j++) { + auto runtime_filter_state = RuntimeFilterParamsContext::create(_query_ctx.get()); + _runtime_filter_mgrs[j] = std::make_unique<RuntimeFilterMgr>( + _query_id, runtime_filter_state, _query_ctx->query_mem_tracker, false); + } for (size_t i = 0; i < _pipelines.size(); i++) { EXPECT_EQ(_pipelines[i]->id(), i); _pipeline_profiles[_pipelines[i]->id()] = std::make_shared<RuntimeProfile>( @@ -871,6 +956,8 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { local_runtime_state->set_task_num(_pipelines[i]->num_tasks()); local_runtime_state->set_task_execution_context( std::static_pointer_cast<TaskExecutionContext>(_context.back())); + local_runtime_state->set_runtime_filter_mgr(_runtime_filter_mgrs[j].get()); + _runtime_filter_mgrs[j]->_state->set_state(local_runtime_state.get()); std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> le_state_map; @@ -891,6 +978,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { } std::shared_ptr<vectorized::VDataStreamRecvr> downstream_recvr; + auto downstream_pipeline_profile = std::make_shared<RuntimeProfile>("Downstream Pipeline"); { // Build downstream recvr auto context = _build_fragment_context(); @@ -900,13 +988,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { downstream_runtime_state->set_task_execution_context( std::static_pointer_cast<TaskExecutionContext>(context)); - auto downstream_pipeline_profile = std::make_shared<RuntimeProfile>("Downstream Pipeline"); auto* memory_used_counter = downstream_pipeline_profile->AddHighWaterMarkCounter( "MemoryUsage", TUnit::BYTES, "", 1); downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr( downstream_runtime_state.get(), memory_used_counter, _pipelines.front()->operators().back()->row_desc(), dest_ins_id, dest_node_id, - parallelism, downstream_pipeline_profile.get(), false, 20480); + parallelism, downstream_pipeline_profile.get(), false, 2048000); } for (size_t i = 0; i < _pipelines.size(); i++) { for (int j = 0; j < parallelism; j++) { @@ -914,21 +1001,16 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { EXPECT_EQ(_pipeline_tasks[_pipelines[i]->id()][j]->prepare(scan_ranges, j, tsink, _query_ctx.get()), Status::OK()); + if (i == 1) { + auto& local_state = _runtime_states[i][j] + ->get_sink_local_state() + ->cast<HashJoinBuildSinkLocalState>(); + EXPECT_EQ(local_state._runtime_filters.size(), 1); + EXPECT_EQ(local_state._should_build_hash_table, true); + } } } - // Construct input block - vectorized::Block block; - { - vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); - - auto int_col0 = vectorized::ColumnInt32::create(); - int_col0->insert_many_vals(1, 10); - block.insert({std::move(int_col0), int_type, "test_int_col0"}); - } - auto block_mem_usage = block.allocated_bytes(); - EXPECT_GT(block_mem_usage - 1, 0); - { for (size_t i = 0; i < _pipelines.size(); i++) { for (int j = 0; j < parallelism; j++) { @@ -960,24 +1042,146 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { { for (int i = _pipelines.size() - 1; i >= 0; i--) { for (int j = 0; j < parallelism; j++) { + bool eos = false; + EXPECT_EQ(_pipeline_tasks[i][j]->execute(&eos), Status::OK()); + EXPECT_EQ(_pipeline_tasks[i][j]->_opened, true); + EXPECT_EQ(eos, false); + } + } + } + for (int i = _pipelines.size() - 1; i >= 0; i--) { + for (int j = 0; j < parallelism; j++) { + { + vectorized::Block block; + { + vectorized::DataTypePtr int_type = + std::make_shared<vectorized::DataTypeInt32>(); + + auto int_col0 = vectorized::ColumnInt32::create(); + if (j == 0 || i == 0) { + int_col0->insert_many_vals(j, 10); + } else { + size_t ndv = 16; + for (size_t n = 0; n < ndv; n++) { + int_col0->insert_many_vals(n, 1); + } + } + + block.insert({std::move(int_col0), int_type, "test_int_col0"}); + } auto& local_state = _runtime_states[i][j] ->get_local_state(_pipelines[i]->operators().front()->operator_id()) ->cast<ExchangeLocalState>(); - local_state.stream_recvr->_sender_queues[0]->decrement_senders(0); - - bool eos = false; - EXPECT_EQ(_pipeline_tasks[i][j]->execute(&eos), Status::OK()); - EXPECT_EQ(_pipeline_tasks[i][j]->_is_blocked(), false); - EXPECT_EQ(eos, true); - EXPECT_EQ(_pipeline_tasks[i][j]->is_pending_finish(), false); - EXPECT_EQ(_pipeline_tasks[i][j]->close(Status::OK()), Status::OK()); + EXPECT_EQ(local_state.stream_recvr->_sender_queues[0]->_source_dependency->ready(), + false); + EXPECT_EQ(local_state.stream_recvr->_sender_queues[0] + ->_source_dependency->_blocked_task.size(), + i == 1 ? 1 : 0); + local_state.stream_recvr->_sender_queues[0]->add_block(&block, true); + } + } + } + { + // Pipeline 1 is blocked by exchange dependency so tasks are ready after data reached. + // Pipeline 0 is blocked by hash join dependency and is still waiting for upstream tasks done. + for (int j = 0; j < parallelism; j++) { + // Task is ready and be push into runnable task queue. + EXPECT_EQ(_task_queue->take(0) != nullptr, true); + } + EXPECT_EQ(_task_queue->take(0), nullptr); + for (int j = 0; j < parallelism; j++) { + EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false); + } + } + { + // Pipeline 1 ran first and build hash table in join build operator. + for (int j = 0; j < parallelism; j++) { + bool eos = false; + EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK()); + EXPECT_EQ(eos, false); + } + for (int j = 0; j < parallelism; j++) { + auto& local_state = + _runtime_states[1][j] + ->get_local_state(_pipelines[1]->operators().front()->operator_id()) + ->cast<ExchangeLocalState>(); + local_state.stream_recvr->_sender_queues[0]->decrement_senders(0); + + bool eos = false; + EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK()); + EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false); + EXPECT_EQ(eos, true); + auto& sink_local_state = _runtime_states[1][j] + ->get_sink_local_state() + ->cast<HashJoinBuildSinkLocalState>(); + EXPECT_EQ(sink_local_state._runtime_filters_disabled, false); + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters.size(), 1); + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0] + ->need_sync_filter_size(), + false); + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0] + ->_runtime_filter_type, + RuntimeFilterType::IN_OR_BLOOM_FILTER); + EXPECT_EQ(_pipeline_tasks[1][j]->is_pending_finish(), false); + EXPECT_EQ(_pipeline_tasks[1][j]->close(Status::OK()), Status::OK()); + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]->get_real_type(), + j == 0 ? RuntimeFilterType::IN_FILTER : RuntimeFilterType::BLOOM_FILTER) + << " " << j << " " + << IRuntimeFilter::to_string( + sink_local_state._runtime_filter_slots->_runtime_filters[0] + ->get_real_type()); + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0] + ->_wrapper->is_ignored(), + false); + if (j == 0) { + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0] + ->_wrapper->_context->hybrid_set->size(), + 1); + } else { + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0] + ->_wrapper->_context->bloom_filter_func->_build_bf_exactly, + false); + + EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0] + ->_wrapper->_context->bloom_filter_func->_bloom_filter_length, + 1048576); } } - { - EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 0); - EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 0); + } + { + // Pipeline 0 ran once hash table is built. + for (int j = 0; j < parallelism; j++) { + EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false); } + for (int j = 0; j < parallelism; j++) { + bool eos = false; + EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK()); + EXPECT_EQ(eos, false); + } + for (int j = 0; j < parallelism; j++) { + auto& local_state = + _runtime_states[0][j] + ->get_local_state(_pipelines[0]->operators().front()->operator_id()) + ->cast<ExchangeLocalState>(); + local_state.stream_recvr->_sender_queues[0]->decrement_senders(0); + + bool eos = false; + EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK()); + EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false); + EXPECT_EQ(eos, true); + EXPECT_EQ(_pipeline_tasks[0][j]->is_pending_finish(), false); + EXPECT_EQ(_pipeline_tasks[0][j]->close(Status::OK()), Status::OK()); + } + } + { + // [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] join [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] produces 100 rows in instance 0. + // [2, 2, 2, 2, 2, 2, 2, 2, 2, 2] join [2, 2, 2, 2, 2, 2, 2, 2, 2, 2] produces 100 rows in instance 1. + EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 2); + EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.front()._block->rows(), + 10 * 10); + EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.back()._block->rows(), 10); + EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 0); } downstream_recvr->close(); } diff --git a/be/test/pipeline/thrift_builder.h b/be/test/pipeline/thrift_builder.h index bdcead88616..1af1ca760bd 100644 --- a/be/test/pipeline/thrift_builder.h +++ b/be/test/pipeline/thrift_builder.h @@ -58,7 +58,7 @@ public: return *this; } TQueryOptionsBuilder& set_enable_new_shuffle_hash_method(bool enable_new_shuffle_hash_method) { - _query_options.enable_new_shuffle_hash_method = enable_new_shuffle_hash_method; + _query_options.__set_enable_new_shuffle_hash_method(enable_new_shuffle_hash_method); return *this; } TQueryOptionsBuilder& set_enable_local_shuffle(bool enable_local_shuffle) { @@ -66,11 +66,23 @@ public: return *this; } TQueryOptionsBuilder& set_runtime_filter_wait_infinitely(bool runtime_filter_wait_infinitely) { - _query_options.runtime_filter_wait_infinitely = runtime_filter_wait_infinitely; + _query_options.__set_runtime_filter_wait_infinitely(runtime_filter_wait_infinitely); return *this; } TQueryOptionsBuilder& set_enable_local_merge_sort(bool enable_local_merge_sort) { - _query_options.enable_local_merge_sort = enable_local_merge_sort; + _query_options.__set_enable_local_merge_sort(enable_local_merge_sort); + return *this; + } + TQueryOptionsBuilder& set_runtime_filter_max_in_num(int64_t runtime_filter_max_in_num) { + _query_options.__set_runtime_filter_max_in_num(runtime_filter_max_in_num); + return *this; + } + TQueryOptionsBuilder& set_runtime_bloom_filter_min_size(int64_t runtime_bloom_filter_min_size) { + _query_options.__set_runtime_bloom_filter_min_size(runtime_bloom_filter_min_size); + return *this; + } + TQueryOptionsBuilder& set_runtime_bloom_filter_max_size(int64_t runtime_bloom_filter_max_size) { + _query_options.__set_runtime_bloom_filter_max_size(runtime_bloom_filter_max_size); return *this; } @@ -117,6 +129,16 @@ public: _plan_node.__set_output_tuple_id(output_tuple_id); return *this; } + TPlanNodeBuilder& append_projections(TExpr& projections) { + _plan_node.__isset.projections = true; + _plan_node.projections.push_back(projections); + return *this; + } + TPlanNodeBuilder& append_runtime_filters(TRuntimeFilterDesc& runtime_filter) { + _plan_node.__isset.runtime_filters = true; + _plan_node.runtime_filters.push_back(runtime_filter); + return *this; + } TPlanNode& build() { return _plan_node; } @@ -127,6 +149,55 @@ private: TPlanNode _plan_node; }; +class TRuntimeFilterDescBuilder { +public: + explicit TRuntimeFilterDescBuilder( + int filter_id, TExpr& src_expr, int expr_order, + std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool is_broadcast_join = false, + bool has_local_targets = true, bool has_remote_targets = false, + TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) + : _desc() { + _desc.__set_filter_id(filter_id); + _desc.__set_src_expr(src_expr); + _desc.__set_expr_order(expr_order); + _desc.__set_planId_to_target_expr(planId_to_target_expr); + _desc.__set_is_broadcast_join(is_broadcast_join); + _desc.__set_has_local_targets(has_local_targets); + _desc.__set_has_remote_targets(has_remote_targets); + _desc.__set_type(type); + } + explicit TRuntimeFilterDescBuilder( + int filter_id, TExpr&& src_expr, int expr_order, + std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool is_broadcast_join = false, + bool has_local_targets = true, bool has_remote_targets = false, + TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) + : _desc() { + _desc.__set_filter_id(filter_id); + _desc.__set_src_expr(src_expr); + _desc.__set_expr_order(expr_order); + _desc.__set_planId_to_target_expr(planId_to_target_expr); + _desc.__set_is_broadcast_join(is_broadcast_join); + _desc.__set_has_local_targets(has_local_targets); + _desc.__set_has_remote_targets(has_remote_targets); + _desc.__set_type(type); + } + + TRuntimeFilterDescBuilder& set_bloom_filter_size_bytes(int64_t bloom_filter_size_bytes) { + _desc.__set_bloom_filter_size_bytes(bloom_filter_size_bytes); + return *this; + } + TRuntimeFilterDescBuilder& set_build_bf_exactly(bool build_bf_exactly) { + _desc.__set_build_bf_exactly(build_bf_exactly); + return *this; + } + TRuntimeFilterDesc& build() { return _desc; } + TRuntimeFilterDescBuilder(const TRuntimeFilterDescBuilder&) = delete; + void operator=(const TRuntimeFilterDescBuilder&) = delete; + +private: + TRuntimeFilterDesc _desc; +}; + class TExchangeNodeBuilder { public: explicit TExchangeNodeBuilder() : _plan_node() {} @@ -269,7 +340,10 @@ private: class TTypeDescBuilder { public: - explicit TTypeDescBuilder() : _desc() {} + explicit TTypeDescBuilder() : _desc() { + _desc.__set_result_is_nullable(false); + _desc.__set_is_nullable(false); + } TTypeDescBuilder& set_types(TTypeNode type_node) { _desc.types.push_back(type_node); @@ -423,6 +497,7 @@ public: _expr_node.__set_type(type); _expr_node.__set_num_children(num_children); _expr_node.__set_opcode(opcode); + _expr_node.__set_is_nullable(false); } explicit TExprNodeBuilder(TExprNodeType::type node_type, TTypeDesc&& type, int num_children, TExprOpcode::type opcode = TExprOpcode::INVALID_OPCODE) @@ -477,4 +552,27 @@ private: TEqJoinCondition _eq_conjuncts; }; +class TRuntimeFilterParamsBuilder { +public: + explicit TRuntimeFilterParamsBuilder( + TNetworkAddress runtime_filter_merge_addr = TNetworkAddress(), + std::map<int, std::vector<TRuntimeFilterTargetParams>> rid_to_target_param = {}, + std::map<int, TRuntimeFilterDesc> rid_to_runtime_filter = {}, + std::map<int, int> runtime_filter_builder_num = {}, + std::map<int, std::vector<TRuntimeFilterTargetParamsV2>> rid_to_target_paramv2 = {}) + : _params() { + _params.__set_runtime_filter_merge_addr(runtime_filter_merge_addr); + _params.__set_rid_to_target_param(rid_to_target_param); + _params.__set_rid_to_runtime_filter(rid_to_runtime_filter); + _params.__set_runtime_filter_builder_num(runtime_filter_builder_num); + _params.__set_rid_to_target_paramv2(rid_to_target_paramv2); + } + TRuntimeFilterParams& build() { return _params; } + TRuntimeFilterParamsBuilder(const TRuntimeFilterParamsBuilder&) = delete; + void operator=(const TRuntimeFilterParamsBuilder&) = delete; + +private: + TRuntimeFilterParams _params; +}; + } // namespace doris::pipeline diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 70db19bfa33..9ab20842c25 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -392,18 +392,21 @@ struct TRuntimeFilterTargetParamsV2 { } struct TRuntimeFilterParams { - // Runtime filter merge instance address + // Runtime filter merge instance address. Used if this filter has a remote target 1: optional Types.TNetworkAddress runtime_filter_merge_addr // deprecated 2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param // Runtime filter ID to the runtime filter desc + // Used if this filter has a remote target 3: optional map<i32, PlanNodes.TRuntimeFilterDesc> rid_to_runtime_filter // Number of Runtime filter producers + // Used if this filter has a remote target 4: optional map<i32, i32> runtime_filter_builder_num + // Used if this filter has a remote target 5: optional map<i32, list<TRuntimeFilterTargetParamsV2>> rid_to_target_paramv2 } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index be920c2baf5..c7f8d247d7c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1252,7 +1252,7 @@ struct TRuntimeFilterDesc { // The order of Expr in join predicate 3: required i32 expr_order - // Map of target node id to the target expr + // Map of target node id to the target expr. Used by consumer 4: required map<Types.TPlanNodeId, Exprs.TExpr> planId_to_target_expr // Indicates if the source join node of this filter is a broadcast or --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org