This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 325eed6e8957c7166d437c4d8fdec5372c26df72
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Sat Sep 2 20:40:06 2023 +0800

    [Feature](CCR) Support MoW for CCR (#22798)
---
 be/src/service/backend_service.cpp                 |  49 +++-
 .../test_binlog_config_change.groovy               | 108 ++++++++
 .../test_create_table_with_binlog_config.groovy    |  88 ++++++
 .../ccr_mow_syncer_p0/test_get_binlog.groovy       | 142 ++++++++++
 .../ccr_mow_syncer_p0/test_ingest_binlog.groovy    | 121 +++++++++
 .../ccr_mow_syncer_p0/test_multi_buckets.groovy    |  98 +++++++
 .../suites/ccr_mow_syncer_p0/test_txn_case.groovy  | 301 +++++++++++++++++++++
 .../ccr_mow_syncer_p1/test_backup_restore.groovy   |  68 +++++
 8 files changed, 973 insertions(+), 2 deletions(-)

diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 5b4ca8f075..822c0e34f4 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -46,10 +46,12 @@
 #include "http/http_client.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
+#include "olap/tablet_meta.h"
 #include "olap/txn_manager.h"
 #include "runtime/exec_env.h"
 #include "runtime/external_scan_context_mgr.h"
@@ -629,7 +631,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& 
result,
         }
     }
 
-    // Step 6: create rowset && commit
+    // Step 6: create rowset && calculate delete bitmap && commit
     // Step 6.1: create rowset
     RowsetSharedPtr rowset;
     status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
@@ -645,7 +647,44 @@ void BackendService::ingest_binlog(TIngestBinlogResult& 
result,
         return;
     }
 
-    // Step 6.2: commit txn
+    // Step 6.2 calculate delete bitmap before commit
+    auto calc_delete_bitmap_token =
+            
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
+    DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(local_tablet_id);
+    RowsetIdUnorderedSet pre_rowset_ids;
+    if (local_tablet->enable_unique_key_merge_on_write()) {
+        auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
+        std::vector<segment_v2::SegmentSharedPtr> segments;
+        status = beta_rowset->load_segments(&segments);
+        if (!status) {
+            LOG(WARNING) << "failed to load segments from rowset"
+                         << ". rowset_id: " << beta_rowset->rowset_id() << ", 
txn_id=" << txn_id
+                         << ", status=" << status.to_string();
+            status.to_thrift(&tstatus);
+            return;
+        }
+        if (segments.size() > 1) {
+            // calculate delete bitmap between segments
+            status = local_tablet->calc_delete_bitmap_between_segments(rowset, 
segments,
+                                                                       
delete_bitmap);
+            if (!status) {
+                LOG(WARNING) << "failed to calculate delete bitmap"
+                             << ". tablet_id: " << local_tablet->tablet_id()
+                             << ". rowset_id: " << rowset->rowset_id() << ", 
txn_id=" << txn_id
+                             << ", status=" << status.to_string();
+                status.to_thrift(&tstatus);
+                return;
+            }
+        }
+
+        local_tablet->commit_phase_update_delete_bitmap(rowset, 
pre_rowset_ids, delete_bitmap,
+                                                        segments, txn_id,
+                                                        
calc_delete_bitmap_token.get(), nullptr);
+        calc_delete_bitmap_token->wait();
+        calc_delete_bitmap_token->get_delete_bitmap(delete_bitmap);
+    }
+
+    // Step 6.3: commit txn
     Status commit_txn_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
             local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
             rowset_meta->txn_id(), rowset_meta->tablet_id(), 
rowset_meta->tablet_schema_hash(),
@@ -661,6 +700,12 @@ void BackendService::ingest_binlog(TIngestBinlogResult& 
result,
         return;
     }
 
+    if (local_tablet->enable_unique_key_merge_on_write()) {
+        
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
+                partition_id, txn_id, local_tablet_id, 
local_tablet->schema_hash(),
+                local_tablet->tablet_uid(), true, delete_bitmap, 
pre_rowset_ids);
+    }
+
     tstatus.__set_status_code(TStatusCode::OK);
 }
 } // namespace doris
diff --git 
a/regression-test/suites/ccr_mow_syncer_p0/test_binlog_config_change.groovy 
b/regression-test/suites/ccr_mow_syncer_p0/test_binlog_config_change.groovy
new file mode 100644
index 0000000000..0e8a2022eb
--- /dev/null
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_binlog_config_change.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_binlog_config_change") {
+
+    def syncer = getSyncer()
+    def tableName = "tbl_binlog_config_change"
+    def test_num = 0
+    def insert_num = 5
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE if NOT EXISTS ${tableName} 
+        (
+            `test` INT,
+            `id` INT
+        )
+        ENGINE=OLAP
+        UNIQUE KEY(`test`, `id`)
+        DISTRIBUTED BY HASH(id) BUCKETS 1 
+        PROPERTIES ( 
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+    sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+    target_sql "DROP TABLE IF EXISTS ${tableName}"
+    target_sql """
+        CREATE TABLE if NOT EXISTS ${tableName} 
+        (
+            `test` INT,
+            `id` INT
+        )
+        ENGINE=OLAP
+        UNIQUE KEY(`test`, `id`)
+        DISTRIBUTED BY HASH(id) BUCKETS 1 
+        PROPERTIES ( 
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+    assertTrue(syncer.getTargetMeta("${tableName}"))
+
+    // test 1: target cluster follow source cluster
+    logger.info("=== Test 1: Target cluster follow source cluster case ===")
+    test_num = 1
+    for (int index = 0; index < insert_num; index++) {
+        sql """
+            INSERT INTO ${tableName} VALUES (${test_num}, ${index})
+            """
+        assertTrue(syncer.getBinlog("${tableName}"))
+        assertTrue(syncer.beginTxn("${tableName}"))
+        assertTrue(syncer.getBackendClients())
+        assertTrue(syncer.ingestBinlog())
+        assertTrue(syncer.commitTxn())
+        assertTrue(syncer.checkTargetVersion())
+        syncer.closeBackendClients()
+    }
+
+    def res = target_sql """SELECT * FROM ${tableName} WHERE 
test=${test_num}"""
+    assertTrue(res.size() == insert_num)
+
+    // TODO: bugfix
+    // test 2: source cluster disable and re-enable binlog
+    // target_sql "DROP TABLE IF EXISTS ${tableName}"
+    // target_sql """
+    //     CREATE TABLE if NOT EXISTS ${tableName} 
+    //     (
+    //         `test` INT,
+    //         `id` INT
+    //     )
+    //     ENGINE=OLAP
+    //     UNIQUE KEY(`test`, `id`)
+    //     DISTRIBUTED BY HASH(id) BUCKETS 1 
+    //     PROPERTIES ( 
+    //         "replication_allocation" = "tag.location.default: 1"
+    //     )
+    // """
+    // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "false")"""
+    // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+    // syncer.context.seq = -1
+
+    // assertTrue(syncer.getBinlog("${tableName}"))
+    // assertTrue(syncer.beginTxn("${tableName}"))
+    // assertTrue(syncer.ingestBinlog())
+    // assertTrue(syncer.commitTxn())
+    // assertTrue(syncer.checkTargetVersion())
+
+    // res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
+    // assertTrue(res.size() == insert_num)
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/ccr_mow_syncer_p0/test_create_table_with_binlog_config.groovy
 
b/regression-test/suites/ccr_mow_syncer_p0/test_create_table_with_binlog_config.groovy
new file mode 100644
index 0000000000..e4610bbe0d
--- /dev/null
+++ 
b/regression-test/suites/ccr_mow_syncer_p0/test_create_table_with_binlog_config.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_create_table_with_binlog_config") {
+    sql "drop database if exists test_table_binlog"
+
+    sql """
+        create database test_table_binlog
+        """
+    result = sql "show create database test_table_binlog"
+    logger.info("${result}")
+
+    // Case 1: database disable binlog, create table with binlog disable
+    sql """
+        CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE 
KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( 
"enable_unique_key_merge_on_write" = "true", "replication_num" = "1", 
"binlog.enable" = "false" );
+        """
+    result = sql "show create table test_table_binlog.t1"
+    logger.info("${result}")
+    assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = 
"false"'))
+    sql """
+        drop table if exists test_table_binlog.t1
+        """
+
+    // Case 2: database disable binlog, create table with binlog enable
+    sql """
+        CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE 
KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( 
"enable_unique_key_merge_on_write" = "true", "replication_num" = "1", 
"binlog.enable" = "true" );
+        """
+    result = sql "show create table test_table_binlog.t1"
+    logger.info("${result}")
+    assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = 
"true"'))
+    sql """
+        drop table if exists test_table_binlog.t1
+        """
+
+    // Case 3: database enable binlog, create table with binlog disable
+    sql """
+        alter database test_table_binlog set properties ("binlog.enable" = 
"true")
+        """
+    assertThrows(Exception.class, {
+        sql """
+            CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE 
KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( 
"enable_unique_key_merge_on_write" = "true", "replication_num" = "1", 
"binlog.enable" = "false" );
+            """
+    })
+    sql """
+        drop table if exists test_table_binlog.t1
+        """
+
+    // Case 4: database enable binlog, create table with binlog enable
+    sql """
+        alter database test_table_binlog set properties ("binlog.enable" = 
"true")
+        """
+    sql """
+        CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE 
KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( 
"enable_unique_key_merge_on_write" = "true", "replication_num" = "1", 
"binlog.enable" = "true" );
+        """
+    result = sql "show create table test_table_binlog.t1"
+    logger.info("${result}")
+    assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = 
"true"'))
+    sql """
+        drop table if exists test_table_binlog.t1
+        """
+
+    // Case 5: database enable binlog, create table inherit database binlog 
config
+    sql """
+        CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE 
KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( 
"enable_unique_key_merge_on_write" = "true", "replication_num" = "1" );
+        """
+    result = sql "show create table test_table_binlog.t1"
+    logger.info("${result}")
+    assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = 
"true"'))
+    sql """
+        drop table if exists test_table_binlog.t1
+        """
+
+    sql "drop database if exists test_table_binlog"
+}
diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_get_binlog.groovy 
b/regression-test/suites/ccr_mow_syncer_p0/test_get_binlog.groovy
new file mode 100644
index 0000000000..18a680380c
--- /dev/null
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_get_binlog.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_get_binlog_case") {
+
+    def create_table = { TableName ->
+        sql "DROP TABLE IF EXISTS ${TableName}"
+        sql """
+            CREATE TABLE if NOT EXISTS ${TableName} 
+            (
+                `test` INT,
+                `id` INT
+            )
+            ENGINE=OLAP
+            UNIQUE KEY(`test`, `id`)
+            DISTRIBUTED BY HASH(id) BUCKETS 1 
+            PROPERTIES ( 
+                "enable_unique_key_merge_on_write" = "true",
+                "replication_allocation" = "tag.location.default: 1"
+            )
+        """
+    }
+    
+    def syncer = getSyncer()
+    def seqTableName = "tbl_get_binlog_case"
+    def test_num = 0
+    def insert_num = 5
+    long seq = -1
+    create_table.call(seqTableName)
+    sql """ALTER TABLE ${seqTableName} set ("binlog.enable" = "true")"""
+    sql """
+            INSERT INTO ${seqTableName} VALUES (${test_num}, 0)
+        """
+    assertTrue(syncer.getBinlog("${seqTableName}"))
+    long firstSeq = syncer.context.seq
+
+
+
+
+    logger.info("=== Test 1: normal case ===")
+    test_num = 1
+    for (int index = 0; index < insert_num; index++) {
+        sql """
+            INSERT INTO ${seqTableName} VALUES (${test_num}, ${index})
+            """
+        assertTrue(syncer.getBinlog("${seqTableName}"))
+    }
+
+    long endSeq = syncer.context.seq
+
+
+
+
+    logger.info("=== Test 2: Abnormal seq case ===")
+    logger.info("=== Test 2.1: too old seq case ===")
+    syncer.context.seq = -1
+    assertTrue(syncer.context.seq == -1)
+    assertTrue(syncer.getBinlog("${seqTableName}"))
+    assertTrue(syncer.context.seq == firstSeq)
+
+
+    logger.info("=== Test 2.2: too new seq case ===")
+    syncer.context.seq = endSeq + 100
+    assertTrue((syncer.getBinlog("${seqTableName}")) == false)
+
+
+    logger.info("=== Test 2.3: not find table case ===")
+    assertTrue(syncer.getBinlog("this_is_an_invalid_tbl") == false)
+
+
+    logger.info("=== Test 2.4: seq between first and end case ===")
+    long midSeq = (firstSeq + endSeq) / 2
+    syncer.context.seq = midSeq
+    assertTrue(syncer.getBinlog("${seqTableName}"))
+    long test5Seq = syncer.context.seq
+    assertTrue(firstSeq <= test5Seq && test5Seq <= endSeq)
+
+    
+
+
+
+    logger.info("=== Test 3: Get binlog with different priv user case ===")
+    logger.info("=== Test 3.1: read only user get binlog case ===")
+    // TODO: bugfix
+    // syncer.context.seq = -1
+    // readOnlyUser = "read_only_user"
+    // sql """DROP USER IF EXISTS ${readOnlyUser}"""
+    // sql """CREATE USER ${readOnlyUser} IDENTIFIED BY '123456'"""
+    // sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${readOnlyUser}"""
+    // sql """GRANT SELECT_PRIV ON TEST_${context.dbName}.${seqTableName} TO 
${readOnlyUser}"""
+    // syncer.context.user = "${readOnlyUser}"
+    // syncer.context.passwd = "123456"
+    // assertTrue(syncer.getBinlog("${seqTableName}"))
+
+    
+    logger.info("=== Test 3.2: no priv user get binlog case ===")
+    syncer.context.seq = -1
+    noPrivUser = "no_priv_user"
+    emptyTable = "tbl_empty_test"
+    sql "DROP TABLE IF EXISTS ${emptyTable}"
+    sql """
+        CREATE TABLE if NOT EXISTS ${emptyTable} 
+        (
+            `test` INT,
+            `id` INT
+        )
+        ENGINE=OLAP
+        UNIQUE KEY(`test`, `id`)
+        DISTRIBUTED BY HASH(id) BUCKETS 1 
+        PROPERTIES ( 
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+    sql """DROP USER IF EXISTS ${noPrivUser}"""
+    sql """CREATE USER ${noPrivUser} IDENTIFIED BY '123456'"""
+    sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${noPrivUser}"""
+    sql """GRANT ALL ON TEST_${context.dbName}.${emptyTable} TO 
${noPrivUser}"""
+    syncer.context.user = "${noPrivUser}"
+    syncer.context.passwd = "123456"
+    assertTrue((syncer.getBinlog("${seqTableName}")) == false)
+    
+
+    logger.info("=== Test 3.3: Non-existent user set in syncer get binlog case 
===")
+    syncer.context.user = "this_is_an_invalid_user"
+    syncer.context.passwd = "this_is_an_invalid_user"
+    assertTrue(syncer.getBinlog("${seqTableName}", false) == false)
+}
\ No newline at end of file
diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy 
b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
new file mode 100644
index 0000000000..4b1b273ab8
--- /dev/null
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_ingest_binlog") {
+
+    def syncer = getSyncer()
+    def tableName = "tbl_ingest_binlog"
+    def insert_num = 5
+    def test_num = 0
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+           CREATE TABLE if NOT EXISTS ${tableName} 
+           (
+               `test` INT,
+               `id` INT
+           )
+           ENGINE=OLAP
+           UNIQUE KEY(`test`, `id`)
+           DISTRIBUTED BY HASH(id) BUCKETS 1 
+           PROPERTIES ( 
+                "enable_unique_key_merge_on_write" = "true",
+               "replication_allocation" = "tag.location.default: 1"
+           )
+        """
+    sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+    target_sql "DROP TABLE IF EXISTS ${tableName}"
+    target_sql """
+                  CREATE TABLE if NOT EXISTS ${tableName} 
+                  (
+                      `test` INT,
+                      `id` INT
+                  )
+                  ENGINE=OLAP
+                  UNIQUE KEY(`test`, `id`)
+                  DISTRIBUTED BY HASH(id) BUCKETS 1 
+                  PROPERTIES ( 
+                        "enable_unique_key_merge_on_write" = "true",
+                      "replication_allocation" = "tag.location.default: 1"
+                  )
+              """
+    assertTrue(syncer.getTargetMeta("${tableName}"))
+
+
+
+
+    logger.info("=== Test 1: Common ingest binlog case ===")
+    test_num = 1
+    for (int index = 0; index < insert_num; index++) {
+        sql """
+            INSERT INTO ${tableName} VALUES (${test_num}, ${index})
+        """
+        assertTrue(syncer.getBinlog("${tableName}"))
+        assertTrue(syncer.beginTxn("${tableName}"))
+        assertTrue(syncer.getBackendClients())
+        assertTrue(syncer.ingestBinlog())
+        assertTrue(syncer.commitTxn())
+        assertTrue(syncer.checkTargetVersion())
+        syncer.closeBackendClients()
+    }
+
+    res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
+    assertTrue(res.size() == insert_num)
+
+
+
+
+    logger.info("=== Test 2: Wrong IngestBinlogRequest case ===")
+    test_num = 2
+    sql """
+            INSERT INTO ${tableName} VALUES (${test_num}, 0)
+        """
+    assertTrue(syncer.getBinlog("${tableName}"))
+    assertTrue(syncer.beginTxn("${tableName}"))
+    assertTrue(syncer.getBackendClients())
+
+
+    logger.info("=== Test 2.1: Wrong txnId case ===")
+    // TODO: bugfix
+    // def originTxnId = syncer.context.txnId
+    // syncer.context.txnId = -1
+    // assertTrue(syncer.ingestBinlog() == false)
+    // syncer.context.txnId = originTxnId
+
+
+    logger.info("=== Test 2.2: Wrong binlog version case ===")
+    // -1 means use the number of syncer.context
+    // Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = -1)
+    assertTrue(syncer.ingestBinlog(-1, 1) == false)
+
+
+    logger.info("=== Test 2.3: Wrong partitionId case ===")
+    // TODO: bugfix
+    // assertTrue(syncer.ingestBinlog(1, -1) == false)
+
+
+    logger.info("=== Test 2.4: Right case ===")
+    assertTrue(syncer.ingestBinlog())
+    assertTrue(syncer.commitTxn())
+    assertTrue(syncer.checkTargetVersion())
+    res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
+    assertTrue(res.size() == 1)
+
+
+    // End Test 2
+    syncer.closeBackendClients()
+}
\ No newline at end of file
diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_multi_buckets.groovy 
b/regression-test/suites/ccr_mow_syncer_p0/test_multi_buckets.groovy
new file mode 100644
index 0000000000..7f4051552f
--- /dev/null
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_multi_buckets.groovy
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_multi_buckets") {
+
+    def syncer = getSyncer()
+    def tableName = "tbl_multi_buckets"
+    def test_num = 0
+    def insert_num = 5
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE if NOT EXISTS ${tableName} 
+        (
+            `test` INT,
+            `id` INT
+        )
+        ENGINE=OLAP
+        UNIQUE KEY(`test`, `id`)
+        DISTRIBUTED BY HASH(id) BUCKETS 3
+        PROPERTIES ( 
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+    sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+    target_sql "DROP TABLE IF EXISTS ${tableName}"
+    target_sql """
+               CREATE TABLE if NOT EXISTS ${tableName} 
+               (
+                   `test` INT,
+                   `id` INT
+               )
+               ENGINE=OLAP
+               UNIQUE KEY(`test`, `id`)
+               DISTRIBUTED BY HASH(id) BUCKETS 3
+               PROPERTIES ( 
+                    "enable_unique_key_merge_on_write" = "true",
+                   "replication_allocation" = "tag.location.default: 1"
+               )
+               """
+    assertTrue(syncer.getTargetMeta("${tableName}"))
+
+
+
+
+    logger.info("=== Test 1: Blank row set case ===")
+    test_num = 1
+    sql """
+        INSERT INTO ${tableName} VALUES (${test_num}, 0)
+        """
+    assertTrue(syncer.getBinlog("${tableName}"))
+    assertTrue(syncer.beginTxn("${tableName}"))
+    assertTrue(syncer.getBackendClients())
+    assertTrue(syncer.ingestBinlog())
+    assertTrue(syncer.commitTxn())
+    syncer.closeBackendClients()
+    assertTrue(syncer.checkTargetVersion())
+    def res = target_sql """SELECT * FROM ${tableName} WHERE 
test=${test_num}"""
+    assertTrue(res.size() == 1)
+
+
+
+
+    logger.info("=== Test 2: Upsert case ===")
+    test_num = 2
+    for (int index = 0; index < insert_num; index++) {
+        sql """
+            INSERT INTO ${tableName} VALUES (${test_num}, ${index})
+        """
+        assertTrue(syncer.getBinlog("${tableName}"))
+        assertTrue(syncer.beginTxn("${tableName}"))
+        assertTrue(syncer.getBackendClients())
+        assertTrue(syncer.ingestBinlog())
+        assertTrue(syncer.commitTxn())
+        assertTrue(syncer.checkTargetVersion())
+        syncer.closeBackendClients()
+    }
+
+    res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
+    assertTrue(res.size() == insert_num)
+
+}
diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_txn_case.groovy 
b/regression-test/suites/ccr_mow_syncer_p0/test_txn_case.groovy
new file mode 100644
index 0000000000..05f44e4808
--- /dev/null
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_txn_case.groovy
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_txn_case") {
+
+    // TODO: bugfix
+    def fullPriv = ["SELECT_PRIV"/*, "LOAD_PRIV"*/, "ALTER_PRIV", 
"CREATE_PRIV", "DROP_PRIV"]
+    def nowPriv = []
+    def recursionPriv = { fullPrivList, idx, nowPrivList, num, callback ->
+        for (; (num - nowPrivList.size() <= fullPrivList.size() - idx) && 
(nowPrivList.size()) < num; ++idx) {
+            nowPrivList.push(fullPrivList[idx])
+            call(fullPrivList, idx + 1, nowPrivList, num, callback)
+            nowPrivList.pop()
+        }
+        if (nowPrivList.size() == num) {
+            String privStr = ""
+            for (int i = 0; i < num; ++i) {
+                privStr += nowPrivList[i]
+                if (i < num - 1) {
+                    privStr += ", "
+                }
+            }
+            callback.call(privStr)
+        }
+    }
+
+    def syncer = getSyncer()
+    def txnTableName = "tbl_txn_case"
+    def test_num = 0
+    sql "DROP TABLE IF EXISTS ${txnTableName}"
+    sql """
+           CREATE TABLE if NOT EXISTS ${txnTableName} 
+           (
+               `test` INT,
+               `id` INT
+           )
+           ENGINE=OLAP
+           UNIQUE KEY(`test`, `id`)
+           DISTRIBUTED BY HASH(id) BUCKETS 1 
+           PROPERTIES ( 
+                "enable_unique_key_merge_on_write" = "true",
+               "replication_allocation" = "tag.location.default: 1"
+           )
+        """
+    sql """ALTER TABLE ${txnTableName} set ("binlog.enable" = "true")"""
+
+    target_sql "DROP TABLE IF EXISTS ${txnTableName}"
+    target_sql """
+                  CREATE TABLE if NOT EXISTS ${txnTableName} 
+                  (
+                      `test` INT,
+                      `id` INT
+                  )
+                  ENGINE=OLAP
+                  UNIQUE KEY(`test`, `id`)
+                  DISTRIBUTED BY HASH(id) BUCKETS 1 
+                  PROPERTIES ( 
+                        "enable_unique_key_merge_on_write" = "true",
+                      "replication_allocation" = "tag.location.default: 1"
+                  )
+              """
+    assertTrue(syncer.getTargetMeta("${txnTableName}"))
+
+    
+
+
+    logger.info("=== Test 1: common txn case ===")
+    test_num = 1
+    sql """
+            INSERT INTO ${txnTableName} VALUES (${test_num}, 0)
+        """
+    assertTrue(syncer.getBinlog("${txnTableName}"))
+    assertTrue(syncer.getBackendClients())
+    assertTrue(syncer.beginTxn("${txnTableName}"))
+    assertTrue(syncer.ingestBinlog())
+    assertTrue(syncer.commitTxn())
+    assertTrue(syncer.checkTargetVersion())
+    def res = target_sql """SELECT * FROM ${txnTableName} WHERE 
test=${test_num}"""
+    assertTrue(res.size() == 1)
+
+
+
+
+    logger.info("=== Test 2: Wrong BeginTxnRequest context case ===")
+
+
+    logger.info("=== Test 2.1: Begin a txn with non-existent table case ===")
+    assertTrue(syncer.beginTxn("tbl_non_existent") == false)
+
+
+    logger.info("=== Test 2.2: Begin a txn with duplicate labels case ===")
+    assertTrue(syncer.beginTxn("${txnTableName}") == false)
+
+
+    // End Test 2
+    syncer.closeBackendClients()
+
+
+
+
+    logger.info("=== Test 3: Begin a txn with different priv user case ===")
+    test_num = 3
+    sql """
+            INSERT INTO ${txnTableName} VALUES (${test_num}, 0)
+        """
+    assertTrue(syncer.getBinlog("${txnTableName}"))
+
+
+    logger.info("=== Test 3.1: Begin a txn with non-existent user set in 
syncer case ===")
+    syncer.context.user = "this_is_an_invalid_user"
+    syncer.context.passwd = "this_is_an_invalid_user"
+    assertTrue(syncer.beginTxn("${txnTableName}") == false)
+
+
+    logger.info("=== Test 3.2: Begin a txn with no priv user case ===")
+    def noPrivUser = "no_priv_user"
+    def emptyTable = "tbl_empty_test"
+    target_sql "DROP TABLE IF EXISTS ${emptyTable}"
+    target_sql """
+                CREATE TABLE if NOT EXISTS ${emptyTable} 
+                (
+                    `test` INT,
+                    `id` INT
+                )
+                ENGINE=OLAP
+                UNIQUE KEY(`test`, `id`)
+                DISTRIBUTED BY HASH(id) BUCKETS 1 
+                PROPERTIES ( 
+                    "enable_unique_key_merge_on_write" = "true",
+                    "replication_allocation" = "tag.location.default: 1"
+                )
+            """
+    target_sql """DROP USER IF EXISTS ${noPrivUser}"""
+    target_sql """CREATE USER ${noPrivUser} IDENTIFIED BY '123456'"""
+    target_sql """GRANT ALL ON ${context.config.defaultDb}.* TO 
${noPrivUser}"""
+    target_sql """GRANT ALL ON TEST_${context.dbName}.${emptyTable} TO 
${noPrivUser}"""
+    syncer.context.user = "${noPrivUser}"
+    syncer.context.passwd = "123456"
+    assertTrue(syncer.beginTxn("${txnTableName}") == false)
+
+    // TODO: bugfix
+    // Recursively selecting privileges, 
+    // if not all privileges are obtained, txn should not be began
+    logger.info("=== Test 3.3: Begin a txn with low priv user case ===")
+    def lowPrivUser = "low_priv_user"
+    target_sql """DROP USER IF EXISTS ${lowPrivUser}"""
+    target_sql """CREATE USER ${lowPrivUser} IDENTIFIED BY '123456'"""
+    target_sql """GRANT ALL ON ${context.config.defaultDb}.* TO 
${lowPrivUser}"""
+    syncer.context.user = "${lowPrivUser}"
+    syncer.context.passwd = "123456"
+
+    def beginTxnCallback = { privStr ->
+        target_sql """GRANT ${privStr} ON 
TEST_${context.dbName}.${txnTableName} TO ${lowPrivUser}"""
+        assertTrue((syncer.beginTxn("${txnTableName}")) == false)
+        target_sql """REVOKE ${privStr} ON 
TEST_${context.dbName}.${txnTableName} FROM ${lowPrivUser}"""
+    }
+
+    for (int i = 1; i <= 4; ++i) {
+        recursionPriv.call(fullPriv, 0, nowPriv, i, beginTxnCallback)
+    }
+
+    logger.info("=== Test 3.4: Complete the txn with SHOW_PRIV user case ===")
+    def showPrivUser = "show_priv_user"
+    target_sql """DROP USER IF EXISTS ${showPrivUser}"""
+    target_sql """CREATE USER ${showPrivUser} IDENTIFIED BY '123456'"""
+    target_sql """GRANT ALL ON ${context.config.defaultDb}.* TO 
${showPrivUser}"""
+    target_sql """
+                  GRANT 
+                  SELECT_PRIV, LOAD_PRIV, ALTER_PRIV, CREATE_PRIV, DROP_PRIV 
+                  ON TEST_${context.dbName}.${txnTableName}
+                  TO ${showPrivUser}
+               """
+    syncer.context.user = "${showPrivUser}"
+    syncer.context.passwd = "123456"
+    assertTrue(syncer.beginTxn("${txnTableName}"))
+    assertTrue(syncer.getBackendClients())
+    assertTrue(syncer.ingestBinlog())
+    assertTrue(syncer.commitTxn())
+    assertTrue(syncer.checkTargetVersion())
+    res = target_sql """SELECT * FROM ${txnTableName} WHERE test=${test_num}"""
+    assertTrue(res.size() == 1)
+
+    // End Test 3
+    syncer.context.user = context.config.feSyncerUser
+    syncer.context.passwd = context.config.feSyncerPassword
+    syncer.closeBackendClients()
+
+
+
+
+    logger.info("=== Test 4: Wrong CommitTxnRequest context case ===")
+    test_num = 4
+    sql """
+            INSERT INTO ${txnTableName} VALUES (${test_num}, 0)
+        """
+    assertTrue(syncer.getBinlog("${txnTableName}"))
+    assertTrue(syncer.getBackendClients())
+    assertTrue(syncer.beginTxn("${txnTableName}"))
+    assertTrue(syncer.ingestBinlog())
+
+    
+    logger.info("=== Test 4.1: Wrong txnId case ===")
+    def originTxnId = syncer.context.txnId
+    syncer.context.txnId = -1
+    assertTrue(syncer.commitTxn() == false)
+    syncer.context.txnId = originTxnId
+
+
+    logger.info("=== Test 4.2: Wrong commit info case ===")
+    // TODO: bugfix
+    // def originCommitInfos = syncer.resetCommitInfos()
+    // syncer.context.addCommitInfo(-1, -1)
+    // assertTrue(syncer.commitTxn()) == false)
+
+
+    logger.info("=== Test 4.3: Empty commit info case ===")
+    // TODO: bugfix
+    // assertTrue(syncer.commitTxn() == false)
+
+
+    logger.info("=== Test 4.4: duplicate txnId case ===")
+    // TODO: bugfix
+    // def lastCommitInfo = syncer.copyCommitInfos()
+    assertTrue(syncer.commitTxn())
+    assertTrue(syncer.checkTargetVersion())
+    res = target_sql """SELECT * FROM ${txnTableName} WHERE test=${test_num}"""
+    assertTrue(res.size() == 1)
+    // syncer.context.commitInfos = lastCommitInfo
+    // assertTrue(syncer.commitTxn() == false)
+
+    // End Test 4
+    syncer.closeBackendClients()
+    
+
+
+
+    logger.info("=== Test 5: User root beginTxn, Other user commitTxn case 
===")
+    test_num = 5
+    sql """
+            INSERT INTO ${txnTableName} VALUES (${test_num}, 0)
+        """
+    assertTrue(syncer.getBinlog("${txnTableName}"))
+    assertTrue(syncer.getBackendClients())
+    assertTrue(syncer.beginTxn("${txnTableName}"))
+    assertTrue(syncer.ingestBinlog())
+
+
+    logger.info("=== Test 5.1: Non-existent user commitTxn case ===")
+    syncer.context.user = "this_is_an_invalid_user"
+    syncer.context.passwd = "this_is_an_invalid_user"
+    assertTrue(syncer.commitTxn() == false)
+
+
+    logger.info("=== Test 5.2: No priv user commitTxn case ===")
+    syncer.context.user = "${noPrivUser}"
+    syncer.context.passwd = "123456"
+    assertTrue(syncer.commitTxn() == false)
+
+
+    logger.info("=== Test 5.3: Low priv user commitTxn case ===")
+    syncer.context.user = "${lowPrivUser}"
+    syncer.context.passwd = "123456"
+
+    def commitTxnCallback = { privStr ->
+        target_sql """GRANT ${privStr} ON 
TEST_${context.dbName}.${txnTableName} TO ${lowPrivUser}"""
+        assertTrue(syncer.commitTxn() == false)
+        target_sql """REVOKE ${privStr} ON 
TEST_${context.dbName}.${txnTableName} FROM ${lowPrivUser}"""
+    }
+    for (int i = 1; i <= 4; ++i) {
+        recursionPriv.call(fullPriv, 0, nowPriv, i, commitTxnCallback)
+    }
+
+
+    logger.info("=== Test 5.4: SHOW_PRIV user commitTxn case ===")
+    syncer.context.user = "${showPrivUser}"
+    syncer.context.passwd = "123456"
+    assertTrue(syncer.commitTxn())
+    assertTrue(syncer.checkTargetVersion())
+    res = target_sql """SELECT * FROM ${txnTableName} WHERE test=${test_num}"""
+    assertTrue(res.size() == 1)
+
+    // End Test 5
+    syncer.context.user = context.config.feSyncerUser
+    syncer.context.passwd = context.config.feSyncerPassword
+    syncer.closeBackendClients()
+    
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/ccr_mow_syncer_p1/test_backup_restore.groovy 
b/regression-test/suites/ccr_mow_syncer_p1/test_backup_restore.groovy
new file mode 100644
index 0000000000..b0844cded5
--- /dev/null
+++ b/regression-test/suites/ccr_mow_syncer_p1/test_backup_restore.groovy
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_backup_restore") {
+
+    def syncer = getSyncer()
+    def tableName = "tbl_backup_restore"
+    def test_num = 0
+    def insert_num = 5
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+           CREATE TABLE if NOT EXISTS ${tableName} 
+           (
+               `test` INT,
+               `id` INT
+           )
+           ENGINE=OLAP
+           UNIQUE KEY(`test`, `id`)
+           DISTRIBUTED BY HASH(id) BUCKETS 1 
+           PROPERTIES ( 
+                "enable_unique_key_merge_on_write" = "true",
+               "replication_allocation" = "tag.location.default: 1",
+               "binlog.enable" = "true"
+           )
+        """
+
+    logger.info("=== Test 1: Common backup and restore ===")
+    test_num = 1
+    def snapshotName = "snapshot_test_1"
+    for (int i = 0; i < insert_num; ++i) {
+        sql """
+               INSERT INTO ${tableName} VALUES (${test_num}, ${i})
+            """ 
+    }
+    def res = sql "SELECT * FROM ${tableName}"
+    assertTrue(res.size() == insert_num)
+    sql """ 
+            BACKUP SNAPSHOT ${context.dbName}.${snapshotName} 
+            TO `__keep_on_local__` 
+            ON (${tableName})
+            PROPERTIES ("type" = "full")
+        """
+    while (syncer.checkSnapshotFinish() == false) {
+        Thread.sleep(3000)
+    }
+    assertTrue(syncer.getSnapshot("${snapshotName}", "${tableName}"))
+    assertTrue(syncer.restoreSnapshot(true))
+    while (syncer.checkRestoreFinish() == false) {
+        Thread.sleep(3000)
+    }
+    res = target_sql "SELECT * FROM ${tableName}"
+    assertTrue(res.size() == insert_num)
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to