liaoxin01 commented on code in PR #33752:
URL: https://github.com/apache/doris/pull/33752#discussion_r1568435848


##########
be/src/io/fs/multi_table_pipe.cpp:
##########
@@ -114,25 +112,35 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
         } else {
             pipe = iter->second;
         }
-        RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
-                                       "append failed in unplanned kafka 
pipe");
-
         ++_unplanned_row_cnt;
-        if (_unplanned_row_cnt >= _row_threshold ||
-            _unplanned_pipes.size() >= _wait_tables_threshold) {
-            LOG(INFO) << fmt::format(
-                                 "unplanned row cnt={} reach row_threshold={} 
or "
-                                 "wait_plan_table_threshold={}, "
-                                 "plan them",
-                                 _unplanned_row_cnt, _row_threshold, 
_wait_tables_threshold)
-                      << ", ctx: " << _ctx->brief();
-            Status st = request_and_exec_plans();
-            _unplanned_row_cnt = 0;
-            if (!st.ok()) {
-                return st;
-            }
+    }
+
+    // It is necessary to determine whether the sum of pipe_current_capacity 
and size is greater than pipe_max_capacity,
+    // otherwise the following situation may occur:
+    // the pipe is full but still cannot trigger the request and exec plan 
condition,
+    // causing one stream multi table load can not finish
+    auto pipe_current_capacity = pipe->current_capacity();
+    auto pipe_max_capacity = pipe->max_capacity();
+    if (_unplanned_row_cnt >= _row_threshold || _unplanned_pipes.size() >= 
_wait_tables_threshold ||

Review Comment:
   This code needs to be placed in the else section above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to