This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ced6e165ee4 branch-3.1: [fix](load) Fix the idempotence of the 
createPartition RPC #57198 (#59179)
ced6e165ee4 is described below

commit ced6e165ee46433857812ab8fc237ef6427b50f6
Author: Refrain <[email protected]>
AuthorDate: Mon Dec 22 14:04:06 2025 +0800

    branch-3.1: [fix](load) Fix the idempotence of the createPartition RPC 
#57198 (#59179)
    
    picked from #57198
---
 be/src/vec/sink/vrow_distribution.cpp              |   5 +
 .../transaction/CloudGlobalTransactionMgr.java     |   9 ++
 .../apache/doris/service/FrontendServiceImpl.java  |  63 ++++++++++-
 .../transaction/AutoPartitionCacheManager.java     | 121 +++++++++++++++++++++
 .../doris/transaction/GlobalTransactionMgr.java    |  10 ++
 .../transaction/GlobalTransactionMgrIface.java     |   2 +
 gensrc/thrift/FrontendService.thrift               |   2 +
 .../test_create_partition_idempotence.groovy       |  94 ++++++++++++++++
 .../test_auto_partition_idempotence.groovy         |  89 +++++++++++++++
 9 files changed, 390 insertions(+), 5 deletions(-)

diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index 1f0bdd96b85..c5a912ae14c 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -29,6 +29,7 @@
 #include "common/status.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "service/backend_options.h"
 #include "util/doris_metrics.h"
@@ -101,6 +102,10 @@ Status VRowDistribution::automatic_create_partition() {
     request.__set_partitionValues(_partitions_need_create);
     request.__set_be_endpoint(be_endpoint);
     request.__set_write_single_replica(_write_single_replica);
+    if (_state && _state->get_query_ctx()) {
+        // Pass query_id to FE so it can determine if this is a multi-instance 
load by checking Coordinator
+        request.__set_query_id(_state->get_query_ctx()->query_id());
+    }
 
     VLOG_NOTICE << "automatic partition rpc begin request " << request;
     TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 98ea8b211d8..c19472b6949 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -105,6 +105,7 @@ import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusResult;
+import org.apache.doris.transaction.AutoPartitionCacheManager;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.GlobalTransactionMgrIface;
 import org.apache.doris.transaction.SubTransactionState;
@@ -207,6 +208,12 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     // dbId -> txnId -> signature
     private Map<Long, Map<Long, Long>> txnLastSignatureMap = 
Maps.newConcurrentMap();
 
+    private final AutoPartitionCacheManager autoPartitionCacheManager = new 
AutoPartitionCacheManager();
+
+    public AutoPartitionCacheManager getAutoPartitionCacheMgr() {
+        return autoPartitionCacheManager;
+    }
+
     public CloudGlobalTransactionMgr() {
         this.callbackFactory = new TxnStateCallbackFactory();
     }
@@ -1503,6 +1510,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         List<Table> tablesToUnlock = getTablesNeedCommitLock(tableList);
         decreaseWaitingLockCount(tablesToUnlock);
         MetaLockUtils.commitUnlockTables(tablesToUnlock);
+        autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
     }
 
     @Override
@@ -1564,6 +1572,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     @Override
     public void abortTransaction(Long dbId, Long transactionId, String reason) 
throws UserException {
         cleanSubTransactions(transactionId);
+        autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
         abortTransaction(dbId, transactionId, reason, null, null);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d2bf0095968..134ded342f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3667,6 +3667,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     public TCreatePartitionResult createPartition(TCreatePartitionRequest 
request) throws TException {
         LOG.info("Receive create partition request: {}", request);
         long dbId = request.getDbId();
+        long txnId = request.getTxnId();
         long tableId = request.getTableId();
         TCreatePartitionResult result = new TCreatePartitionResult();
         TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
@@ -3709,6 +3710,29 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return result;
         }
 
+        // Cache tablet location only when needed:
+        // 1. From a requirement perspective: Only multi-instance ingestion 
may trigger inconsistent replica
+        //    distribution issues due to concurrent createPartition RPCs.
+        // 2. From a necessity perspective: For BE-initiated loads (e.g., 
stream load commit/abort from BE),
+        //    if a BE crashes, the cache for the related transaction may 
remain in memory and cannot be cleaned up.
+        //    So we skip caching for them.
+        boolean needUseCache = false;
+        if (request.isSetQueryId()) {
+            Coordinator coordinator = 
QeProcessorImpl.INSTANCE.getCoordinator(request.getQueryId());
+            if (coordinator != null) {
+                // For single-instance imports (like stream load from FE), we 
don't need cache either
+                // Only multi-instance imports need to ensure consistent 
tablet replica information
+                // Coordinator may be null for stream load or other 
BE-initiated loads
+                Map<String, Integer> beToInstancesNum = 
coordinator.getBeToInstancesNum();
+                int instanceNum = beToInstancesNum.values().stream()
+                        .mapToInt(Integer::intValue)
+                        .sum();
+                if (instanceNum > 1) {
+                    needUseCache = true;
+                }
+            }
+        }
+
         OlapTable olapTable = (OlapTable) table;
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
         ArrayList<List<TNullableStringLiteral>> partitionValues = new 
ArrayList<>();
@@ -3763,11 +3787,15 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
 
         // build partition & tablets
-        List<TOlapTablePartition> partitions = Lists.newArrayList();
-        List<TTabletLocation> tablets = Lists.newArrayList();
+        List<TTabletLocation> tablets = new ArrayList<>();
         List<TTabletLocation> slaveTablets = new ArrayList<>();
+        List<TOlapTablePartition> partitions = Lists.newArrayList();
         for (String partitionName : addPartitionClauseMap.keySet()) {
             Partition partition = table.getPartition(partitionName);
+            // For thread safety, we preserve the tablet distribution 
information of each partition
+            // before calling getOrSetAutoPartitionInfo, but not check the 
partition first
+            List<TTabletLocation> partitionTablets = new ArrayList<>();
+            List<TTabletLocation> partitionSlaveTablets = new ArrayList<>();
             TOlapTablePartition tPartition = new TOlapTablePartition();
             tPartition.setId(partition.getId());
             int partColNum = partitionInfo.getPartitionColumns().size();
@@ -3787,6 +3815,18 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
             partitions.add(tPartition);
             // tablet
+            if (needUseCache
+                    && 
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+                            .getAutoPartitionInfo(txnId, partition.getId(), 
partitionTablets,
+                                    partitionSlaveTablets)) {
+                // fast path, if cached
+                tablets.addAll(partitionTablets);
+                slaveTablets.addAll(partitionSlaveTablets);
+                LOG.debug("Fast path: use cached auto partition info, txnId: 
{}, partitionId: {}, "
+                        + "tablets: {}, slaveTablets: {}", txnId, 
partition.getId(),
+                        partitionTablets.size(), partitionSlaveTablets.size());
+                continue;
+            }
             int quorum = 
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2
                     + 1;
             for (MaterializedIndex index : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -3817,15 +3857,28 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                         Long masterNode = nodes[random.nextInt(nodes.length)];
                         Multimap<Long, Long> slaveBePathsMap = bePathsMap;
                         slaveBePathsMap.removeAll(masterNode);
-                        tablets.add(new TTabletLocation(tablet.getId(),
+                        partitionTablets.add(new 
TTabletLocation(tablet.getId(),
                                 
Lists.newArrayList(Sets.newHashSet(masterNode))));
-                        slaveTablets.add(new TTabletLocation(tablet.getId(),
+                        partitionSlaveTablets.add(new 
TTabletLocation(tablet.getId(),
                                 Lists.newArrayList(slaveBePathsMap.keySet())));
                     } else {
-                        tablets.add(new TTabletLocation(tablet.getId(), 
Lists.newArrayList(bePathsMap.keySet())));
+                        partitionTablets.add(new 
TTabletLocation(tablet.getId(),
+                                Lists.newArrayList(bePathsMap.keySet())));
                     }
                 }
             }
+
+            if (needUseCache) {
+                Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+                        .getOrSetAutoPartitionInfo(txnId, partition.getId(), 
partitionTablets,
+                                partitionSlaveTablets);
+                LOG.debug("Cache auto partition info, txnId: {}, partitionId: 
{}, "
+                        + "tablets: {}, slaveTablets: {}", txnId, 
partition.getId(),
+                        partitionTablets.size(), partitionSlaveTablets.size());
+            }
+
+            tablets.addAll(partitionTablets);
+            slaveTablets.addAll(partitionSlaveTablets);
         }
         result.setPartitions(partitions);
         result.setTablets(tablets);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
new file mode 100644
index 00000000000..f24d2f45b33
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/*
+    ** this class AutoPartitionCacheManager is used for solve the follow 
question :
+    **
+    * RPC [P1, P2]              RPC [P2, P3]
+    *       |                         |
+    *    P1:t1, t2                    |
+    *       ↓                         |
+    *    P2:t3, t4                    |
+    *                                 ↓
+    *                             P2:exist
+    *                                 ↓
+    *                             P3:t5,t6
+    * --------------------------------------
+    *       tablet rebalance during ...
+    *     t1 - be1                 t3 - be1 <-
+    *     t2 - be2                 t4 - be1
+    *     t3 - be2 <-              t5 - be2
+    *     t4 - be1                 t6 - be2
+    * --------------------------------------
+    * We ensure that only one view of the replica distribution in P2:t3,t4 
above takes effect for this txn
+    * to avoid tablets being written to multiple instances within the same 
transaction (assuming single replica)
+*/
+
+// AutoPartitionCacheManager is used to manage the cache of auto partition 
info.
+// To distinguish the idempotence of the createPartition RPC during 
incremental partition creation
+// for automatic partitioned tables, cache tablet locations per partition.
+public class AutoPartitionCacheManager {
+    private static final Logger LOG = 
LogManager.getLogger(AutoPartitionCacheManager.class);
+
+    public static class PartitionTabletCache {
+        public final List<TTabletLocation> tablets;
+        public final List<TTabletLocation> slaveTablets;
+
+        public PartitionTabletCache(List<TTabletLocation> tablets, 
List<TTabletLocation> slaveTablets) {
+            this.tablets = tablets;
+            this.slaveTablets = slaveTablets;
+        }
+    }
+
+    // txnId -> partitionId -> PartitionTabletCache
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<Long, 
PartitionTabletCache>> autoPartitionInfo =
+            new ConcurrentHashMap<>();
+
+    // return true if cached, else false, this function only read cache
+    public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+            List<TTabletLocation> partitionTablets, List<TTabletLocation> 
partitionSlaveTablets) {
+        ConcurrentHashMap<Long, PartitionTabletCache> partitionMap = 
autoPartitionInfo.get(txnId);
+        if (partitionMap == null) {
+            return false;
+        }
+
+        PartitionTabletCache cached = partitionMap.get(partitionId);
+        if (cached == null) {
+            return false;
+        }
+
+        partitionTablets.clear();
+        partitionTablets.addAll(cached.tablets);
+        partitionSlaveTablets.clear();
+        partitionSlaveTablets.addAll(cached.slaveTablets);
+        return true;
+    }
+
+    public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
+            List<TTabletLocation> partitionTablets, List<TTabletLocation> 
partitionSlaveTablets) {
+        ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
+                autoPartitionInfo.computeIfAbsent(txnId, k -> new 
ConcurrentHashMap<>());
+
+        final AtomicBoolean needUpdate = new AtomicBoolean(false);
+        PartitionTabletCache cached = 
partitionMap.computeIfAbsent(partitionId, k -> {
+            needUpdate.set(true);
+            return new PartitionTabletCache(
+                    new ArrayList<>(partitionTablets),
+                    new ArrayList<>(partitionSlaveTablets)
+            );
+        });
+
+        if (!needUpdate.get()) {
+            partitionTablets.clear();
+            partitionSlaveTablets.clear();
+            partitionTablets.addAll(cached.tablets);
+            partitionSlaveTablets.addAll(cached.slaveTablets);
+            LOG.debug("Get cached auto partition info from cache, txnId: {}, 
partitionId: {}, "
+                    + "tablets: {}, slaveTablets: {}", txnId, partitionId,
+                    cached.tablets.size(), cached.slaveTablets.size());
+        }
+    }
+
+    public void clearAutoPartitionInfo(Long txnId) {
+        autoPartitionInfo.remove(txnId);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 995f43a3330..614448891a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -87,11 +87,18 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
     private Env env;
 
+    private final AutoPartitionCacheManager autoPartitionCacheManager;
+
     public GlobalTransactionMgr(Env env) {
         this.env = env;
         this.dbIdToDatabaseTransactionMgrs = Maps.newConcurrentMap();
         this.idGenerator = new TransactionIdGenerator();
         this.callbackFactory = new TxnStateCallbackFactory();
+        this.autoPartitionCacheManager = new AutoPartitionCacheManager();
+    }
+
+    public AutoPartitionCacheManager getAutoPartitionCacheMgr() {
+        return autoPartitionCacheManager;
     }
 
     @Override
@@ -395,6 +402,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             dbTransactionMgr.abortTransaction(txnId, reason, 
txnCommitAttachment);
         } finally {
             MetaLockUtils.writeUnlockTables(tableList);
+            autoPartitionCacheManager.clearAutoPartitionInfo(txnId);
         }
     }
 
@@ -419,6 +427,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             dbTransactionMgr.abortTransaction2PC(transactionId);
         } finally {
             MetaLockUtils.writeUnlockTables(tableList);
+            autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
         }
     }
 
@@ -529,6 +538,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             Map<Long, Set<Long>> backendPartitions) throws UserException {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
         dbTransactionMgr.finishTransaction(transactionId, 
partitionVisibleVersions, backendPartitions);
+        autoPartitionCacheManager.clearAutoPartitionInfo(transactionId);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index 8f3e4d682f6..d56193fe683 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -220,4 +220,6 @@ public interface GlobalTransactionMgrIface extends Writable 
{
                 long dbId, List<Long> tableIdList) throws UserException;
 
     public int getQueueLength();
+
+    public AutoPartitionCacheManager getAutoPartitionCacheMgr();
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index bdd18eb2646..ae2a22297a9 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1563,6 +1563,8 @@ struct TCreatePartitionRequest {
     // be_endpoint = <ip>:<heartbeat_port> to distinguish a particular BE
     5: optional string be_endpoint
     6: optional bool write_single_replica = false
+    // query_id to identify the coordinator, if coordinator exists, it means 
this is a multi-instance load
+    7: optional Types.TUniqueId query_id
 }
 
 struct TCreatePartitionResult {
diff --git 
a/regression-test/suites/cloud_p0/tablets/test_create_partition_idempotence.groovy
 
b/regression-test/suites/cloud_p0/tablets/test_create_partition_idempotence.groovy
new file mode 100644
index 00000000000..494754f657b
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/tablets/test_create_partition_idempotence.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+/*
+Test Description:
+    We create a two-tablet table with skewed data, where one tablet has 1 row 
and the other tablet has 20,000 rows.
+    By running "insert into ... select ..." we trigger two instances, which 
will send the same create-partition RPC to the FE.
+
+    Enabling MockRebalance will count how many times createPartition is 
called. On the second time this RPC is received,
+    it will deliberately return a different tablet distribution to check 
whether the tablet distribution cache is working.
+*/
+
+
+suite('test_create_partition_idempotence', 'docker') {
+    if (!isCloudMode()) {
+        logger.info("Skip test_create_partition_idempotence, only run in cloud 
mode")
+        return
+    }
+
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'enable_debug_points = true',
+    ]
+    options.cloudMode = true
+    options.beNum = 3 
+
+    docker(options) {
+        def sourceTable = "test_partition_source_table"
+        sql "DROP TABLE IF EXISTS ${sourceTable}"
+        sql """
+            CREATE TABLE ${sourceTable} (
+                `date` DATE NOT NULL,
+                `id` INT,
+                `value` VARCHAR(100)
+            )
+            DISTRIBUTED BY HASH(`date`) BUCKETS 2
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+        """
+
+        sql """ INSERT INTO ${sourceTable} VALUES ("2025-11-04", 1, "test1"); 
"""
+        sql """ INSERT INTO ${sourceTable} SELECT "2025-11-04", number, "test" 
FROM numbers("number" = "20000"); """
+
+        def tableName = "test_partition_idempotence_table"
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql """
+            CREATE TABLE ${tableName} (
+                `date` DATE NOT NULL,
+                `id` INT,
+                `value` VARCHAR(100)
+            )
+            AUTO PARTITION BY RANGE (date_trunc(`date`, 'day')) ()
+            DISTRIBUTED BY HASH(id) BUCKETS 10
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+        """
+
+        sql """ set parallel_pipeline_task_num = 2 """
+        sql """ set load_stream_per_node = 2 """
+
+        
GetDebugPoint().enableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+    
+        sql """ INSERT INTO ${tableName} SELECT * FROM ${sourceTable}; """
+
+        
GetDebugPoint().disableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+
+        def result = sql "SELECT count(DISTINCT `date`) FROM ${tableName}"
+        logger.info("Distinct date count: ${result}")
+        assertEquals(1, result[0][0])
+
+        def count = sql "SELECT count(*) FROM ${tableName}"
+        logger.info("Total row count: ${count}")
+        assertEquals(20001, count[0][0])
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_idempotence.groovy
 
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_idempotence.groovy
new file mode 100644
index 00000000000..971998f057b
--- /dev/null
+++ 
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_idempotence.groovy
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+/*
+Test Description:
+    We create a two-tablet table with skewed data, where one tablet has 1 row 
and the other tablet has 20,000 rows.
+    By running "insert into ... select ..." we trigger two instances, which 
will send the same create-partition RPC to the FE.
+
+    Enabling MockRebalance will count how many times createPartition is 
called. On the second time this RPC is received,
+    it will deliberately return a different tablet distribution to check 
whether the tablet distribution cache is working.
+*/
+
+
+suite('test_create_partition_idempotence_non_cloud', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'enable_debug_points = true',
+    ]
+    options.cloudMode = false
+    options.beNum = 3 
+
+    docker(options) {
+        def sourceTable = "test_partition_source_table"
+        sql "DROP TABLE IF EXISTS ${sourceTable}"
+        sql """
+            CREATE TABLE ${sourceTable} (
+                `date` DATE NOT NULL,
+                `id` INT,
+                `value` VARCHAR(100)
+            )
+            DISTRIBUTED BY HASH(`date`) BUCKETS 2
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+        """
+
+        sql """ INSERT INTO ${sourceTable} VALUES ("2025-11-04", 1, "test1"); 
"""
+        sql """ INSERT INTO ${sourceTable} SELECT "2025-11-04", number, "test" 
FROM numbers("number" = "20000"); """
+
+        def tableName = "test_partition_idempotence_table"
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql """
+            CREATE TABLE ${tableName} (
+                `date` DATE NOT NULL,
+                `id` INT,
+                `value` VARCHAR(100)
+            )
+            AUTO PARTITION BY RANGE (date_trunc(`date`, 'day')) ()
+            DISTRIBUTED BY HASH(id) BUCKETS 10
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+        """
+
+        sql """ set parallel_pipeline_task_num = 2 """
+        sql """ set load_stream_per_node = 2 """
+
+        
GetDebugPoint().enableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+    
+        sql """ INSERT INTO ${tableName} SELECT * FROM ${sourceTable}; """
+
+        
GetDebugPoint().disableDebugPointForAllFEs("FE.FrontendServiceImpl.createPartition.MockRebalance")
+
+        def result = sql "SELECT count(DISTINCT `date`) FROM ${tableName}"
+        logger.info("Distinct date count: ${result}")
+        assertEquals(1, result[0][0])
+
+        def count = sql "SELECT count(*) FROM ${tableName}"
+        logger.info("Total row count: ${count}")
+        assertEquals(20001, count[0][0])
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to