github-actions[bot] commented on code in PR #29466: URL: https://github.com/apache/doris/pull/29466#discussion_r1440172276
########## be/src/vec/sink/vresult_file_sink.cpp: ########## @@ -110,44 +113,54 @@ Status VResultFileSink::open(RuntimeState* state) { return AsyncWriterSink::open(state); } -Status VResultFileSink::close(RuntimeState* state, Status exec_status) { +Status VResultFileSink::try_close(RuntimeState* rs, Status exec_status) { Review Comment: warning: method 'try_close' can be made const [readability-make-member-function-const] ```suggestion Status VResultFileSink::try_close(RuntimeState* rs, Status exec_status) const { ``` be/src/vec/sink/vresult_file_sink.h:65: ```diff - Status try_close(RuntimeState* rs, Status exec_status) override; + Status try_close(RuntimeState* rs, Status exec_status) const override; ``` ########## be/src/vec/sink/vresult_file_sink.cpp: ########## @@ -110,44 +113,54 @@ return AsyncWriterSink::open(state); } -Status VResultFileSink::close(RuntimeState* state, Status exec_status) { +Status VResultFileSink::try_close(RuntimeState* rs, Status exec_status) { if (_closed) { + LOG_WARNING("Closing a closed result file sink"); return Status::OK(); } - Status final_status = exec_status; - // close the writer - if (_writer && _writer->need_normal_close()) { - Status st = _writer->close(); - if (!st.ok() && exec_status.ok()) { - // close file writer failed, should return this error to client - final_status = st; - } + if (_writer == nullptr) { + LOG_WARNING("writer of result file sink is not initialized"); + return Status::RuntimeError("writer of result file sink is not initialized"); } + + // TaskScheduler should call close if exec_status is not ok. + // actually we should remove arg exec_status. + RETURN_IF_ERROR(exec_status); + + // here we should rename _writer->cloes to _writer->try_close + // to make read code easily + // check of EOF should be left to operator + RETURN_IF_ERROR(_writer->try_close(exec_status)); + if (_is_top_sink) { - // close sender, this is normal path end if (_sender) { - _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); - static_cast<void>(_sender->close(final_status)); + _sender->update_num_written_rows(_writer->get_written_rows()); + // WIP: close -> flush + RETURN_IF_ERROR(_sender->close(exec_status)); } - static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time( + RETURN_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id())); + rs->fragment_instance_id())); } else { - if (final_status.ok()) { - auto st = _stream_sender->send(state, _output_block.get(), true); - if (!st.template is<ErrorCode::END_OF_FILE>()) { - RETURN_IF_ERROR(st); - } - } - RETURN_IF_ERROR(_stream_sender->close(state, final_status)); + RETURN_IF_ERROR(_stream_sender->send(rs, _output_block.get(), true)); + // WIP: close -> flush + RETURN_IF_ERROR(_stream_sender->close(rs, exec_status)); _output_block->clear(); } - _closed = true; return Status::OK(); } +Status VResultFileSink::close(RuntimeState* state, Status exec_status) { Review Comment: warning: method 'close' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/vresult_file_sink.h:68: ```diff - Status close(RuntimeState* state, Status exec_status) override; + static Status close(RuntimeState* state, Status exec_status) override; ``` ########## be/src/vec/sink/writer/async_result_writer.h: ########## @@ -98,19 +115,19 @@ void _return_free_block(std::unique_ptr<Block>); -private: +protected: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` <details> <summary>Additional context</summary> **be/src/vec/sink/writer/async_result_writer.h:109:** previously declared here ```cpp protected: ^ ``` </details> ########## be/src/vec/sink/writer/async_result_writer.cpp: ########## @@ -93,48 +95,68 @@ void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profil void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profile) { if (auto status = open(state, profile); !status.ok()) { - force_close(status); + auto close_st = close(status); + return; } - if (_writer_status.ok()) { - while (true) { - if (!_eos && _data_queue.empty() && _writer_status.ok()) { - std::unique_lock l(_m); - while (!_eos && _data_queue.empty() && _writer_status.ok()) { - _cv.wait(l); - } - } - - if ((_eos && _data_queue.empty()) || !_writer_status.ok()) { - _data_queue.clear(); - break; - } - - auto block = _get_block_from_queue(); - auto status = write(block); - if (!status.ok()) [[unlikely]] { - std::unique_lock l(_m); - _writer_status = status; - if (_dependency && _is_finished()) { - _dependency->set_ready(); - } - break; - } + while (true) { + std::unique_lock<std::mutex> lock(_m); + + while (!_is_closed && !_eos && _data_queue.empty()) { + _cv.wait_for(lock, std::chrono::seconds(1)); + } + + if (_is_closed) { + break; + } + + if (_eos && _data_queue.empty()) { + break; + } + + // if _eos == true && !_data_queue.empty() + // we will write block to downstream one by one until _data_queue is empty + // and loop will be breaked then. + + // release lock before IO + lock.unlock(); + + auto block = _get_block_from_queue(); + // for VFileResultWriter, we have three types of transformer: + // 1. VCSVTransformer + // 2. VParquetTransformer + // 3. FORMAT_ORC + // what if method write blocks for a very long time? seems that we cannot + // notify them to stop unless they are using async writing pattern. + auto write_st = write(block); + if (write_st.ok()) { _return_free_block(std::move(block)); + continue; + } + + lock.lock(); + // from this point on, there will be no further block added to _data_queue + _is_closed = true; + + if (_dependency && _is_finished()) { + _dependency->set_ready(); } - } - // if not in transaction or status is in error or force close we can do close in - // async IO thread - if (!_writer_status.ok() || !in_transaction()) { - _writer_status = close(_writer_status); - _need_normal_close = false; + lock.unlock(); + break; } + + std::lock_guard<std::mutex> lg(_m); + // we need this to notify try_close to return. _writer_thread_closed = true; + _data_queue.clear(); + if (_finish_dependency) { _finish_dependency->set_ready(); } + + return; } Review Comment: warning: redundant return statement at the end of a function with a void return type [readability-redundant-control-flow] ```suggestion } ``` ########## be/src/vec/sink/writer/async_result_writer.h: ########## @@ -19,8 +19,10 @@ #include <concurrentqueue.h> Review Comment: warning: 'concurrentqueue.h' file not found [clang-diagnostic-error] ```cpp #include <concurrentqueue.h> ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org