This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ced6e165ee4 branch-3.1: [fix](load) Fix the idempotence of the
createPartition RPC #57198 (#59179)
ced6e165ee4 is described below
commit ced6e165ee46433857812ab8fc237ef6427b50f6
Author: Refrain <[email protected]>
AuthorDate: Mon Dec 22 14:04:06 2025 +0800
branch-3.1: [fix](load) Fix the idempotence of the createPartition RPC
#57198 (#59179)
picked from #57198
---
be/src/vec/sink/vrow_distribution.cpp | 5 +
.../transaction/CloudGlobalTransactionMgr.java | 9 ++
.../apache/doris/service/FrontendServiceImpl.java | 63 ++++++++++-
.../transaction/AutoPartitionCacheManager.java | 121 +++++++++++++++++++++
.../doris/transaction/GlobalTransactionMgr.java | 10 ++
.../transaction/GlobalTransactionMgrIface.java | 2 +
gensrc/thrift/FrontendService.thrift | 2 +
.../test_create_partition_idempotence.groovy | 94 ++++++++++++++++
.../test_auto_partition_idempotence.groovy | 89 +++++++++++++++
9 files changed, 390 insertions(+), 5 deletions(-)
diff --git a/be/src/vec/sink/vrow_distribution.cpp
b/be/src/vec/sink/vrow_distribution.cpp
index 1f0bdd96b85..c5a912ae14c 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -29,6 +29,7 @@
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
@@ -101,6 +102,10 @@ Status VRowDistribution::automatic_create_partition() {
request.__set_partitionValues(_partitions_need_create);
request.__set_be_endpoint(be_endpoint);
request.__set_write_single_replica(_write_single_replica);
+ if (_state && _state->get_query_ctx()) {
+ // Pass query_id to FE so it can determine if this is a multi-instance
load by checking Coordinator
+ request.__set_query_id(_state->get_query_ctx()->query_id());
+ }
VLOG_NOTICE << "automatic partition rpc begin request " << request;
TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 98ea8b211d8..c19472b6949 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -105,6 +105,7 @@ import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
+import org.apache.doris.transaction.AutoPartitionCacheManager;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.SubTransactionState;
@@ -207,6 +208,12 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
// dbId -> txnId -> signature
private Map<Long, Map<Long, Long>> txnLastSignatureMap =
Maps.newConcurrentMap();
+ private final AutoPartitionCacheManager autoPartitionCacheManager = new
AutoPartitionCacheManager();
+
+ public AutoPartitionCacheManager getAutoPartitionCacheMgr() {
+ return autoPartitionCacheManager;
+ }
+
public CloudGlobalTransactionMgr() {
this.callbackFactory = new TxnStateCallbackFactory();
}
@@ -1503,6 +1510,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
List<Table> tablesToUnlock = getTablesNeedCommitLock(tableList);
decreaseWaitingLockCount(tablesToUnlock);
MetaLockUtils.commitUnlockTables(tablesToUnlock);
+ autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
}
@Override
@@ -1564,6 +1572,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public void abortTransaction(Long dbId, Long transactionId, String reason)
throws UserException {
cleanSubTransactions(transactionId);
+ autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
abortTransaction(dbId, transactionId, reason, null, null);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d2bf0095968..134ded342f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3667,6 +3667,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
public TCreatePartitionResult createPartition(TCreatePartitionRequest
request) throws TException {
LOG.info("Receive create partition request: {}", request);
long dbId = request.getDbId();
+ long txnId = request.getTxnId();
long tableId = request.getTableId();
TCreatePartitionResult result = new TCreatePartitionResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
@@ -3709,6 +3710,29 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ // Cache tablet location only when needed:
+ // 1. From a requirement perspective: Only multi-instance ingestion
may trigger inconsistent replica
+ // distribution issues due to concurrent createPartition RPCs.
+ // 2. From a necessity perspective: For BE-initiated loads (e.g.,
stream load commit/abort from BE),
+ // if a BE crashes, the cache for the related transaction may
remain in memory and cannot be cleaned up.
+ // So we skip caching for them.
+ boolean needUseCache = false;
+ if (request.isSetQueryId()) {
+ Coordinator coordinator =
QeProcessorImpl.INSTANCE.getCoordinator(request.getQueryId());
+ if (coordinator != null) {
+ // For single-instance imports (like stream load from FE), we
don't need cache either
+ // Only multi-instance imports need to ensure consistent
tablet replica information
+ // Coordinator may be null for stream load or other
BE-initiated loads
+ Map<String, Integer> beToInstancesNum =
coordinator.getBeToInstancesNum();
+ int instanceNum = beToInstancesNum.values().stream()
+ .mapToInt(Integer::intValue)
+ .sum();
+ if (instanceNum > 1) {
+ needUseCache = true;
+ }
+ }
+ }
+
OlapTable olapTable = (OlapTable) table;
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
ArrayList<List<TNullableStringLiteral>> partitionValues = new
ArrayList<>();
@@ -3763,11 +3787,15 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
// build partition & tablets
- List<TOlapTablePartition> partitions = Lists.newArrayList();
- List<TTabletLocation> tablets = Lists.newArrayList();
+ List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
+ List<TOlapTablePartition> partitions = Lists.newArrayList();
for (String partitionName : addPartitionClauseMap.keySet()) {
Partition partition = table.getPartition(partitionName);
+ // For thread safety, we preserve the tablet distribution
information of each partition
+ // before calling getOrSetAutoPartitionInfo, but not check the
partition first
+ List<TTabletLocation> partitionTablets = new ArrayList<>();
+ List<TTabletLocation> partitionSlaveTablets = new ArrayList<>();
TOlapTablePartition tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());
int partColNum = partitionInfo.getPartitionColumns().size();
@@ -3787,6 +3815,18 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
partitions.add(tPartition);
// tablet
+ if (needUseCache
+ &&
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+ .getAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
+ partitionSlaveTablets)) {
+ // fast path, if cached
+ tablets.addAll(partitionTablets);
+ slaveTablets.addAll(partitionSlaveTablets);
+ LOG.debug("Fast path: use cached auto partition info, txnId:
{}, partitionId: {}, "
+ + "tablets: {}, slaveTablets: {}", txnId,
partition.getId(),
+ partitionTablets.size(), partitionSlaveTablets.size());
+ continue;
+ }
int quorum =
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2
+ 1;
for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -3817,15 +3857,28 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
Long masterNode = nodes[random.nextInt(nodes.length)];
Multimap<Long, Long> slaveBePathsMap = bePathsMap;
slaveBePathsMap.removeAll(masterNode);
- tablets.add(new TTabletLocation(tablet.getId(),
+ partitionTablets.add(new
TTabletLocation(tablet.getId(),
Lists.newArrayList(Sets.newHashSet(masterNode))));
- slaveTablets.add(new TTabletLocation(tablet.getId(),
+ partitionSlaveTablets.add(new
TTabletLocation(tablet.getId(),
Lists.newArrayList(slaveBePathsMap.keySet())));
} else {
- tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(bePathsMap.keySet())));
+ partitionTablets.add(new
TTabletLocation(tablet.getId(),
+ Lists.newArrayList(bePathsMap.keySet())));
}
}
}
+
+ if (needUseCache) {
+ Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+ .getOrSetAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
+ partitionSlaveTablets);
+ LOG.debug("Cache auto partition info, txnId: {}, partitionId:
{}, "
+ + "tablets: {}, slaveTablets: {}", txnId,
partition.getId(),
+ partitionTablets.size(), partitionSlaveTablets.size());
+ }
+
+ tablets.addAll(partitionTablets);
+ slaveTablets.addAll(partitionSlaveTablets);
}
result.setPartitions(partitions);
result.setTablets(tablets);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
new file mode 100644
index 00000000000..f24d2f45b33
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/*
+ ** this class AutoPartitionCacheManager is used for solve the follow
question :
+ **
+ * RPC [P1, P2] RPC [P2, P3]
+ * | |
+ * P1:t1, t2 |
+ * ↓ |
+ * P2:t3, t4 |
+ * ↓
+ * P2:exist
+ * ↓
+ * P3:t5,t6
+ * --------------------------------------
+ * tablet rebalance during ...
+ * t1 - be1 t3 - be1 <-
+ * t2 - be2 t4 - be1
+ * t3 - be2 <- t5 - be2
+ * t4 - be1 t6 - be2
+ * --------------------------------------
+ * We ensure that only one view of the replica distribution in P2:t3,t4
above takes effect for this txn
+ * to avoid tablets being written to multiple instances within the same
transaction (assuming single replica)
+*/
+
+// AutoPartitionCacheManager is used to manage the cache of auto partition
info.
+// To distinguish the idempotence of the createPartition RPC during
incremental partition creation
+// for automatic partitioned tables, cache tablet locations per partition.
+public class AutoPartitionCacheManager {
+ private static final Logger LOG =
LogManager.getLogger(AutoPartitionCacheManager.class);
+
+ public static class PartitionTabletCache {
+ public final List<TTabletLocation> tablets;
+ public final List<TTabletLocation> slaveTablets;
+
+ public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets) {
+ this.tablets = tablets;
+ this.slaveTablets = slaveTablets;
+ }
+ }
+
+ // txnId -> partitionId -> PartitionTabletCache
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<Long,
PartitionTabletCache>> autoPartitionInfo =
+ new ConcurrentHashMap<>();
+
+ // return true if cached, else false, this function only read cache
+ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.get(txnId);
+ if (partitionMap == null) {
+ return false;
+ }
+
+ PartitionTabletCache cached = partitionMap.get(partitionId);
+ if (cached == null) {
+ return false;
+ }
+
+ partitionTablets.clear();
+ partitionTablets.addAll(cached.tablets);
+ partitionSlaveTablets.clear();
+ partitionSlaveTablets.addAll(cached.slaveTablets);
+ return true;
+ }
+
+ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
+ autoPartitionInfo.computeIfAbsent(txnId, k -> new
ConcurrentHashMap<>());
+
+ final AtomicBoolean needUpdate = new AtomicBoolean(false);
+ PartitionTabletCache cached =
partitionMap.computeIfAbsent(partitionId, k -> {
+ needUpdate.set(true);
+ return new PartitionTabletCache(
+ new ArrayList<>(partitionTablets),
+ new ArrayList<>(partitionSlaveTablets)
+ );
+ });
+
+ if (!needUpdate.get()) {
+ partitionTablets.clear();
+ partitionSlaveTablets.clear();
+ partitionTablets.addAll(cached.tablets);
+ partitionSlaveTablets.addAll(cached.slaveTablets);
+ LOG.debug("Get cached auto partition info from cache, txnId: {},
partitionId: {}, "
+ + "tablets: {}, slaveTablets: {}", txnId, partitionId,
+ cached.tablets.size(), cached.slaveTablets.size());
+ }
+ }
+
+ public void clearAutoPartitionInfo(Long txnId) {
+ autoPartitionInfo.remove(txnId);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 995f43a3330..614448891a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -87,11 +87,18 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
private Env env;
+ private final AutoPartitionCacheManager autoPartitionCacheManager;
+
public GlobalTransactionMgr(Env env) {
this.env = env;
this.dbIdToDatabaseTransactionMgrs = Maps.newConcurrentMap();
this.idGenerator = new TransactionIdGenerator();
this.callbackFactory = new TxnStateCallbackFactory();
+ this.autoPartitionCacheManager = new AutoPartitionCacheManager();
+ }
+
+ public AutoPartitionCacheManager getAutoPartitionCacheMgr() {
+ return autoPartitionCacheManager;
}
@Override
@@ -395,6 +402,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
dbTransactionMgr.abortTransaction(txnId, reason,
txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
+ autoPartitionCacheManager.clearAutoPartitionInfo(txnId);
}
}
@@ -419,6 +427,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
dbTransactionMgr.abortTransaction2PC(transactionId);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
+ autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
}
}
@@ -529,6 +538,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
Map<Long, Set<Long>> backendPartitions) throws UserException {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
dbTransactionMgr.finishTransaction(transactionId,
partitionVisibleVersions, backendPartitions);
+ autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index 8f3e4d682f6..d56193fe683 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -220,4 +220,6 @@ public interface GlobalTransactionMgrIface extends Writable
{
long dbId, List<Long> tableIdList) throws UserException;
public int getQueueLength();
+
+ public AutoPartitionCacheManager getAutoPartitionCacheMgr();
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index bdd18eb2646..ae2a22297a9 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1563,6 +1563,8 @@ struct TCreatePartitionRequest {
// be_endpoint = <ip>:<heartbeat_port> to distinguish a particular BE
5: optional string be_endpoint
6: optional bool write_single_replica = false
+ // query_id to identify the coordinator, if coordinator exists, it means
this is a multi-instance load
+ 7: optional Types.TUniqueId query_id
}
struct TCreatePartitionResult {
diff --git
a/regression-test/suites/cloud_p0/tablets/test_create_partition_idempotence.groovy
b/regression-test/suites/cloud_p0/tablets/test_create_partition_idempotence.groovy
new file mode 100644
index 00000000000..494754f657b
--- /dev/null
+++
b/regression-test/suites/cloud_p0/tablets/test_create_partition_idempotence.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+/*
+Test Description:
+ We create a two-tablet table with skewed data, where one tablet has 1 row
and the other tablet has 20,000 rows.
+ By running "insert into ... select ..." we trigger two instances, which
will send the same create-partition RPC to the FE.
+
+ Enabling MockRebalance will count how many times createPartition is
called. On the second time this RPC is received,
+ it will deliberately return a different tablet distribution to check
whether the tablet distribution cache is working.
+*/
+
+
+suite('test_create_partition_idempotence', 'docker') {
+ if (!isCloudMode()) {
+ logger.info("Skip test_create_partition_idempotence, only run in cloud
mode")
+ return
+ }
+
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'enable_debug_points = true',
+ ]
+ options.cloudMode = true
+ options.beNum = 3
+
+ docker(options) {
+ def sourceTable = "test_partition_source_table"
+ sql "DROP TABLE IF EXISTS ${sourceTable}"
+ sql """
+ CREATE TABLE ${sourceTable} (
+ `date` DATE NOT NULL,
+ `id` INT,
+ `value` VARCHAR(100)
+ )
+ DISTRIBUTED BY HASH(`date`) BUCKETS 2
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """ INSERT INTO ${sourceTable} VALUES ("2025-11-04", 1, "test1");
"""
+ sql """ INSERT INTO ${sourceTable} SELECT "2025-11-04", number, "test"
FROM numbers("number" = "20000"); """
+
+ def tableName = "test_partition_idempotence_table"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ `date` DATE NOT NULL,
+ `id` INT,
+ `value` VARCHAR(100)
+ )
+ AUTO PARTITION BY RANGE (date_trunc(`date`, 'day')) ()
+ DISTRIBUTED BY HASH(id) BUCKETS 10
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """ set parallel_pipeline_task_num = 2 """
+ sql """ set load_stream_per_node = 2 """
+
+
GetDebugPoint().enableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+
+ sql """ INSERT INTO ${tableName} SELECT * FROM ${sourceTable}; """
+
+
GetDebugPoint().disableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+
+ def result = sql "SELECT count(DISTINCT `date`) FROM ${tableName}"
+ logger.info("Distinct date count: ${result}")
+ assertEquals(1, result[0][0])
+
+ def count = sql "SELECT count(*) FROM ${tableName}"
+ logger.info("Total row count: ${count}")
+ assertEquals(20001, count[0][0])
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_idempotence.groovy
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_idempotence.groovy
new file mode 100644
index 00000000000..971998f057b
--- /dev/null
+++
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_idempotence.groovy
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+/*
+Test Description:
+ We create a two-tablet table with skewed data, where one tablet has 1 row
and the other tablet has 20,000 rows.
+ By running "insert into ... select ..." we trigger two instances, which
will send the same create-partition RPC to the FE.
+
+ Enabling MockRebalance will count how many times createPartition is
called. On the second time this RPC is received,
+ it will deliberately return a different tablet distribution to check
whether the tablet distribution cache is working.
+*/
+
+
+suite('test_create_partition_idempotence_non_cloud', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'enable_debug_points = true',
+ ]
+ options.cloudMode = false
+ options.beNum = 3
+
+ docker(options) {
+ def sourceTable = "test_partition_source_table"
+ sql "DROP TABLE IF EXISTS ${sourceTable}"
+ sql """
+ CREATE TABLE ${sourceTable} (
+ `date` DATE NOT NULL,
+ `id` INT,
+ `value` VARCHAR(100)
+ )
+ DISTRIBUTED BY HASH(`date`) BUCKETS 2
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """ INSERT INTO ${sourceTable} VALUES ("2025-11-04", 1, "test1");
"""
+ sql """ INSERT INTO ${sourceTable} SELECT "2025-11-04", number, "test"
FROM numbers("number" = "20000"); """
+
+ def tableName = "test_partition_idempotence_table"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ `date` DATE NOT NULL,
+ `id` INT,
+ `value` VARCHAR(100)
+ )
+ AUTO PARTITION BY RANGE (date_trunc(`date`, 'day')) ()
+ DISTRIBUTED BY HASH(id) BUCKETS 10
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """ set parallel_pipeline_task_num = 2 """
+ sql """ set load_stream_per_node = 2 """
+
+
GetDebugPoint().enableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+
+ sql """ INSERT INTO ${tableName} SELECT * FROM ${sourceTable}; """
+
+
GetDebugPoint().disableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+
+ def result = sql "SELECT count(DISTINCT `date`) FROM ${tableName}"
+ logger.info("Distinct date count: ${result}")
+ assertEquals(1, result[0][0])
+
+ def count = sql "SELECT count(*) FROM ${tableName}"
+ logger.info("Total row count: ${count}")
+ assertEquals(20001, count[0][0])
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]