github-actions[bot] commented on code in PR #18828: URL: https://github.com/apache/doris/pull/18828#discussion_r1174719070
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -1154,10 +1155,101 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, return runtime_filter_mgr->update_filter(request, attach_data); } +Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, + butil::IOBufAsZeroCopyInputStream* attach_data) { + bool is_pipeline = request->has_is_pipeline() && request->is_pipeline(); + + const auto& fragment_instance_ids = request->fragment_instance_ids(); + if (fragment_instance_ids.size() > 0) { + // Create a runtime filter by the first instance and reuse it by others. This + // runtime filter is allocated by the object pool hold by query context. + std::shared_ptr<RuntimePredicateWrapper> wrapper = nullptr; + UniqueId fragment_instance_id = fragment_instance_ids[0]; + TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); + + std::shared_ptr<FragmentExecState> fragment_state; + std::shared_ptr<pipeline::PipelineFragmentContext> pip_context; + + RuntimeFilterMgr* runtime_filter_mgr = nullptr; + ObjectPool* pool; + if (is_pipeline) { + std::unique_lock<std::mutex> lock(_lock); + auto iter = _pipeline_map.find(tfragment_instance_id); + if (iter == _pipeline_map.end()) { + VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; + return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + } + pip_context = iter->second; + + DCHECK(pip_context != nullptr); + runtime_filter_mgr = pip_context->get_runtime_state()->runtime_filter_mgr(); + pool = &pip_context->get_query_context()->obj_pool; + } else { + std::unique_lock<std::mutex> lock(_lock); + auto iter = _fragment_map.find(tfragment_instance_id); + if (iter == _fragment_map.end()) { + VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; + return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + } + fragment_state = iter->second; + + DCHECK(fragment_state != nullptr); + runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr(); + pool = &fragment_state->get_fragments_ctx()->obj_pool; Review Comment: warning: no member named 'get_fragments_ctx' in 'doris::FragmentExecState' [clang-diagnostic-error] ```cpp pool = &fragment_state->get_fragments_ctx()->obj_pool; ^ ``` ########## be/src/runtime/runtime_filter_mgr.cpp: ########## @@ -35,6 +35,7 @@ #include "exprs/runtime_filter.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/query_fragments_ctx.h" Review Comment: warning: 'runtime/query_fragments_ctx.h' file not found [clang-diagnostic-error] ```cpp #include "runtime/query_fragments_ctx.h" ^ ``` ########## be/src/vec/exec/scan/vscan_node.cpp: ########## @@ -317,7 +317,8 @@ Status VScanNode::_register_runtime_filter() { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id())); + RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(), false, + &_state->get_query_fragments_ctx()->obj_pool)); Review Comment: warning: no member named 'get_query_fragments_ctx' in 'doris::RuntimeState' [clang-diagnostic-error] ```cpp &_state->get_query_fragments_ctx()->obj_pool)); ^ ``` **be/src/common/status.h:509:** expanded from macro 'RETURN_IF_ERROR' ```cpp Status _status_ = (stmt); \ ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org