This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 94a6eacd9df branch-3.0: [fix](suites) Fix syncer ingest binlog with 
multiple replicas #44444 (#44486)
94a6eacd9df is described below

commit 94a6eacd9df00c75d384454214ba7dcd2f0d654f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 22 20:07:09 2024 +0800

    branch-3.0: [fix](suites) Fix syncer ingest binlog with multiple replicas 
#44444 (#44486)
    
    Cherry-picked from #44444
    
    Co-authored-by: walter <maoch...@selectdb.com>
---
 .../apache/doris/regression/suite/Syncer.groovy    | 95 ++++++++++++----------
 .../doris/regression/suite/SyncerContext.groovy    | 27 +++++-
 .../ccr_mow_syncer_p0/test_ingest_binlog.groovy    | 36 ++++----
 3 files changed, 98 insertions(+), 60 deletions(-)

diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 894b42824ef..2195e7e745a 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -700,8 +700,13 @@ class Syncer {
                 // step 3.2: get partition/indexId/tabletId
                 partitionSQl += "/" + meta.indexId.toString()
                 sqlInfo = sendSql.call(partitionSQl, toSrc)
+                Map<Long, Long> replicaMap = Maps.newHashMap()
                 for (List<Object> row : sqlInfo) {
-                    meta.tabletMeta.put(row[0] as Long, row[2] as Long)
+                    Long tabletId = row[0] as Long
+                    if (!meta.tabletMeta.containsKey(tabletId)) {
+                        meta.tabletMeta.put(tabletId, new TabletMeta())
+                    }
+                    meta.tabletMeta[tabletId].replicas.put(row[1] as Long, 
row[2] as Long)
                 }
                 if (meta.tabletMeta.isEmpty()) {
                     logger.error("Target cluster get 
(partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.")
@@ -816,49 +821,57 @@ class Syncer {
                     while (srcTabletIter.hasNext()) {
                         Entry srcTabletMap = srcTabletIter.next()
                         Entry tarTabletMap = tarTabletIter.next()
+                        TabletMeta srcTabletMeta = srcTabletMap.value
+                        TabletMeta tarTabletMeta = tarTabletMap.value
+
+                        Iterator srcReplicaIter = 
srcTabletMeta.replicas.iterator()
+                        Iterator tarReplicaIter = 
tarTabletMeta.replicas.iterator()
+                        while (srcReplicaIter.hasNext()) {
+                            Entry srcReplicaMap = srcReplicaIter.next()
+                            Entry tarReplicaMap = tarReplicaIter.next()
+                            BackendClientImpl srcClient = 
context.sourceBackendClients.get(srcReplicaMap.value)
+                            if (srcClient == null) {
+                                logger.error("Can't find src 
tabletId-${srcReplicaMap.key} -> beId-${srcReplicaMap.value}")
+                                return false
+                            }
+                            BackendClientImpl tarClient = 
context.targetBackendClients.get(tarReplicaMap.value)
+                            if (tarClient == null) {
+                                logger.error("Can't find target 
tabletId-${tarReplicaMap.key} -> beId-${tarReplicaMap.value}")
+                                return false
+                            }
 
-                        BackendClientImpl srcClient = 
context.sourceBackendClients.get(srcTabletMap.value)
-                        if (srcClient == null) {
-                            logger.error("Can't find src 
tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
-                            return false
-                        }
-                        BackendClientImpl tarClient = 
context.targetBackendClients.get(tarTabletMap.value)
-                        if (tarClient == null) {
-                            logger.error("Can't find target 
tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
-                            return false
-                        }
-
-                        tarPartition.value.version = srcPartition.value.version
-                        long partitionId = fakePartitionId == -1 ? 
tarPartition.key : fakePartitionId
-                        long version = fakeVersion == -1 ? 
partitionRecord.version : fakeVersion
-
-                        TIngestBinlogRequest request = new 
TIngestBinlogRequest()
-                        TUniqueId uid = new TUniqueId(-1, -1)
-                        request.setTxnId(txnId)
-                        request.setRemoteTabletId(srcTabletMap.key)
-                        request.setBinlogVersion(version)
-                        request.setRemoteHost(srcClient.address.hostname)
-                        request.setRemotePort(srcClient.httpPort.toString())
-                        request.setPartitionId(partitionId)
-                        request.setLocalTabletId(tarTabletMap.key)
-                        request.setLoadId(uid)
-                        logger.info("request -> ${request}")
-                        TIngestBinlogResult result = 
tarClient.client.ingestBinlog(request)
-                        if (!checkIngestBinlog(result)) {
-                            logger.error("Ingest binlog error! result: 
${result}")
-                            return false
-                        }
+                            tarPartition.value.version = 
srcPartition.value.version
+                            long partitionId = fakePartitionId == -1 ? 
tarPartition.key : fakePartitionId
+                            long version = fakeVersion == -1 ? 
partitionRecord.version : fakeVersion
+
+                            TIngestBinlogRequest request = new 
TIngestBinlogRequest()
+                            TUniqueId uid = new TUniqueId(-1, -1)
+                            request.setTxnId(txnId)
+                            request.setRemoteTabletId(srcTabletMap.key)
+                            request.setBinlogVersion(version)
+                            request.setRemoteHost(srcClient.address.hostname)
+                            
request.setRemotePort(srcClient.httpPort.toString())
+                            request.setPartitionId(partitionId)
+                            request.setLocalTabletId(tarTabletMap.key)
+                            request.setLoadId(uid)
+                            logger.info("request -> ${request}")
+                            TIngestBinlogResult result = 
tarClient.client.ingestBinlog(request)
+                            if (!checkIngestBinlog(result)) {
+                                logger.error("Ingest binlog error! result: 
${result}")
+                                return false
+                            }
 
-                        if (context.txnInsert) {
-                            List<TTabletCommitInfo> tabletCommitInfos = 
subTxnIdToTabletCommitInfos.get(txnId)
-                            if (tabletCommitInfos == null) {
-                                tabletCommitInfos = new 
ArrayList<TTabletCommitInfo>()
-                                subTxnIdToTabletCommitInfos.put(txnId, 
tabletCommitInfos)
-                                subTxnIdToTableId.put(txnId, tarTableMeta.id)
+                            if (context.txnInsert) {
+                                List<TTabletCommitInfo> tabletCommitInfos = 
subTxnIdToTabletCommitInfos.get(txnId)
+                                if (tabletCommitInfos == null) {
+                                    tabletCommitInfos = new 
ArrayList<TTabletCommitInfo>()
+                                    subTxnIdToTabletCommitInfos.put(txnId, 
tabletCommitInfos)
+                                    subTxnIdToTableId.put(txnId, 
tarTableMeta.id)
+                                }
+                                tabletCommitInfos.add(new 
TTabletCommitInfo(tarTabletMap.key, tarReplicaMap.value))
+                            } else {
+                                addCommitInfo(tarTabletMap.key, 
tarReplicaMap.value)
                             }
-                            tabletCommitInfos.add(new 
TTabletCommitInfo(tarTabletMap.key, tarTabletMap.value))
-                        } else {
-                            addCommitInfo(tarTabletMap.key, tarTabletMap.value)
                         }
                     }
                 }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
index b86f012aa87..3202db4011f 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
@@ -32,10 +32,22 @@ import groovy.util.logging.Slf4j
 
 import java.sql.Connection
 
+class TabletMeta {
+    public TreeMap<Long, Long> replicas
+
+    TabletMeta() {
+        this.replicas = new TreeMap<Long, Long>()
+    }
+
+    String toString() {
+        return "TabletMeta: { replicas: " + replicas.toString() + " }"
+    }
+}
+
 class PartitionMeta {
     public long version
     public long indexId
-    public TreeMap<Long, Long> tabletMeta
+    public TreeMap<Long, TabletMeta> tabletMeta
 
     PartitionMeta(long indexId, long version) {
         this.indexId = indexId
@@ -219,6 +231,19 @@ class SyncerContext {
                 } else if (srcTabletMeta.size() != tarTabletMeta.size()) {
                     return false
                 }
+
+                Iterator srcTabletIter = srcTabletMeta.iterator()
+                Iterator tarTabletIter = tarTabletMeta.iterator()
+                while (srcTabletIter.hasNext()) {
+                    Map srcReplicaMap = srcTabletIter.next().value.replicas
+                    Map tarReplicaMap = tarTabletIter.next().value.replicas
+
+                    if (srcReplicaMap.isEmpty() || tarReplicaMap.isEmpty()) {
+                        return false
+                    } else if (srcReplicaMap.size() != tarReplicaMap.size()) {
+                        return false
+                    }
+                }
             }
         })
 
diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy 
b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
index 233d57aa7b3..e07529718ee 100644
--- a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
@@ -27,16 +27,16 @@ suite("test_mow_ingest_binlog") {
     def test_num = 0
     sql "DROP TABLE IF EXISTS ${tableName}"
     sql """
-           CREATE TABLE if NOT EXISTS ${tableName} 
+           CREATE TABLE if NOT EXISTS ${tableName}
            (
                `test` INT,
                `id` INT
            )
            ENGINE=OLAP
            UNIQUE KEY(`test`, `id`)
-           DISTRIBUTED BY HASH(id) BUCKETS 1 
-           PROPERTIES ( 
-                "enable_unique_key_merge_on_write" = "true",
+           DISTRIBUTED BY HASH(id) BUCKETS 1
+           PROPERTIES (
+               "enable_unique_key_merge_on_write" = "true",
                "replication_allocation" = "tag.location.default: 1"
            )
         """
@@ -44,19 +44,19 @@ suite("test_mow_ingest_binlog") {
 
     target_sql "DROP TABLE IF EXISTS ${tableName}"
     target_sql """
-                  CREATE TABLE if NOT EXISTS ${tableName} 
-                  (
-                      `test` INT,
-                      `id` INT
-                  )
-                  ENGINE=OLAP
-                  UNIQUE KEY(`test`, `id`)
-                  DISTRIBUTED BY HASH(id) BUCKETS 1 
-                  PROPERTIES ( 
-                        "enable_unique_key_merge_on_write" = "true",
-                      "replication_allocation" = "tag.location.default: 1"
-                  )
-              """
+          CREATE TABLE if NOT EXISTS ${tableName}
+          (
+              `test` INT,
+              `id` INT
+          )
+          ENGINE=OLAP
+          UNIQUE KEY(`test`, `id`)
+          DISTRIBUTED BY HASH(id) BUCKETS 1
+          PROPERTIES (
+               "enable_unique_key_merge_on_write" = "true",
+              "replication_allocation" = "tag.location.default: 1"
+          )
+        """
     assertTrue(syncer.getTargetMeta("${tableName}"))
 
 
@@ -124,4 +124,4 @@ suite("test_mow_ingest_binlog") {
 
     // End Test 2
     syncer.closeBackendClients()
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to