This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 83edcdead96 [enhancement](random_sink) change tablet search algorithm from random to round-robin for random distribution table (#26611) 83edcdead96 is described below commit 83edcdead96d1e8994cd666dbfa15be558084477 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Wed Nov 15 19:55:31 2023 +0800 [enhancement](random_sink) change tablet search algorithm from random to round-robin for random distribution table (#26611) 1. fix race condition problem when get tablet load index 2. change tablet search algorithm from random to round-robin for random distribution table when load_to_single_tablet set to false --- be/src/exec/tablet_info.h | 18 +- be/src/olap/base_compaction.cpp | 2 +- be/src/vec/sink/vrow_distribution.cpp | 1 - be/src/vec/sink/vtablet_finder.cpp | 10 + be/src/vec/sink/vtablet_finder.h | 17 +- be/src/vec/sink/writer/vtablet_writer.cpp | 1 - .../main/java/org/apache/doris/catalog/Env.java | 12 +- .../org/apache/doris/planner/OlapTableSink.java | 18 +- .../doris/planner/SingleTabletLoadRecorderMgr.java | 112 --------- .../doris/planner/TabletLoadIndexRecorderMgr.java | 89 +++++++ gensrc/thrift/Descriptors.thrift | 2 +- .../test_insert_random_distribution_table.groovy | 279 +++++++++++++++++++++ 12 files changed, 410 insertions(+), 151 deletions(-) diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index bb9fbd8bc60..2e5f40bec35 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -127,7 +127,7 @@ struct VOlapTablePartition { int64_t num_buckets = 0; std::vector<OlapTableIndexTablets> indexes; bool is_mutable; - // -1 indicates load_to_single_tablet = false + // -1 indicates partition with hash distribution int64_t load_tablet_idx = -1; VOlapTablePartition(vectorized::Block* partition_block) @@ -187,7 +187,7 @@ public: const std::vector<VOlapTablePartition*>& partitions, std::vector<uint32_t>& tablet_indexes /*result*/, /*TODO: check if flat hash map will be better*/ - std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr) const { + std::map<VOlapTablePartition*, int64_t>* partition_tablets_buffer = nullptr) const { std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)> compute_function; if (!_distributed_slot_locs.empty()) { @@ -212,10 +212,9 @@ public: compute_function = [](vectorized::Block* block, uint32_t row, const VOlapTablePartition& partition) -> uint32_t { if (partition.load_tablet_idx == -1) { - // load_to_single_tablet = false, just do random + // for compatible with old version, just do random return butil::fast_rand() % partition.num_buckets; } - // load_to_single_tablet = ture, do round-robin return partition.load_tablet_idx % partition.num_buckets; }; } @@ -226,14 +225,15 @@ public: } } else { // use buffer for (auto index : indexes) { - auto& partition_id = partitions[index]->id; - if (auto it = partition_tablets_buffer->find(partition_id); + auto* partition = partitions[index]; + if (auto it = partition_tablets_buffer->find(partition); it != partition_tablets_buffer->end()) { tablet_indexes[index] = it->second; // tablet + } else { + // compute and save in buffer + (*partition_tablets_buffer)[partition] = tablet_indexes[index] = + compute_function(block, index, *partitions[index]); } - // compute and save in buffer - (*partition_tablets_buffer)[partition_id] = tablet_indexes[index] = - compute_function(block, index, *partitions[index]); } } } diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index eab41c73acb..6ae006709a7 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -98,7 +98,7 @@ Status BaseCompaction::execute_compact_impl() { void BaseCompaction::_filter_input_rowset() { // if dup_key and no delete predicate - // we skip big files too save resources + // we skip big files to save resources if (_tablet->keys_type() != KeysType::DUP_KEYS) { return; } diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 78d3b062045..0071629175f 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -269,7 +269,6 @@ Status VRowDistribution::generate_rows_distribution( RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); - _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); auto num_rows = block->rows(); _tablet_finder->filter_bitmap().Reset(num_rows); diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp index 5c37da6a7cf..865a3066d62 100644 --- a/be/src/vec/sink/vtablet_finder.cpp +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -95,8 +95,18 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) { _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index); } else { + // for random distribution _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index, &_partition_to_tablet_map); + if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { + for (auto it : _partition_to_tablet_map) { + // do round-robin for next batch + if (it.first->load_tablet_idx != -1) { + it.first->load_tablet_idx++; + } + } + _partition_to_tablet_map.clear(); + } } return Status::OK(); diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index 3426f7cb67d..bccdc39a066 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -30,12 +30,13 @@ namespace doris::vectorized { class OlapTabletFinder { public: - // FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we + // FIND_TABLET_EVERY_ROW is used for hash distribution info, which indicates that we // should compute tablet index for every row - // FIND_TABLET_EVERY_BATCH is only used for random distribution info, which indicates that we should + // FIND_TABLET_EVERY_BATCH is used for random distribution info, which indicates that we should // compute tablet index for every row batch - // FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should - // only compute tablet index in the corresponding partition once for the whole time in olap table sink + // FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true, + // which indicates that we should only compute tablet index in the corresponding partition once for the + // whole time in olap table sink enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode) @@ -50,12 +51,6 @@ public: return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; } - void clear_for_new_batch() { - if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { - _partition_to_tablet_map.clear(); - } - } - bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; } const vectorized::flat_hash_set<int64_t>& partition_ids() { return _partition_ids; } @@ -71,7 +66,7 @@ public: private: VOlapTablePartitionParam* _vpartition; FindTabletMode _find_tablet_mode; - std::map<int64_t, int64_t> _partition_to_tablet_map; + std::map<VOlapTablePartition*, int64_t> _partition_to_tablet_map; vectorized::flat_hash_set<int64_t> _partition_ids; int64_t _num_filtered_rows = 0; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 703959da80c..eb54d4b5dc3 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1603,7 +1603,6 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { _state->update_num_bytes_load_total(bytes); DorisMetrics::instance()->load_rows->increment(rows); DorisMetrics::instance()->load_bytes->increment(bytes); - // Random distribution and the block belongs to a single tablet, we could optimize to append the whole // block into node channel. bool load_block_to_single_tablet = diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1ed32913459..24945649912 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -207,7 +207,7 @@ import org.apache.doris.persist.TruncateTableInfo; import org.apache.doris.persist.meta.MetaHeader; import org.apache.doris.persist.meta.MetaReader; import org.apache.doris.persist.meta.MetaWriter; -import org.apache.doris.planner.SingleTabletLoadRecorderMgr; +import org.apache.doris.planner.TabletLoadIndexRecorderMgr; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.plugin.PluginMgr; import org.apache.doris.policy.PolicyMgr; @@ -331,7 +331,7 @@ public class Env { private LoadManager loadManager; private ProgressManager progressManager; private StreamLoadRecordMgr streamLoadRecordMgr; - private SingleTabletLoadRecorderMgr singleTabletLoadRecorderMgr; + private TabletLoadIndexRecorderMgr tabletLoadIndexRecorderMgr; private RoutineLoadManager routineLoadManager; private SqlBlockRuleMgr sqlBlockRuleMgr; private ExportMgr exportMgr; @@ -682,7 +682,7 @@ public class Env { this.progressManager = new ProgressManager(); this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000L); - this.singleTabletLoadRecorderMgr = new SingleTabletLoadRecorderMgr(); + this.tabletLoadIndexRecorderMgr = new TabletLoadIndexRecorderMgr(); this.loadEtlChecker = new LoadEtlChecker(loadManager); this.loadLoadingChecker = new LoadLoadingChecker(loadManager); this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager); @@ -1549,7 +1549,7 @@ public class Env { cooldownConfHandler.start(); } streamLoadRecordMgr.start(); - singleTabletLoadRecorderMgr.start(); + tabletLoadIndexRecorderMgr.start(); getInternalCatalog().getIcebergTableCreationRecordMgr().start(); new InternalSchemaInitializer().start(); if (Config.enable_hms_events_incremental_sync) { @@ -3758,8 +3758,8 @@ public class Env { return streamLoadRecordMgr; } - public SingleTabletLoadRecorderMgr getSingleTabletLoadRecorderMgr() { - return singleTabletLoadRecorderMgr; + public TabletLoadIndexRecorderMgr getTabletLoadIndexRecorderMgr() { + return tabletLoadIndexRecorderMgr; } public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 23bec446f4b..fa3b4abee9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.Index; @@ -109,8 +110,6 @@ public class OlapTableSink extends DataSink { private boolean isStrictMode = false; - private boolean loadToSingleTablet; - public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds, boolean singleReplicaLoad) { this.dstTable = dstTable; @@ -134,7 +133,6 @@ public class OlapTableSink extends DataSink { "if load_to_single_tablet set to true," + " the olap table must be with random distribution"); } tSink.setLoadToSingleTablet(loadToSingleTablet); - this.loadToSingleTablet = loadToSingleTablet; tDataSink = new TDataSink(getDataSinkType()); tDataSink.setOlapTableSink(tSink); @@ -344,11 +342,12 @@ public class OlapTableSink extends DataSink { tPartition.setNumBuckets(index.getTablets().size()); } tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId)); - if (loadToSingleTablet) { - int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr() - .getCurrentLoadTabletIndex(dbId, table.getId(), partitionId); + if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) { + int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr() + .getCurrentTabletLoadIndex(dbId, table.getId(), partition); tPartition.setLoadTabletIdx(tabletIndex); } + partitionParam.addToPartitions(tPartition); DistributionInfo distInfo = partition.getDistributionInfo(); @@ -403,9 +402,10 @@ public class OlapTableSink extends DataSink { index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); tPartition.setNumBuckets(index.getTablets().size()); } - if (loadToSingleTablet) { - int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr() - .getCurrentLoadTabletIndex(dbId, table.getId(), partition.getId()); + + if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) { + int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr() + .getCurrentTabletLoadIndex(dbId, table.getId(), partition); tPartition.setLoadTabletIdx(tabletIndex); } partitionParam.addToPartitions(tPartition); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java deleted file mode 100644 index 89f21c47dc2..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java +++ /dev/null @@ -1,112 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner; - -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.MasterDaemon; - -import lombok.Getter; -import org.apache.commons.lang3.tuple.Triple; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.concurrent.ConcurrentHashMap; - -public class SingleTabletLoadRecorderMgr extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(SingleTabletLoadRecorderMgr.class); - private static final long EXPIRY_TIME_INTERVAL_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days - - // <<db_id, table_id, partition_id> -> load_tablet_record> - // 0 =< load_tablet_index < number_buckets - private final ConcurrentHashMap<Triple<Long, Long, Long>, TabletUpdateRecord> loadTabletRecordMap = - new ConcurrentHashMap<>(); - - public SingleTabletLoadRecorderMgr() { - super("single_tablet_load_recorder", EXPIRY_TIME_INTERVAL_MS); - } - - @Override - protected void runAfterCatalogReady() { - long expiryTime = System.currentTimeMillis() - EXPIRY_TIME_INTERVAL_MS; - loadTabletRecordMap.entrySet().removeIf(entry -> - entry.getValue().getUpdateTimestamp() < expiryTime - ); - LOG.info("Remove expired load tablet record successfully."); - } - - public int getCurrentLoadTabletIndex(long dbId, long tableId, long partitionId) throws UserException { - Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partitionId); - TabletUpdateRecord record = loadTabletRecordMap.get(key); - int numBuckets = -1; - if (record == null) { - numBuckets = getNumBuckets(dbId, tableId, partitionId); - } - return createOrUpdateLoadTabletRecord(key, numBuckets); - } - - private int getNumBuckets(long dbId, long tableId, long partitionId) throws UserException { - OlapTable olapTable = (OlapTable) Env.getCurrentInternalCatalog().getDb(dbId) - .flatMap(db -> db.getTable(tableId)).filter(t -> t.getType() == TableIf.TableType.OLAP) - .orElse(null); - if (olapTable == null) { - throw new UserException("Olap table[" + dbId + "." + tableId + "] is not exist."); - } - return olapTable.getPartition(partitionId) - .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL) - .get(0).getTablets().size(); - } - - private int createOrUpdateLoadTabletRecord(Triple<Long, Long, Long> key, int numBuckets) { - TabletUpdateRecord record = loadTabletRecordMap.compute(key, (k, existingRecord) -> { - if (existingRecord == null) { - return new TabletUpdateRecord(0, numBuckets); - } else { - existingRecord.updateRecord(); - return existingRecord; - } - }); - return record.getTabletIndex(); - } - - static class TabletUpdateRecord { - @Getter - // 0 =< load_tablet_index < number_buckets - int tabletIndex; - int numBuckets; - @Getter - long updateTimestamp = System.currentTimeMillis(); - - TabletUpdateRecord(int tabletIndex, int numBuckets) { - this.tabletIndex = tabletIndex; - this.numBuckets = numBuckets; - } - - public synchronized void updateRecord() { - this.tabletIndex = this.tabletIndex + 1 >= numBuckets ? 0 : this.tabletIndex + 1; - // To reduce the compute time cost, only update timestamp when index is 0 - if (this.tabletIndex == 0) { - this.updateTimestamp = System.currentTimeMillis(); - } - } - - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java new file mode 100644 index 00000000000..3b14445653d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.MasterDaemon; + +import lombok.Getter; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ConcurrentHashMap; + +public class TabletLoadIndexRecorderMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(TabletLoadIndexRecorderMgr.class); + private static final long TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days + private static final long TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS = 3600000; // 1 hour + private static final int TIMES_FOR_UPDATE_TIMESTAMP = 1000; + + // <<db_id, table_id, partition_id> -> load_tablet_record> + // 0 =< load_tablet_index < number_buckets + private final ConcurrentHashMap<Triple<Long, Long, Long>, TabletLoadIndexRecord> loadTabletRecordMap = + new ConcurrentHashMap<>(); + + public TabletLoadIndexRecorderMgr() { + super("tablet_load_index_recorder", TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS); + } + + @Override + protected void runAfterCatalogReady() { + int originRecordSize = loadTabletRecordMap.size(); + long expireTime = System.currentTimeMillis() - TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS; + loadTabletRecordMap.entrySet().removeIf(entry -> + entry.getValue().getUpdateTimestamp() < expireTime + ); + int currentRecordSize = loadTabletRecordMap.size(); + LOG.info("Remove expired load tablet index record successfully, before {}, current {}", + originRecordSize, currentRecordSize); + } + + public int getCurrentTabletLoadIndex(long dbId, long tableId, Partition partition) throws UserException { + Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partition.getId()); + return loadTabletRecordMap.compute(key, (k, existingRecord) -> + existingRecord == null ? new TabletLoadIndexRecord(partition.getVisibleVersion() - 1, + partition.getDistributionInfo().getBucketNum()) : existingRecord).getAndIncrement(); + } + + static class TabletLoadIndexRecord { + int loadIndex; + int numBuckets; + @Getter + long updateTimestamp = System.currentTimeMillis(); + + TabletLoadIndexRecord(long initialIndex, int numBuckets) { + this.loadIndex = (int) (initialIndex % numBuckets); + this.numBuckets = numBuckets; + } + + public synchronized int getAndIncrement() { + int tabletLoadIndex = loadIndex % numBuckets; + loadIndex++; + // To reduce the compute time cost, only update timestamp when load index is + // greater than or equal to both TIMES_FOR_UPDATE_TIMESTAMP and numBuckets + if (loadIndex >= Math.max(TIMES_FOR_UPDATE_TIMESTAMP, numBuckets)) { + loadIndex = loadIndex % numBuckets; + this.updateTimestamp = System.currentTimeMillis(); + } + return tabletLoadIndex; + } + + } +} diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 21bea8cac59..abaf8f8967d 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -178,7 +178,7 @@ struct TOlapTablePartition { 9: optional bool is_mutable = true // only used in List Partition 10: optional bool is_default_partition; - // only used in load_to_single_tablet + // only used in random distribution scenario to make data distributed even 11: optional i64 load_tablet_idx } diff --git a/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy b/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy new file mode 100644 index 00000000000..4630264ab41 --- /dev/null +++ b/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_insert_random_distribution_table", "p0") { + def tableName = "test_insert_random_distribution_table" + + // ${tableName} unpartitioned table + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` date NULL, + `k2` char(100) NULL, + `v1` char(100) NULL, + `v2` text NULL, + `v3` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY RANDOM BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql "set batch_size=2" + // insert first time + sql "insert into ${tableName} values('2021-11-14', '2', '3', '4', 55), ('2022-12-13', '3', '31', '4', 55), ('2023-10-14', '23', '45', '66', 88), ('2023-10-16', '2', '3', '4', 55)" + + sql "sync" + def totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 4) + def res = sql "show tablets from ${tableName}" + def tabletId1 = res[0][0] + def tabletId2 = res[1][0] + def tabletId3 = res[2][0] + def tabletId4 = res[3][0] + def tabletId5 = res[4][0] + + def rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})" + def rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})" + def rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})" + def rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})" + def rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})" + + assertEquals(rowCount1[0][0], 3) + assertEquals(rowCount2[0][0], 1) + assertEquals(rowCount3[0][0], 0) + assertEquals(rowCount4[0][0], 0) + assertEquals(rowCount5[0][0], 0) + + sql "set batch_size=2" + // insert second time + sql "insert into ${tableName} values('2021-11-14', '2', '3', '4', 55), ('2022-12-13', '3', '31', '4', 55), ('2023-10-14', '23', '45', '66', 88), ('2023-10-16', '2', '3', '4', 55)" + + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 8) + + rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})" + rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})" + rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})" + rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})" + rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})" + + assertEquals(rowCount1[0][0], 3) + assertEquals(rowCount2[0][0], 4) + assertEquals(rowCount3[0][0], 1) + assertEquals(rowCount4[0][0], 0) + assertEquals(rowCount5[0][0], 0) + + sql "set batch_size=2" + // insert third time + sql "insert into ${tableName} values('2021-11-14', '2', '3', '4', 55), ('2022-12-13', '3', '31', '4', 55), ('2023-10-14', '23', '45', '66', 88), ('2023-10-16', '2', '3', '4', 55)" + + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 12) + + rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})" + rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})" + rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})" + rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})" + rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})" + + assertEquals(rowCount1[0][0], 3) + assertEquals(rowCount2[0][0], 4) + assertEquals(rowCount3[0][0], 4) + assertEquals(rowCount4[0][0], 1) + assertEquals(rowCount5[0][0], 0) + + // ${tableName} partitioned table + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` date NULL, + `k2` text NULL, + `k3` char(50) NULL, + `k4` varchar(200) NULL, + `k5` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION p20231011 VALUES [('2023-10-11'), ('2023-10-12')), + PARTITION p20231012 VALUES [('2023-10-12'), ('2023-10-13')), + PARTITION p20231013 VALUES [('2023-10-13'), ('2023-10-14'))) + DISTRIBUTED BY RANDOM BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql "set batch_size=1" + // insert first time + sql "insert into ${tableName} values('2023-10-11', '2', '3', '4', 55), ('2023-10-11', '3', '31', '4', 55), ('2023-10-11', '23', '45', '66', 88), ('2023-10-12', '2', '3', '4', 55),('2023-10-12', '12', '13', '4', 55)" + + sql "sync" + totalCount = sql "select count() from ${tableName}" + def partition1 = "p20231011" + def partition2 = "p20231012" + def partition3 = "p20231013" + assertEquals(totalCount[0][0], 5) + res = sql "show tablets from ${tableName} partition ${partition1}" + def tabletId11 = res[0][0] + def tabletId12 = res[1][0] + def tabletId13 = res[2][0] + def tabletId14 = res[3][0] + def tabletId15 = res[4][0] + + res = sql "show tablets from ${tableName} partition ${partition2}" + def tabletId21 = res[0][0] + def tabletId22 = res[1][0] + def tabletId23 = res[2][0] + def tabletId24 = res[3][0] + def tabletId25 = res[4][0] + + res = sql "show tablets from ${tableName} partition ${partition3}" + def tabletId31 = res[0][0] + def tabletId32 = res[1][0] + def tabletId33 = res[2][0] + def tabletId34 = res[3][0] + def tabletId35 = res[4][0] + + def rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})" + def rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})" + def rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})" + def rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})" + def rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})" + + def rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})" + def rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})" + def rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})" + def rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})" + def rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})" + + def rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})" + def rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})" + def rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})" + def rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})" + def rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})" + + assertEquals(rowCount11[0][0], 2) + assertEquals(rowCount12[0][0], 1) + assertEquals(rowCount13[0][0], 0) + assertEquals(rowCount14[0][0], 0) + assertEquals(rowCount15[0][0], 0) + + assertEquals(rowCount21[0][0], 1) + assertEquals(rowCount22[0][0], 1) + assertEquals(rowCount23[0][0], 0) + assertEquals(rowCount24[0][0], 0) + assertEquals(rowCount25[0][0], 0) + + assertEquals(rowCount31[0][0], 0) + assertEquals(rowCount32[0][0], 0) + assertEquals(rowCount33[0][0], 0) + assertEquals(rowCount34[0][0], 0) + assertEquals(rowCount35[0][0], 0) + + sql "set batch_size=1" + // insert second time + sql "insert into ${tableName} values('2023-10-12', '2', '3', '4', 55), ('2023-10-12', '3', '31', '4', 55), ('2023-10-12', '23', '45', '66', 88), ('2023-10-13', '2', '3', '4', 55),('2023-10-11', '11', '13', '4', 55)" + + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 10) + + rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})" + rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})" + rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})" + rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})" + rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})" + + rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})" + rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})" + rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})" + rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})" + rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})" + + rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})" + rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})" + rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})" + rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})" + rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})" + + assertEquals(rowCount11[0][0], 2) + assertEquals(rowCount12[0][0], 2) + assertEquals(rowCount13[0][0], 0) + assertEquals(rowCount14[0][0], 0) + assertEquals(rowCount15[0][0], 0) + + assertEquals(rowCount21[0][0], 1) + assertEquals(rowCount22[0][0], 3) + assertEquals(rowCount23[0][0], 1) + assertEquals(rowCount24[0][0], 0) + assertEquals(rowCount25[0][0], 0) + + assertEquals(rowCount31[0][0], 0) + assertEquals(rowCount32[0][0], 1) + assertEquals(rowCount33[0][0], 0) + assertEquals(rowCount34[0][0], 0) + assertEquals(rowCount35[0][0], 0) + + sql "set batch_size=1" + // insert third time + sql "insert into ${tableName} values('2023-10-13', '2', '3', '4', 55), ('2023-10-13', '3', '31', '4', 55), ('2023-10-12', '23', '45', '66', 88), ('2023-10-13', '2', '3', '4', 55),('2023-10-11', '11', '13', '4', 55),('2023-10-11', '13', '145', '4', 55)" + sql "set batch_size=4064" + + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 16) + + rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})" + rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})" + rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})" + rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})" + rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})" + + rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})" + rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})" + rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})" + rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})" + rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})" + + rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})" + rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})" + rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})" + rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})" + rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})" + + assertEquals(rowCount11[0][0], 2) + assertEquals(rowCount12[0][0], 2) + assertEquals(rowCount13[0][0], 2) + assertEquals(rowCount14[0][0], 0) + assertEquals(rowCount15[0][0], 0) + + assertEquals(rowCount21[0][0], 1) + assertEquals(rowCount22[0][0], 3) + assertEquals(rowCount23[0][0], 2) + assertEquals(rowCount24[0][0], 0) + assertEquals(rowCount25[0][0], 0) + + assertEquals(rowCount31[0][0], 0) + assertEquals(rowCount32[0][0], 1) + assertEquals(rowCount33[0][0], 2) + assertEquals(rowCount34[0][0], 1) + assertEquals(rowCount35[0][0], 0) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org