This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 09a7a355478 [fix] (streamload) fixed the issue of data loss due to concurrency when importing data from streamload (#48948) (#49666) 09a7a355478 is described below commit 09a7a355478bd5f18572da508b6780f28817c02b Author: Xin Liao <liao...@selectdb.com> AuthorDate: Mon Mar 31 11:18:42 2025 +0800 [fix] (streamload) fixed the issue of data loss due to concurrency when importing data from streamload (#48948) (#49666) Cherry-picked from #48948 Co-authored-by: kang <35803862+ghkan...@users.noreply.github.com> Co-authored-by: lik40 <li...@chinatelecom.cn> --- be/src/vec/sink/writer/async_result_writer.cpp | 54 +++++++++++++------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 29877cb268a..0cc37e3458b 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -108,41 +108,43 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } DCHECK(_dependency); - if (_writer_status.ok()) { - while (true) { - ThreadCpuStopWatch cpu_time_stop_watch; - cpu_time_stop_watch.start(); - Defer defer {[&]() { - if (state && state->get_query_ctx()) { - state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time()); - } - }}; - if (!_eos && _data_queue.empty() && _writer_status.ok()) { - std::unique_lock l(_m); - while (!_eos && _data_queue.empty() && _writer_status.ok()) { - // Add 1s to check to avoid lost signal - _cv.wait_for(l, std::chrono::seconds(1)); - } + while (_writer_status.ok()) { + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); + Defer defer {[&]() { + if (state && state->get_query_ctx()) { + state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time()); + } + }}; + + //1) wait scan operator write data + { + std::unique_lock l(_m); + while (!_eos && _data_queue.empty() && _writer_status.ok()) { + // Add 1s to check to avoid lost signal + _cv.wait_for(l, std::chrono::seconds(1)); } + //check if eos or writer error if ((_eos && _data_queue.empty()) || !_writer_status.ok()) { _data_queue.clear(); break; } + } - auto block = _get_block_from_queue(); - auto status = write(state, *block); - if (!status.ok()) [[unlikely]] { - std::unique_lock l(_m); - _writer_status.update(status); - if (_is_finished()) { - _dependency->set_ready(); - } - break; + //2) get the block from data queue and write to downstream + auto block = _get_block_from_queue(); + auto status = write(state, *block); + if (!status.ok()) [[unlikely]] { + std::unique_lock l(_m); + _writer_status.update(status); + if (_is_finished()) { + _dependency->set_ready(); } - - _return_free_block(std::move(block)); + break; } + + _return_free_block(std::move(block)); } bool need_finish = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org