This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new a7f791059c5 [opt](load)change `load_to_single_tablet` tablet search algorithm from random to round-robin #25256 (#25504) a7f791059c5 is described below commit a7f791059c569d5900462af562fbc88265cbd9b1 Author: qiye <jianliang5...@gmail.com> AuthorDate: Tue Oct 17 16:40:44 2023 +0800 [opt](load)change `load_to_single_tablet` tablet search algorithm from random to round-robin #25256 (#25504) --- be/src/exec/tablet_info.cpp | 21 +- be/src/exec/tablet_info.h | 4 +- .../main/java/org/apache/doris/catalog/Env.java | 8 + .../org/apache/doris/planner/OlapTableSink.java | 13 + .../doris/planner/SingleTabletLoadRecorderMgr.java | 112 ++++++++ gensrc/thrift/Descriptors.thrift | 2 + .../stream_load/test_load_to_single_tablet.json | 10 + .../stream_load/test_load_to_single_tablet.groovy | 299 +++++++++++++++++++++ 8 files changed, 463 insertions(+), 6 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index f000e4d2065..34bd9053b43 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -252,11 +252,18 @@ Status VOlapTablePartitionParam::init() { } } if (_distributed_slot_locs.empty()) { - _compute_tablet_index = [](BlockRow* key, int64_t num_buckets) -> uint32_t { - return butil::fast_rand() % num_buckets; + _compute_tablet_index = [](BlockRow* key, + const VOlapTablePartition& partition) -> uint32_t { + if (partition.load_tablet_idx == -1) { + // load_to_single_tablet = false, just do random + return butil::fast_rand() % partition.num_buckets; + } + // load_to_single_tablet = ture, do round-robin + return partition.load_tablet_idx % partition.num_buckets; }; } else { - _compute_tablet_index = [this](BlockRow* key, int64_t num_buckets) -> uint32_t { + _compute_tablet_index = [this](BlockRow* key, + const VOlapTablePartition& partition) -> uint32_t { uint32_t hash_val = 0; for (int i = 0; i < _distributed_slot_locs.size(); ++i) { auto slot_desc = _slots[_distributed_slot_locs[i]]; @@ -269,7 +276,7 @@ Status VOlapTablePartitionParam::init() { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } } - return hash_val % num_buckets; + return hash_val % partition.num_buckets; }; } @@ -282,6 +289,10 @@ Status VOlapTablePartitionParam::init() { auto part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); part->id = t_part.id; part->is_mutable = t_part.is_mutable; + // only load_to_single_tablet = true will set load_tablet_idx + if (t_part.__isset.load_tablet_idx) { + part->load_tablet_idx = t_part.load_tablet_idx; + } if (!_is_in_partition) { if (t_part.__isset.start_keys) { @@ -359,7 +370,7 @@ bool VOlapTablePartitionParam::find_partition(BlockRow* block_row, uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const { - return _compute_tablet_index(block_row, partition.num_buckets); + return _compute_tablet_index(block_row, partition); } Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs, diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 90cc713f95b..7f1e1ddf4c5 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -123,6 +123,8 @@ struct VOlapTablePartition { int64_t num_buckets = 0; std::vector<OlapTableIndexTablets> indexes; bool is_mutable; + // -1 indicates load_to_single_tablet = false + int64_t load_tablet_idx = -1; VOlapTablePartition(vectorized::Block* partition_block) : start_key {partition_block, -1}, end_key {partition_block, -1} {} @@ -182,7 +184,7 @@ private: Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos); - std::function<uint32_t(BlockRow*, int64_t)> _compute_tablet_index; + std::function<uint32_t(BlockRow*, const VOlapTablePartition&)> _compute_tablet_index; // check if this partition contain this key bool _part_contains(VOlapTablePartition* part, BlockRow* key) const { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e836b69f77d..ca6d0ee0986 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -199,6 +199,7 @@ import org.apache.doris.persist.TruncateTableInfo; import org.apache.doris.persist.meta.MetaHeader; import org.apache.doris.persist.meta.MetaReader; import org.apache.doris.persist.meta.MetaWriter; +import org.apache.doris.planner.SingleTabletLoadRecorderMgr; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.plugin.PluginMgr; import org.apache.doris.policy.PolicyMgr; @@ -308,6 +309,7 @@ public class Env { private LoadManager loadManager; private ProgressManager progressManager; private StreamLoadRecordMgr streamLoadRecordMgr; + private SingleTabletLoadRecorderMgr singleTabletLoadRecorderMgr; private RoutineLoadManager routineLoadManager; private SqlBlockRuleMgr sqlBlockRuleMgr; private ExportMgr exportMgr; @@ -640,6 +642,7 @@ public class Env { this.progressManager = new ProgressManager(); this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000L); + this.singleTabletLoadRecorderMgr = new SingleTabletLoadRecorderMgr(); this.loadEtlChecker = new LoadEtlChecker(loadManager); this.loadLoadingChecker = new LoadLoadingChecker(loadManager); this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager); @@ -1491,6 +1494,7 @@ public class Env { cooldownConfHandler.start(); } streamLoadRecordMgr.start(); + singleTabletLoadRecorderMgr.start(); getInternalCatalog().getIcebergTableCreationRecordMgr().start(); new InternalSchemaInitializer().start(); if (Config.enable_hms_events_incremental_sync) { @@ -3678,6 +3682,10 @@ public class Env { return streamLoadRecordMgr; } + public SingleTabletLoadRecorderMgr getSingleTabletLoadRecorderMgr() { + return singleTabletLoadRecorderMgr; + } + public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() { return getInternalCatalog().getIcebergTableCreationRecordMgr(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index d073f0b1394..276da22d04a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -104,6 +104,8 @@ public class OlapTableSink extends DataSink { private boolean isStrictMode = false; + private boolean loadToSingleTablet; + public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds, boolean singleReplicaLoad) { this.dstTable = dstTable; @@ -126,6 +128,7 @@ public class OlapTableSink extends DataSink { "if load_to_single_tablet set to true," + " the olap table must be with random distribution"); } tSink.setLoadToSingleTablet(loadToSingleTablet); + this.loadToSingleTablet = loadToSingleTablet; tDataSink = new TDataSink(TDataSinkType.OLAP_TABLE_SINK); tDataSink.setOlapTableSink(tSink); @@ -335,6 +338,11 @@ public class OlapTableSink extends DataSink { tPartition.setNumBuckets(index.getTablets().size()); } tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId)); + if (loadToSingleTablet) { + int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr() + .getCurrentLoadTabletIndex(dbId, table.getId(), partitionId); + tPartition.setLoadTabletIdx(tabletIndex); + } partitionParam.addToPartitions(tPartition); DistributionInfo distInfo = partition.getDistributionInfo(); @@ -366,6 +374,11 @@ public class OlapTableSink extends DataSink { index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); tPartition.setNumBuckets(index.getTablets().size()); } + if (loadToSingleTablet) { + int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr() + .getCurrentLoadTabletIndex(dbId, table.getId(), partition.getId()); + tPartition.setLoadTabletIdx(tabletIndex); + } partitionParam.addToPartitions(tPartition); partitionParam.setDistributedColumns(getDistColumns(partition.getDistributionInfo())); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java new file mode 100644 index 00000000000..58ebd5424ee --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.MasterDaemon; + +import lombok.Getter; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ConcurrentHashMap; + +public class SingleTabletLoadRecorderMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(SingleTabletLoadRecorderMgr.class); + private static final long EXPIRY_TIME_INTERVAL_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days + + // <<db_id, table_id, partition_id> -> load_tablet_record> + // 0 =< load_tablet_index < number_buckets + private final ConcurrentHashMap<Triple<Long, Long, Long>, TabletUpdateRecord> loadTabletRecordMap = + new ConcurrentHashMap<>(); + + public SingleTabletLoadRecorderMgr() { + super("single_tablet_load_recorder", EXPIRY_TIME_INTERVAL_MS); + } + + @Override + protected void runAfterCatalogReady() { + long expiryTime = System.currentTimeMillis() - EXPIRY_TIME_INTERVAL_MS; + loadTabletRecordMap.entrySet().removeIf(entry -> + entry.getValue().getUpdateTimestamp() < expiryTime + ); + LOG.info("Remove expired load tablet record successfully."); + } + + public int getCurrentLoadTabletIndex(long dbId, long tableId, long partitionId) throws UserException { + Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partitionId); + TabletUpdateRecord record = loadTabletRecordMap.get(key); + int numBuckets = -1; + if (record == null) { + numBuckets = getNumBuckets(dbId, tableId, partitionId); + } + return createOrUpdateLoadTabletRecord(key, numBuckets); + } + + private int getNumBuckets(long dbId, long tableId, long partitionId) throws UserException { + OlapTable olapTable = (OlapTable) Env.getCurrentInternalCatalog().getDb(dbId) + .flatMap(db -> db.getTable(tableId)).filter(t -> t.getType() == TableIf.TableType.OLAP) + .orElse(null); + if (olapTable == null) { + throw new UserException("Olap table[" + dbId + "." + tableId + "] is not exist."); + } + return olapTable.getPartition(partitionId) + .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL) + .get(0).getTablets().size(); + } + + private int createOrUpdateLoadTabletRecord(Triple<Long, Long, Long> key, int numBuckets) { + TabletUpdateRecord record = loadTabletRecordMap.compute(key, (k, existingRecord) -> { + if (existingRecord == null) { + return new TabletUpdateRecord(0, numBuckets); + } else { + existingRecord.updateRecord(); + return existingRecord; + } + }); + return record.getTabletIndex(); + } + + static class TabletUpdateRecord { + @Getter + // 0 =< load_tablet_index < number_buckets + int tabletIndex; + int numBuckets; + @Getter + long updateTimestamp = System.currentTimeMillis(); + + TabletUpdateRecord(int tabletIndex, int numBuckets) { + this.tabletIndex = tabletIndex; + this.numBuckets = numBuckets; + } + + public synchronized void updateRecord() { + this.tabletIndex = this.tabletIndex + 1 >= numBuckets ? 0 : this.tabletIndex + 1; + // To reduce the compute time cost, only update timestamp when index is 0 + if (this.tabletIndex == 0) { + this.updateTimestamp = System.currentTimeMillis(); + } + } + + } +} diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 9e68c6fe09e..326a5bd0d0c 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -171,6 +171,8 @@ struct TOlapTablePartition { 9: optional bool is_mutable = true // only used in List Partition 10: optional bool is_default_partition; + // only used in load_to_single_tablet + 11: optional i64 load_tablet_idx } struct TOlapTablePartitionParam { diff --git a/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json b/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json new file mode 100644 index 00000000000..be1ebe5a86e --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json @@ -0,0 +1,10 @@ +{"k1":"2023-10-11", "k2":"json love anny", "k3":"json", "k4":"anny","k5":1} +{"k1":"2023-10-11", "k2":"andy hate banana", "k3":"andy", "k4":"banana","k5":2} +{"k1":"2023-10-11", "k2":"liam love apple", "k3":"liam", "k4":"apple","k5":3} +{"k1":"2023-10-11", "k2":"tom hate apple", "k3":"tom", "k4":"apple","k5":4} +{"k1":"2023-10-11", "k2":"lisa love pear", "k3":"lisa", "k4":"pear","k5":5} +{"k1":"2023-10-12", "k2":"json love anny", "k3":"json", "k4":"anny","k5":1} +{"k1":"2023-10-12", "k2":"andy hate banana", "k3":"andy", "k4":"banana","k5":2} +{"k1":"2023-10-12", "k2":"liam love apple", "k3":"liam", "k4":"apple","k5":3} +{"k1":"2023-10-12", "k2":"tom hate apple", "k3":"tom", "k4":"apple","k5":4} +{"k1":"2023-10-12", "k2":"lisa love pear", "k3":"lisa", "k4":"pear","k5":5} diff --git a/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy new file mode 100644 index 00000000000..82a24a8ad30 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonSlurper + +/** + * @Params url is "/xxx" + * @Return response body + */ +def http_get(url) { + def conn = new URL(url).openConnection() + conn.setRequestMethod("GET") + //token for root + return conn.getInputStream().getText() +} + +suite("test_load_to_single_tablet", "p0") { + sql "show tables" + + def tableName = "test_load_to_single_tablet" + + // test unpartitioned table + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` date NULL, + `k2` text NULL, + `k3` char(50) NULL, + `k4` varchar(200) NULL, + `k5` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY RANDOM BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // load first time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + + sql "sync" + def totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 10) + def res = sql "show tablets from ${tableName}" + def tabletMetaUrl1 = res[0][17] + def tabletMetaUrl2 = res[1][17] + def tabletMetaUrl3 = res[2][17] + def tabletMetaRes1 = http_get(tabletMetaUrl1) + def tabletMetaRes2 = http_get(tabletMetaUrl2) + def tabletMetaRes3 = http_get(tabletMetaUrl3) + + def obj1 = new JsonSlurper().parseText(tabletMetaRes1) + def obj2 = new JsonSlurper().parseText(tabletMetaRes2) + def obj3 = new JsonSlurper().parseText(tabletMetaRes3) + def rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + def rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + def rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + + assertEquals(rowCount1, 10) + assertEquals(rowCount2, 0) + assertEquals(rowCount3, 0) + + // load second time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 20) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + assertEquals(rowCount1, 10) + assertEquals(rowCount2, 10) + assertEquals(rowCount3, 0) + + // load third time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 30) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + obj1.rs_metas[3].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + obj2.rs_metas[3].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + obj3.rs_metas[3].num_rows + assertEquals(rowCount1, 10) + assertEquals(rowCount2, 10) + assertEquals(rowCount3, 10) + + // test partitioned table + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` date NULL, + `k2` text NULL, + `k3` char(50) NULL, + `k4` varchar(200) NULL, + `k5` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION p20231011 VALUES [('2023-10-11'), ('2023-10-12')), + PARTITION p20231012 VALUES [('2023-10-12'), ('2023-10-13')), + PARTITION p20231013 VALUES [('2023-10-13'), ('2023-10-14')), + PARTITION p20231014 VALUES [('2023-10-14'), ('2023-10-15'))) + DISTRIBUTED BY RANDOM BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // load first time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 10) + res = sql "show tablets from ${tableName} partitions(p20231011, p20231012)" + tabletMetaUrl1 = res[0][17] + tabletMetaUrl2 = res[1][17] + tabletMetaUrl3 = res[2][17] + tabletMetaUrl4 = res[10][17] + tabletMetaUrl5 = res[11][17] + tabletMetaUrl6 = res[12][17] + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + tabletMetaRes4 = http_get(tabletMetaUrl4) + tabletMetaRes5 = http_get(tabletMetaUrl5) + tabletMetaRes6 = http_get(tabletMetaUrl6) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + obj4 = new JsonSlurper().parseText(tabletMetaRes4) + obj5 = new JsonSlurper().parseText(tabletMetaRes5) + obj6 = new JsonSlurper().parseText(tabletMetaRes6) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + def rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + def rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + def rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + assertEquals(rowCount1, 5) + assertEquals(rowCount2, 0) + assertEquals(rowCount3, 0) + assertEquals(rowCount4, 5) + assertEquals(rowCount5, 0) + assertEquals(rowCount6, 0) + + // load second time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 20) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + tabletMetaRes4 = http_get(tabletMetaUrl4) + tabletMetaRes5 = http_get(tabletMetaUrl5) + tabletMetaRes6 = http_get(tabletMetaUrl6) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + obj4 = new JsonSlurper().parseText(tabletMetaRes4) + obj5 = new JsonSlurper().parseText(tabletMetaRes5) + obj6 = new JsonSlurper().parseText(tabletMetaRes6) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + obj4.rs_metas[2].num_rows + rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + obj5.rs_metas[2].num_rows + rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + obj6.rs_metas[2].num_rows + assertEquals(rowCount1, 5) + assertEquals(rowCount2, 5) + assertEquals(rowCount3, 0) + assertEquals(rowCount4, 5) + assertEquals(rowCount5, 5) + assertEquals(rowCount6, 0) + + // load third time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 30) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + tabletMetaRes4 = http_get(tabletMetaUrl4) + tabletMetaRes5 = http_get(tabletMetaUrl5) + tabletMetaRes6 = http_get(tabletMetaUrl6) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + obj4 = new JsonSlurper().parseText(tabletMetaRes4) + obj5 = new JsonSlurper().parseText(tabletMetaRes5) + obj6 = new JsonSlurper().parseText(tabletMetaRes6) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + obj1.rs_metas[3].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + obj2.rs_metas[3].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + obj3.rs_metas[3].num_rows + rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + obj4.rs_metas[2].num_rows + obj4.rs_metas[3].num_rows + rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + obj5.rs_metas[2].num_rows + obj5.rs_metas[3].num_rows + rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + obj6.rs_metas[2].num_rows + obj6.rs_metas[3].num_rows + assertEquals(rowCount1, 5) + assertEquals(rowCount2, 5) + assertEquals(rowCount3, 5) + assertEquals(rowCount4, 5) + assertEquals(rowCount5, 5) + assertEquals(rowCount6, 5) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org