This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8df2c5b7f8e [Fix](load) Fix data loss when node channel been cancelled before close wait (#36662) 8df2c5b7f8e is described below commit 8df2c5b7f8ee1a42ca50706b53d0f9488f34a04a Author: plat1ko <platonekos...@gmail.com> AuthorDate: Mon Jun 24 14:28:22 2024 +0800 [Fix](load) Fix data loss when node channel been cancelled before close wait (#36662) Fix data loss when node channel been cancelled before close wait. When an error occurs in `VNodeChannel::try_send_pending_block`, invoking `VNodeChannel::cancel` sets the `VNodeChannel` to closed. In `VTabletWriter::close`, if `VNodeChannel::cancel` is called before `VNodeChannel::close_wait`, it bypasses the error handling code directly, causing the transaction to still be considered successful. --- be/src/cloud/injection_point_action.cpp | 10 ++++++++ be/src/vec/sink/writer/vtablet_writer.cpp | 42 ++++++++++++++++++------------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index d99dcfd534d..e8c57ed91e8 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -98,6 +98,16 @@ void register_suites() { should_ret = true; }); }); + suite_map.emplace("test_cancel_node_channel", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("VNodeChannel::try_send_block", [](auto&& args) { + LOG(INFO) << "injection VNodeChannel::try_send_block"; + auto* arg0 = try_any_cast<Status*>(args[0]); + *arg0 = Status::InternalError<false>("test_cancel_node_channel injection error"); + }); + sp->set_call_back("VOlapTableSink::close", + [](auto&&) { std::this_thread::sleep_for(std::chrono::seconds(5)); }); + }); } void set_sleep(const std::string& point, HttpRequest* req) { diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index b31796fe724..22d9bbd840b 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -45,6 +45,7 @@ #include <vector> #include "cloud/config.h" +#include "common/sync_point.h" #include "util/runtime_profile.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" @@ -627,6 +628,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { &uncompressed_bytes, &compressed_bytes, state->fragement_transmission_compression_type(), _parent->_transfer_large_data_by_brpc); + TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.to_string())); _send_block_callback->clear_in_flight(); @@ -1325,22 +1327,22 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } -static Status cancel_channel_and_check_intolerable_failure( - Status status, const std::string& err_msg, const std::shared_ptr<IndexChannel> ich, - const std::shared_ptr<VNodeChannel> nch) { - LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << err_msg; - ich->mark_as_failed(nch.get(), err_msg, -1); +static Status cancel_channel_and_check_intolerable_failure(Status status, + const std::string& err_msg, + IndexChannel& ich, VNodeChannel& nch) { + LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << err_msg; + ich.mark_as_failed(&nch, err_msg, -1); // cancel the node channel in best effort - nch->cancel(err_msg); + nch.cancel(err_msg); // check if index has intolerable failure - Status index_st = ich->check_intolerable_failure(); + Status index_st = ich.check_intolerable_failure(); if (!index_st.ok()) { - status = index_st; - } else if (Status st = ich->check_tablet_received_rows_consistency(); !st.ok()) { - status = st; - } else if (Status st = ich->check_tablet_filtered_rows_consistency(); !st.ok()) { - status = st; + status = std::move(index_st); + } else if (Status st = ich.check_tablet_received_rows_consistency(); !st.ok()) { + status = std::move(st); + } else if (Status st = ich.check_tablet_filtered_rows_consistency(); !st.ok()) { + status = std::move(st); } return status; } @@ -1416,7 +1418,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status ch->mark_close(true); if (ch->is_cancelled()) { status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); + std::move(status), ch->get_cancel_msg(), *index_channel, + *ch); } }); if (!status.ok()) { @@ -1432,7 +1435,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status << "close1 wait finished!"; if (!s.ok()) { status = cancel_channel_and_check_intolerable_failure( - status, s.to_string(), index_channel, ch); + std::move(status), s.to_string(), *index_channel, *ch); } }); if (!status.ok()) { @@ -1450,7 +1453,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status ch->mark_close(); if (ch->is_cancelled()) { status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); + std::move(status), ch->get_cancel_msg(), *index_channel, + *ch); } }); } else { // not has_incremental_node_channel @@ -1464,7 +1468,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status ch->mark_close(); if (ch->is_cancelled()) { status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); + std::move(status), ch->get_cancel_msg(), *index_channel, + *ch); } }); } @@ -1491,6 +1496,7 @@ Status VTabletWriter::close(Status exec_status) { // will make the last batch of request-> close_wait will wait this finished. _do_try_close(_state, exec_status); + TEST_INJECTION_POINT("VOlapTableSink::close"); // If _close_status is not ok, all nodes have been canceled in try_close. if (_close_status.ok()) { @@ -1520,7 +1526,7 @@ Status VTabletWriter::close(Status exec_status) { &total_add_batch_exec_time_ns, &add_batch_exec_time, &total_wait_exec_time_ns, &wait_exec_time, &total_add_batch_num](const std::shared_ptr<VNodeChannel>& ch) { - if (!status.ok() || ch->is_closed()) { + if (!status.ok() || (ch->is_closed() && !ch->is_cancelled())) { return; } // in pipeline, all node channels are done or canceled, will not block. @@ -1528,7 +1534,7 @@ Status VTabletWriter::close(Status exec_status) { auto s = ch->close_wait(_state); if (!s.ok()) { status = cancel_channel_and_check_intolerable_failure( - status, s.to_string(), index_channel, ch); + std::move(status), s.to_string(), *index_channel, *ch); } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, &channel_stat, &queue_push_lock_ns, &actual_consume_ns, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org