github-actions[bot] commented on code in PR #30294:
URL: https://github.com/apache/doris/pull/30294#discussion_r1464236729


##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -95,25 +95,42 @@ class PipelineXTask : public PipelineTask {
     bool is_pending_finish() override { return _finish_blocked_dependency() != 
nullptr; }
 
     std::vector<DependencySPtr>& get_downstream_dependency() { return 
_downstream_dependency; }
+    std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() { 
return _shared_states; }
 
-    void add_upstream_dependency(std::vector<DependencySPtr>& 
multi_upstream_dependency) {
+    void add_upstream_dependency(std::vector<DependencySPtr>& 
multi_upstream_dependency,
+                                 std::map<int, 
std::shared_ptr<BasicSharedState>>& shared_states) {
         for (auto dep : multi_upstream_dependency) {
             int dst_id = dep->id();
             if (!_upstream_dependency.contains(dst_id)) {
                 _upstream_dependency.insert({dst_id, {dep}});
             } else {
                 _upstream_dependency[dst_id].push_back(dep);
             }
+
+            if (shared_states.contains(dst_id) && 
!_shared_states.contains(dst_id)) {
+                // Shared state is created by upstream task's sink operator 
and shared by source operator of this task.
+                _shared_states.insert({dst_id, shared_states[dst_id]});
+            } else if (_shared_states.contains(dst_id) && 
!shared_states.contains(dst_id)) {
+                // Shared state is created by this task's source operator and 
shared by upstream task's sink operator.
+                shared_states.insert({dst_id, _shared_states[dst_id]});
+            }
         }
     }
 
     std::vector<DependencySPtr>& get_upstream_dependency(int id) {
         if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
-            _upstream_dependency.insert({id, {DependencySPtr {}}});
+            _upstream_dependency.insert({id, {}});
         }
         return _upstream_dependency[id];
     }
 
+    BasicSharedState* get_shared_state(int id) {

Review Comment:
   warning: method 'get_shared_state' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static BasicSharedState* get_shared_state(int id) {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to