liaoxin01 commented on code in PR #33752:
URL: https://github.com/apache/doris/pull/33752#discussion_r1568435848
##########
be/src/io/fs/multi_table_pipe.cpp:
##########
@@ -114,25 +112,35 @@ Status MultiTablePipe::dispatch(const std::string& table,
const char* data, size
} else {
pipe = iter->second;
}
- RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
- "append failed in unplanned kafka
pipe");
-
++_unplanned_row_cnt;
- if (_unplanned_row_cnt >= _row_threshold ||
- _unplanned_pipes.size() >= _wait_tables_threshold) {
- LOG(INFO) << fmt::format(
- "unplanned row cnt={} reach row_threshold={}
or "
- "wait_plan_table_threshold={}, "
- "plan them",
- _unplanned_row_cnt, _row_threshold,
_wait_tables_threshold)
- << ", ctx: " << _ctx->brief();
- Status st = request_and_exec_plans();
- _unplanned_row_cnt = 0;
- if (!st.ok()) {
- return st;
- }
+ }
+
+ // It is necessary to determine whether the sum of pipe_current_capacity
and size is greater than pipe_max_capacity,
+ // otherwise the following situation may occur:
+ // the pipe is full but still cannot trigger the request and exec plan
condition,
+ // causing one stream multi table load can not finish
+ auto pipe_current_capacity = pipe->current_capacity();
+ auto pipe_max_capacity = pipe->max_capacity();
+ if (_unplanned_row_cnt >= _row_threshold || _unplanned_pipes.size() >=
_wait_tables_threshold ||
Review Comment:
This code needs to be placed in the else section above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]