This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 83edcdead96 [enhancement](random_sink) change tablet search algorithm 
from random  to round-robin for random distribution table (#26611)
83edcdead96 is described below

commit 83edcdead96d1e8994cd666dbfa15be558084477
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Wed Nov 15 19:55:31 2023 +0800

    [enhancement](random_sink) change tablet search algorithm from random  to 
round-robin for random distribution table (#26611)
    
    1. fix race condition problem when get tablet load index
    2. change tablet search algorithm from random to round-robin for random 
distribution table when load_to_single_tablet set to false
---
 be/src/exec/tablet_info.h                          |  18 +-
 be/src/olap/base_compaction.cpp                    |   2 +-
 be/src/vec/sink/vrow_distribution.cpp              |   1 -
 be/src/vec/sink/vtablet_finder.cpp                 |  10 +
 be/src/vec/sink/vtablet_finder.h                   |  17 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |   1 -
 .../main/java/org/apache/doris/catalog/Env.java    |  12 +-
 .../org/apache/doris/planner/OlapTableSink.java    |  18 +-
 .../doris/planner/SingleTabletLoadRecorderMgr.java | 112 ---------
 .../doris/planner/TabletLoadIndexRecorderMgr.java  |  89 +++++++
 gensrc/thrift/Descriptors.thrift                   |   2 +-
 .../test_insert_random_distribution_table.groovy   | 279 +++++++++++++++++++++
 12 files changed, 410 insertions(+), 151 deletions(-)

diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index bb9fbd8bc60..2e5f40bec35 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -127,7 +127,7 @@ struct VOlapTablePartition {
     int64_t num_buckets = 0;
     std::vector<OlapTableIndexTablets> indexes;
     bool is_mutable;
-    // -1 indicates load_to_single_tablet = false
+    // -1 indicates partition with hash distribution
     int64_t load_tablet_idx = -1;
 
     VOlapTablePartition(vectorized::Block* partition_block)
@@ -187,7 +187,7 @@ public:
             const std::vector<VOlapTablePartition*>& partitions,
             std::vector<uint32_t>& tablet_indexes /*result*/,
             /*TODO: check if flat hash map will be better*/
-            std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr) 
const {
+            std::map<VOlapTablePartition*, int64_t>* partition_tablets_buffer 
= nullptr) const {
         std::function<uint32_t(vectorized::Block*, uint32_t, const 
VOlapTablePartition&)>
                 compute_function;
         if (!_distributed_slot_locs.empty()) {
@@ -212,10 +212,9 @@ public:
             compute_function = [](vectorized::Block* block, uint32_t row,
                                   const VOlapTablePartition& partition) -> 
uint32_t {
                 if (partition.load_tablet_idx == -1) {
-                    // load_to_single_tablet = false, just do random
+                    // for compatible with old version, just do random
                     return butil::fast_rand() % partition.num_buckets;
                 }
-                // load_to_single_tablet = ture, do round-robin
                 return partition.load_tablet_idx % partition.num_buckets;
             };
         }
@@ -226,14 +225,15 @@ public:
             }
         } else { // use buffer
             for (auto index : indexes) {
-                auto& partition_id = partitions[index]->id;
-                if (auto it = partition_tablets_buffer->find(partition_id);
+                auto* partition = partitions[index];
+                if (auto it = partition_tablets_buffer->find(partition);
                     it != partition_tablets_buffer->end()) {
                     tablet_indexes[index] = it->second; // tablet
+                } else {
+                    // compute and save in buffer
+                    (*partition_tablets_buffer)[partition] = 
tablet_indexes[index] =
+                            compute_function(block, index, *partitions[index]);
                 }
-                // compute and save in buffer
-                (*partition_tablets_buffer)[partition_id] = 
tablet_indexes[index] =
-                        compute_function(block, index, *partitions[index]);
             }
         }
     }
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index eab41c73acb..6ae006709a7 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -98,7 +98,7 @@ Status BaseCompaction::execute_compact_impl() {
 
 void BaseCompaction::_filter_input_rowset() {
     // if dup_key and no delete predicate
-    // we skip big files too save resources
+    // we skip big files to save resources
     if (_tablet->keys_type() != KeysType::DUP_KEYS) {
         return;
     }
diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index 78d3b062045..0071629175f 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -269,7 +269,6 @@ Status VRowDistribution::generate_rows_distribution(
     RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
             _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, 
has_filtered_rows));
 
-    _tablet_finder->clear_for_new_batch();
     _row_distribution_watch.start();
     auto num_rows = block->rows();
     _tablet_finder->filter_bitmap().Reset(num_rows);
diff --git a/be/src/vec/sink/vtablet_finder.cpp 
b/be/src/vec/sink/vtablet_finder.cpp
index 5c37da6a7cf..865a3066d62 100644
--- a/be/src/vec/sink/vtablet_finder.cpp
+++ b/be/src/vec/sink/vtablet_finder.cpp
@@ -95,8 +95,18 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, 
Block* block, int row
     if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
         _vpartition->find_tablets(block, qualified_rows, partitions, 
tablet_index);
     } else {
+        // for random distribution
         _vpartition->find_tablets(block, qualified_rows, partitions, 
tablet_index,
                                   &_partition_to_tablet_map);
+        if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
+            for (auto it : _partition_to_tablet_map) {
+                // do round-robin for next batch
+                if (it.first->load_tablet_idx != -1) {
+                    it.first->load_tablet_idx++;
+                }
+            }
+            _partition_to_tablet_map.clear();
+        }
     }
 
     return Status::OK();
diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h
index 3426f7cb67d..bccdc39a066 100644
--- a/be/src/vec/sink/vtablet_finder.h
+++ b/be/src/vec/sink/vtablet_finder.h
@@ -30,12 +30,13 @@ namespace doris::vectorized {
 
 class OlapTabletFinder {
 public:
-    // FIND_TABLET_EVERY_ROW is used for both hash and random distribution 
info, which indicates that we
+    // FIND_TABLET_EVERY_ROW is used for hash distribution info, which 
indicates that we
     // should compute tablet index for every row
-    // FIND_TABLET_EVERY_BATCH is only used for random distribution info, 
which indicates that we should
+    // FIND_TABLET_EVERY_BATCH is used for random distribution info, which 
indicates that we should
     // compute tablet index for every row batch
-    // FIND_TABLET_EVERY_SINK is only used for random distribution info, which 
indicates that we should
-    // only compute tablet index in the corresponding partition once for the 
whole time in olap table sink
+    // FIND_TABLET_EVERY_SINK is used for random distribution info when 
load_to_single_tablet set to true,
+    // which indicates that we should only compute tablet index in the 
corresponding partition once for the
+    // whole time in olap table sink
     enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, 
FIND_TABLET_EVERY_SINK };
 
     OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
@@ -50,12 +51,6 @@ public:
         return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
     }
 
-    void clear_for_new_batch() {
-        if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
-            _partition_to_tablet_map.clear();
-        }
-    }
-
     bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }
 
     const vectorized::flat_hash_set<int64_t>& partition_ids() { return 
_partition_ids; }
@@ -71,7 +66,7 @@ public:
 private:
     VOlapTablePartitionParam* _vpartition;
     FindTabletMode _find_tablet_mode;
-    std::map<int64_t, int64_t> _partition_to_tablet_map;
+    std::map<VOlapTablePartition*, int64_t> _partition_to_tablet_map;
     vectorized::flat_hash_set<int64_t> _partition_ids;
 
     int64_t _num_filtered_rows = 0;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 703959da80c..eb54d4b5dc3 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1603,7 +1603,6 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
     _state->update_num_bytes_load_total(bytes);
     DorisMetrics::instance()->load_rows->increment(rows);
     DorisMetrics::instance()->load_bytes->increment(bytes);
-
     // Random distribution and the block belongs to a single tablet, we could 
optimize to append the whole
     // block into node channel.
     bool load_block_to_single_tablet =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1ed32913459..24945649912 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -207,7 +207,7 @@ import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.persist.meta.MetaHeader;
 import org.apache.doris.persist.meta.MetaReader;
 import org.apache.doris.persist.meta.MetaWriter;
-import org.apache.doris.planner.SingleTabletLoadRecorderMgr;
+import org.apache.doris.planner.TabletLoadIndexRecorderMgr;
 import org.apache.doris.plugin.PluginInfo;
 import org.apache.doris.plugin.PluginMgr;
 import org.apache.doris.policy.PolicyMgr;
@@ -331,7 +331,7 @@ public class Env {
     private LoadManager loadManager;
     private ProgressManager progressManager;
     private StreamLoadRecordMgr streamLoadRecordMgr;
-    private SingleTabletLoadRecorderMgr singleTabletLoadRecorderMgr;
+    private TabletLoadIndexRecorderMgr tabletLoadIndexRecorderMgr;
     private RoutineLoadManager routineLoadManager;
     private SqlBlockRuleMgr sqlBlockRuleMgr;
     private ExportMgr exportMgr;
@@ -682,7 +682,7 @@ public class Env {
         this.progressManager = new ProgressManager();
         this.streamLoadRecordMgr = new 
StreamLoadRecordMgr("stream_load_record_manager",
                 Config.fetch_stream_load_record_interval_second * 1000L);
-        this.singleTabletLoadRecorderMgr = new SingleTabletLoadRecorderMgr();
+        this.tabletLoadIndexRecorderMgr = new TabletLoadIndexRecorderMgr();
         this.loadEtlChecker = new LoadEtlChecker(loadManager);
         this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
         this.routineLoadScheduler = new 
RoutineLoadScheduler(routineLoadManager);
@@ -1549,7 +1549,7 @@ public class Env {
             cooldownConfHandler.start();
         }
         streamLoadRecordMgr.start();
-        singleTabletLoadRecorderMgr.start();
+        tabletLoadIndexRecorderMgr.start();
         getInternalCatalog().getIcebergTableCreationRecordMgr().start();
         new InternalSchemaInitializer().start();
         if (Config.enable_hms_events_incremental_sync) {
@@ -3758,8 +3758,8 @@ public class Env {
         return streamLoadRecordMgr;
     }
 
-    public SingleTabletLoadRecorderMgr getSingleTabletLoadRecorderMgr() {
-        return singleTabletLoadRecorderMgr;
+    public TabletLoadIndexRecorderMgr getTabletLoadIndexRecorderMgr() {
+        return tabletLoadIndexRecorderMgr;
     }
 
     public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 23bec446f4b..fa3b4abee9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.Index;
@@ -109,8 +110,6 @@ public class OlapTableSink extends DataSink {
 
     private boolean isStrictMode = false;
 
-    private boolean loadToSingleTablet;
-
     public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, 
List<Long> partitionIds,
             boolean singleReplicaLoad) {
         this.dstTable = dstTable;
@@ -134,7 +133,6 @@ public class OlapTableSink extends DataSink {
                     "if load_to_single_tablet set to true," + " the olap table 
must be with random distribution");
         }
         tSink.setLoadToSingleTablet(loadToSingleTablet);
-        this.loadToSingleTablet = loadToSingleTablet;
         tDataSink = new TDataSink(getDataSinkType());
         tDataSink.setOlapTableSink(tSink);
 
@@ -344,11 +342,12 @@ public class OlapTableSink extends DataSink {
                         tPartition.setNumBuckets(index.getTablets().size());
                     }
                     
tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId));
-                    if (loadToSingleTablet) {
-                        int tabletIndex = 
Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
-                                .getCurrentLoadTabletIndex(dbId, 
table.getId(), partitionId);
+                    if (partition.getDistributionInfo().getType() == 
DistributionInfoType.RANDOM) {
+                        int tabletIndex = 
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+                                .getCurrentTabletLoadIndex(dbId, 
table.getId(), partition);
                         tPartition.setLoadTabletIdx(tabletIndex);
                     }
+
                     partitionParam.addToPartitions(tPartition);
 
                     DistributionInfo distInfo = 
partition.getDistributionInfo();
@@ -403,9 +402,10 @@ public class OlapTableSink extends DataSink {
                             
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
                     tPartition.setNumBuckets(index.getTablets().size());
                 }
-                if (loadToSingleTablet) {
-                    int tabletIndex = 
Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
-                            .getCurrentLoadTabletIndex(dbId, table.getId(), 
partition.getId());
+
+                if (partition.getDistributionInfo().getType() == 
DistributionInfoType.RANDOM) {
+                    int tabletIndex = 
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+                            .getCurrentTabletLoadIndex(dbId, table.getId(), 
partition);
                     tPartition.setLoadTabletIdx(tabletIndex);
                 }
                 partitionParam.addToPartitions(tPartition);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java
deleted file mode 100644
index 89f21c47dc2..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.MasterDaemon;
-
-import lombok.Getter;
-import org.apache.commons.lang3.tuple.Triple;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SingleTabletLoadRecorderMgr extends MasterDaemon {
-    private static final Logger LOG = 
LogManager.getLogger(SingleTabletLoadRecorderMgr.class);
-    private static final long EXPIRY_TIME_INTERVAL_MS = 86400000; // 1 * 24 * 
60 * 60 * 1000, 1 days
-
-    // <<db_id, table_id, partition_id> -> load_tablet_record>
-    // 0 =< load_tablet_index < number_buckets
-    private final ConcurrentHashMap<Triple<Long, Long, Long>, 
TabletUpdateRecord> loadTabletRecordMap =
-            new ConcurrentHashMap<>();
-
-    public SingleTabletLoadRecorderMgr() {
-        super("single_tablet_load_recorder", EXPIRY_TIME_INTERVAL_MS);
-    }
-
-    @Override
-    protected void runAfterCatalogReady() {
-        long expiryTime = System.currentTimeMillis() - EXPIRY_TIME_INTERVAL_MS;
-        loadTabletRecordMap.entrySet().removeIf(entry ->
-                entry.getValue().getUpdateTimestamp() < expiryTime
-        );
-        LOG.info("Remove expired load tablet record successfully.");
-    }
-
-    public int getCurrentLoadTabletIndex(long dbId, long tableId, long 
partitionId) throws UserException {
-        Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partitionId);
-        TabletUpdateRecord record = loadTabletRecordMap.get(key);
-        int numBuckets = -1;
-        if (record == null) {
-            numBuckets = getNumBuckets(dbId, tableId, partitionId);
-        }
-        return createOrUpdateLoadTabletRecord(key, numBuckets);
-    }
-
-    private int getNumBuckets(long dbId, long tableId, long partitionId) 
throws UserException {
-        OlapTable olapTable = (OlapTable) 
Env.getCurrentInternalCatalog().getDb(dbId)
-                .flatMap(db -> db.getTable(tableId)).filter(t -> t.getType() 
== TableIf.TableType.OLAP)
-                .orElse(null);
-        if (olapTable == null) {
-            throw new UserException("Olap table[" + dbId + "." + tableId + "] 
is not exist.");
-        }
-        return olapTable.getPartition(partitionId)
-                .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
-                .get(0).getTablets().size();
-    }
-
-    private int createOrUpdateLoadTabletRecord(Triple<Long, Long, Long> key, 
int numBuckets) {
-        TabletUpdateRecord record =  loadTabletRecordMap.compute(key, (k, 
existingRecord) -> {
-            if (existingRecord == null) {
-                return new TabletUpdateRecord(0, numBuckets);
-            } else {
-                existingRecord.updateRecord();
-                return existingRecord;
-            }
-        });
-        return record.getTabletIndex();
-    }
-
-    static class TabletUpdateRecord {
-        @Getter
-        // 0 =< load_tablet_index < number_buckets
-        int tabletIndex;
-        int numBuckets;
-        @Getter
-        long updateTimestamp = System.currentTimeMillis();
-
-        TabletUpdateRecord(int tabletIndex, int numBuckets) {
-            this.tabletIndex = tabletIndex;
-            this.numBuckets = numBuckets;
-        }
-
-        public synchronized void updateRecord() {
-            this.tabletIndex = this.tabletIndex + 1 >= numBuckets ? 0 : 
this.tabletIndex + 1;
-            // To reduce the compute time cost, only update timestamp when 
index is 0
-            if (this.tabletIndex == 0) {
-                this.updateTimestamp = System.currentTimeMillis();
-            }
-        }
-
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java
new file mode 100644
index 00000000000..3b14445653d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.MasterDaemon;
+
+import lombok.Getter;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TabletLoadIndexRecorderMgr extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(TabletLoadIndexRecorderMgr.class);
+    private static final long TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS = 86400000; 
// 1 * 24 * 60 * 60 * 1000, 1 days
+    private static final long TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS = 
3600000; // 1 hour
+    private static final int TIMES_FOR_UPDATE_TIMESTAMP = 1000;
+
+    // <<db_id, table_id, partition_id> -> load_tablet_record>
+    // 0 =< load_tablet_index < number_buckets
+    private final ConcurrentHashMap<Triple<Long, Long, Long>, 
TabletLoadIndexRecord> loadTabletRecordMap =
+            new ConcurrentHashMap<>();
+
+    public TabletLoadIndexRecorderMgr() {
+        super("tablet_load_index_recorder", 
TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        int originRecordSize = loadTabletRecordMap.size();
+        long expireTime = System.currentTimeMillis() - 
TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS;
+        loadTabletRecordMap.entrySet().removeIf(entry ->
+                entry.getValue().getUpdateTimestamp() < expireTime
+        );
+        int currentRecordSize = loadTabletRecordMap.size();
+        LOG.info("Remove expired load tablet index record successfully, before 
{}, current {}",
+                originRecordSize, currentRecordSize);
+    }
+
+    public int getCurrentTabletLoadIndex(long dbId, long tableId, Partition 
partition) throws UserException {
+        Triple<Long, Long, Long> key = Triple.of(dbId, tableId, 
partition.getId());
+        return loadTabletRecordMap.compute(key, (k, existingRecord) ->
+                existingRecord == null ? new 
TabletLoadIndexRecord(partition.getVisibleVersion() - 1,
+                    partition.getDistributionInfo().getBucketNum()) : 
existingRecord).getAndIncrement();
+    }
+
+    static class TabletLoadIndexRecord {
+        int loadIndex;
+        int numBuckets;
+        @Getter
+        long updateTimestamp = System.currentTimeMillis();
+
+        TabletLoadIndexRecord(long initialIndex, int numBuckets) {
+            this.loadIndex = (int) (initialIndex % numBuckets);
+            this.numBuckets = numBuckets;
+        }
+
+        public synchronized int getAndIncrement() {
+            int tabletLoadIndex = loadIndex % numBuckets;
+            loadIndex++;
+            // To reduce the compute time cost, only update timestamp when 
load index is
+            // greater than or equal to both TIMES_FOR_UPDATE_TIMESTAMP and 
numBuckets
+            if (loadIndex >= Math.max(TIMES_FOR_UPDATE_TIMESTAMP, numBuckets)) 
{
+                loadIndex = loadIndex % numBuckets;
+                this.updateTimestamp = System.currentTimeMillis();
+            }
+            return tabletLoadIndex;
+        }
+
+    }
+}
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 21bea8cac59..abaf8f8967d 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -178,7 +178,7 @@ struct TOlapTablePartition {
     9: optional bool is_mutable = true
     // only used in List Partition
     10: optional bool is_default_partition;
-    // only used in load_to_single_tablet
+    // only used in random distribution scenario to make data distributed even 
     11: optional i64 load_tablet_idx
 }
 
diff --git 
a/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy
 
b/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy
new file mode 100644
index 00000000000..4630264ab41
--- /dev/null
+++ 
b/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_random_distribution_table", "p0") {
+    def tableName = "test_insert_random_distribution_table"
+
+    // ${tableName} unpartitioned table
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+          `k1` date NULL,
+          `k2` char(100) NULL,
+          `v1` char(100) NULL,
+          `v2` text NULL,
+          `v3` int(11) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY RANDOM BUCKETS 5
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql "set batch_size=2"
+    // insert first time
+    sql "insert into ${tableName} values('2021-11-14', '2', '3', '4', 55), 
('2022-12-13', '3', '31', '4', 55), ('2023-10-14', '23', '45', '66', 88), 
('2023-10-16', '2', '3', '4', 55)"
+
+    sql "sync"
+    def totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 4)
+    def res = sql "show tablets from ${tableName}"
+    def tabletId1 = res[0][0]
+    def tabletId2 = res[1][0]
+    def tabletId3 = res[2][0]
+    def tabletId4 = res[3][0]
+    def tabletId5 = res[4][0]
+
+    def rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})"
+    def rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})"
+    def rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})"
+    def rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})"
+    def rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})"
+
+    assertEquals(rowCount1[0][0], 3)
+    assertEquals(rowCount2[0][0], 1)
+    assertEquals(rowCount3[0][0], 0)
+    assertEquals(rowCount4[0][0], 0)
+    assertEquals(rowCount5[0][0], 0)
+
+    sql "set batch_size=2"
+    // insert second time
+    sql "insert into ${tableName} values('2021-11-14', '2', '3', '4', 55), 
('2022-12-13', '3', '31', '4', 55), ('2023-10-14', '23', '45', '66', 88), 
('2023-10-16', '2', '3', '4', 55)"
+
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 8)
+
+    rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})"
+    rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})"
+    rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})"
+    rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})"
+    rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})"
+
+    assertEquals(rowCount1[0][0], 3)
+    assertEquals(rowCount2[0][0], 4)
+    assertEquals(rowCount3[0][0], 1)
+    assertEquals(rowCount4[0][0], 0)
+    assertEquals(rowCount5[0][0], 0)
+
+    sql "set batch_size=2"
+    // insert third time
+    sql "insert into ${tableName} values('2021-11-14', '2', '3', '4', 55), 
('2022-12-13', '3', '31', '4', 55), ('2023-10-14', '23', '45', '66', 88), 
('2023-10-16', '2', '3', '4', 55)"
+
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 12)
+
+    rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})"
+    rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})"
+    rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})"
+    rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})"
+    rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})"
+
+    assertEquals(rowCount1[0][0], 3)
+    assertEquals(rowCount2[0][0], 4)
+    assertEquals(rowCount3[0][0], 4)
+    assertEquals(rowCount4[0][0], 1)
+    assertEquals(rowCount5[0][0], 0)
+
+    // ${tableName} partitioned table
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+          `k1` date NULL,
+          `k2` text NULL,
+          `k3` char(50) NULL,
+          `k4` varchar(200) NULL,
+          `k5` int(11) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION p20231011 VALUES [('2023-10-11'), ('2023-10-12')),
+        PARTITION p20231012 VALUES [('2023-10-12'), ('2023-10-13')),
+        PARTITION p20231013 VALUES [('2023-10-13'), ('2023-10-14')))
+        DISTRIBUTED BY RANDOM BUCKETS 10
+        PROPERTIES (
+          "replication_allocation" = "tag.location.default: 1"
+        );
+        """
+    sql "set batch_size=1"
+    // insert first time
+    sql "insert into ${tableName} values('2023-10-11', '2', '3', '4', 55), 
('2023-10-11', '3', '31', '4', 55), ('2023-10-11', '23', '45', '66', 88), 
('2023-10-12', '2', '3', '4', 55),('2023-10-12', '12', '13', '4', 55)"
+
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    def partition1 = "p20231011"
+    def partition2 = "p20231012"
+    def partition3 = "p20231013"
+    assertEquals(totalCount[0][0], 5)
+    res = sql "show tablets from ${tableName} partition ${partition1}"
+    def tabletId11 = res[0][0]
+    def tabletId12 = res[1][0]
+    def tabletId13 = res[2][0]
+    def tabletId14 = res[3][0]
+    def tabletId15 = res[4][0]
+
+    res = sql "show tablets from ${tableName} partition ${partition2}"
+    def tabletId21 = res[0][0]
+    def tabletId22 = res[1][0]
+    def tabletId23 = res[2][0]
+    def tabletId24 = res[3][0]
+    def tabletId25 = res[4][0]
+
+    res = sql "show tablets from ${tableName} partition ${partition3}"
+    def tabletId31 = res[0][0]
+    def tabletId32 = res[1][0]
+    def tabletId33 = res[2][0]
+    def tabletId34 = res[3][0]
+    def tabletId35 = res[4][0]
+
+    def rowCount11 = sql "select count() from ${tableName} 
tablet(${tabletId11})"
+    def rowCount12 = sql "select count() from ${tableName} 
tablet(${tabletId12})"
+    def rowCount13 = sql "select count() from ${tableName} 
tablet(${tabletId13})"
+    def rowCount14 = sql "select count() from ${tableName} 
tablet(${tabletId14})"
+    def rowCount15 = sql "select count() from ${tableName} 
tablet(${tabletId15})"
+
+    def rowCount21 = sql "select count() from ${tableName} 
tablet(${tabletId21})"
+    def rowCount22 = sql "select count() from ${tableName} 
tablet(${tabletId22})"
+    def rowCount23 = sql "select count() from ${tableName} 
tablet(${tabletId23})"
+    def rowCount24 = sql "select count() from ${tableName} 
tablet(${tabletId24})"
+    def rowCount25 = sql "select count() from ${tableName} 
tablet(${tabletId25})"
+
+    def rowCount31 = sql "select count() from ${tableName} 
tablet(${tabletId31})"
+    def rowCount32 = sql "select count() from ${tableName} 
tablet(${tabletId32})"
+    def rowCount33 = sql "select count() from ${tableName} 
tablet(${tabletId33})"
+    def rowCount34 = sql "select count() from ${tableName} 
tablet(${tabletId34})"
+    def rowCount35 = sql "select count() from ${tableName} 
tablet(${tabletId35})"
+
+    assertEquals(rowCount11[0][0], 2)
+    assertEquals(rowCount12[0][0], 1)
+    assertEquals(rowCount13[0][0], 0)
+    assertEquals(rowCount14[0][0], 0)
+    assertEquals(rowCount15[0][0], 0)
+
+    assertEquals(rowCount21[0][0], 1)
+    assertEquals(rowCount22[0][0], 1)
+    assertEquals(rowCount23[0][0], 0)
+    assertEquals(rowCount24[0][0], 0)
+    assertEquals(rowCount25[0][0], 0)
+
+    assertEquals(rowCount31[0][0], 0)
+    assertEquals(rowCount32[0][0], 0)
+    assertEquals(rowCount33[0][0], 0)
+    assertEquals(rowCount34[0][0], 0)
+    assertEquals(rowCount35[0][0], 0)
+
+    sql "set batch_size=1"
+    // insert second time
+    sql "insert into ${tableName} values('2023-10-12', '2', '3', '4', 55), 
('2023-10-12', '3', '31', '4', 55), ('2023-10-12', '23', '45', '66', 88), 
('2023-10-13', '2', '3', '4', 55),('2023-10-11', '11', '13', '4', 55)"
+
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 10)
+
+    rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})"
+    rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})"
+    rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})"
+    rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})"
+    rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})"
+
+    rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})"
+    rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})"
+    rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})"
+    rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})"
+    rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})"
+
+    rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})"
+    rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})"
+    rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})"
+    rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})"
+    rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})"
+
+    assertEquals(rowCount11[0][0], 2)
+    assertEquals(rowCount12[0][0], 2)
+    assertEquals(rowCount13[0][0], 0)
+    assertEquals(rowCount14[0][0], 0)
+    assertEquals(rowCount15[0][0], 0)
+
+    assertEquals(rowCount21[0][0], 1)
+    assertEquals(rowCount22[0][0], 3)
+    assertEquals(rowCount23[0][0], 1)
+    assertEquals(rowCount24[0][0], 0)
+    assertEquals(rowCount25[0][0], 0)
+
+    assertEquals(rowCount31[0][0], 0)
+    assertEquals(rowCount32[0][0], 1)
+    assertEquals(rowCount33[0][0], 0)
+    assertEquals(rowCount34[0][0], 0)
+    assertEquals(rowCount35[0][0], 0)
+
+    sql "set batch_size=1"
+    // insert third time
+    sql "insert into ${tableName} values('2023-10-13', '2', '3', '4', 55), 
('2023-10-13', '3', '31', '4', 55), ('2023-10-12', '23', '45', '66', 88), 
('2023-10-13', '2', '3', '4', 55),('2023-10-11', '11', '13', '4', 
55),('2023-10-11', '13', '145', '4', 55)"
+    sql "set batch_size=4064"
+
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 16)
+
+    rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})"
+    rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})"
+    rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})"
+    rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})"
+    rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})"
+
+    rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})"
+    rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})"
+    rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})"
+    rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})"
+    rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})"
+
+    rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})"
+    rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})"
+    rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})"
+    rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})"
+    rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})"
+
+    assertEquals(rowCount11[0][0], 2)
+    assertEquals(rowCount12[0][0], 2)
+    assertEquals(rowCount13[0][0], 2)
+    assertEquals(rowCount14[0][0], 0)
+    assertEquals(rowCount15[0][0], 0)
+
+    assertEquals(rowCount21[0][0], 1)
+    assertEquals(rowCount22[0][0], 3)
+    assertEquals(rowCount23[0][0], 2)
+    assertEquals(rowCount24[0][0], 0)
+    assertEquals(rowCount25[0][0], 0)
+
+    assertEquals(rowCount31[0][0], 0)
+    assertEquals(rowCount32[0][0], 1)
+    assertEquals(rowCount33[0][0], 2)
+    assertEquals(rowCount34[0][0], 1)
+    assertEquals(rowCount35[0][0], 0)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to