yiguolei commented on code in PR #39031:
URL: https://github.com/apache/doris/pull/39031#discussion_r1708839354


##########
be/src/pipeline/dependency.h:
##########
@@ -847,28 +849,74 @@ struct LocalExchangeSharedState : public BasicSharedState 
{
     void add_mem_usage(int channel_id, size_t delta, bool 
update_total_mem_usage = true) {
         mem_trackers[channel_id]->consume(delta);
         if (update_total_mem_usage) {
-            add_total_mem_usage(delta);
+            add_total_mem_usage(delta, channel_id);
         }
     }
 
     void sub_mem_usage(int channel_id, size_t delta, bool 
update_total_mem_usage = true) {
         mem_trackers[channel_id]->release(delta);
         if (update_total_mem_usage) {
-            sub_total_mem_usage(delta);
+            sub_total_mem_usage(delta, channel_id);
         }
     }
 
-    void add_total_mem_usage(size_t delta) {
+    virtual void add_total_mem_usage(size_t delta, int channel_id = 0) {
         if (mem_usage.fetch_add(delta) + delta > 
config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->block();
         }
     }
 
-    void sub_total_mem_usage(size_t delta) {
+    virtual void sub_total_mem_usage(size_t delta, int channel_id = 0) {
         if (mem_usage.fetch_sub(delta) - delta <= 
config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->set_ready();
         }
     }
 };
 
+struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
+    LocalMergeExchangeSharedState(int num_instances)
+            : LocalExchangeSharedState(num_instances),
+              _queues_mem_usege(num_instances),
+              _each_queue_limit(config::local_exchange_buffer_mem_limit / 
num_instances) {
+        for (size_t i = 0; i < num_instances; i++) {
+            _queues_mem_usege[i] = 0;
+        }
+    }
+
+    void create_dependencies(int operator_id, int node_id) override {
+        sink_deps.resize(source_deps.size());
+        for (size_t i = 0; i < source_deps.size(); i++) {
+            source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
+                                                          
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
+            source_deps[i]->set_shared_state(this);
+            sink_deps[i] = std::make_shared<Dependency>(
+                    operator_id, node_id, 
"LOCAL_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
+            sink_deps[i]->set_shared_state(this);
+        }
+    }
+
+    void sub_total_mem_usage(size_t delta, int channel_id) override {
+        if (_queues_mem_usege[channel_id].fetch_sub(delta) - delta <= 
_each_queue_limit) {

Review Comment:
   typo error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to