This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 2c95a647961 branch-3.1: [Fix](case) fix 
`test_partial_update_publish_seq` #53565 (#53622)
2c95a647961 is described below

commit 2c95a647961b45755d794982df5c3ebb555acbfd
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jul 28 10:26:33 2025 +0800

    branch-3.1: [Fix](case) fix `test_partial_update_publish_seq` #53565 
(#53622)
    
    Cherry-picked from #53565
    
    Co-authored-by: bobhan1 <[email protected]>
---
 .../publish}/test_partial_update_publish_seq.out   | Bin 1714 -> 1716 bytes
 .../test_partial_update_publish_seq.groovy         | 180 ---------------------
 .../publish/test_partial_update_publish_seq.groovy | 158 ++++++++++++++++++
 3 files changed, 158 insertions(+), 180 deletions(-)

diff --git 
a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_seq.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out
similarity index 97%
rename from 
regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_seq.out
rename to 
regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out
index a7fa43e0ad9..44b849dedc0 100644
Binary files 
a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_seq.out
 and 
b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_seq.groovy
 
b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_seq.groovy
deleted file mode 100644
index 1fdbb667230..00000000000
--- 
a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_seq.groovy
+++ /dev/null
@@ -1,180 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.junit.Assert
-import java.util.concurrent.TimeUnit
-import org.awaitility.Awaitility
-
-suite("test_partial_update_publish_seq", "nonConcurrent") {
-
-    def enable_block_in_publish = {
-        if (isCloudMode()) {
-            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
-            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
-        } else {
-            
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
-            
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
-        }
-    }
-
-    def disable_block_in_publish = {
-        if (isCloudMode()) {
-            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
-            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
-        } else {
-            
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
-            
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
-        }
-    }
-
-    def inspect_rows = { sqlStr ->
-        sql "set skip_delete_sign=true;"
-        sql "set skip_delete_bitmap=true;"
-        sql "sync"
-        qt_inspect sqlStr
-        sql "set skip_delete_sign=false;"
-        sql "set skip_delete_bitmap=false;"
-        sql "sync"
-    }
-
-
-    try {
-        GetDebugPoint().clearDebugPointsForAllFEs()
-        GetDebugPoint().clearDebugPointsForAllBEs()
-
-        def table1 = "test_partial_update_publish_seq_map"
-        sql "DROP TABLE IF EXISTS ${table1} FORCE;"
-        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
-                `k1` int NOT NULL,
-                `c1` int,
-                `c2` int,
-                `c3` int,
-                `c4` int
-                )UNIQUE KEY(k1)
-            DISTRIBUTED BY HASH(k1) BUCKETS 1
-            PROPERTIES (
-                "enable_mow_light_delete" = "false",
-                "disable_auto_compaction" = "true",
-                "function_column.sequence_col" = "c1",
-                "replication_num" = "1"); """
-
-        sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
-        sql "sync;"
-        qt_seq_map_0 "select * from ${table1} order by k1;"
-
-
-        // with seq map val, >/=/< conflicting seq val
-        enable_block_in_publish()
-        def t1 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c1,c2) 
values(1,10,99),(2,10,99),(3,10,99);"
-        }
-        Thread.sleep(2000)
-        def t2 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c1,c3) 
values(1,20,88),(2,10,88),(3,5,88);"
-        }
-        Thread.sleep(2000)
-        disable_block_in_publish()
-        t1.join()
-        t2.join()
-        qt_seq_map_1 "select * from ${table1} order by k1;"
-        inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
-
-        // without seq map val, the filled seq val >/=/< conflicting seq val
-        enable_block_in_publish()
-        t1 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c1,c2) 
values(1,9,77),(2,10,77),(3,50,77);"
-        }
-        Thread.sleep(2000)
-        t2 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c4) values(1,33),(2,33),(3,33);"
-        }
-        Thread.sleep(2000)
-        disable_block_in_publish()
-        t1.join()
-        t2.join()
-        qt_seq_map_2 "select * from ${table1} order by k1;"
-        inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
-
-        // with delete sign and seq col val, >/=/< conflicting seq val
-        enable_block_in_publish()
-        t1 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c1,c2) 
values(1,80,66),(2,100,66),(3,120,66);"
-        }
-        Thread.sleep(2000)
-        t2 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c1,__DORIS_DELETE_SIGN__) 
values(1,100,1),(2,100,1),(3,100,1);"
-        }
-        Thread.sleep(2000)
-        disable_block_in_publish()
-        t1.join()
-        t2.join()
-        qt_seq_map_3 "select * from ${table1} order by k1;"
-        inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
-
-
-        sql "truncate table ${table1};"
-        sql "insert into ${table1} 
values(1,10,1,1,1),(2,10,2,2,2),(3,10,3,3,3);"
-        sql "sync;"
-        // with delete sign and without seq col val, >/=/< conflicting seq val
-        enable_block_in_publish()
-        t1 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c1,c2) 
values(1,20,55),(2,100,55),(3,120,55);"
-        }
-        Thread.sleep(2000)
-        t2 = Thread.start {
-            sql "set enable_unique_key_partial_update=true;"
-            sql "set enable_insert_strict=false;"
-            sql "sync;"
-            sql "insert into ${table1}(k1,c4,__DORIS_DELETE_SIGN__) 
values(1,100,1),(2,100,1),(3,100,1);"
-        }
-        Thread.sleep(2000)
-        disable_block_in_publish()
-        t1.join()
-        t2.join()
-        qt_seq_map_4 "select * from ${table1} order by k1;"
-        inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
-
-
-    } catch(Exception e) {
-        logger.info(e.getMessage())
-        throw e
-    } finally {
-        GetDebugPoint().clearDebugPointsForAllFEs()
-        GetDebugPoint().clearDebugPointsForAllBEs()
-    }
-}
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy
new file mode 100644
index 00000000000..042c8276777
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_partial_update_publish_seq") {
+    if (isCloudMode()) {
+        logger.info("skip test_partial_update_publish_seq in cloud mode")
+        return
+    }
+    def inspect_rows = { sqlStr ->
+        sql "set skip_delete_sign=true;"
+        sql "set skip_delete_bitmap=true;"
+        sql "sync"
+        qt_inspect sqlStr
+        sql "set skip_delete_sign=false;"
+        sql "set skip_delete_bitmap=false;"
+        sql "sync"
+    }
+
+    def dbName = context.config.getDbNameByFile(context.file)
+    def table1 = "test_partial_update_publish_seq_map"
+    sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int,
+            `c3` int,
+            `c4` int
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_mow_light_delete" = "false",
+            "disable_auto_compaction" = "true",
+            "function_column.sequence_col" = "c1",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
+    sql "sync;"
+    qt_seq_map_0 "select * from ${table1} order by k1;"
+
+    def load_data = { String cols, String data ->
+        def txnId
+        streamLoad {
+            table "${table1}"
+            set 'format', 'csv'
+            set 'column_separator', ','
+            set 'strict_mode', 'false'
+            set 'columns', cols
+            set 'two_phase_commit', 'true'
+            set 'partial_columns', 'true'
+            inputStream new ByteArrayInputStream(data.getBytes())
+            time 60000
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                def json = parseJson(result)
+                txnId = json.TxnId
+                assertEquals("success", json.Status.toLowerCase())
+            } 
+        }
+        return txnId
+    }
+
+    def do_streamload_2pc_commit = { txnId ->
+        def command = "curl -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+                " -H txn_id:${txnId}" +
+                " -H txn_operation:commit" +
+                " 
http://${context.config.feHttpAddress}/api/${dbName}/${table1}/_stream_load_2pc";
+        log.info("http_stream execute 2pc: ${command}")
+
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.text
+        def json2pc = parseJson(out)
+        log.info("http_stream 2pc result: ${out}".toString())
+        assertEquals(code, 0)
+        assertEquals("success", json2pc.status.toLowerCase())
+    }
+
+    def wait_for_publish = {txnId, waitSecond ->
+        String st = "PREPARE"
+        while (!st.equalsIgnoreCase("VISIBLE") && 
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+            Thread.sleep(1000)
+            waitSecond -= 1
+            def result = sql_return_maparray "show transaction from ${dbName} 
where id = ${txnId}"
+            assertNotNull(result)
+            st = result[0].TransactionStatus
+        }
+        log.info("Stream load with txn ${txnId} is ${st}")
+        assertEquals(st, "VISIBLE")
+    }
+
+    // with seq map val, >/=/< conflicting seq val
+    def txn1 = load_data("k1,c1,c2", "1,10,99\n2,10,99\n3,10,99\n")
+    def txn2 = load_data("k1,c1,c3", "1,20,88\n2,10,88\n3,5,88\n")
+    do_streamload_2pc_commit(txn1)
+    wait_for_publish(txn1, 10)
+    do_streamload_2pc_commit(txn2)
+    wait_for_publish(txn2, 10)
+    
+    qt_seq_map_1 "select * from ${table1} order by k1;"
+    inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
+
+    // without seq map val, the filled seq val >/=/< conflicting seq val
+    def txn3 = load_data("k1,c1,c2", "1,9,77\n2,10,77\n3,50,77\n")
+    def txn4 = load_data("k1,c4", "1,33\n2,33\n3,33\n")
+    do_streamload_2pc_commit(txn3)
+    wait_for_publish(txn3, 10)
+    do_streamload_2pc_commit(txn4)
+    wait_for_publish(txn4, 10)
+    qt_seq_map_2 "select * from ${table1} order by k1;"
+    inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
+
+    // with delete sign and seq col val, >/=/< conflicting seq val
+    def txn5 = load_data("k1,c1,c2", "1,80,66\n2,100,66\n3,120,66\n")
+    def txn6 = load_data("k1,c1,__DORIS_DELETE_SIGN__", 
"1,100,1\n2,100,1\n3,100,1\n")
+    do_streamload_2pc_commit(txn5)
+    wait_for_publish(txn5, 10)
+    do_streamload_2pc_commit(txn6)
+    wait_for_publish(txn6, 10)
+
+    qt_seq_map_3 "select * from ${table1} order by k1;"
+    inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
+
+
+    sql "truncate table ${table1};"
+    sql "insert into ${table1} values(1,10,1,1,1),(2,10,2,2,2),(3,10,3,3,3);"
+    sql "sync;"
+    // with delete sign and without seq col val, >/=/< conflicting seq val
+
+    def txn7 = load_data("k1,c1,c2", "1,20,55\n2,100,55\n3,120,55\n")
+    def txn8 = load_data("k1,c4,__DORIS_DELETE_SIGN__", 
"1,100,1\n2,100,1\n3,100,1\n")
+    do_streamload_2pc_commit(txn7)
+    wait_for_publish(txn7, 10)
+    do_streamload_2pc_commit(txn8)
+    wait_for_publish(txn8, 10)
+
+    qt_seq_map_4 "select * from ${table1} order by k1;"
+    inspect_rows "select 
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from 
${table1} order by k1,__DORIS_VERSION_COL__;"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to