This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a7f791059c5 [opt](load)change `load_to_single_tablet` tablet search 
algorithm from random to round-robin #25256 (#25504)
a7f791059c5 is described below

commit a7f791059c569d5900462af562fbc88265cbd9b1
Author: qiye <jianliang5...@gmail.com>
AuthorDate: Tue Oct 17 16:40:44 2023 +0800

    [opt](load)change `load_to_single_tablet` tablet search algorithm from 
random to round-robin #25256 (#25504)
---
 be/src/exec/tablet_info.cpp                        |  21 +-
 be/src/exec/tablet_info.h                          |   4 +-
 .../main/java/org/apache/doris/catalog/Env.java    |   8 +
 .../org/apache/doris/planner/OlapTableSink.java    |  13 +
 .../doris/planner/SingleTabletLoadRecorderMgr.java | 112 ++++++++
 gensrc/thrift/Descriptors.thrift                   |   2 +
 .../stream_load/test_load_to_single_tablet.json    |  10 +
 .../stream_load/test_load_to_single_tablet.groovy  | 299 +++++++++++++++++++++
 8 files changed, 463 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index f000e4d2065..34bd9053b43 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -252,11 +252,18 @@ Status VOlapTablePartitionParam::init() {
         }
     }
     if (_distributed_slot_locs.empty()) {
-        _compute_tablet_index = [](BlockRow* key, int64_t num_buckets) -> 
uint32_t {
-            return butil::fast_rand() % num_buckets;
+        _compute_tablet_index = [](BlockRow* key,
+                                   const VOlapTablePartition& partition) -> 
uint32_t {
+            if (partition.load_tablet_idx == -1) {
+                // load_to_single_tablet = false, just do random
+                return butil::fast_rand() % partition.num_buckets;
+            }
+            // load_to_single_tablet = ture, do round-robin
+            return partition.load_tablet_idx % partition.num_buckets;
         };
     } else {
-        _compute_tablet_index = [this](BlockRow* key, int64_t num_buckets) -> 
uint32_t {
+        _compute_tablet_index = [this](BlockRow* key,
+                                       const VOlapTablePartition& partition) 
-> uint32_t {
             uint32_t hash_val = 0;
             for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
                 auto slot_desc = _slots[_distributed_slot_locs[i]];
@@ -269,7 +276,7 @@ Status VOlapTablePartitionParam::init() {
                     hash_val = HashUtil::zlib_crc_hash_null(hash_val);
                 }
             }
-            return hash_val % num_buckets;
+            return hash_val % partition.num_buckets;
         };
     }
 
@@ -282,6 +289,10 @@ Status VOlapTablePartitionParam::init() {
         auto part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
         part->id = t_part.id;
         part->is_mutable = t_part.is_mutable;
+        // only load_to_single_tablet = true will set load_tablet_idx
+        if (t_part.__isset.load_tablet_idx) {
+            part->load_tablet_idx = t_part.load_tablet_idx;
+        }
 
         if (!_is_in_partition) {
             if (t_part.__isset.start_keys) {
@@ -359,7 +370,7 @@ bool VOlapTablePartitionParam::find_partition(BlockRow* 
block_row,
 
 uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row,
                                                const VOlapTablePartition& 
partition) const {
-    return _compute_tablet_index(block_row, partition.num_buckets);
+    return _compute_tablet_index(block_row, partition);
 }
 
 Status VOlapTablePartitionParam::_create_partition_keys(const 
std::vector<TExprNode>& t_exprs,
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 90cc713f95b..7f1e1ddf4c5 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -123,6 +123,8 @@ struct VOlapTablePartition {
     int64_t num_buckets = 0;
     std::vector<OlapTableIndexTablets> indexes;
     bool is_mutable;
+    // -1 indicates load_to_single_tablet = false
+    int64_t load_tablet_idx = -1;
 
     VOlapTablePartition(vectorized::Block* partition_block)
             : start_key {partition_block, -1}, end_key {partition_block, -1} {}
@@ -182,7 +184,7 @@ private:
 
     Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, 
uint16_t pos);
 
-    std::function<uint32_t(BlockRow*, int64_t)> _compute_tablet_index;
+    std::function<uint32_t(BlockRow*, const VOlapTablePartition&)> 
_compute_tablet_index;
 
     // check if this partition contain this key
     bool _part_contains(VOlapTablePartition* part, BlockRow* key) const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index e836b69f77d..ca6d0ee0986 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -199,6 +199,7 @@ import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.persist.meta.MetaHeader;
 import org.apache.doris.persist.meta.MetaReader;
 import org.apache.doris.persist.meta.MetaWriter;
+import org.apache.doris.planner.SingleTabletLoadRecorderMgr;
 import org.apache.doris.plugin.PluginInfo;
 import org.apache.doris.plugin.PluginMgr;
 import org.apache.doris.policy.PolicyMgr;
@@ -308,6 +309,7 @@ public class Env {
     private LoadManager loadManager;
     private ProgressManager progressManager;
     private StreamLoadRecordMgr streamLoadRecordMgr;
+    private SingleTabletLoadRecorderMgr singleTabletLoadRecorderMgr;
     private RoutineLoadManager routineLoadManager;
     private SqlBlockRuleMgr sqlBlockRuleMgr;
     private ExportMgr exportMgr;
@@ -640,6 +642,7 @@ public class Env {
         this.progressManager = new ProgressManager();
         this.streamLoadRecordMgr = new 
StreamLoadRecordMgr("stream_load_record_manager",
                 Config.fetch_stream_load_record_interval_second * 1000L);
+        this.singleTabletLoadRecorderMgr = new SingleTabletLoadRecorderMgr();
         this.loadEtlChecker = new LoadEtlChecker(loadManager);
         this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
         this.routineLoadScheduler = new 
RoutineLoadScheduler(routineLoadManager);
@@ -1491,6 +1494,7 @@ public class Env {
             cooldownConfHandler.start();
         }
         streamLoadRecordMgr.start();
+        singleTabletLoadRecorderMgr.start();
         getInternalCatalog().getIcebergTableCreationRecordMgr().start();
         new InternalSchemaInitializer().start();
         if (Config.enable_hms_events_incremental_sync) {
@@ -3678,6 +3682,10 @@ public class Env {
         return streamLoadRecordMgr;
     }
 
+    public SingleTabletLoadRecorderMgr getSingleTabletLoadRecorderMgr() {
+        return singleTabletLoadRecorderMgr;
+    }
+
     public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() {
         return getInternalCatalog().getIcebergTableCreationRecordMgr();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index d073f0b1394..276da22d04a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -104,6 +104,8 @@ public class OlapTableSink extends DataSink {
 
     private boolean isStrictMode = false;
 
+    private boolean loadToSingleTablet;
+
     public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, 
List<Long> partitionIds,
             boolean singleReplicaLoad) {
         this.dstTable = dstTable;
@@ -126,6 +128,7 @@ public class OlapTableSink extends DataSink {
                     "if load_to_single_tablet set to true," + " the olap table 
must be with random distribution");
         }
         tSink.setLoadToSingleTablet(loadToSingleTablet);
+        this.loadToSingleTablet = loadToSingleTablet;
         tDataSink = new TDataSink(TDataSinkType.OLAP_TABLE_SINK);
         tDataSink.setOlapTableSink(tSink);
 
@@ -335,6 +338,11 @@ public class OlapTableSink extends DataSink {
                         tPartition.setNumBuckets(index.getTablets().size());
                     }
                     
tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId));
+                    if (loadToSingleTablet) {
+                        int tabletIndex = 
Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
+                                .getCurrentLoadTabletIndex(dbId, 
table.getId(), partitionId);
+                        tPartition.setLoadTabletIdx(tabletIndex);
+                    }
                     partitionParam.addToPartitions(tPartition);
 
                     DistributionInfo distInfo = 
partition.getDistributionInfo();
@@ -366,6 +374,11 @@ public class OlapTableSink extends DataSink {
                             
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
                     tPartition.setNumBuckets(index.getTablets().size());
                 }
+                if (loadToSingleTablet) {
+                    int tabletIndex = 
Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
+                            .getCurrentLoadTabletIndex(dbId, table.getId(), 
partition.getId());
+                    tPartition.setLoadTabletIdx(tabletIndex);
+                }
                 partitionParam.addToPartitions(tPartition);
                 
partitionParam.setDistributedColumns(getDistColumns(partition.getDistributionInfo()));
                 break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java
new file mode 100644
index 00000000000..58ebd5424ee
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.MasterDaemon;
+
+import lombok.Getter;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SingleTabletLoadRecorderMgr extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(SingleTabletLoadRecorderMgr.class);
+    private static final long EXPIRY_TIME_INTERVAL_MS = 86400000; // 1 * 24 * 
60 * 60 * 1000, 1 days
+
+    // <<db_id, table_id, partition_id> -> load_tablet_record>
+    // 0 =< load_tablet_index < number_buckets
+    private final ConcurrentHashMap<Triple<Long, Long, Long>, 
TabletUpdateRecord> loadTabletRecordMap =
+            new ConcurrentHashMap<>();
+
+    public SingleTabletLoadRecorderMgr() {
+        super("single_tablet_load_recorder", EXPIRY_TIME_INTERVAL_MS);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        long expiryTime = System.currentTimeMillis() - EXPIRY_TIME_INTERVAL_MS;
+        loadTabletRecordMap.entrySet().removeIf(entry ->
+                entry.getValue().getUpdateTimestamp() < expiryTime
+        );
+        LOG.info("Remove expired load tablet record successfully.");
+    }
+
+    public int getCurrentLoadTabletIndex(long dbId, long tableId, long 
partitionId) throws UserException {
+        Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partitionId);
+        TabletUpdateRecord record = loadTabletRecordMap.get(key);
+        int numBuckets = -1;
+        if (record == null) {
+            numBuckets = getNumBuckets(dbId, tableId, partitionId);
+        }
+        return createOrUpdateLoadTabletRecord(key, numBuckets);
+    }
+
+    private int getNumBuckets(long dbId, long tableId, long partitionId) 
throws UserException {
+        OlapTable olapTable = (OlapTable) 
Env.getCurrentInternalCatalog().getDb(dbId)
+                .flatMap(db -> db.getTable(tableId)).filter(t -> t.getType() 
== TableIf.TableType.OLAP)
+                .orElse(null);
+        if (olapTable == null) {
+            throw new UserException("Olap table[" + dbId + "." + tableId + "] 
is not exist.");
+        }
+        return olapTable.getPartition(partitionId)
+            .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
+            .get(0).getTablets().size();
+    }
+
+    private int createOrUpdateLoadTabletRecord(Triple<Long, Long, Long> key, 
int numBuckets) {
+        TabletUpdateRecord record =  loadTabletRecordMap.compute(key, (k, 
existingRecord) -> {
+            if (existingRecord == null) {
+                return new TabletUpdateRecord(0, numBuckets);
+            } else {
+                existingRecord.updateRecord();
+                return existingRecord;
+            }
+        });
+        return record.getTabletIndex();
+    }
+
+    static class TabletUpdateRecord {
+        @Getter
+        // 0 =< load_tablet_index < number_buckets
+        int tabletIndex;
+        int numBuckets;
+        @Getter
+        long updateTimestamp = System.currentTimeMillis();
+
+        TabletUpdateRecord(int tabletIndex, int numBuckets) {
+            this.tabletIndex = tabletIndex;
+            this.numBuckets = numBuckets;
+        }
+
+        public synchronized void updateRecord() {
+            this.tabletIndex = this.tabletIndex + 1 >= numBuckets ? 0 : 
this.tabletIndex + 1;
+            // To reduce the compute time cost, only update timestamp when 
index is 0
+            if (this.tabletIndex == 0) {
+                this.updateTimestamp = System.currentTimeMillis();
+            }
+        }
+
+    }
+}
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 9e68c6fe09e..326a5bd0d0c 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -171,6 +171,8 @@ struct TOlapTablePartition {
     9: optional bool is_mutable = true
     // only used in List Partition
     10: optional bool is_default_partition;
+    // only used in load_to_single_tablet
+    11: optional i64 load_tablet_idx
 }
 
 struct TOlapTablePartitionParam {
diff --git 
a/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json 
b/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json
new file mode 100644
index 00000000000..be1ebe5a86e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json
@@ -0,0 +1,10 @@
+{"k1":"2023-10-11", "k2":"json love anny", "k3":"json", "k4":"anny","k5":1}
+{"k1":"2023-10-11", "k2":"andy hate banana", "k3":"andy", "k4":"banana","k5":2}
+{"k1":"2023-10-11", "k2":"liam love apple", "k3":"liam", "k4":"apple","k5":3}
+{"k1":"2023-10-11", "k2":"tom hate apple", "k3":"tom", "k4":"apple","k5":4}
+{"k1":"2023-10-11", "k2":"lisa love pear", "k3":"lisa", "k4":"pear","k5":5}
+{"k1":"2023-10-12", "k2":"json love anny", "k3":"json", "k4":"anny","k5":1}
+{"k1":"2023-10-12", "k2":"andy hate banana", "k3":"andy", "k4":"banana","k5":2}
+{"k1":"2023-10-12", "k2":"liam love apple", "k3":"liam", "k4":"apple","k5":3}
+{"k1":"2023-10-12", "k2":"tom hate apple", "k3":"tom", "k4":"apple","k5":4}
+{"k1":"2023-10-12", "k2":"lisa love pear", "k3":"lisa", "k4":"pear","k5":5}
diff --git 
a/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy 
b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy
new file mode 100644
index 00000000000..82a24a8ad30
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonSlurper
+
+/**
+ *   @Params url is "/xxx"
+ *   @Return response body
+ */
+def http_get(url) {
+    def conn = new URL(url).openConnection()
+    conn.setRequestMethod("GET")
+    //token for root
+    return conn.getInputStream().getText()
+}
+
+suite("test_load_to_single_tablet", "p0") {
+    sql "show tables"
+
+    def tableName = "test_load_to_single_tablet"
+
+    // test unpartitioned table
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+          `k1` date NULL,
+          `k2` text NULL,
+          `k3` char(50) NULL,
+          `k4` varchar(200) NULL,
+          `k5` int(11) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY RANDOM BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    // load first time
+    streamLoad {
+        table "${tableName}"
+
+        set 'format', 'json'
+        set 'read_json_by_line', 'true'
+        set 'load_to_single_tablet', 'true'
+
+        file 'test_load_to_single_tablet.json'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+    def totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 10)
+    def res = sql "show tablets from ${tableName}"
+    def tabletMetaUrl1 = res[0][17]
+    def tabletMetaUrl2 = res[1][17]
+    def tabletMetaUrl3 = res[2][17]
+    def tabletMetaRes1 = http_get(tabletMetaUrl1)
+    def tabletMetaRes2 = http_get(tabletMetaUrl2)
+    def tabletMetaRes3 = http_get(tabletMetaUrl3)
+
+    def obj1 = new JsonSlurper().parseText(tabletMetaRes1)
+    def obj2 = new JsonSlurper().parseText(tabletMetaRes2)
+    def obj3 = new JsonSlurper().parseText(tabletMetaRes3)
+    def rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows
+    def rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows
+    def rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows
+
+    assertEquals(rowCount1, 10)
+    assertEquals(rowCount2, 0)
+    assertEquals(rowCount3, 0)
+
+    // load second time
+    streamLoad {
+        table "${tableName}"
+
+        set 'format', 'json'
+        set 'read_json_by_line', 'true'
+        set 'load_to_single_tablet', 'true'
+
+        file 'test_load_to_single_tablet.json'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 20)
+    tabletMetaRes1 = http_get(tabletMetaUrl1)
+    tabletMetaRes2 = http_get(tabletMetaUrl2)
+    tabletMetaRes3 = http_get(tabletMetaUrl3)
+
+    obj1 = new JsonSlurper().parseText(tabletMetaRes1)
+    obj2 = new JsonSlurper().parseText(tabletMetaRes2)
+    obj3 = new JsonSlurper().parseText(tabletMetaRes3)
+
+    rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + 
obj1.rs_metas[2].num_rows
+    rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + 
obj2.rs_metas[2].num_rows
+    rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + 
obj3.rs_metas[2].num_rows
+    assertEquals(rowCount1, 10)
+    assertEquals(rowCount2, 10)
+    assertEquals(rowCount3, 0)
+
+    // load third time
+    streamLoad {
+        table "${tableName}"
+
+        set 'format', 'json'
+        set 'read_json_by_line', 'true'
+        set 'load_to_single_tablet', 'true'
+
+        file 'test_load_to_single_tablet.json'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 30)
+    tabletMetaRes1 = http_get(tabletMetaUrl1)
+    tabletMetaRes2 = http_get(tabletMetaUrl2)
+    tabletMetaRes3 = http_get(tabletMetaUrl3)
+
+    obj1 = new JsonSlurper().parseText(tabletMetaRes1)
+    obj2 = new JsonSlurper().parseText(tabletMetaRes2)
+    obj3 = new JsonSlurper().parseText(tabletMetaRes3)
+
+    rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + 
obj1.rs_metas[2].num_rows + obj1.rs_metas[3].num_rows
+    rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + 
obj2.rs_metas[2].num_rows + obj2.rs_metas[3].num_rows
+    rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + 
obj3.rs_metas[2].num_rows + obj3.rs_metas[3].num_rows
+    assertEquals(rowCount1, 10)
+    assertEquals(rowCount2, 10)
+    assertEquals(rowCount3, 10)
+
+    // test partitioned table
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+          `k1` date NULL,
+          `k2` text NULL,
+          `k3` char(50) NULL,
+          `k4` varchar(200) NULL,
+          `k5` int(11) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION p20231011 VALUES [('2023-10-11'), ('2023-10-12')),
+        PARTITION p20231012 VALUES [('2023-10-12'), ('2023-10-13')),
+        PARTITION p20231013 VALUES [('2023-10-13'), ('2023-10-14')),
+        PARTITION p20231014 VALUES [('2023-10-14'), ('2023-10-15')))
+        DISTRIBUTED BY RANDOM BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    // load first time
+    streamLoad {
+        table "${tableName}"
+
+        set 'format', 'json'
+        set 'read_json_by_line', 'true'
+        set 'load_to_single_tablet', 'true'
+
+        file 'test_load_to_single_tablet.json'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 10)
+    res = sql "show tablets from ${tableName} partitions(p20231011, p20231012)"
+    tabletMetaUrl1 = res[0][17]
+    tabletMetaUrl2 = res[1][17]
+    tabletMetaUrl3 = res[2][17]
+    tabletMetaUrl4 = res[10][17]
+    tabletMetaUrl5 = res[11][17]
+    tabletMetaUrl6 = res[12][17]
+    tabletMetaRes1 = http_get(tabletMetaUrl1)
+    tabletMetaRes2 = http_get(tabletMetaUrl2)
+    tabletMetaRes3 = http_get(tabletMetaUrl3)
+    tabletMetaRes4 = http_get(tabletMetaUrl4)
+    tabletMetaRes5 = http_get(tabletMetaUrl5)
+    tabletMetaRes6 = http_get(tabletMetaUrl6)
+
+    obj1 = new JsonSlurper().parseText(tabletMetaRes1)
+    obj2 = new JsonSlurper().parseText(tabletMetaRes2)
+    obj3 = new JsonSlurper().parseText(tabletMetaRes3)
+    obj4 = new JsonSlurper().parseText(tabletMetaRes4)
+    obj5 = new JsonSlurper().parseText(tabletMetaRes5)
+    obj6 = new JsonSlurper().parseText(tabletMetaRes6)
+
+    rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows
+    rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows
+    rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows
+    def rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows
+    def rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows
+    def rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows
+    assertEquals(rowCount1, 5)
+    assertEquals(rowCount2, 0)
+    assertEquals(rowCount3, 0)
+    assertEquals(rowCount4, 5)
+    assertEquals(rowCount5, 0)
+    assertEquals(rowCount6, 0)
+
+    // load second time
+    streamLoad {
+        table "${tableName}"
+
+        set 'format', 'json'
+        set 'read_json_by_line', 'true'
+        set 'load_to_single_tablet', 'true'
+
+        file 'test_load_to_single_tablet.json'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 20)
+    tabletMetaRes1 = http_get(tabletMetaUrl1)
+    tabletMetaRes2 = http_get(tabletMetaUrl2)
+    tabletMetaRes3 = http_get(tabletMetaUrl3)
+    tabletMetaRes4 = http_get(tabletMetaUrl4)
+    tabletMetaRes5 = http_get(tabletMetaUrl5)
+    tabletMetaRes6 = http_get(tabletMetaUrl6)
+
+    obj1 = new JsonSlurper().parseText(tabletMetaRes1)
+    obj2 = new JsonSlurper().parseText(tabletMetaRes2)
+    obj3 = new JsonSlurper().parseText(tabletMetaRes3)
+    obj4 = new JsonSlurper().parseText(tabletMetaRes4)
+    obj5 = new JsonSlurper().parseText(tabletMetaRes5)
+    obj6 = new JsonSlurper().parseText(tabletMetaRes6)
+
+    rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + 
obj1.rs_metas[2].num_rows
+    rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + 
obj2.rs_metas[2].num_rows
+    rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + 
obj3.rs_metas[2].num_rows
+    rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + 
obj4.rs_metas[2].num_rows
+    rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + 
obj5.rs_metas[2].num_rows
+    rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + 
obj6.rs_metas[2].num_rows
+    assertEquals(rowCount1, 5)
+    assertEquals(rowCount2, 5)
+    assertEquals(rowCount3, 0)
+    assertEquals(rowCount4, 5)
+    assertEquals(rowCount5, 5)
+    assertEquals(rowCount6, 0)
+
+    // load third time
+    streamLoad {
+        table "${tableName}"
+
+        set 'format', 'json'
+        set 'read_json_by_line', 'true'
+        set 'load_to_single_tablet', 'true'
+
+        file 'test_load_to_single_tablet.json'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    totalCount = sql "select count() from ${tableName}"
+    assertEquals(totalCount[0][0], 30)
+    tabletMetaRes1 = http_get(tabletMetaUrl1)
+    tabletMetaRes2 = http_get(tabletMetaUrl2)
+    tabletMetaRes3 = http_get(tabletMetaUrl3)
+    tabletMetaRes4 = http_get(tabletMetaUrl4)
+    tabletMetaRes5 = http_get(tabletMetaUrl5)
+    tabletMetaRes6 = http_get(tabletMetaUrl6)
+
+    obj1 = new JsonSlurper().parseText(tabletMetaRes1)
+    obj2 = new JsonSlurper().parseText(tabletMetaRes2)
+    obj3 = new JsonSlurper().parseText(tabletMetaRes3)
+    obj4 = new JsonSlurper().parseText(tabletMetaRes4)
+    obj5 = new JsonSlurper().parseText(tabletMetaRes5)
+    obj6 = new JsonSlurper().parseText(tabletMetaRes6)
+
+    rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + 
obj1.rs_metas[2].num_rows + obj1.rs_metas[3].num_rows
+    rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + 
obj2.rs_metas[2].num_rows + obj2.rs_metas[3].num_rows
+    rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + 
obj3.rs_metas[2].num_rows + obj3.rs_metas[3].num_rows
+    rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + 
obj4.rs_metas[2].num_rows + obj4.rs_metas[3].num_rows
+    rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + 
obj5.rs_metas[2].num_rows + obj5.rs_metas[3].num_rows
+    rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + 
obj6.rs_metas[2].num_rows + obj6.rs_metas[3].num_rows
+    assertEquals(rowCount1, 5)
+    assertEquals(rowCount2, 5)
+    assertEquals(rowCount3, 5)
+    assertEquals(rowCount4, 5)
+    assertEquals(rowCount5, 5)
+    assertEquals(rowCount6, 5)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to