This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch bvelodb-doris-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ff45ade636894c68867e02df8ad669fd2e63a9e0
Author: Kaijie Chen <chenkai...@selectdb.com>
AuthorDate: Thu Nov 21 14:07:36 2024 +0800

    [fix](move-memtable) immediately return error when close wait failed 
(#44344)
    
    
    Problem Summary:
    
    #38003 introduced a problem where the last sink node could report
    success even when close wait timeout, which may cause data loss.
    
    Previously we made that change hoping to tolerate minority replica
    failure in this step.
    However, it turns out the last sink node could miss tablet reports from
    downstreams in case of close wait failure.
    
    This PR fixes the problem by return the close_wait error immediately.
    The most common error in close wait is timeout, and it should not be
    fault tolerant on a replica basis anyways.
---
 be/src/vec/sink/writer/vtablet_writer_v2.cpp | 7 ++++---
 be/src/vec/sink/writer/vtablet_writer_v2.h   | 2 +-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c693e20c3a8..14f328eee78 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -604,7 +604,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         // close_wait on all non-incremental streams, even if this is not the 
last sink.
         // because some per-instance data structures are now shared among all 
sinks
         // due to sharing delta writers and load stream stubs.
-        _close_wait(false);
+        RETURN_IF_ERROR(_close_wait(false));
 
         // send CLOSE_LOAD on all incremental streams if this is the last sink.
         // this must happen after all non-incremental streams are closed,
@@ -614,7 +614,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         }
 
         // close_wait on all incremental streams, even if this is not the last 
sink.
-        _close_wait(true);
+        RETURN_IF_ERROR(_close_wait(true));
 
         // calculate and submit commit info
         if (is_last_sink) {
@@ -663,7 +663,7 @@ Status VTabletWriterV2::close(Status exec_status) {
     return status;
 }
 
-void VTabletWriterV2::_close_wait(bool incremental) {
+Status VTabletWriterV2::_close_wait(bool incremental) {
     SCOPED_TIMER(_close_load_timer);
     auto st = _load_stream_map->for_each_st(
             [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> 
Status {
@@ -688,6 +688,7 @@ void VTabletWriterV2::_close_wait(bool incremental) {
     if (!st.ok()) {
         LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << 
print_id(_load_id);
     }
+    return st;
 }
 
 void VTabletWriterV2::_calc_tablets_to_commit() {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index b50044ede93..9f9743de3a2 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -146,7 +146,7 @@ private:
 
     void _calc_tablets_to_commit();
 
-    void _close_wait(bool incremental);
+    Status _close_wait(bool incremental);
 
     void _cancel(Status status);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to