This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5e4674ab669f28c1b1292625719083ec44a5b153
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Fri Jan 26 15:58:43 2024 +0800

    [fix](partial update) mishandling of exceptions in the publish phase may 
result in data loss (#30366)
---
 be/src/olap/tablet.cpp                             |  44 +++++-
 be/src/olap/tablet.h                               |   6 +-
 be/src/olap/txn_manager.cpp                        |  63 +-------
 be/src/olap/txn_manager.h                          |  40 ++++-
 .../fault_injection_p0/concurrency_update1.csv     |  21 +++
 .../fault_injection_p0/concurrency_update2.csv     |  21 +++
 .../fault_injection_p0/concurrency_update3.csv     |  21 +++
 ..._partial_update_publish_conflict_with_error.out |  47 ++++++
 ...rtial_update_publish_conflict_with_error.groovy | 171 +++++++++++++++++++++
 9 files changed, 362 insertions(+), 72 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 2af47a5b430..19d598d1275 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3272,16 +3272,28 @@ Status Tablet::commit_phase_update_delete_bitmap(
     return Status::OK();
 }
 
-Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
-                                    const RowsetIdUnorderedSet& pre_rowset_ids,
-                                    DeleteBitmapPtr delete_bitmap, int64_t 
txn_id,
-                                    RowsetWriter* rowset_writer) {
+Status Tablet::update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id) {
     SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
     RowsetIdUnorderedSet cur_rowset_ids;
     RowsetIdUnorderedSet rowset_ids_to_add;
     RowsetIdUnorderedSet rowset_ids_to_del;
+    RowsetSharedPtr rowset = txn_info->rowset;
     int64_t cur_version = rowset->start_version();
 
+    std::unique_ptr<RowsetWriter> rowset_writer;
+    RETURN_IF_ERROR(
+            create_transient_rowset_writer(rowset, &rowset_writer, 
txn_info->partial_update_info));
+
+    DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
+    // Partial update might generate new segments when there is conflicts 
while publish, and mark
+    // the same key in original segments as delete.
+    // When the new segment flush fails or the rowset build fails, the 
deletion marker for the
+    // duplicate key of the original segment should not remain in 
`txn_info->delete_bitmap`,
+    // so we need to make a copy of `txn_info->delete_bitmap` and make changes 
on it.
+    if (txn_info->partial_update_info && 
txn_info->partial_update_info->is_partial_update) {
+        delete_bitmap = 
std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
+    }
+
     OlapStopWatch watch;
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
@@ -3299,7 +3311,8 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     }
     auto t2 = watch.get_elapse_time_us();
 
-    _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, 
&rowset_ids_to_del);
+    _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids, 
&rowset_ids_to_add,
+                           &rowset_ids_to_del);
     for (const auto& to_del : rowset_ids_to_del) {
         delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
     }
@@ -3313,7 +3326,7 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
 
     auto token = _engine.calc_delete_bitmap_executor()->create_token();
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, 
delete_bitmap,
-                                       cur_version - 1, token.get(), 
rowset_writer));
+                                       cur_version - 1, token.get(), 
rowset_writer.get()));
     RETURN_IF_ERROR(token->wait());
 
     std::stringstream ss;
@@ -3345,6 +3358,25 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
         _remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
     }
 
+    if (txn_info->partial_update_info && 
txn_info->partial_update_info->is_partial_update) {
+        
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail", 
{
+            if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+                
LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random 
failed")
+                        .tag("txn_id", txn_id);
+                return Status::InternalError(
+                        "debug update_delete_bitmap partial update write 
rowset random failed");
+            }
+        });
+        // build rowset writer and merge transient rowset
+        RETURN_IF_ERROR(rowset_writer->flush());
+        RowsetSharedPtr transient_rowset;
+        RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
+        rowset->merge_rowset_meta(transient_rowset->rowset_meta());
+
+        // erase segment cache cause we will add a segment to rowset
+        SegmentLoader::instance()->erase_segments(rowset->rowset_id(), 
rowset->num_segments());
+    }
+
     // update version without write lock, compaction and publish_txn
     // will update delete bitmap, handle compaction with _rowset_update_lock
     // and publish_txn runs sequential so no need to lock here
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d6ad0285233..22721a835a3 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -70,6 +70,7 @@ class TupleDescriptor;
 class CalcDeleteBitmapToken;
 enum CompressKind : int;
 class RowsetBinlogMetasPB;
+struct TabletTxnInfo;
 
 namespace io {
 class RemoteFileSystem;
@@ -471,10 +472,7 @@ public:
             const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t 
txn_id,
             CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = 
nullptr);
 
-    Status update_delete_bitmap(const RowsetSharedPtr& rowset,
-                                const RowsetIdUnorderedSet& pre_rowset_ids,
-                                DeleteBitmapPtr delete_bitmap, int64_t txn_id,
-                                RowsetWriter* rowset_writer = nullptr);
+    Status update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id);
     void calc_compaction_output_rowset_delete_bitmap(
             const std::vector<RowsetSharedPtr>& input_rowsets,
             const RowIdConversion& rowid_conversion, uint64_t start_version, 
uint64_t end_version,
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 7a82ef4c6ae..8e0a2439712 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -63,46 +63,6 @@ using std::vector;
 namespace doris {
 using namespace ErrorCode;
 
-struct TabletTxnInfo {
-    PUniqueId load_id;
-    RowsetSharedPtr rowset;
-    PendingRowsetGuard pending_rs_guard;
-    bool unique_key_merge_on_write {false};
-    DeleteBitmapPtr delete_bitmap;
-    // records rowsets calc in commit txn
-    RowsetIdUnorderedSet rowset_ids;
-    int64_t creation_time;
-    bool ingest {false};
-    std::shared_ptr<PartialUpdateInfo> partial_update_info;
-    TxnState state {TxnState::PREPARED};
-
-    TabletTxnInfo() = default;
-
-    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
-            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
-
-    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
-            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), 
ingest(ingest_arg) {}
-
-    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool 
merge_on_write,
-                  DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& 
ids)
-            : load_id(load_id),
-              rowset(rowset),
-              unique_key_merge_on_write(merge_on_write),
-              delete_bitmap(delete_bitmap),
-              rowset_ids(ids),
-              creation_time(UnixSeconds()) {}
-
-    void prepare() { state = TxnState::PREPARED; }
-    void commit() { state = TxnState::COMMITTED; }
-    void rollback() { state = TxnState::ROLLEDBACK; }
-    void abort() {
-        if (state == TxnState::PREPARED) {
-            state = TxnState::ABORTED;
-        }
-    }
-};
-
 TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, 
int32_t txn_shard_size)
         : _engine(engine),
           _txn_map_shard_size(txn_map_shard_size),
@@ -521,33 +481,14 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
     });
     // update delete_bitmap
     if (tablet_txn_info->unique_key_merge_on_write) {
-        std::unique_ptr<RowsetWriter> rowset_writer;
-        RETURN_IF_ERROR(tablet->create_transient_rowset_writer(
-                rowset, &rowset_writer, tablet_txn_info->partial_update_info));
-
         int64_t t2 = MonotonicMicros();
-        RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, 
tablet_txn_info->rowset_ids,
-                                                     
tablet_txn_info->delete_bitmap, transaction_id,
-                                                     rowset_writer.get()));
+        RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(), 
transaction_id));
         int64_t t3 = MonotonicMicros();
         stats->calc_delete_bitmap_time_us = t3 - t2;
-        if (tablet_txn_info->partial_update_info &&
-            tablet_txn_info->partial_update_info->is_partial_update) {
-            // build rowset writer and merge transient rowset
-            RETURN_IF_ERROR(rowset_writer->flush());
-            RowsetSharedPtr transient_rowset;
-            RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
-            rowset->merge_rowset_meta(transient_rowset->rowset_meta());
-
-            // erase segment cache cause we will add a segment to rowset
-            SegmentLoader::instance()->erase_segments(rowset->rowset_id(), 
rowset->num_segments());
-        }
-        stats->partial_update_write_segment_us = MonotonicMicros() - t3;
-        int64_t t4 = MonotonicMicros();
         RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
                 tablet->data_dir(), tablet->tablet_id(), 
tablet_txn_info->delete_bitmap,
                 version.second));
-        stats->save_meta_time_us = MonotonicMicros() - t4;
+        stats->save_meta_time_us = MonotonicMicros() - t3;
     }
 
     /// Step 3:  add to binlog
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index f7e67d0a462..e33958a66aa 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -62,7 +62,45 @@ enum class TxnState {
     DELETED = 5,
 };
 
-struct TabletTxnInfo;
+struct TabletTxnInfo {
+    PUniqueId load_id;
+    RowsetSharedPtr rowset;
+    PendingRowsetGuard pending_rs_guard;
+    bool unique_key_merge_on_write {false};
+    DeleteBitmapPtr delete_bitmap;
+    // records rowsets calc in commit txn
+    RowsetIdUnorderedSet rowset_ids;
+    int64_t creation_time;
+    bool ingest {false};
+    std::shared_ptr<PartialUpdateInfo> partial_update_info;
+    TxnState state {TxnState::PREPARED};
+
+    TabletTxnInfo() = default;
+
+    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
+            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
+
+    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
+            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), 
ingest(ingest_arg) {}
+
+    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool 
merge_on_write,
+                  DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& 
ids)
+            : load_id(load_id),
+              rowset(rowset),
+              unique_key_merge_on_write(merge_on_write),
+              delete_bitmap(delete_bitmap),
+              rowset_ids(ids),
+              creation_time(UnixSeconds()) {}
+
+    void prepare() { state = TxnState::PREPARED; }
+    void commit() { state = TxnState::COMMITTED; }
+    void rollback() { state = TxnState::ROLLEDBACK; }
+    void abort() {
+        if (state == TxnState::PREPARED) {
+            state = TxnState::ABORTED;
+        }
+    }
+};
 
 struct CommitTabletTxnInfo {
     TTransactionId transaction_id {0};
diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv 
b/regression-test/data/fault_injection_p0/concurrency_update1.csv
new file mode 100644
index 00000000000..5bc6c8de802
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/concurrency_update1.csv
@@ -0,0 +1,21 @@
+0
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv 
b/regression-test/data/fault_injection_p0/concurrency_update2.csv
new file mode 100644
index 00000000000..23f43edc585
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/concurrency_update2.csv
@@ -0,0 +1,21 @@
+0,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+1,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+2,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+3,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+4,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+5,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+6,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+7,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+8,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+9,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+10,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+12,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+13,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+14,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+15,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+16,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+18,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+19,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+20,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv 
b/regression-test/data/fault_injection_p0/concurrency_update3.csv
new file mode 100644
index 00000000000..e2dd51a2dcc
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/concurrency_update3.csv
@@ -0,0 +1,21 @@
+0,b0
+1,b1
+2,b2
+3,b3
+4,b4
+5,b5
+6,b6
+7,b7
+8,b8
+9,b9
+10,b10
+11,b11
+12,b12
+13,b13
+14,b14
+15,b15
+16,b16
+17,b17
+18,b18
+19,b19
+20,b20
diff --git 
a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
 
b/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
new file mode 100644
index 00000000000..4e4d3f19f6e
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out
@@ -0,0 +1,47 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+0      \N      \N      \N      \N      \N
+1      \N      \N      \N      \N      \N
+10     \N      \N      \N      \N      \N
+11     \N      \N      \N      \N      \N
+12     \N      \N      \N      \N      \N
+13     \N      \N      \N      \N      \N
+14     \N      \N      \N      \N      \N
+15     \N      \N      \N      \N      \N
+16     \N      \N      \N      \N      \N
+17     \N      \N      \N      \N      \N
+18     \N      \N      \N      \N      \N
+19     \N      \N      \N      \N      \N
+2      \N      \N      \N      \N      \N
+20     \N      \N      \N      \N      \N
+3      \N      \N      \N      \N      \N
+4      \N      \N      \N      \N      \N
+5      \N      \N      \N      \N      \N
+6      \N      \N      \N      \N      \N
+7      \N      \N      \N      \N      \N
+8      \N      \N      \N      \N      \N
+9      \N      \N      \N      \N      \N
+
+-- !sql --
+0      aaaaaaaaaa      b0      \N      \N      \N
+1      aaaaaaaaaa      b1      \N      \N      \N
+10     aaaaaaaaaa      b10     \N      \N      \N
+11     aaaaaaaaaa      b11     \N      \N      \N
+12     aaaaaaaaaa      b12     \N      \N      \N
+13     aaaaaaaaaa      b13     \N      \N      \N
+14     aaaaaaaaaa      b14     \N      \N      \N
+15     aaaaaaaaaa      b15     \N      \N      \N
+16     aaaaaaaaaa      b16     \N      \N      \N
+17     aaaaaaaaaa      b17     \N      \N      \N
+18     aaaaaaaaaa      b18     \N      \N      \N
+19     aaaaaaaaaa      b19     \N      \N      \N
+2      aaaaaaaaaa      b2      \N      \N      \N
+20     aaaaaaaaaa      b20     \N      \N      \N
+3      aaaaaaaaaa      b3      \N      \N      \N
+4      aaaaaaaaaa      b4      \N      \N      \N
+5      aaaaaaaaaa      b5      \N      \N      \N
+6      aaaaaaaaaa      b6      \N      \N      \N
+7      aaaaaaaaaa      b7      \N      \N      \N
+8      aaaaaaaaaa      b8      \N      \N      \N
+9      aaaaaaaaaa      b9      \N      \N      \N
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
 
b/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
new file mode 100644
index 00000000000..29b5bcaaed6
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy
@@ -0,0 +1,171 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_partial_update_publish_conflict_with_error", "nonConcurrent") {
+    def dbName = context.config.getDbNameByFile(context.file)
+    def tableName = "test_partial_update_publish_conflict_with_error"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName} (
+        k1 varchar(20) not null,
+        v1 varchar(20),
+        v2 varchar(20),
+        v3 varchar(20),
+        v4 varchar(20),
+        v5 varchar(20))
+        UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES(
+            "replication_num" = "1",
+            "light_schema_change" = "true",
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true")"""
+
+    // base data
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', "k1"
+
+        file 'concurrency_update1.csv'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync;"
+    qt_sql """ select * from ${tableName} order by k1;"""
+
+    // NOTE: use streamload 2pc to construct the conflict of publish
+    def do_streamload_2pc_commit = { txnId ->
+        def command = "curl -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+                " -H txn_id:${txnId}" +
+                " -H txn_operation:commit" +
+                " 
http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc";
+        log.info("http_stream execute 2pc: ${command}")
+
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        json2pc = parseJson(out)
+        log.info("http_stream 2pc result: ${out}".toString())
+        assertEquals(code, 0)
+        assertEquals("success", json2pc.status.toLowerCase())
+    }
+
+    def wait_for_publish = {txnId, waitSecond ->
+        String st = "PREPARE"
+        while (!st.equalsIgnoreCase("VISIBLE") && 
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+            Thread.sleep(1000)
+            waitSecond -= 1
+            def result = sql_return_maparray "show transaction from ${dbName} 
where id = ${txnId}"
+            assertNotNull(result)
+            st = result[0].TransactionStatus
+        }
+        log.info("Stream load with txn ${txnId} is ${st}")
+        assertEquals(st, "VISIBLE")
+    }
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    def dbug_point = 
'Tablet.update_delete_bitmap.partial_update_write_rowset_fail'
+
+    // concurrent load 1
+    String txnId1
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'k1,tmp,v1=substr(tmp,1,10)'
+        set 'strict_mode', "false"
+        set 'two_phase_commit', 'true'
+        file 'concurrency_update2.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            txnId1 = json.TxnId
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+
+    String txnId2
+    // concurrent load 2
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'k1,v2'
+        set 'strict_mode', "false"
+        set 'two_phase_commit', 'true'
+        file 'concurrency_update3.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            txnId2 = json.TxnId
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+    sql "sync;"
+
+    // complete load 1 first
+    do_streamload_2pc_commit(txnId1)
+    wait_for_publish(txnId1, 10)
+
+    // inject failure on publish
+    try {
+        GetDebugPoint().enableDebugPointForAllBEs(dbug_point, [percent : 1.0])
+        do_streamload_2pc_commit(txnId2)
+        sleep(3000)
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+        throw e
+    } finally {
+        GetDebugPoint().disableDebugPointForAllBEs(dbug_point)
+    }
+    // publish will retry until success
+    // FE retry may take logger time, wait for 20 secs
+    wait_for_publish(txnId2, 20)
+
+    sql "sync;"
+    qt_sql """ select * from ${tableName} order by k1;"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to