github-actions[bot] commented on code in PR #18828:
URL: https://github.com/apache/doris/pull/18828#discussion_r1174719070


##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1154,10 +1155,101 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request,
     return runtime_filter_mgr->update_filter(request, attach_data);
 }
 
+Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
+                                   butil::IOBufAsZeroCopyInputStream* 
attach_data) {
+    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+
+    const auto& fragment_instance_ids = request->fragment_instance_ids();
+    if (fragment_instance_ids.size() > 0) {
+        // Create a runtime filter by the first instance and reuse it by 
others. This
+        // runtime filter is allocated by the object pool hold by query 
context.
+        std::shared_ptr<RuntimePredicateWrapper> wrapper = nullptr;
+        UniqueId fragment_instance_id = fragment_instance_ids[0];
+        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+
+        std::shared_ptr<FragmentExecState> fragment_state;
+        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+        ObjectPool* pool;
+        if (is_pipeline) {
+            std::unique_lock<std::mutex> lock(_lock);
+            auto iter = _pipeline_map.find(tfragment_instance_id);
+            if (iter == _pipeline_map.end()) {
+                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
+                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
+            }
+            pip_context = iter->second;
+
+            DCHECK(pip_context != nullptr);
+            runtime_filter_mgr = 
pip_context->get_runtime_state()->runtime_filter_mgr();
+            pool = &pip_context->get_query_context()->obj_pool;
+        } else {
+            std::unique_lock<std::mutex> lock(_lock);
+            auto iter = _fragment_map.find(tfragment_instance_id);
+            if (iter == _fragment_map.end()) {
+                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
+                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
+            }
+            fragment_state = iter->second;
+
+            DCHECK(fragment_state != nullptr);
+            runtime_filter_mgr = 
fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+            pool = &fragment_state->get_fragments_ctx()->obj_pool;

Review Comment:
   warning: no member named 'get_fragments_ctx' in 'doris::FragmentExecState' 
[clang-diagnostic-error]
   ```cpp
               pool = &fragment_state->get_fragments_ctx()->obj_pool;
                                       ^
   ```
   



##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -35,6 +35,7 @@
 #include "exprs/runtime_filter.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
+#include "runtime/query_fragments_ctx.h"

Review Comment:
   warning: 'runtime/query_fragments_ctx.h' file not found 
[clang-diagnostic-error]
   ```cpp
   #include "runtime/query_fragments_ctx.h"
            ^
   ```
   



##########
be/src/vec/exec/scan/vscan_node.cpp:
##########
@@ -317,7 +317,8 @@ Status VScanNode::_register_runtime_filter() {
         IRuntimeFilter* runtime_filter = nullptr;
         const auto& filter_desc = _runtime_filter_descs[i];
         RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
-                RuntimeFilterRole::CONSUMER, filter_desc, 
_state->query_options(), id()));
+                RuntimeFilterRole::CONSUMER, filter_desc, 
_state->query_options(), id(), false,
+                &_state->get_query_fragments_ctx()->obj_pool));

Review Comment:
   warning: no member named 'get_query_fragments_ctx' in 'doris::RuntimeState' 
[clang-diagnostic-error]
   ```cpp
                   &_state->get_query_fragments_ctx()->obj_pool));
                            ^
   ```
   **be/src/common/status.h:509:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           Status _status_ = (stmt);       \
                              ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to