This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 94a6eacd9df branch-3.0: [fix](suites) Fix syncer ingest binlog with multiple replicas #44444 (#44486) 94a6eacd9df is described below commit 94a6eacd9df00c75d384454214ba7dcd2f0d654f Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Nov 22 20:07:09 2024 +0800 branch-3.0: [fix](suites) Fix syncer ingest binlog with multiple replicas #44444 (#44486) Cherry-picked from #44444 Co-authored-by: walter <maoch...@selectdb.com> --- .../apache/doris/regression/suite/Syncer.groovy | 95 ++++++++++++---------- .../doris/regression/suite/SyncerContext.groovy | 27 +++++- .../ccr_mow_syncer_p0/test_ingest_binlog.groovy | 36 ++++---- 3 files changed, 98 insertions(+), 60 deletions(-) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy index 894b42824ef..2195e7e745a 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy @@ -700,8 +700,13 @@ class Syncer { // step 3.2: get partition/indexId/tabletId partitionSQl += "/" + meta.indexId.toString() sqlInfo = sendSql.call(partitionSQl, toSrc) + Map<Long, Long> replicaMap = Maps.newHashMap() for (List<Object> row : sqlInfo) { - meta.tabletMeta.put(row[0] as Long, row[2] as Long) + Long tabletId = row[0] as Long + if (!meta.tabletMeta.containsKey(tabletId)) { + meta.tabletMeta.put(tabletId, new TabletMeta()) + } + meta.tabletMeta[tabletId].replicas.put(row[1] as Long, row[2] as Long) } if (meta.tabletMeta.isEmpty()) { logger.error("Target cluster get (partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.") @@ -816,49 +821,57 @@ class Syncer { while (srcTabletIter.hasNext()) { Entry srcTabletMap = srcTabletIter.next() Entry tarTabletMap = tarTabletIter.next() + TabletMeta srcTabletMeta = srcTabletMap.value + TabletMeta tarTabletMeta = tarTabletMap.value + + Iterator srcReplicaIter = srcTabletMeta.replicas.iterator() + Iterator tarReplicaIter = tarTabletMeta.replicas.iterator() + while (srcReplicaIter.hasNext()) { + Entry srcReplicaMap = srcReplicaIter.next() + Entry tarReplicaMap = tarReplicaIter.next() + BackendClientImpl srcClient = context.sourceBackendClients.get(srcReplicaMap.value) + if (srcClient == null) { + logger.error("Can't find src tabletId-${srcReplicaMap.key} -> beId-${srcReplicaMap.value}") + return false + } + BackendClientImpl tarClient = context.targetBackendClients.get(tarReplicaMap.value) + if (tarClient == null) { + logger.error("Can't find target tabletId-${tarReplicaMap.key} -> beId-${tarReplicaMap.value}") + return false + } - BackendClientImpl srcClient = context.sourceBackendClients.get(srcTabletMap.value) - if (srcClient == null) { - logger.error("Can't find src tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}") - return false - } - BackendClientImpl tarClient = context.targetBackendClients.get(tarTabletMap.value) - if (tarClient == null) { - logger.error("Can't find target tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}") - return false - } - - tarPartition.value.version = srcPartition.value.version - long partitionId = fakePartitionId == -1 ? tarPartition.key : fakePartitionId - long version = fakeVersion == -1 ? partitionRecord.version : fakeVersion - - TIngestBinlogRequest request = new TIngestBinlogRequest() - TUniqueId uid = new TUniqueId(-1, -1) - request.setTxnId(txnId) - request.setRemoteTabletId(srcTabletMap.key) - request.setBinlogVersion(version) - request.setRemoteHost(srcClient.address.hostname) - request.setRemotePort(srcClient.httpPort.toString()) - request.setPartitionId(partitionId) - request.setLocalTabletId(tarTabletMap.key) - request.setLoadId(uid) - logger.info("request -> ${request}") - TIngestBinlogResult result = tarClient.client.ingestBinlog(request) - if (!checkIngestBinlog(result)) { - logger.error("Ingest binlog error! result: ${result}") - return false - } + tarPartition.value.version = srcPartition.value.version + long partitionId = fakePartitionId == -1 ? tarPartition.key : fakePartitionId + long version = fakeVersion == -1 ? partitionRecord.version : fakeVersion + + TIngestBinlogRequest request = new TIngestBinlogRequest() + TUniqueId uid = new TUniqueId(-1, -1) + request.setTxnId(txnId) + request.setRemoteTabletId(srcTabletMap.key) + request.setBinlogVersion(version) + request.setRemoteHost(srcClient.address.hostname) + request.setRemotePort(srcClient.httpPort.toString()) + request.setPartitionId(partitionId) + request.setLocalTabletId(tarTabletMap.key) + request.setLoadId(uid) + logger.info("request -> ${request}") + TIngestBinlogResult result = tarClient.client.ingestBinlog(request) + if (!checkIngestBinlog(result)) { + logger.error("Ingest binlog error! result: ${result}") + return false + } - if (context.txnInsert) { - List<TTabletCommitInfo> tabletCommitInfos = subTxnIdToTabletCommitInfos.get(txnId) - if (tabletCommitInfos == null) { - tabletCommitInfos = new ArrayList<TTabletCommitInfo>() - subTxnIdToTabletCommitInfos.put(txnId, tabletCommitInfos) - subTxnIdToTableId.put(txnId, tarTableMeta.id) + if (context.txnInsert) { + List<TTabletCommitInfo> tabletCommitInfos = subTxnIdToTabletCommitInfos.get(txnId) + if (tabletCommitInfos == null) { + tabletCommitInfos = new ArrayList<TTabletCommitInfo>() + subTxnIdToTabletCommitInfos.put(txnId, tabletCommitInfos) + subTxnIdToTableId.put(txnId, tarTableMeta.id) + } + tabletCommitInfos.add(new TTabletCommitInfo(tarTabletMap.key, tarReplicaMap.value)) + } else { + addCommitInfo(tarTabletMap.key, tarReplicaMap.value) } - tabletCommitInfos.add(new TTabletCommitInfo(tarTabletMap.key, tarTabletMap.value)) - } else { - addCommitInfo(tarTabletMap.key, tarTabletMap.value) } } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy index b86f012aa87..3202db4011f 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy @@ -32,10 +32,22 @@ import groovy.util.logging.Slf4j import java.sql.Connection +class TabletMeta { + public TreeMap<Long, Long> replicas + + TabletMeta() { + this.replicas = new TreeMap<Long, Long>() + } + + String toString() { + return "TabletMeta: { replicas: " + replicas.toString() + " }" + } +} + class PartitionMeta { public long version public long indexId - public TreeMap<Long, Long> tabletMeta + public TreeMap<Long, TabletMeta> tabletMeta PartitionMeta(long indexId, long version) { this.indexId = indexId @@ -219,6 +231,19 @@ class SyncerContext { } else if (srcTabletMeta.size() != tarTabletMeta.size()) { return false } + + Iterator srcTabletIter = srcTabletMeta.iterator() + Iterator tarTabletIter = tarTabletMeta.iterator() + while (srcTabletIter.hasNext()) { + Map srcReplicaMap = srcTabletIter.next().value.replicas + Map tarReplicaMap = tarTabletIter.next().value.replicas + + if (srcReplicaMap.isEmpty() || tarReplicaMap.isEmpty()) { + return false + } else if (srcReplicaMap.size() != tarReplicaMap.size()) { + return false + } + } } }) diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy index 233d57aa7b3..e07529718ee 100644 --- a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy +++ b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy @@ -27,16 +27,16 @@ suite("test_mow_ingest_binlog") { def test_num = 0 sql "DROP TABLE IF EXISTS ${tableName}" sql """ - CREATE TABLE if NOT EXISTS ${tableName} + CREATE TABLE if NOT EXISTS ${tableName} ( `test` INT, `id` INT ) ENGINE=OLAP UNIQUE KEY(`test`, `id`) - DISTRIBUTED BY HASH(id) BUCKETS 1 - PROPERTIES ( - "enable_unique_key_merge_on_write" = "true", + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", "replication_allocation" = "tag.location.default: 1" ) """ @@ -44,19 +44,19 @@ suite("test_mow_ingest_binlog") { target_sql "DROP TABLE IF EXISTS ${tableName}" target_sql """ - CREATE TABLE if NOT EXISTS ${tableName} - ( - `test` INT, - `id` INT - ) - ENGINE=OLAP - UNIQUE KEY(`test`, `id`) - DISTRIBUTED BY HASH(id) BUCKETS 1 - PROPERTIES ( - "enable_unique_key_merge_on_write" = "true", - "replication_allocation" = "tag.location.default: 1" - ) - """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ assertTrue(syncer.getTargetMeta("${tableName}")) @@ -124,4 +124,4 @@ suite("test_mow_ingest_binlog") { // End Test 2 syncer.closeBackendClients() -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org