This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 22aacd849c1638f6fcd87cd1ad58f70eed5fbe8f
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Fri Dec 16 22:07:00 2022 +0800

    [fix](load) delta writer init failed might cause data inconsistency between 
multiple replicas (#15058)
    
    In the following case, data inconsistency would happen between multiple 
replicas
    
    current delta writer only writes a few lines of data (which meas the 
write() method only called once)
    writer failed when init()(which is called at the fist time we call 
write()), and current tablet is recorded in _broken_tablets
    delta writer closed, and in the close() method, delta writer found it's not 
inited, treat such case as an empty load, it will try to init again, which 
would create an empty rowset.
    tablet sink received the error report in rpc response, marked the replica 
as failed, but since the quorum replicas are succeed, so the following load 
commit operation will succeed.
    FE send publish version task to each be, the one with empty rowset will 
publish version successfully.
    We got 2 replica with data and 1 empty replica.
---
 be/src/olap/delta_writer.cpp       |  4 ++--
 be/src/runtime/tablets_channel.cpp | 17 ++++++++++++-----
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index d330e430fe..75f402f202 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -318,7 +318,7 @@ Status DeltaWriter::close() {
         // if this delta writer is not initialized, but close() is called.
         // which means this tablet has no data loaded, but at least one tablet
         // in same partition has data loaded.
-        // so we have to also init this DeltaWriter, so that it can create a 
empty rowset
+        // so we have to also init this DeltaWriter, so that it can create an 
empty rowset
         // for this tablet when being closed.
         RETURN_NOT_OK(init());
     }
@@ -409,7 +409,7 @@ Status DeltaWriter::cancel() {
 
 Status DeltaWriter::cancel_with_status(const Status& st) {
     std::lock_guard<std::mutex> l(_lock);
-    if (!_is_init || _is_cancelled) {
+    if (_is_cancelled) {
         return Status::OK();
     }
     _mem_table.reset();
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index d6c81fb845..5bee7e2050 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -121,6 +121,14 @@ Status TabletsChannel::close(
                     // just skip this tablet(writer) and continue to close 
others
                     continue;
                 }
+                // to make sure tablet writer in `_broken_tablets` won't call 
`close_wait` method.
+                // `close_wait` might create the rowset and commit txn 
directly, and the subsequent
+                // publish version task will success, which can cause the 
replica inconsistency.
+                if (_broken_tablets.find(it.second->tablet_id()) != 
_broken_tablets.end()) {
+                    LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is 
broken but not cancelled"
+                                 << ", tablet_id=" << it.first << ", 
transaction_id=" << _txn_id;
+                    continue;
+                }
                 need_wait_writers.insert(it.second);
             } else {
                 auto st = it.second->cancel();
@@ -182,11 +190,9 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
                                  const bool write_single_replica) {
     Status st = writer->close_wait(slave_tablet_nodes, write_single_replica);
     if (st.ok()) {
-        if (_broken_tablets.find(writer->tablet_id()) == 
_broken_tablets.end()) {
-            PTabletInfo* tablet_info = tablet_vec->Add();
-            tablet_info->set_tablet_id(writer->tablet_id());
-            tablet_info->set_schema_hash(writer->schema_hash());
-        }
+        PTabletInfo* tablet_info = tablet_vec->Add();
+        tablet_info->set_tablet_id(writer->tablet_id());
+        tablet_info->set_schema_hash(writer->schema_hash());
     } else {
         PTabletError* tablet_error = tablet_errors->Add();
         tablet_error->set_tablet_id(writer->tablet_id());
@@ -487,6 +493,7 @@ Status TabletsChannel::add_batch(const 
TabletWriterAddRequest& request,
             PTabletError* error = tablet_errors->Add();
             error->set_tablet_id(tablet_to_rowidxs_it.first);
             error->set_msg(err_msg);
+            tablet_writer_it->second->cancel_with_status(st);
             _broken_tablets.insert(tablet_to_rowidxs_it.first);
             // continue write to other tablet.
             // the error will return back to sender.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to