liaoxin01 commented on code in PR #33752: URL: https://github.com/apache/doris/pull/33752#discussion_r1568435848
########## be/src/io/fs/multi_table_pipe.cpp: ########## @@ -114,25 +112,35 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size } else { pipe = iter->second; } - RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size), - "append failed in unplanned kafka pipe"); - ++_unplanned_row_cnt; - if (_unplanned_row_cnt >= _row_threshold || - _unplanned_pipes.size() >= _wait_tables_threshold) { - LOG(INFO) << fmt::format( - "unplanned row cnt={} reach row_threshold={} or " - "wait_plan_table_threshold={}, " - "plan them", - _unplanned_row_cnt, _row_threshold, _wait_tables_threshold) - << ", ctx: " << _ctx->brief(); - Status st = request_and_exec_plans(); - _unplanned_row_cnt = 0; - if (!st.ok()) { - return st; - } + } + + // It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity, + // otherwise the following situation may occur: + // the pipe is full but still cannot trigger the request and exec plan condition, + // causing one stream multi table load can not finish + auto pipe_current_capacity = pipe->current_capacity(); + auto pipe_max_capacity = pipe->max_capacity(); + if (_unplanned_row_cnt >= _row_threshold || _unplanned_pipes.size() >= _wait_tables_threshold || Review Comment: This code needs to be placed in the else section above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org