github-actions[bot] commented on code in PR #29466: URL: https://github.com/apache/doris/pull/29466#discussion_r1440237568
########## be/src/vec/sink/writer/async_result_writer.cpp: ########## @@ -93,48 +95,70 @@ void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profil void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profile) { if (auto status = open(state, profile); !status.ok()) { - force_close(status); + auto close_st = close(status); + return; } - if (_writer_status.ok()) { - while (true) { - if (!_eos && _data_queue.empty() && _writer_status.ok()) { - std::unique_lock l(_m); - while (!_eos && _data_queue.empty() && _writer_status.ok()) { - _cv.wait(l); - } - } - - if ((_eos && _data_queue.empty()) || !_writer_status.ok()) { - _data_queue.clear(); - break; - } - - auto block = _get_block_from_queue(); - auto status = write(block); - if (!status.ok()) [[unlikely]] { - std::unique_lock l(_m); - _writer_status = status; - if (_dependency && _is_finished()) { - _dependency->set_ready(); - } - break; - } + while (true) { + std::unique_lock<std::mutex> lock(_m); + + while (!_is_closed && !_eos && _data_queue.empty()) { + _cv.wait_for(lock, std::chrono::seconds(1)); + } + + if (_is_closed) { + break; + } + + if (_eos && _data_queue.empty()) { + break; + } + + // if _eos == true && !_data_queue.empty() + // we will write block to downstream one by one until _data_queue is empty + // and loop will be breaked then. + // release lock before IO + lock.unlock(); + + auto block = _get_block_from_queue(); + // for VFileResultWriter, we have three types of transformer: + // 1. VCSVTransformer + // 2. VParquetTransformer + // 3. FORMAT_ORC + // what if method write blocks for a very long time? seems that we cannot + // notify them to stop unless they are using async writing pattern. + auto write_st = write(block); + + if (write_st.ok()) { _return_free_block(std::move(block)); + continue; } - } - // if not in transaction or status is in error or force close we can do close in - // async IO thread - if (!_writer_status.ok() || !in_transaction()) { - _writer_status = close(_writer_status); - _need_normal_close = false; + lock.lock(); + // from this point on, there will be no further block added to _data_queue + _is_closed = true; + + if (_dependency && _is_finished()) { + _dependency->set_ready(); + } + + lock.unlock(); + break; } + + std::lock_guard<std::mutex> lg(_m); + // we need this to notify try_close to return. _writer_thread_closed = true; + _data_queue.clear(); + if (_finish_dependency) { _finish_dependency->set_ready(); } + // notify try_close thread + _cv.notify_one(); + + return; Review Comment: warning: redundant return statement at the end of a function with a void return type [readability-redundant-control-flow] be/src/vec/sink/writer/async_result_writer.cpp:159: ```diff - - return; - } + } ``` ########## be/src/vec/sink/vresult_file_sink.cpp: ########## @@ -110,44 +113,54 @@ Status VResultFileSink::open(RuntimeState* state) { return AsyncWriterSink::open(state); } -Status VResultFileSink::close(RuntimeState* state, Status exec_status) { +Status VResultFileSink::try_close(RuntimeState* state, Status exec_status) { Review Comment: warning: method 'try_close' can be made const [readability-make-member-function-const] ```suggestion Status VResultFileSink::try_close(RuntimeState* state, Status exec_status) const { ``` be/src/vec/sink/vresult_file_sink.h:65: ```diff - Status try_close(RuntimeState* rs, Status exec_status) override; + Status try_close(RuntimeState* rs, Status exec_status) const override; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org