liaoxin01 commented on code in PR #51989: URL: https://github.com/apache/doris/pull/51989#discussion_r2160357384
########## be/src/vec/sink/writer/vtablet_writer_v2.cpp: ########## @@ -705,32 +705,87 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -Status VTabletWriterV2::_close_wait(bool incremental) { +std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_incremental_streams() { + std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (stream->is_incremental()) { + incremental_streams.insert(stream); + } + } + } + return incremental_streams; +} + +std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() { + std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (!stream->is_incremental()) { + non_incremental_streams.insert(stream); + } + } + } + return non_incremental_streams; +} + +Status VTabletWriterV2::_close_wait( + std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams) { SCOPED_TIMER(_close_load_timer); - auto st = _load_stream_map->for_each_st( - [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status { - if (streams.is_incremental() != incremental) { - return Status::OK(); - } - int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - auto st = streams.close_wait(_state, remain_ms); - if (!st.ok()) { - LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id - << ", load_id=" << print_id(_load_id) << ": " << st; - } - return st; - }); - if (!st.ok()) { - LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); + Status status; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + while (true) { + RETURN_IF_ERROR(_check_timeout()); + RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node)); Review Comment: Will return an error when one stream close failed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org