This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new db5d95ea8d0 branch-3.1: Revert "[refactor](sink) refactor vtablet 
writer v2 sequential close … #52566 (#52634)
db5d95ea8d0 is described below

commit db5d95ea8d0a89b52b9dd30da97bdff658304045
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 2 18:06:41 2025 +0800

    branch-3.1: Revert "[refactor](sink) refactor vtablet writer v2 sequential 
close … #52566 (#52634)
    
    Cherry-picked from #52566
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/vec/sink/load_stream_map_pool.h             |   9 --
 be/src/vec/sink/load_stream_stub.cpp               |  72 +++++++-------
 be/src/vec/sink/load_stream_stub.h                 |  10 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       | 107 +++++----------------
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  12 +--
 .../test_writer_v2_fault_injection.groovy          |  17 +---
 6 files changed, 69 insertions(+), 158 deletions(-)

diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index bf4f3df2cc9..4ecae2f16be 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -98,15 +98,6 @@ public:
     // only call this method after release() returns true.
     void close_load(bool incremental);
 
-    std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> 
get_streams_for_node() {
-        decltype(_streams_for_node) snapshot;
-        {
-            std::lock_guard<std::mutex> lock(_mutex);
-            snapshot = _streams_for_node;
-        }
-        return snapshot;
-    }
-
 private:
     const UniqueId _load_id;
     const int64_t _src_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 62295129dd2..d57ec02645c 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -114,7 +114,9 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
         LOG(WARNING) << "stub is not exist when on_closed, " << *this;
         return;
     }
+    std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
     stub->_is_closed.store(true);
+    stub->_close_cv.notify_all();
 }
 
 inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler& handler) {
@@ -328,30 +330,37 @@ Status LoadStreamStub::wait_for_schema(int64_t 
partition_id, int64_t index_id, i
     return Status::OK();
 }
 
-Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* 
is_closed) {
+Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
     DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
-    DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
-                    { return Status::InternalError("close failed"); });
-    *is_closed = true;
     if (!_is_open.load()) {
         // we don't need to close wait on non-open streams
         return Status::OK();
     }
     if (!_is_closing.load()) {
-        *is_closed = false;
         return _status;
     }
-    if (state->get_query_ctx()->is_cancelled()) {
-        return state->get_query_ctx()->exec_status();
-    }
     if (_is_closed.load()) {
-        RETURN_IF_ERROR(_check_cancel());
-        if (!_is_eos.load()) {
-            return Status::InternalError("Stream closed without EOS, {}", 
to_string());
+        return _check_cancel();
+    }
+    DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
+    std::unique_lock<bthread::Mutex> lock(_close_mutex);
+    auto timeout_sec = timeout_ms / 1000;
+    while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
+        //the query maybe cancel, so need check after wait 1s
+        timeout_sec = timeout_sec - 1;
+        LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << 
timeout_sec
+                  << ", is_closed=" << _is_closed.load()
+                  << ", is_cancelled=" << 
state->get_query_ctx()->is_cancelled();
+        int ret = _close_cv.wait_for(lock, 1000000);
+        if (ret != 0 && timeout_sec <= 0) {
+            return Status::InternalError("stream close_wait timeout, error={}, 
timeout_ms={}, {}",
+                                         ret, timeout_ms, to_string());
         }
-        return Status::OK();
     }
-    *is_closed = false;
+    RETURN_IF_ERROR(_check_cancel());
+    if (!_is_eos.load()) {
+        return Status::InternalError("stream closed without eos, {}", 
to_string());
+    }
     return Status::OK();
 }
 
@@ -365,7 +374,11 @@ void LoadStreamStub::cancel(Status reason) {
         _cancel_st = reason;
         _is_cancelled.store(true);
     }
-    _is_closed.store(true);
+    {
+        std::lock_guard<bthread::Mutex> lock(_close_mutex);
+        _is_closed.store(true);
+        _close_cv.notify_all();
+    }
 }
 
 Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const 
Slice> data) {
@@ -424,34 +437,12 @@ void LoadStreamStub::_handle_failure(butil::IOBuf& buf, 
Status st) {
         switch (hdr.opcode()) {
         case PStreamHeader::ADD_SEGMENT:
         case PStreamHeader::APPEND_DATA: {
-            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
-                add_failed_tablet(hdr.tablet_id(), st);
-                return;
-            });
-            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
-                add_failed_tablet(hdr.tablet_id(), st);
-                return;
-            });
             add_failed_tablet(hdr.tablet_id(), st);
         } break;
         case PStreamHeader::CLOSE_LOAD: {
-            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
-                brpc::StreamClose(_stream_id);
-                return;
-            });
             brpc::StreamClose(_stream_id);
         } break;
         case PStreamHeader::GET_SCHEMA: {
-            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
-                // Just log and let wait_for_schema timeout
-                std::ostringstream oss;
-                for (const auto& tablet : hdr.tablets()) {
-                    oss << " " << tablet.tablet_id();
-                }
-                LOG(WARNING) << "failed to send GET_SCHEMA request, 
tablet_id:" << oss.str() << ", "
-                             << *this;
-                return;
-            });
             // Just log and let wait_for_schema timeout
             std::ostringstream oss;
             for (const auto& tablet : hdr.tablets()) {
@@ -565,4 +556,13 @@ Status LoadStreamStubs::close_load(const 
std::vector<PTabletID>& tablets_to_comm
     return status;
 }
 
+Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
+    MonotonicStopWatch watch;
+    watch.start();
+    for (auto& stream : _streams) {
+        RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - 
watch.elapsed_time() / 1000 / 1000));
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index afce7119b38..9816770c82e 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -152,7 +152,7 @@ public:
 
     // wait remote to close stream,
     // remote will close stream when it receives CLOSE_LOAD
-    Status close_finish_check(RuntimeState* state, bool* is_closed);
+    Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
 
     // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
     void cancel(Status reason);
@@ -220,8 +220,6 @@ private:
     void _handle_failure(butil::IOBuf& buf, Status st);
 
     Status _check_cancel() {
-        DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
-                        { return Status::InternalError("stream cancelled"); });
         if (!_is_cancelled.load()) {
             return Status::OK();
         }
@@ -246,7 +244,9 @@ protected:
     Status _cancel_st;
 
     bthread::Mutex _open_mutex;
+    bthread::Mutex _close_mutex;
     bthread::Mutex _cancel_mutex;
+    bthread::ConditionVariable _close_cv;
 
     std::mutex _buffer_mutex;
     std::mutex _send_mutex;
@@ -307,6 +307,8 @@ public:
 
     Status close_load(const std::vector<PTabletID>& tablets_to_commit);
 
+    Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
+
     std::unordered_set<int64_t> success_tablets() {
         std::unordered_set<int64_t> s;
         for (auto& stream : _streams) {
@@ -325,8 +327,6 @@ public:
         return m;
     }
 
-    std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
-
 private:
     std::vector<std::shared_ptr<LoadStreamStub>> _streams;
     std::atomic<bool> _open_success = false;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 8fc2cfdaeb0..f432595efa5 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -639,7 +639,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         // close_wait on all non-incremental streams, even if this is not the 
last sink.
         // because some per-instance data structures are now shared among all 
sinks
         // due to sharing delta writers and load stream stubs.
-        RETURN_IF_ERROR(_close_wait(_non_incremental_streams()));
+        RETURN_IF_ERROR(_close_wait(false));
 
         // send CLOSE_LOAD on all incremental streams if this is the last sink.
         // this must happen after all non-incremental streams are closed,
@@ -649,7 +649,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         }
 
         // close_wait on all incremental streams, even if this is not the last 
sink.
-        RETURN_IF_ERROR(_close_wait(_incremental_streams()));
+        RETURN_IF_ERROR(_close_wait(true));
 
         // calculate and submit commit info
         if (is_last_sink) {
@@ -695,87 +695,32 @@ Status VTabletWriterV2::close(Status exec_status) {
     return status;
 }
 
-std::unordered_set<std::shared_ptr<LoadStreamStub>> 
VTabletWriterV2::_incremental_streams() {
-    std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams;
-    auto streams_for_node = _load_stream_map->get_streams_for_node();
-    for (const auto& [dst_id, streams] : streams_for_node) {
-        for (const auto& stream : streams->streams()) {
-            if (stream->is_incremental()) {
-                incremental_streams.insert(stream);
-            }
-        }
-    }
-    return incremental_streams;
-}
-
-std::unordered_set<std::shared_ptr<LoadStreamStub>> 
VTabletWriterV2::_non_incremental_streams() {
-    std::unordered_set<std::shared_ptr<LoadStreamStub>> 
non_incremental_streams;
-    auto streams_for_node = _load_stream_map->get_streams_for_node();
-    for (const auto& [dst_id, streams] : streams_for_node) {
-        for (const auto& stream : streams->streams()) {
-            if (!stream->is_incremental()) {
-                non_incremental_streams.insert(stream);
-            }
-        }
-    }
-    return non_incremental_streams;
-}
-
-Status VTabletWriterV2::_close_wait(
-        std::unordered_set<std::shared_ptr<LoadStreamStub>> 
unfinished_streams) {
+Status VTabletWriterV2::_close_wait(bool incremental) {
     SCOPED_TIMER(_close_load_timer);
-    Status status;
-    auto streams_for_node = _load_stream_map->get_streams_for_node();
-    while (true) {
-        RETURN_IF_ERROR(_check_timeout());
-        RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, 
streams_for_node));
-        if (!status.ok() || unfinished_streams.empty()) {
-            LOG(INFO) << "is all unfinished: " << unfinished_streams.empty()
-                      << ", status: " << status << ", txn_id: " << _txn_id
-                      << ", load_id: " << print_id(_load_id);
-            break;
-        }
-        bthread_usleep(1000 * 10);
-    }
-    if (!status.ok()) {
-        LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << 
print_id(_load_id);
-    }
-    return status;
-}
-
-Status VTabletWriterV2::_check_timeout() {
-    int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 
1000 -
-                        _timeout_watch.elapsed_time() / 1000 / 1000;
-    DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 
0; });
-    if (remain_ms <= 0) {
-        LOG(WARNING) << "load timed out before close waiting, load_id=" << 
print_id(_load_id);
-        return Status::TimedOut("load timed out before close waiting");
-    }
-    return Status::OK();
-}
-
-Status VTabletWriterV2::_check_streams_finish(
-        std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams, Status& status,
-        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& 
streams_for_node) {
-    for (const auto& [dst_id, streams] : streams_for_node) {
-        for (const auto& stream : streams->streams()) {
-            if (!unfinished_streams.contains(stream)) {
-                continue;
-            }
-            bool is_closed = false;
-            auto stream_st = stream->close_finish_check(_state, &is_closed);
-            if (!stream_st.ok()) {
-                status = stream_st;
-                unfinished_streams.erase(stream);
-                LOG(WARNING) << "close_wait failed: " << stream_st
-                             << ", load_id=" << print_id(_load_id);
-            }
-            if (is_closed) {
-                unfinished_streams.erase(stream);
-            }
-        }
+    auto st = _load_stream_map->for_each_st(
+            [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> 
Status {
+                if (streams.is_incremental() != incremental) {
+                    return Status::OK();
+                }
+                int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+                                    _timeout_watch.elapsed_time() / 1000 / 
1000;
+                DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { 
remain_ms = 0; });
+                if (remain_ms <= 0) {
+                    LOG(WARNING) << "load timed out before close waiting, 
load_id="
+                                 << print_id(_load_id);
+                    return Status::TimedOut("load timed out before close 
waiting");
+                }
+                auto st = streams.close_wait(_state, remain_ms);
+                if (!st.ok()) {
+                    LOG(WARNING) << "close_wait timeout on streams to dst_id=" 
<< dst_id
+                                 << ", load_id=" << print_id(_load_id) << ": " 
<< st;
+                }
+                return st;
+            });
+    if (!st.ok()) {
+        LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << 
print_id(_load_id);
     }
-    return status;
+    return st;
 }
 
 void VTabletWriterV2::_calc_tablets_to_commit() {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index cc87002a097..46a3974bba8 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -148,17 +148,7 @@ private:
 
     void _calc_tablets_to_commit();
 
-    std::unordered_set<std::shared_ptr<LoadStreamStub>> _incremental_streams();
-
-    std::unordered_set<std::shared_ptr<LoadStreamStub>> 
_non_incremental_streams();
-
-    Status _close_wait(std::unordered_set<std::shared_ptr<LoadStreamStub>> 
unfinished_streams);
-
-    Status _check_timeout();
-
-    Status _check_streams_finish(
-            std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams, Status& status,
-            const std::unordered_map<int64_t, 
std::shared_ptr<LoadStreamStubs>>& streams_for_node);
+    Status _close_wait(bool incremental);
 
     void _cancel(Status status);
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
index 1a473d90e52..30854cfb50b 100644
--- 
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
@@ -68,7 +68,7 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
             file "baseall.txt"
         }
 
-        def load_with_injection = { injection, error_msg="", success=false->
+        def load_with_injection = { injection, error_msg, success=false->
             try {
                 GetDebugPoint().enableDebugPointForAllBEs(injection)
                 sql "insert into test select * from baseall where k1 <= 3"
@@ -104,21 +104,6 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
         // DeltaWriterV2 stream_size is 0
         load_with_injection("DeltaWriterV2.init.stream_size", "failed to find 
tablet schema")
 
-        // injection cases for VTabletWriterV2 close logic
-        // Test LoadStreamStub close_finish_check close failed
-        load_with_injection("LoadStreamStub::close_finish_check.close_failed")
-        // Test LoadStreamStub _check_cancel when cancelled
-        load_with_injection("LoadStreamStub._check_cancel.cancelled")
-        // Test LoadStreamStub _send_with_retry stream write failed
-        
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed")
-        // Test LoadStreamStub _handle_failure for different opcodes
-        
load_with_injection("LoadStreamStub._handle_failure.append_data_failed")
-        
load_with_injection("LoadStreamStub._handle_failure.add_segment_failed")
-        load_with_injection("LoadStreamStub._handle_failure.close_load_failed")
-        load_with_injection("LoadStreamStub._handle_failure.get_schema_failed")
-        // Test LoadStreamStub skip send segment
-        load_with_injection("LoadStreamStub.skip_send_segment")
-
         sql """ set enable_memtable_on_sink_node=false """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to