This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8df2c5b7f8e [Fix](load) Fix data loss when node channel been cancelled 
before close wait (#36662)
8df2c5b7f8e is described below

commit 8df2c5b7f8ee1a42ca50706b53d0f9488f34a04a
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Mon Jun 24 14:28:22 2024 +0800

    [Fix](load) Fix data loss when node channel been cancelled before close 
wait (#36662)
    
    Fix data loss when node channel been cancelled before close wait.
    When an error occurs in `VNodeChannel::try_send_pending_block`, invoking
    `VNodeChannel::cancel` sets the `VNodeChannel` to closed. In
    `VTabletWriter::close`, if `VNodeChannel::cancel` is called before
    `VNodeChannel::close_wait`, it bypasses the error handling code
    directly, causing the transaction to still be considered successful.
---
 be/src/cloud/injection_point_action.cpp   | 10 ++++++++
 be/src/vec/sink/writer/vtablet_writer.cpp | 42 ++++++++++++++++++-------------
 2 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/be/src/cloud/injection_point_action.cpp 
b/be/src/cloud/injection_point_action.cpp
index d99dcfd534d..e8c57ed91e8 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -98,6 +98,16 @@ void register_suites() {
             should_ret = true;
         });
     });
+    suite_map.emplace("test_cancel_node_channel", [] {
+        auto* sp = SyncPoint::get_instance();
+        sp->set_call_back("VNodeChannel::try_send_block", [](auto&& args) {
+            LOG(INFO) << "injection VNodeChannel::try_send_block";
+            auto* arg0 = try_any_cast<Status*>(args[0]);
+            *arg0 = Status::InternalError<false>("test_cancel_node_channel 
injection error");
+        });
+        sp->set_call_back("VOlapTableSink::close",
+                          [](auto&&) { 
std::this_thread::sleep_for(std::chrono::seconds(5)); });
+    });
 }
 
 void set_sleep(const std::string& point, HttpRequest* req) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index b31796fe724..22d9bbd840b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -45,6 +45,7 @@
 #include <vector>
 
 #include "cloud/config.h"
+#include "common/sync_point.h"
 #include "util/runtime_profile.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
@@ -627,6 +628,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
                                     &uncompressed_bytes, &compressed_bytes,
                                     
state->fragement_transmission_compression_type(),
                                     _parent->_transfer_large_data_by_brpc);
+        TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), st.to_string()));
             _send_block_callback->clear_in_flight();
@@ -1325,22 +1327,22 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
-static Status cancel_channel_and_check_intolerable_failure(
-        Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
-        const std::shared_ptr<VNodeChannel> nch) {
-    LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << 
err_msg;
-    ich->mark_as_failed(nch.get(), err_msg, -1);
+static Status cancel_channel_and_check_intolerable_failure(Status status,
+                                                           const std::string& 
err_msg,
+                                                           IndexChannel& ich, 
VNodeChannel& nch) {
+    LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich.mark_as_failed(&nch, err_msg, -1);
     // cancel the node channel in best effort
-    nch->cancel(err_msg);
+    nch.cancel(err_msg);
 
     // check if index has intolerable failure
-    Status index_st = ich->check_intolerable_failure();
+    Status index_st = ich.check_intolerable_failure();
     if (!index_st.ok()) {
-        status = index_st;
-    } else if (Status st = ich->check_tablet_received_rows_consistency(); 
!st.ok()) {
-        status = st;
-    } else if (Status st = ich->check_tablet_filtered_rows_consistency(); 
!st.ok()) {
-        status = st;
+        status = std::move(index_st);
+    } else if (Status st = ich.check_tablet_received_rows_consistency(); 
!st.ok()) {
+        status = std::move(st);
+    } else if (Status st = ich.check_tablet_filtered_rows_consistency(); 
!st.ok()) {
+        status = std::move(st);
     }
     return status;
 }
@@ -1416,7 +1418,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                             ch->mark_close(true);
                             if (ch->is_cancelled()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                                        std::move(status), 
ch->get_cancel_msg(), *index_channel,
+                                        *ch);
                             }
                         });
                 if (!status.ok()) {
@@ -1432,7 +1435,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                                        << "close1 wait finished!";
                             if (!s.ok()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, s.to_string(), index_channel, 
ch);
+                                        std::move(status), s.to_string(), 
*index_channel, *ch);
                             }
                         });
                 if (!status.ok()) {
@@ -1450,7 +1453,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                             ch->mark_close();
                             if (ch->is_cancelled()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                                        std::move(status), 
ch->get_cancel_msg(), *index_channel,
+                                        *ch);
                             }
                         });
             } else { // not has_incremental_node_channel
@@ -1464,7 +1468,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                             ch->mark_close();
                             if (ch->is_cancelled()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                                        std::move(status), 
ch->get_cancel_msg(), *index_channel,
+                                        *ch);
                             }
                         });
             }
@@ -1491,6 +1496,7 @@ Status VTabletWriter::close(Status exec_status) {
 
     // will make the last batch of request-> close_wait will wait this 
finished.
     _do_try_close(_state, exec_status);
+    TEST_INJECTION_POINT("VOlapTableSink::close");
 
     // If _close_status is not ok, all nodes have been canceled in try_close.
     if (_close_status.ok()) {
@@ -1520,7 +1526,7 @@ Status VTabletWriter::close(Status exec_status) {
                      &total_add_batch_exec_time_ns, &add_batch_exec_time, 
&total_wait_exec_time_ns,
                      &wait_exec_time,
                      &total_add_batch_num](const 
std::shared_ptr<VNodeChannel>& ch) {
-                        if (!status.ok() || ch->is_closed()) {
+                        if (!status.ok() || (ch->is_closed() && 
!ch->is_cancelled())) {
                             return;
                         }
                         // in pipeline, all node channels are done or 
canceled, will not block.
@@ -1528,7 +1534,7 @@ Status VTabletWriter::close(Status exec_status) {
                         auto s = ch->close_wait(_state);
                         if (!s.ok()) {
                             status = 
cancel_channel_and_check_intolerable_failure(
-                                    status, s.to_string(), index_channel, ch);
+                                    std::move(status), s.to_string(), 
*index_channel, *ch);
                         }
                         ch->time_report(&node_add_batch_counter_map, 
&serialize_batch_ns,
                                         &channel_stat, &queue_push_lock_ns, 
&actual_consume_ns,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to