This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 8093b4b3342 [branch-2.0-pick] "[Fix](partial-update) Correct the 
alignment process when the table has sequence column and add cases #25346" 
(#25789)
8093b4b3342 is described below

commit 8093b4b3342ebc3100ddc2480f131a998164d229
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Mon Oct 23 23:35:21 2023 +0800

    [branch-2.0-pick] "[Fix](partial-update) Correct the alignment process when 
the table has sequence column and add cases #25346" (#25789)
---
 be/src/olap/tablet.cpp                             |  27 ++-
 .../test_partial_update_parallel.out               |  21 ++
 .../test_partial_update_parallel.groovy            | 243 ++++++++++++++++++++-
 3 files changed, 287 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 37bfedc3293..527a8b1c372 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2965,6 +2965,15 @@ Status 
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
     Version dummy_version(end_version + 1, end_version + 1);
     auto rowset_schema = rowset->tablet_schema();
     bool is_partial_update = rowset_writer && 
rowset_writer->is_partial_update();
+    bool have_input_seq_column = false;
+    if (is_partial_update && rowset_schema->has_sequence_col()) {
+        std::vector<uint32_t> including_cids =
+                rowset_writer->get_partial_update_info()->update_cids;
+        have_input_seq_column =
+                rowset_schema->has_sequence_col() &&
+                (std::find(including_cids.cbegin(), including_cids.cend(),
+                           rowset_schema->sequence_col_idx()) != 
including_cids.cend());
+    }
     // use for partial update
     PartialUpdateReadPlan read_plan_ori;
     PartialUpdateReadPlan read_plan_update;
@@ -3033,12 +3042,24 @@ Status 
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                 continue;
             }
 
-            // sequence id smaller than the previous one, so delete current row
-            if (st.is<KEY_ALREADY_EXISTS>()) {
+            if (st.is<KEY_ALREADY_EXISTS>() && (!is_partial_update || 
have_input_seq_column)) {
+                // `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row 
with the same key and larger value
+                // in seqeunce column.
+                // - If the current load is not a partial update, we just 
delete current row.
+                // - Otherwise, it means that we are doing the alignment 
process in publish phase due to conflicts
+                // during concurrent partial updates. And there exists another 
load which introduces a row with
+                // the same keys and larger sequence column value published 
successfully after the commit phase
+                // of the current load.
+                //     - If the columns we update include sequence column, we 
should delete the current row becase the
+                //       partial update on the current row has been 
`overwritten` by the previous one with larger sequence
+                //       column value.
+                //     - Otherwise, we should combine the values of the 
missing columns in the previous row and the values
+                //       of the including columns in the current row into a 
new row.
                 delete_bitmap->add({rowset_id, seg->id(), 
DeleteBitmap::TEMP_VERSION_COMMON},
                                    row_id);
                 continue;
-            } else if (is_partial_update && rowset_writer != nullptr) {
+            }
+            if (is_partial_update && rowset_writer != nullptr) {
                 // In publish version, record rows to be deleted for 
concurrent update
                 // For example, if version 5 and 6 update a row, but version 6 
only see
                 // version 4 when write, and when publish version, version 5's 
value will
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
index bcd7e86c53c..f4a51133a81 100644
--- 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
@@ -6,3 +6,24 @@
 4      "bbbbbbbb"      4444    499     40
 5      "cccccccccccc"  5555    599     50
 
+-- !sql --
+1      "ddddddddddd"   1111    199     10
+2      "eeeeee"        2222    299     20
+3      "aaaaa" 3333    399     30
+4      "bbbbbbbb"      4444    499     40
+5      "cccccccccccc"  5555    599     50
+
+-- !sql --
+1      "ddddddddddd"   1111    199     10      0       5       10
+2      "eeeeee"        2222    299     20      0       5       20
+3      "aaaaa" 3333    399     30      0       5       30
+4      "bbbbbbbb"      4444    499     40      0       5       40
+5      "cccccccccccc"  5555    599     50      0       5       50
+
+-- !sql --
+1      "ddddddddddd"   1111    199     10      0       5       10
+2      "eeeeee"        2222    299     20      0       5       20
+3      "aaaaa" 3333    399     30      0       5       30
+4      "bbbbbbbb"      4444    499     40      0       5       40
+5      "cccccccccccc"  5555    599     50      0       5       50
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
index 19522e8064e..ba0c1766aa1 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
@@ -17,7 +17,156 @@
 
 suite("test_primary_key_partial_update_parallel", "p0") {
 
+    // case 1: concurrent partial update
     def tableName = "test_primary_key_partial_update"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321")
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "doris2", 2000, 223, 2),
+        (1, "doris", 1000, 123, 1),
+        (5, "doris5", 5000, 523, 5),
+        (4, "doris4", 4000, 423, 4),
+        (3, "doris3", 3000, 323, 3);"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,score,test'
+
+            file 'partial_update_parallel2.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t1.join()
+    t2.join()
+    t3.join()
+
+    sql "sync"
+
+    qt_sql """ select * from ${tableName} order by id;"""
+
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+    // case 2: concurrent partial update with row store column
+    tableName = "test_primary_key_row_store_partial_update"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321")
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true", "store_row_column" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "doris2", 2000, 223, 2),
+        (1, "doris", 1000, 123, 1),
+        (5, "doris5", 5000, 523, 5),
+        (4, "doris4", 4000, 423, 4),
+        (3, "doris3", 3000, 323, 3);"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,score,test'
+
+            file 'partial_update_parallel2.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t1.join()
+    t2.join()
+    t3.join()
+
+    sql "sync"
+
+    qt_sql """ select * from ${tableName} order by id;"""
+
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+    // case 3: concurrent partial update with sequence column
+    tableName = "test_primary_key_seq_partial_update"
 
     // create table
     sql """ DROP TABLE IF EXISTS ${tableName} """
@@ -29,14 +178,21 @@ suite("test_primary_key_partial_update_parallel", "p0") {
                 `test` int(11) NULL COMMENT "null test",
                 `dft` int(11) DEFAULT "4321")
                 UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
-                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true")
+                PROPERTIES(
+                    "replication_num" = "1", 
+                    "enable_unique_key_merge_on_write" = "true",
+                    "function_column.sequence_col" = "dft")
     """
 
     sql """insert into ${tableName} values
+        (2, "deprecated", 99999, 999, 1),
         (2, "doris2", 2000, 223, 2),
         (1, "doris", 1000, 123, 1),
+        (3, "deprecated", 99999, 999, 2),
         (5, "doris5", 5000, 523, 5),
         (4, "doris4", 4000, 423, 4),
+        (4, "deprecated", 99999, 999, 3),
+        (4, "deprecated", 99999, 999, 1),
         (3, "doris3", 3000, 323, 3);"""
 
     t1 = Thread.startDaemon {
@@ -85,9 +241,94 @@ suite("test_primary_key_partial_update_parallel", "p0") {
     t2.join()
     t3.join()
 
+    sql "set show_hidden_columns=true;"
     sql "sync"
 
     qt_sql """ select * from ${tableName} order by id;"""
+    sql "set show_hidden_columns=false;"
+    sql "sync"
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+    // case 4: concurrent partial update with row store column and sequence 
column
+    tableName = "test_primary_key_row_store_seq_partial_update"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321")
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES(
+                    "replication_num" = "1", 
+                    "enable_unique_key_merge_on_write" = "true",
+                    "function_column.sequence_col" = "dft",
+                    "store_row_column" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "deprecated", 99999, 999, 1),
+        (2, "doris2", 2000, 223, 2),
+        (1, "doris", 1000, 123, 1),
+        (3, "deprecated", 99999, 999, 2),
+        (5, "doris5", 5000, 523, 5),
+        (4, "doris4", 4000, 423, 4),
+        (4, "deprecated", 99999, 999, 3),
+        (4, "deprecated", 99999, 999, 1),
+        (3, "doris3", 3000, 323, 3);"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,score,test'
+
+            file 'partial_update_parallel2.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t1.join()
+    t2.join()
+    t3.join()
+
+    sql "set show_hidden_columns=true;"
+    sql "sync"
+
+    qt_sql """ select 
id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__
 from ${tableName} order by id;"""
 
     sql """ DROP TABLE IF EXISTS ${tableName}; """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to