This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 64aeeb971b5 [Fix](partial-update) Correct the alignment process when 
the table has sequence column and add cases (#25346)
64aeeb971b5 is described below

commit 64aeeb971b591b5cdad48f665437362ae203dd98
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Wed Oct 18 11:32:51 2023 +0800

    [Fix](partial-update) Correct the alignment process when the table has 
sequence column and add cases (#25346)
    
    This PR fix the alignment process during publish phase when conflict occurs 
during concurrent partial updates: if we encounter a row with the same key and 
larger value in sequence column, it means that there exists another load which 
introduces a row with the same keys and larger sequence column value published 
successfully after the commit phase of the current load. We should act as 
follows:
    
    - If the columns we update include sequence column, we should delete the 
current row becase the partial update on the current row has been overwritten 
by the previous one with larger sequence column value.
    - Otherwise, we should combine the values of the missing columns in the 
previous row and the values of the including columns in the current row into a 
new row.
---
 be/src/olap/tablet.cpp                             |  27 ++-
 .../test_partial_update_parallel.out               |  21 ++
 .../test_partial_update_parallel.groovy            | 243 ++++++++++++++++++++-
 3 files changed, 287 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index df3776e8ed2..7e468b1563d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2934,6 +2934,15 @@ Status 
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
     Version dummy_version(end_version + 1, end_version + 1);
     auto rowset_schema = rowset->tablet_schema();
     bool is_partial_update = rowset_writer && 
rowset_writer->is_partial_update();
+    bool have_input_seq_column = false;
+    if (is_partial_update && rowset_schema->has_sequence_col()) {
+        std::vector<uint32_t> including_cids =
+                rowset_writer->get_partial_update_info()->update_cids;
+        have_input_seq_column =
+                rowset_schema->has_sequence_col() &&
+                (std::find(including_cids.cbegin(), including_cids.cend(),
+                           rowset_schema->sequence_col_idx()) != 
including_cids.cend());
+    }
     // use for partial update
     PartialUpdateReadPlan read_plan_ori;
     PartialUpdateReadPlan read_plan_update;
@@ -3002,12 +3011,24 @@ Status 
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                 continue;
             }
 
-            // sequence id smaller than the previous one, so delete current row
-            if (st.is<KEY_ALREADY_EXISTS>()) {
+            if (st.is<KEY_ALREADY_EXISTS>() && (!is_partial_update || 
have_input_seq_column)) {
+                // `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row 
with the same key and larger value
+                // in seqeunce column.
+                // - If the current load is not a partial update, we just 
delete current row.
+                // - Otherwise, it means that we are doing the alignment 
process in publish phase due to conflicts
+                // during concurrent partial updates. And there exists another 
load which introduces a row with
+                // the same keys and larger sequence column value published 
successfully after the commit phase
+                // of the current load.
+                //     - If the columns we update include sequence column, we 
should delete the current row becase the
+                //       partial update on the current row has been 
`overwritten` by the previous one with larger sequence
+                //       column value.
+                //     - Otherwise, we should combine the values of the 
missing columns in the previous row and the values
+                //       of the including columns in the current row into a 
new row.
                 delete_bitmap->add({rowset_id, seg->id(), 
DeleteBitmap::TEMP_VERSION_COMMON},
                                    row_id);
                 continue;
-            } else if (is_partial_update && rowset_writer != nullptr) {
+            }
+            if (is_partial_update && rowset_writer != nullptr) {
                 // In publish version, record rows to be deleted for 
concurrent update
                 // For example, if version 5 and 6 update a row, but version 6 
only see
                 // version 4 when write, and when publish version, version 5's 
value will
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
index bcd7e86c53c..f4a51133a81 100644
--- 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
@@ -6,3 +6,24 @@
 4      "bbbbbbbb"      4444    499     40
 5      "cccccccccccc"  5555    599     50
 
+-- !sql --
+1      "ddddddddddd"   1111    199     10
+2      "eeeeee"        2222    299     20
+3      "aaaaa" 3333    399     30
+4      "bbbbbbbb"      4444    499     40
+5      "cccccccccccc"  5555    599     50
+
+-- !sql --
+1      "ddddddddddd"   1111    199     10      0       5       10
+2      "eeeeee"        2222    299     20      0       5       20
+3      "aaaaa" 3333    399     30      0       5       30
+4      "bbbbbbbb"      4444    499     40      0       5       40
+5      "cccccccccccc"  5555    599     50      0       5       50
+
+-- !sql --
+1      "ddddddddddd"   1111    199     10      0       5       10
+2      "eeeeee"        2222    299     20      0       5       20
+3      "aaaaa" 3333    399     30      0       5       30
+4      "bbbbbbbb"      4444    499     40      0       5       40
+5      "cccccccccccc"  5555    599     50      0       5       50
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
index 19522e8064e..ba0c1766aa1 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
@@ -17,7 +17,156 @@
 
 suite("test_primary_key_partial_update_parallel", "p0") {
 
+    // case 1: concurrent partial update
     def tableName = "test_primary_key_partial_update"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321")
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "doris2", 2000, 223, 2),
+        (1, "doris", 1000, 123, 1),
+        (5, "doris5", 5000, 523, 5),
+        (4, "doris4", 4000, 423, 4),
+        (3, "doris3", 3000, 323, 3);"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,score,test'
+
+            file 'partial_update_parallel2.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t1.join()
+    t2.join()
+    t3.join()
+
+    sql "sync"
+
+    qt_sql """ select * from ${tableName} order by id;"""
+
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+    // case 2: concurrent partial update with row store column
+    tableName = "test_primary_key_row_store_partial_update"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321")
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true", "store_row_column" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "doris2", 2000, 223, 2),
+        (1, "doris", 1000, 123, 1),
+        (5, "doris5", 5000, 523, 5),
+        (4, "doris4", 4000, 423, 4),
+        (3, "doris3", 3000, 323, 3);"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,score,test'
+
+            file 'partial_update_parallel2.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t1.join()
+    t2.join()
+    t3.join()
+
+    sql "sync"
+
+    qt_sql """ select * from ${tableName} order by id;"""
+
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+    // case 3: concurrent partial update with sequence column
+    tableName = "test_primary_key_seq_partial_update"
 
     // create table
     sql """ DROP TABLE IF EXISTS ${tableName} """
@@ -29,14 +178,21 @@ suite("test_primary_key_partial_update_parallel", "p0") {
                 `test` int(11) NULL COMMENT "null test",
                 `dft` int(11) DEFAULT "4321")
                 UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
-                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true")
+                PROPERTIES(
+                    "replication_num" = "1", 
+                    "enable_unique_key_merge_on_write" = "true",
+                    "function_column.sequence_col" = "dft")
     """
 
     sql """insert into ${tableName} values
+        (2, "deprecated", 99999, 999, 1),
         (2, "doris2", 2000, 223, 2),
         (1, "doris", 1000, 123, 1),
+        (3, "deprecated", 99999, 999, 2),
         (5, "doris5", 5000, 523, 5),
         (4, "doris4", 4000, 423, 4),
+        (4, "deprecated", 99999, 999, 3),
+        (4, "deprecated", 99999, 999, 1),
         (3, "doris3", 3000, 323, 3);"""
 
     t1 = Thread.startDaemon {
@@ -85,9 +241,94 @@ suite("test_primary_key_partial_update_parallel", "p0") {
     t2.join()
     t3.join()
 
+    sql "set show_hidden_columns=true;"
     sql "sync"
 
     qt_sql """ select * from ${tableName} order by id;"""
+    sql "set show_hidden_columns=false;"
+    sql "sync"
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+    // case 4: concurrent partial update with row store column and sequence 
column
+    tableName = "test_primary_key_row_store_seq_partial_update"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321")
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES(
+                    "replication_num" = "1", 
+                    "enable_unique_key_merge_on_write" = "true",
+                    "function_column.sequence_col" = "dft",
+                    "store_row_column" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "deprecated", 99999, 999, 1),
+        (2, "doris2", 2000, 223, 2),
+        (1, "doris", 1000, 123, 1),
+        (3, "deprecated", 99999, 999, 2),
+        (5, "doris5", 5000, 523, 5),
+        (4, "doris4", 4000, 423, 4),
+        (4, "deprecated", 99999, 999, 3),
+        (4, "deprecated", 99999, 999, 1),
+        (3, "doris3", 3000, 323, 3);"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,score,test'
+
+            file 'partial_update_parallel2.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t1.join()
+    t2.join()
+    t3.join()
+
+    sql "set show_hidden_columns=true;"
+    sql "sync"
+
+    qt_sql """ select 
id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__
 from ${tableName} order by id;"""
 
     sql """ DROP TABLE IF EXISTS ${tableName}; """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to