This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch short-circuit-in in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/short-circuit-in by this push: new e00b9400562 [Enhancement](Short Circuit) short circuit query supports `IN` (#39468) e00b9400562 is described below commit e00b9400562a23bdf8ee416f47366b53534ce2c0 Author: Xr Ling <63634816+lxr...@users.noreply.github.com> AuthorDate: Wed Jan 1 22:19:08 2025 +0800 [Enhancement](Short Circuit) short circuit query supports `IN` (#39468) ## Proposed changes short circuit query support `IN` <!--Describe your changes.--> --------- Co-authored-by: lihangyu <15605149...@163.com> Co-authored-by: Liuyushiii <liuyushi...@163.com> Co-authored-by: eldenmoon <lihan...@selectdb.com> --- be/src/olap/base_tablet.cpp | 1 - be/src/service/internal_service.cpp | 29 + be/src/service/internal_service.h | 5 + be/src/service/point_query_executor.cpp | 5 + .../org/apache/doris/catalog/PartitionKey.java | 23 + .../LogicalResultSinkToShortCircuitPointQuery.java | 3 +- .../doris/planner/HashDistributionPruner.java | 30 +- .../org/apache/doris/planner/OlapScanNode.java | 81 ++- .../PartitionPruneV2ForShortCircuitPlan.java | 41 +- .../doris/planner/PartitionPrunerV2Base.java | 52 +- .../doris/planner/RangePartitionPrunerV2.java | 66 ++- .../org/apache/doris/qe/PointQueryExecutor.java | 599 +++++++++++++++++---- .../org/apache/doris/rpc/BackendServiceClient.java | 5 + .../org/apache/doris/rpc/BackendServiceProxy.java | 12 + gensrc/proto/internal_service.proto | 11 + .../data/point_query_p0/test_point_IN_query.out | 27 + .../data/point_query_p0/test_point_query.out | 179 ++++++ .../point_query_p0/test_point_query_partition.out | 25 + .../data/point_query_p0/test_rowstore.out | 30 ++ .../data/point_query_p0/test_rowstore_query.out | 6 +- .../prepared_stmt_p0/prepared_stmt_in_list.out | 6 + .../org/apache/doris/regression/suite/Suite.groovy | 6 +- .../test_row_store_page_size.groovy | 4 +- .../test_dynamic_partition_point_query.groovy | 122 +++++ .../point_query_p0/test_point_IN_query.groovy | 85 +++ .../suites/point_query_p0/test_point_query.groovy | 400 ++++++++++++++ .../point_query_p0/test_point_query_ck.groovy | 80 ++- .../test_point_query_partition.groovy | 33 +- .../suites/point_query_p0/test_rowstore.groovy | 70 +++ .../point_query_p0/test_rowstore_query.groovy | 3 +- .../prepared_stmt_p0/prepared_stmt_in_list.groovy | 39 ++ 31 files changed, 1910 insertions(+), 168 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 40c5443477d..dc3b50688b6 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -495,7 +495,6 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest } auto& segments = segment_caches[i]->get_segments(); DCHECK_EQ(segments.size(), num_segments); - for (auto id : picked_segments) { Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid, &loc, stats, encoded_seq_value); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index fb0b2f090bc..b58b697036d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -948,6 +948,35 @@ void PInternalService::tablet_fetch_data(google::protobuf::RpcController* contro } } +void PInternalService::tablet_batch_fetch_data(google::protobuf::RpcController* controller, + const PTabletBatchKeyLookupRequest* batchRequest, + PTabletBatchKeyLookupResponse* batchResponse, + google::protobuf::Closure* done) { + int request_count = batchRequest->sub_key_lookup_req_size(); + batchResponse->mutable_sub_key_lookup_res()->Reserve(request_count); + [[maybe_unused]] auto* cntl = static_cast<brpc::Controller*>(controller); + bool ret = + _light_work_pool.try_offer([this, batchRequest, batchResponse, done, request_count]() { + Status st = Status::OK(); + brpc::ClosureGuard guard(done); + for (int i = 0; i < request_count; ++i) { + batchResponse->add_sub_key_lookup_res(); + const PTabletKeyLookupRequest* request = &batchRequest->sub_key_lookup_req(i); + PTabletKeyLookupResponse* response = + batchResponse->mutable_sub_key_lookup_res(i); + Status status = _tablet_fetch_data(request, response); + status.to_protobuf(response->mutable_status()); + if (!status.ok()) { + st = status; + } + } + st.to_protobuf(batchResponse->mutable_status()); + }); + if (!ret) { + offer_failed(batchResponse, done, _light_work_pool); + } +} + void PInternalService::test_jdbc_connection(google::protobuf::RpcController* controller, const PJdbcTestConnectionRequest* request, PJdbcTestConnectionResult* result, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 66a0f867393..0c683ccc3ff 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -226,6 +226,11 @@ public: PTabletKeyLookupResponse* response, google::protobuf::Closure* done) override; + void tablet_batch_fetch_data(google::protobuf::RpcController* controller, + const PTabletBatchKeyLookupRequest* batchRequest, + PTabletBatchKeyLookupResponse* batchResponse, + google::protobuf::Closure* done) override; + void test_jdbc_connection(google::protobuf::RpcController* controller, const PJdbcTestConnectionRequest* request, PJdbcTestConnectionResult* result, diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index ea991e158a1..3f77a9f2d2b 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -262,6 +262,11 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, auto cache_handle = LookupConnectionCache::instance()->get(uuid); _binary_row_format = request->is_binary_row(); _tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id())); + + if (_tablet->tablet_meta()->replica_id() != request->replica_id()) { + return Status::OK(); + } + if (cache_handle != nullptr) { _reusable = cache_handle; _profile_metrics.hit_lookup_cache = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java index 29bfda8b201..854cdd27f50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.zip.CRC32; @@ -79,6 +80,24 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable { types = Lists.newArrayList(); } + private PartitionKey(PartitionKey other) { + this.keys = new ArrayList<>(other.keys.size()); + for (LiteralExpr expr : other.keys) { + try { + String value = expr.getStringValue(); + if ("null".equalsIgnoreCase(value)) { + this.keys.add(NullLiteral.create(expr.getType())); + } else { + this.keys.add(LiteralExpr.create(value, expr.getType())); + } + } catch (Exception e) { + throw new RuntimeException("Create partition key failed: " + e.getMessage()); + } + } + this.originHiveKeys = new ArrayList<>(other.originHiveKeys); + this.types = new ArrayList<>(other.types); + } + public void setDefaultListPartition(boolean isDefaultListPartitionKey) { this.isDefaultListPartitionKey = isDefaultListPartitionKey; } @@ -205,6 +224,10 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable { return createListPartitionKeyWithTypes(values, types, false); } + public static PartitionKey clone(PartitionKey other) { + return new PartitionKey(other); + } + public void pushColumn(LiteralExpr keyValue, PrimitiveType keyType) { keys.add(keyValue); types.add(keyType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java index c087dcbb37b..56103f863d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.InPredicate; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; @@ -53,7 +54,7 @@ public class LogicalResultSinkToShortCircuitPointQuery implements RewriteRuleFac private boolean filterMatchShortCircuitCondition(LogicalFilter<LogicalOlapScan> filter) { return filter.getConjuncts().stream().allMatch( // all conjuncts match with pattern `key = ?` - expression -> (expression instanceof EqualTo) + expression -> ((expression instanceof EqualTo) || expression instanceof InPredicate) && (removeCast(expression.child(0)).isKeyColumnFromTable() || (expression.child(0) instanceof SlotReference && ((SlotReference) expression.child(0)).getName().equals(Column.DELETE_SIGN))) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java index 2a60e4029a6..38bfe6820fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java @@ -26,9 +26,8 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.common.Config; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.List; @@ -49,8 +48,6 @@ import java.util.Set; * If depth is larger than 'max_distribution_pruner_recursion_depth', all buckets will be return without pruning. */ public class HashDistributionPruner implements DistributionPruner { - private static final Logger LOG = LogManager.getLogger(HashDistributionPruner.class); - // partition list, sort by the hash code private List<Long> bucketsList; // partition columns @@ -61,6 +58,18 @@ public class HashDistributionPruner implements DistributionPruner { private boolean isBaseIndexSelected; + /* + * This map maintains a relationship between distribution keys and their corresponding tablet IDs. + * For example, if the distribution columns are (k1, k2, k3), + * and the tuple (1, 2, 3) is hashed into bucket 1001, + * then the `distributionKey2TabletIDs` would map the key (1, 2, 3) to the tablet ID 1001. + * (1, 2, 3) -> 1001 + * Map structure: + * - Key: PartitionKey, representing a specific combination of distribution columns (e.g., k1, k2, k3). + * - Value: Set<Long>, containing the tablet IDs associated with the corresponding distribution key. + */ + private Map<PartitionKey, Set<Long>> distributionKey2TabletIDs = Maps.newHashMap(); + public HashDistributionPruner(List<Long> bucketsList, List<Column> columns, Map<String, PartitionColumnFilter> filters, int hashMod, boolean isBaseIndexSelected) { this.bucketsList = bucketsList; @@ -70,13 +79,21 @@ public class HashDistributionPruner implements DistributionPruner { this.isBaseIndexSelected = isBaseIndexSelected; } + public Map<PartitionKey, Set<Long>> getDistributionKeysTabletIDs() { + return distributionKey2TabletIDs; + } + // columnId: which column to compute // hashKey: the key which to compute hash value public Collection<Long> prune(int columnId, PartitionKey hashKey, int complex) { if (columnId == distributionColumns.size()) { // compute Hash Key long hashValue = hashKey.getHashValue(); - return Lists.newArrayList(bucketsList.get((int) ((hashValue & 0xffffffff) % hashMod))); + List<Long> result = + Lists.newArrayList(bucketsList.get((int) ((hashValue & 0xffffffff) % hashMod))); + distributionKey2TabletIDs.computeIfAbsent(PartitionKey.clone(hashKey), + k -> Sets.newHashSet(result)).addAll(result); + return result; } Column keyColumn = distributionColumns.get(columnId); String columnName = isBaseIndexSelected ? keyColumn.getName() @@ -119,9 +136,6 @@ public class HashDistributionPruner implements DistributionPruner { Collection<Long> subList = prune(columnId + 1, hashKey, newComplex); resultSet.addAll(subList); hashKey.popColumn(); - if (resultSet.size() >= bucketsList.size()) { - break; - } } return resultSet; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 92175523f22..3756dc985c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -27,7 +27,6 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; -import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; @@ -52,6 +51,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Partition.PartitionState; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ScalarType; @@ -98,9 +98,13 @@ import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; import com.google.common.collect.Sets; +import com.google.common.collect.Table; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -192,7 +196,21 @@ public class OlapScanNode extends ScanNode { private Set<Long> sampleTabletIds = Sets.newHashSet(); private TableSample tableSample; + private HashSet<Long> scanBackendIds = new HashSet<>(); + + private PartitionPruner partitionPruner = null; + private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap(); + + // Maps partition column names to a RangeMap that associates ColumnBound ranges with lists of partition IDs, + // similar to the implementation in PartitionPrunerV2Base. + private Map<String, RangeMap<ColumnBound, List<Long>>> partitionCol2PartitionID = Maps.newHashMap(); + + private Map<PartitionKey, Set<Long>> distributionKeys2TabletID = Maps.newHashMap(); + + /// tablet id -> (backend id -> replica) + private Table<Long, Long, Replica> scanBackendReplicaTable = HashBasedTable.create(); + // a bucket seq may map to many tablets, and each tablet has a // TScanRangeLocations. public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations = ArrayListMultimap.create(); @@ -257,6 +275,14 @@ public class OlapScanNode extends ScanNode { return scanBackendIds; } + public Map<String, RangeMap<ColumnBound, List<Long>>> getPartitionCol2PartitionID() { + return partitionCol2PartitionID; + } + + public Map<PartitionKey, Set<Long>> getDistributionKeys2TabletID() { + return distributionKeys2TabletID; + } + public void setSampleTabletIds(List<Long> sampleTablets) { if (sampleTablets != null) { this.sampleTabletIds.addAll(sampleTablets); @@ -292,6 +318,10 @@ public class OlapScanNode extends ScanNode { return scanTabletIds; } + public Table<Long, Long, Replica> getScanBackendReplicaTable() { + return scanBackendReplicaTable; + } + public void setForceOpenPreAgg(boolean forceOpenPreAgg) { this.forceOpenPreAgg = forceOpenPreAgg; } @@ -656,9 +686,9 @@ public class OlapScanNode extends ScanNode { cardinality = (long) statsDeriveResult.getRowCount(); } + // get the pruned partition IDs private Collection<Long> partitionPrune(PartitionInfo partitionInfo, PartitionNames partitionNames) throws AnalysisException { - PartitionPruner partitionPruner = null; Map<Long, PartitionItem> keyItemMap; if (partitionNames != null) { keyItemMap = Maps.newHashMap(); @@ -675,13 +705,12 @@ public class OlapScanNode extends ScanNode { if (partitionInfo.getType() == PartitionType.RANGE) { if (isPointQuery() && partitionInfo.getPartitionColumns().size() == 1) { // short circuit, a quick path to find partition - ColumnRange filterRange = columnNameToRange.get(partitionInfo.getPartitionColumns().get(0).getName()); - LiteralExpr lowerBound = filterRange.getRangeSet().get().asRanges().stream() - .findFirst().get().lowerEndpoint().getValue(); - LiteralExpr upperBound = filterRange.getRangeSet().get().asRanges().stream() - .findFirst().get().upperEndpoint().getValue(); + Column col = partitionInfo.getPartitionColumns().get(0); + // todo: support range query + Set<Range<ColumnBound>> filterRanges = + columnNameToRange.get(col.getName()).getRangeSet().get().asRanges(); cachedPartitionPruner.update(keyItemMap); - return cachedPartitionPruner.prune(lowerBound, upperBound); + return cachedPartitionPruner.prune(filterRanges, col.getName(), partitionCol2PartitionID); } partitionPruner = new RangePartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(), columnNameToRange); @@ -699,12 +728,22 @@ public class OlapScanNode extends ScanNode { switch (distributionInfo.getType()) { case HASH: { HashDistributionInfo info = (HashDistributionInfo) distributionInfo; - distributionPruner = new HashDistributionPruner(table.getTabletIdsInOrder(), + distributionPruner = + new HashDistributionPruner(table.getTabletIdsInOrder(), info.getDistributionColumns(), columnFilters, info.getBucketNum(), getSelectedIndexId() == olapTable.getBaseIndexId()); - return distributionPruner.prune(); + HashDistributionPruner hashPruner = (HashDistributionPruner) distributionPruner; + Collection<Long> resultIDs = hashPruner.prune(); + Map<PartitionKey, Set<Long>> newPrunedIDs = hashPruner.getDistributionKeysTabletIDs(); + for (Map.Entry<PartitionKey, Set<Long>> entry : newPrunedIDs.entrySet()) { + distributionKeys2TabletID.merge(entry.getKey(), entry.getValue(), (existingSet, newSet) -> { + existingSet.addAll(newSet); + return existingSet; + }); + } + return resultIDs; } case RANDOM: { return null; @@ -960,6 +999,7 @@ public class OlapScanNode extends ScanNode { collectedStat = true; } scanBackendIds.add(backend.getId()); + scanBackendReplicaTable.put(tabletId, backend.getId(), replica); // For skipping missing version of tablet, we only select the backend with the highest last // success version replica to save as much data as possible. if (skipMissingVersion) { @@ -1030,10 +1070,20 @@ public class OlapScanNode extends ScanNode { // Step1: compute partition ids PartitionNames partitionNames = ((BaseTableRef) desc.getRef()).getPartitionNames(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { - selectedPartitionIds = partitionPrune(partitionInfo, partitionNames); - } else { - selectedPartitionIds = olapTable.getPartitionIds(); + switch (partitionInfo.getType()) { + case RANGE: + selectedPartitionIds = partitionPrune(partitionInfo, partitionNames); + if (isPointQuery() && partitionPruner instanceof RangePartitionPrunerV2) { + RangePartitionPrunerV2 rangePartitionPruner = (RangePartitionPrunerV2) partitionPruner; + this.partitionCol2PartitionID = rangePartitionPruner.getPartitionCol2PartitionID(); + } + break; + case LIST: + selectedPartitionIds = partitionPrune(partitionInfo, partitionNames); + break; + default: + selectedPartitionIds = olapTable.getPartitionIds(); + break; } selectedPartitionIds = olapTable.selectNonEmptyPartitionIds(selectedPartitionIds); selectedPartitionNum = selectedPartitionIds.size(); @@ -1336,6 +1386,8 @@ public class OlapScanNode extends ScanNode { // Lazy evaluation selectedIndexId = olapTable.getBaseIndexId(); // Only key columns + distributionKeys2TabletID.clear(); + partitionCol2PartitionID.clear(); computeColumnsFilter(olapTable.getBaseSchemaKeyColumns(), olapTable.getPartitionInfo()); computePartitionInfo(); scanBackendIds.clear(); @@ -1343,6 +1395,7 @@ public class OlapScanNode extends ScanNode { bucketSeq2locations.clear(); scanReplicaIds.clear(); sampleTabletIds.clear(); + scanBackendReplicaTable.clear(); try { createScanRangeLocations(); } catch (AnalysisException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java index 0b98cac9724..fe6c7474bc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java @@ -23,18 +23,24 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.common.AnalysisException; +import com.google.common.collect.Lists; import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeRangeMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base { private static final Logger LOG = LogManager.getLogger(PartitionPruneV2ForShortCircuitPlan.class); // map to record literal range to find specific partition - private RangeMap<LiteralExpr, Long> partitionRangeMapByLiteral = new RangeMap<>(); + private RangeMap<ColumnBound, List<Long>> partitionRangeMap = TreeRangeMap.create(); // last timestamp partitionRangeMapByLiteral updated private long lastPartitionRangeMapUpdateTimestampMs = 0; @@ -42,19 +48,36 @@ public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base { super(); } + public static <C extends Comparable<C>, V> Set<V> + getOverlappingRangeValues(RangeMap<C, List<V>> partRangeMap, Set<Range<C>> ranges) { + Set<V> partitionIds = Sets.newHashSet(); + for (Range<C> range : ranges) { + Map<Range<C>, List<V>> overlappingRanges = partRangeMap.subRangeMap(range).asMapOfRanges(); + for (Map.Entry<Range<C>, List<V>> entry : overlappingRanges.entrySet()) { + partitionIds.addAll(entry.getValue()); + } + } + return partitionIds; + } + + public RangeMap<ColumnBound, List<Long>> getPartitionColValue2PartitionID() { + return partitionRangeMap; + } + public boolean update(Map<Long, PartitionItem> keyItemMap) { // interval to update partitionRangeMapByLiteral long partitionRangeMapUpdateIntervalS = 10; if (System.currentTimeMillis() - lastPartitionRangeMapUpdateTimestampMs > partitionRangeMapUpdateIntervalS * 1000) { - partitionRangeMapByLiteral = new RangeMap<>(); + partitionRangeMap = TreeRangeMap.create(); // recalculate map for (Entry<Long, PartitionItem> entry : keyItemMap.entrySet()) { Range<PartitionKey> range = entry.getValue().getItems(); LiteralExpr partitionLowerBound = (LiteralExpr) range.lowerEndpoint().getKeys().get(0); LiteralExpr partitionUpperBound = (LiteralExpr) range.upperEndpoint().getKeys().get(0); - Range<LiteralExpr> partitionRange = Range.closedOpen(partitionLowerBound, partitionUpperBound); - partitionRangeMapByLiteral.put(partitionRange, entry.getKey()); + Range<ColumnBound> partitionRange = + Range.closedOpen(ColumnBound.of(partitionLowerBound), ColumnBound.of(partitionUpperBound)); + partitionRangeMap.put(partitionRange, Lists.newArrayList(entry.getKey())); } if (LOG.isDebugEnabled()) { LOG.debug("update partitionRangeMapByLiteral"); @@ -65,9 +88,13 @@ public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base { return false; } - public Collection<Long> prune(LiteralExpr lowerBound, LiteralExpr upperBound) throws AnalysisException { - Range<LiteralExpr> filterRangeValue = Range.closed(lowerBound, upperBound); - return partitionRangeMapByLiteral.getOverlappingRangeValues(filterRangeValue); + public Collection<Long> prune(Set<Range<ColumnBound>> partitionColumnRange, + String partitionColName, + Map<String, RangeMap<ColumnBound, List<Long>>> partitionCol2PartitionID) { + Set<Long> overlappingRangeValues = getOverlappingRangeValues(partitionRangeMap, partitionColumnRange); + partitionCol2PartitionID.putIfAbsent( + partitionColName, partitionRangeMap); + return overlappingRangeValues; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java index 1d9f163ca80..60fcc523c38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java @@ -28,6 +28,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeRangeMap; import java.util.Collection; import java.util.Collections; @@ -48,6 +50,19 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { // currently only used for list partition private Map.Entry<Long, PartitionItem> defaultPartition; + /* + * This map maintains the relationship between partition columns and their corresponding partition IDs. + * For example, if the partition columns are (k1, k2, k3), and there is a partition `p0` with the range + * [(1, 5, 10), (5, 10, 20)), then the `partitionCol2PartitionID` map should be: + * k1 -> [1, 5) -> p0 + * Map structure: + * - Key: String, representing the name of a partition column (e.g., k1). + * - Value: RangeMap<ColumnBound, List<Long>>, where each range of column bounds is mapped to a list of + * partition IDs. For instance, the range [1, 5) for column `k1` would map to partition ID `p0`. + */ + // todo: `List<Long>` is not neccessary, `Long` is enough + protected Map<String, RangeMap<ColumnBound, List<Long>>> partitionCol2PartitionID = Maps.newHashMap(); + // Only called in PartitionPruneV2ByShortCircuitPlan constructor PartitionPrunerV2Base() { this.idToPartitionItem = null; @@ -76,6 +91,10 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { findDefaultPartition(idToPartitionItem); } + public Map<String, RangeMap<ColumnBound, List<Long>>> getPartitionCol2PartitionID() { + return partitionCol2PartitionID; + } + private Collection<Long> handleDefaultPartition(Collection<Long> result) { if (this.defaultPartition != null) { Set<Long> r = result.stream().collect(Collectors.toSet()); @@ -107,11 +126,12 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { @Override public Collection<Long> prune() throws AnalysisException { Map<Column, FinalFilters> columnToFilters = Maps.newHashMap(); - for (Column column : partitionColumns) { + for (Column column : partitionColumns) { // partition col is key ColumnRange columnRange = columnNameToRange.get(column.getName()); if (columnRange == null) { columnToFilters.put(column, FinalFilters.noFilters()); } else { + // add the partiton&key col columnToFilters.put(column, getFinalFilters(columnRange, column)); } } @@ -160,22 +180,32 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { * partitions. */ private Collection<Long> pruneSingleColumnPartition(Map<Column, FinalFilters> columnToFilters) { - FinalFilters finalFilters = columnToFilters.get(partitionColumns.get(0)); + Column partitionCol = partitionColumns.get(0); + FinalFilters finalFilters = columnToFilters.get(partitionCol); switch (finalFilters.type) { case CONSTANT_FALSE_FILTERS: return Collections.emptySet(); case HAVE_FILTERS: genSingleColumnRangeMap(); Preconditions.checkNotNull(singleColumnRangeMap); - return finalFilters.filters.stream() - .map(filter -> { - RangeMap<ColumnBound, UniqueId> filtered = singleColumnRangeMap.subRangeMap(filter); - return filtered.asMapOfRanges().values().stream() - .map(UniqueId::getPartitionId) - .collect(Collectors.toSet()); - }) - .flatMap(Set::stream) - .collect(Collectors.toSet()); + partitionCol2PartitionID.put(partitionCol.getName(), TreeRangeMap.create()); + Set<Long> resultPartID = Sets.newHashSet(); + finalFilters.filters.forEach(filter -> { + RangeMap<ColumnBound, UniqueId> filtered = singleColumnRangeMap.subRangeMap(filter); + + filtered.asMapOfRanges().forEach((range, partID) -> { + RangeMap<ColumnBound, List<Long>> rangeMap = + partitionCol2PartitionID.get(partitionCol.getName()); + List<Long> partitionIds = rangeMap.get(range.lowerEndpoint()); + if (partitionIds == null) { + partitionIds = Lists.newArrayList(); + rangeMap.put(range, partitionIds); + } + partitionIds.add(partID.getPartitionId()); + resultPartID.add(partID.getPartitionId()); + }); + }); + return resultPartID; case NO_FILTERS: default: return idToPartitionItem.keySet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java index 8acba72f156..5d523b01bfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java @@ -57,13 +57,43 @@ public class RangePartitionPrunerV2 extends PartitionPrunerV2Base { public static RangeMap<ColumnBound, UniqueId> genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) { RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create(); - idToPartitionItem.forEach((id, item) -> { + idToPartitionItem.forEach((id, item) -> { // partition id , key range Range<PartitionKey> range = item.getItems(); candidate.put(mapPartitionKeyRange(range, 0), new RangePartitionUniqueId(id)); }); return candidate; } + public static <C extends Comparable<C>> Range<C> buildRange(C lowerBound, BoundType lowerBoundType, + C upperBound, BoundType upperBoundType) { + if (lowerBound.compareTo(upperBound) > 0) { + // swap + C temp = lowerBound; + lowerBound = upperBound; + upperBound = temp; + + BoundType tempBoundType = lowerBoundType; + lowerBoundType = upperBoundType; + upperBoundType = tempBoundType; + } + if (lowerBoundType == BoundType.CLOSED && upperBoundType == BoundType.OPEN) { + // [lowerBound, upperBound) + return Range.closedOpen(lowerBound, upperBound); + } else if (lowerBoundType == BoundType.OPEN && upperBoundType == BoundType.CLOSED) { + // (lowerBound, upperBound] + return Range.openClosed(lowerBound, upperBound); + } else if (lowerBoundType == BoundType.CLOSED && upperBoundType == BoundType.CLOSED) { + // [lowerBound, upperBound] + return Range.closed(lowerBound, upperBound); + } else if (lowerBoundType == BoundType.OPEN && upperBoundType == BoundType.OPEN) { + // (lowerBound, upperBound) + return Range.open(lowerBound, upperBound); + } else { + throw new IllegalArgumentException("Unsupported BoundType combination: " + + lowerBoundType + " and " + upperBoundType); + } + } + /** * This is just like the logic in v1 version, but we support disjunctive predicates here. */ @@ -154,8 +184,38 @@ public class RangePartitionPrunerV2 extends PartitionPrunerV2Base { && filter.hasUpperBound() && filter.upperBoundType() == BoundType.CLOSED && filter.lowerEndpoint() == filter.upperEndpoint()) { // Equal to predicate, e.g., col=1, the filter range is [1, 1]. - minKey.pushColumn(filter.lowerEndpoint().getValue(), column.getDataType()); - maxKey.pushColumn(filter.upperEndpoint().getValue(), column.getDataType()); + ColumnBound lowerFilter = filter.lowerEndpoint(); + ColumnBound upperFilter = filter.upperEndpoint(); + minKey.pushColumn(lowerFilter.getValue(), column.getDataType()); + maxKey.pushColumn(upperFilter.getValue(), column.getDataType()); + + // Locate the partition to which the filter belongs + List<Long> partID = Lists.newArrayList(); + for (Map.Entry<Range<PartitionKey>, Long> rangeMapEntry : rangeMap.asMapOfRanges().entrySet()) { + Range<PartitionKey> partitionColRange = rangeMapEntry.getKey(); + PartitionKey upperPartitionKeys = partitionColRange.upperEndpoint(); + int partitionLess = upperPartitionKeys.getKeys() + .get(columnIdx).compareTo(lowerFilter.getValue()); + if (partitionLess < 0) { + continue; + } + PartitionKey lowerPartitionKeys = partitionColRange.lowerEndpoint(); + int partitionGreater = lowerPartitionKeys.getKeys() + .get(columnIdx).compareTo(upperFilter.getValue()); + if (partitionGreater > 0) { + break; + } + Range<ColumnBound> keyColRange = buildRange( + ColumnBound.of(lowerPartitionKeys.getKeys().get(columnIdx)), + partitionColRange.lowerBoundType(), + ColumnBound.of(upperPartitionKeys.getKeys().get(columnIdx)), + partitionColRange.upperBoundType()); + if (filter.isConnected(keyColRange)) { + partID.add(rangeMapEntry.getValue()); + } + } + partitionCol2PartitionID.putIfAbsent(column.getName(), TreeRangeMap.create()); + partitionCol2PartitionID.get(column.getName()).put(filter, partID); result.addAll(doPruneMulti(columnToFilters, rangeMap, columnIdx + 1, minKey, maxKey)); minKey.popColumn(); maxKey.popColumn(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java index 9e4030b768b..d65c6032b86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java @@ -19,19 +19,26 @@ package org.apache.doris.qe; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Replica; import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.KeyTuple; @@ -48,6 +55,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.RangeMap; +import com.google.common.collect.Sets; +import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TDeserializer; @@ -55,31 +65,78 @@ import org.apache.thrift.TException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; public class PointQueryExecutor implements CoordInterface { private static final Logger LOG = LogManager.getLogger(PointQueryExecutor.class); - private long tabletID = 0; + private static final ConcurrentHashMap<Class<? extends Expr>, ConjunctHandler> handlers = new ConcurrentHashMap<>(); + + private class RoundRobinScheduler<T> { + private final List<T> list; + private AtomicInteger currentIndex; + + public RoundRobinScheduler(List<T> list) { + this.list = list; + this.currentIndex = new AtomicInteger(0); + } + + public T next() { + if (list.isEmpty()) { + return null; + } + int index = currentIndex.getAndUpdate(i -> (i + 1) % list.size()); + return list.get(index); + } + } + + private List<Long> scanTabletIDs = new ArrayList<>(); private long timeoutMs = Config.point_query_timeout_ms; // default 10s private boolean isCancel = false; private List<Backend> candidateBackends; + + // key: tablet id, value: backend and replica id + private HashMap<Long, Pair<Backend, Long>> pickedCandidateReplicas = Maps.newHashMap(); + + RoundRobinScheduler<Backend> roundRobinscheduler = null; + + // tablet id -> backend id -> replica + Table<Long, Long, Replica> replicaMetaTable = null; + private final int maxMsgSizeOfResultReceiver; // used for snapshot read in cloud mode - private List<Long> snapshotVisibleVersions; + // Key: cloud partition id, Value: snapshot visible version + private HashMap<CloudPartition, Long> snapshotVisibleVersions; private final ShortCircuitQueryContext shortCircuitQueryContext; + Map<PartitionKey, Set<Long>> distributionKeys2TabletID = null; + + // Maps partition column names to a RangeMap that associates ColumnBound ranges with lists of partition IDs, + // similar to the implementation in PartitionPrunerV2Base. + Map<String, RangeMap<ColumnBound, List<Long>>> partitionCol2PartitionID = null; + + List<Set<Long>> keyTupleIndex2TabletID = null; + + List<List<String>> allKeyTuples = null; + + private List<Integer> distributionKeyColumns = Lists.newArrayList(); + + private List<Integer> partitionKeyColumns = Lists.newArrayList(); + public PointQueryExecutor(ShortCircuitQueryContext ctx, int maxMessageSize) { ctx.sanitize(); this.shortCircuitQueryContext = ctx; @@ -97,10 +154,13 @@ public class PointQueryExecutor implements CoordInterface { partitions.add((CloudPartition) table.getPartition(id)); } } - snapshotVisibleVersions = CloudPartition.getSnapshotVisibleVersion(partitions); - // Only support single partition at present - Preconditions.checkState(snapshotVisibleVersions.size() == 1); - LOG.debug("set cloud version {}", snapshotVisibleVersions.get(0)); + snapshotVisibleVersions = (snapshotVisibleVersions == null) ? Maps.newHashMap() : snapshotVisibleVersions; + List<Long> versionList = CloudPartition.getSnapshotVisibleVersion(partitions); + for (int i = 0; i < versionList.size(); ++i) { + snapshotVisibleVersions.put(partitions.get(i), versionList.get(i)); + } + + LOG.debug("set cloud version {}", snapshotVisibleVersions); } void setScanRangeLocations() throws Exception { @@ -111,8 +171,9 @@ public class PointQueryExecutor implements CoordInterface { if (scanNode.getScanTabletIds().isEmpty()) { return; } - Preconditions.checkState(scanNode.getScanTabletIds().size() == 1); - this.tabletID = scanNode.getScanTabletIds().get(0); + this.distributionKeys2TabletID = scanNode.getDistributionKeys2TabletID(); + this.partitionCol2PartitionID = scanNode.getPartitionCol2PartitionID(); + this.scanTabletIDs = scanNode.getScanTabletIds(); // update partition version if cloud mode if (Config.isCloudMode() @@ -128,10 +189,9 @@ public class PointQueryExecutor implements CoordInterface { candidateBackends.add(backend); } } - // Random read replicas - Collections.shuffle(this.candidateBackends); + if (LOG.isDebugEnabled()) { - LOG.debug("set scan locations, backend ids {}, tablet id {}", candidateBackends, tabletID); + LOG.debug("set scan locations, backend ids {}, tablet ids {}", candidateBackends, scanTabletIDs); } } @@ -159,40 +219,260 @@ public class PointQueryExecutor implements CoordInterface { .getMysqlChannel(), null, null); } - private static void updateScanNodeConjuncts(OlapScanNode scanNode, List<Expr> conjunctVals) { - for (int i = 0; i < conjunctVals.size(); ++i) { - BinaryPredicate binaryPredicate = (BinaryPredicate) scanNode.getConjuncts().get(i); - if (binaryPredicate.getChild(0) instanceof LiteralExpr) { - binaryPredicate.setChild(0, conjunctVals.get(i)); - } else if (binaryPredicate.getChild(1) instanceof LiteralExpr) { - binaryPredicate.setChild(1, conjunctVals.get(i)); + /* + * Interface for handling different conjunct types + */ + private interface ConjunctHandler { + int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException; + } + + private static class InPredicateHandler implements ConjunctHandler { + public static final InPredicateHandler INSTANCE = new InPredicateHandler(); + + private InPredicateHandler() { + } + + @Override + public int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException { + InPredicate inPredicate = (InPredicate) expr; + if (inPredicate.isNotIn()) { + throw new AnalysisException("Not support NOT IN predicate in point query"); + } + for (int j = 1; j < inPredicate.getChildren().size(); ++j) { + if (inPredicate.getChild(j) instanceof LiteralExpr) { + inPredicate.setChild(j, conjunctVals.get(handledConjunctVals++)); + } else { + Preconditions.checkState(false, "Should contains literal in " + inPredicate.toSqlImpl()); + } + } + return handledConjunctVals; + } + } + + private static class BinaryPredicateHandler implements ConjunctHandler { + public static final BinaryPredicateHandler INSTANCE = new BinaryPredicateHandler(); + + private BinaryPredicateHandler() { + } + + @Override + public int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException { + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; + Expr left = binaryPredicate.getChild(0); + Expr right = binaryPredicate.getChild(1); + + if (isDeleteSign(left) || isDeleteSign(right)) { + return handledConjunctVals; + } + + if (isLiteralExpr(left)) { + binaryPredicate.setChild(0, conjunctVals.get(handledConjunctVals++)); + } else if (isLiteralExpr(right)) { + binaryPredicate.setChild(1, conjunctVals.get(handledConjunctVals++)); } else { Preconditions.checkState(false, "Should contains literal in " + binaryPredicate.toSqlImpl()); } + return handledConjunctVals; + } + + private boolean isLiteralExpr(Expr expr) { + return expr instanceof LiteralExpr; + } + + private boolean isDeleteSign(Expr expr) { + return expr instanceof SlotRef && ((SlotRef) expr).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN); + } + } + + private static void initHandler() { + handlers.put(InPredicate.class, InPredicateHandler.INSTANCE); + handlers.put(BinaryPredicate.class, BinaryPredicateHandler.INSTANCE); + } + + private static void updateScanNodeConjuncts(OlapScanNode scanNode, List<Expr> conjunctVals) { + List<Expr> conjuncts = scanNode.getConjuncts(); + if (handlers.isEmpty()) { + initHandler(); + } + int handledConjunctVals = 0; + for (Expr expr : conjuncts) { + ConjunctHandler handler = handlers.get(expr.getClass()); + if (handler == null) { + throw new AnalysisException("Not support conjunct type " + expr.getClass().getName()); + } + handledConjunctVals = handler.handle(expr, conjunctVals, handledConjunctVals); } + + Preconditions.checkState(handledConjunctVals == conjunctVals.size()); } public void setTimeout(long timeoutMs) { this.timeoutMs = timeoutMs; } - void addKeyTuples( - InternalService.PTabletKeyLookupRequest.Builder requestBuilder) { - // TODO handle IN predicates - Map<String, Expr> columnExpr = Maps.newHashMap(); - KeyTuple.Builder kBuilder = KeyTuple.newBuilder(); - for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) { + /* + * According to the current ordered key tuple, based on the leftmost partition principle, + * get the partition ID to which it belongs + */ + private Set<Long> getLeftMostPartitionIDs(List<String> orderedKeyTuple, + List<Column> keyColumns) { + Set<Long> leftMostPartitionIDs = Sets.newHashSet(); + for (int i = 0; i < partitionKeyColumns.size(); ++i) { + int colIdx = partitionKeyColumns.get(i); + String partitionColName = keyColumns.get(colIdx).getName(); + try { + ColumnBound partitionKey = ColumnBound.of(LiteralExpr.create(orderedKeyTuple.get(colIdx), + keyColumns.get(colIdx).getType())); + List<Long> partitionIDs = Lists.newArrayList( + Optional.ofNullable( + partitionCol2PartitionID.get(partitionColName).get(partitionKey)) + .orElse(Collections.emptyList())); + // Add the first partition column directly + if (i == 0) { + leftMostPartitionIDs.addAll(partitionIDs); + continue; + } + if (leftMostPartitionIDs.isEmpty() || partitionIDs == null) { + break; + } + partitionIDs.retainAll(leftMostPartitionIDs); + if (partitionIDs.isEmpty()) { + break; + } + } catch (Exception e) { + throw new AnalysisException("Failed to create partition key for key tuple: " + orderedKeyTuple); + } + } + return leftMostPartitionIDs; + } + + private void pickCandidateBackends() { + for (Long tabletID : scanTabletIDs) { + roundRobinPickReplica(tabletID); + } + } + + private void roundRobinPickReplica(Long tabletID) { + while (true) { + Backend backend = roundRobinscheduler.next(); + if (backend == null) { + break; + } + Map<Long, Replica> beWithReplica = replicaMetaTable.row(tabletID); + if (!beWithReplica.containsKey(backend.getId())) { + continue; + } + pickedCandidateReplicas + .putIfAbsent(tabletID, + Pair.of(backend, beWithReplica.get(backend.getId()).getId())); + break; + } + } + + // Use the leftmost matching partitions to filter out tablets that do not belong to these partitions + private void addTabletIDsForKeyTuple(List<String> orderedKeyTuple, List<Column> keyColumns, + OlapTable olapTable, Set<Long> leftMostPartitionIDs) { + // get part of the key tuple using distribution columns + List<String> keyTupleForDistributionPrune = Lists.newArrayList(); + for (Integer idx : distributionKeyColumns) { + keyTupleForDistributionPrune.add(orderedKeyTuple.get(idx)); + } + Set<Long> tabletIDs = Sets.newHashSet(); + for (PartitionKey key : distributionKeys2TabletID.keySet()) { + List<String> distributionKeys = Lists.newArrayList(); + for (LiteralExpr expr : key.getKeys()) { + distributionKeys.add(expr.getStringValue()); + } + if (distributionKeys.equals(keyTupleForDistributionPrune)) { + Set<Long> originTabletIDs = Sets.newHashSet(distributionKeys2TabletID.get(key)); + // If partitions are not explicitly created, this condition holds true + if (leftMostPartitionIDs.isEmpty()) { + tabletIDs.addAll(originTabletIDs); + } else { + Set<Long> prunedTabletIDs = Sets.newHashSet(); + for (Long partitionID : leftMostPartitionIDs) { + Partition partition = olapTable.getPartition(partitionID); + MaterializedIndex selectedTable = + partition.getIndex(shortCircuitQueryContext.scanNode.getSelectedIndexId()); + // filter out tablets that do not belong to this partition + selectedTable.getTablets().forEach(tablet -> { + if (originTabletIDs.contains(tablet.getId())) { + prunedTabletIDs.add(tablet.getId()); + } + }); + tabletIDs.addAll(prunedTabletIDs); + } + } + break; + } + } + keyTupleIndex2TabletID.add(tabletIDs.isEmpty() ? null : tabletIDs); + } + + // Get all possible key tuple combinations + void getAllKeyTupleCombination(List<Expr> conjuncts, int index, + List<String> currentKeyTuple, + List<List<String>> result, + List<String> columnExpr, + List<Column> keyColumns) { + if (index == conjuncts.size()) { + List<String> orderedKeyTuple = new ArrayList<>(currentKeyTuple.size()); + OlapTable olapTable = shortCircuitQueryContext.scanNode.getOlapTable(); + + // add key tuple in keys order + for (Column column : keyColumns) { + int colIdx = columnExpr.indexOf(column.getName()); + String currentKey = currentKeyTuple.get(colIdx); + orderedKeyTuple.add(currentKey); + } + result.add(Lists.newArrayList(orderedKeyTuple)); + Set<Long> leftMostPartitionIDs = getLeftMostPartitionIDs(orderedKeyTuple, keyColumns); + addTabletIDsForKeyTuple(orderedKeyTuple, keyColumns, olapTable, leftMostPartitionIDs); + return; + } + + Expr expr = conjuncts.get(index); + if (expr instanceof BinaryPredicate) { BinaryPredicate predicate = (BinaryPredicate) expr; Expr left = predicate.getChild(0); Expr right = predicate.getChild(1); SlotRef columnSlot = left.unwrapSlotRef(); - columnExpr.put(columnSlot.getColumnName(), right); + if (left instanceof SlotRef && ((SlotRef) left).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN)) { + getAllKeyTupleCombination(conjuncts, index + 1, currentKeyTuple, result, columnExpr, keyColumns); + return; + } + columnExpr.add(columnSlot.getColumnName()); + currentKeyTuple.add(right.getStringValue()); + getAllKeyTupleCombination(conjuncts, index + 1, currentKeyTuple, result, columnExpr, keyColumns); + currentKeyTuple.remove(currentKeyTuple.size() - 1); + columnExpr.remove(columnExpr.size() - 1); + } else if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + if (inPredicate.isNotIn()) { + throw new AnalysisException("Not support NOT IN predicate in point query"); + } + SlotRef columnSlot = inPredicate.getChild(0).unwrapSlotRef(); + columnExpr.add(columnSlot.getColumnName()); + for (int i = 1; i < inPredicate.getChildren().size(); ++i) { + currentKeyTuple.add(inPredicate.getChild(i).getStringValue()); + getAllKeyTupleCombination(conjuncts, index + 1, currentKeyTuple, result, columnExpr, keyColumns); + currentKeyTuple.remove(currentKeyTuple.size() - 1); + } + columnExpr.remove(columnExpr.size() - 1); + } else { + throw new AnalysisException("Not support conjunct type " + expr.getClass().getName()); } - // add key tuple in keys order - for (Column column : shortCircuitQueryContext.scanNode.getOlapTable().getBaseSchemaKeyColumns()) { - kBuilder.addKeyColumnRep(columnExpr.get(column.getName()).getStringValue()); + } + + List<List<String>> addAllKeyTuples(List<Column> keyColumns) { + List<Expr> conjuncts = shortCircuitQueryContext.scanNode.getConjuncts(); + List<List<String>> keyTuples = Lists.newArrayList(); + if (keyTupleIndex2TabletID == null) { + keyTupleIndex2TabletID = Lists.newArrayList(); } - requestBuilder.addKeyTuples(kBuilder); + getAllKeyTupleCombination(conjuncts, 0, new ArrayList<>(), keyTuples, Lists.newArrayList(), keyColumns); + Preconditions.checkState(keyTuples.size() == keyTupleIndex2TabletID.size()); + return keyTuples; } @Override @@ -208,21 +488,71 @@ public class PointQueryExecutor implements CoordInterface { if (candidateBackends == null || candidateBackends.isEmpty()) { return new RowBatch(); } - Iterator<Backend> backendIter = candidateBackends.iterator(); - RowBatch rowBatch = null; - int tryCount = 0; - int maxTry = Math.min(Config.max_point_query_retry_time, candidateBackends.size()); + + // pick candidate backends + OlapScanNode scanNode = shortCircuitQueryContext.scanNode; + replicaMetaTable = scanNode.getScanBackendReplicaTable(); + roundRobinscheduler = new RoundRobinScheduler<>(candidateBackends); + pickCandidateBackends(); + + OlapTable olapTable = scanNode.getOlapTable(); + List<Column> keyColumns = olapTable.getBaseSchemaKeyColumns(); + for (int i = 0; i < keyColumns.size(); ++i) { + Column column = keyColumns.get(i); + if (olapTable.isPartitionColumn(column.getName())) { + partitionKeyColumns.add(i); + } + if (olapTable.isDistributionColumn(column.getName())) { + distributionKeyColumns.add(i); + } + } + RowBatch rowBatch = new RowBatch(); Status status = new Status(); - do { - Backend backend = backendIter.next(); - rowBatch = getNextInternal(status, backend); - if (rowBatch != null) { + this.allKeyTuples = addAllKeyTuples(keyColumns); + TResultBatch resultBatch = new TResultBatch(); + resultBatch.setRows(Lists.newArrayList()); + List<byte[]> batchSerialResult = Lists.newArrayList(); + Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> batchRequestBuilders = + buildBatchRequest(status); + // send batch request + if (batchRequestBuilders.isEmpty()) { + status.updateStatus(TStatusCode.OK, ""); + rowBatch.setEos(true); + return rowBatch; + } + for (Map.Entry<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> entry : + batchRequestBuilders.entrySet()) { + List<byte[]> subSerialResult = batchGetNext(status, entry.getKey(), entry.getValue()); + + if (!status.ok()) { break; } - if (++tryCount >= maxTry) { - break; + batchSerialResult.addAll(subSerialResult); + } + + // todo: maybe there is a better way + if (!batchSerialResult.isEmpty()) { + TDeserializer deserializer = new TDeserializer( + new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver)); + for (byte[] serialResult : batchSerialResult) { + TResultBatch tmpResultBatch = new TResultBatch(); + try { + deserializer.deserialize(tmpResultBatch, serialResult); + tmpResultBatch.getRows().forEach(row -> { + resultBatch.addToRows(row); + }); + } catch (TException e) { + if (e.getMessage().contains("MaxMessageSize reached")) { + throw new TException("MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); + } else { + throw e; + } + } } - } while (true); + rowBatch.setBatch(resultBatch); + } + rowBatch.setEos(true); + // handle status code if (!status.ok()) { if (Strings.isNullOrEmpty(status.getErrorMsg())) { @@ -250,45 +580,106 @@ public class PointQueryExecutor implements CoordInterface { // only handles in getNext() } - private RowBatch getNextInternal(Status status, Backend backend) throws TException { - long timeoutTs = System.currentTimeMillis() + timeoutMs; - RowBatch rowBatch = new RowBatch(); - InternalService.PTabletKeyLookupResponse pResult = null; - try { - Preconditions.checkNotNull(shortCircuitQueryContext.serializedDescTable); - - InternalService.PTabletKeyLookupRequest.Builder requestBuilder - = InternalService.PTabletKeyLookupRequest.newBuilder() - .setTabletId(tabletID) - .setDescTbl(shortCircuitQueryContext.serializedDescTable) - .setOutputExpr(shortCircuitQueryContext.serializedOutputExpr) - .setQueryOptions(shortCircuitQueryContext.serializedQueryOptions) - .setIsBinaryRow(ConnectContext.get().command == MysqlCommand.COM_STMT_EXECUTE); - if (snapshotVisibleVersions != null && !snapshotVisibleVersions.isEmpty()) { - requestBuilder.setVersion(snapshotVisibleVersions.get(0)); - } - // Only set cacheID for prepared statement excute phase, - // otherwise leading to many redundant cost in BE side - if (shortCircuitQueryContext.cacheID != null - && ConnectContext.get().command == MysqlCommand.COM_STMT_EXECUTE) { - InternalService.UUID.Builder uuidBuilder = InternalService.UUID.newBuilder(); - uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits()); - uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits()); - requestBuilder.setUuid(uuidBuilder); + private void collectBatchRequests( + Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> batchRequests, + KeyTuple.Builder kBuilder, Set<Long> tabletIDsOfKeyTuple) { + // check containsKey + for (Long tabletID : tabletIDsOfKeyTuple) { + Preconditions.checkState(pickedCandidateReplicas.containsKey(tabletID)); + Pair<Backend, Long> beWithReplicaID = pickedCandidateReplicas.get(tabletID); + Backend candidate = beWithReplicaID.first; + batchRequests.putIfAbsent( + candidate, + InternalService.PTabletBatchKeyLookupRequest.newBuilder()); + buildSubRequest(tabletID, kBuilder, beWithReplicaID.second, + batchRequests.get(candidate)); + } + } + + // Find the tabletID, backend, and replica corresponding to each keyTuple, + // and then add them to batchRequests. Each backend corresponds to a batchRequest. + private Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> + buildBatchRequest(Status status) throws TException { + Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> batchRequestBuilders = + Maps.newHashMap(); + for (int i = 0; i < keyTupleIndex2TabletID.size(); ++i) { + if (keyTupleIndex2TabletID.get(i) == null) { + continue; + } + KeyTuple.Builder kBuilder = KeyTuple.newBuilder(); + for (String key : this.allKeyTuples.get(i)) { + kBuilder.addKeyColumnRep(key); + } + collectBatchRequests(batchRequestBuilders, kBuilder, keyTupleIndex2TabletID.get(i)); + } + return batchRequestBuilders; + } + + // Build a request about a keyTuple, that is, SubRequest + private void buildSubRequest( + Long prunedTabletIdsOfBe, KeyTuple.Builder kBuilder, Long replicaID, + InternalService.PTabletBatchKeyLookupRequest.Builder pBatchRequestBuilder) { + InternalService.PTabletKeyLookupRequest.Builder requestBuilder + = InternalService.PTabletKeyLookupRequest.newBuilder() + .setDescTbl(shortCircuitQueryContext.serializedDescTable) + .setOutputExpr(shortCircuitQueryContext.serializedOutputExpr) + .setQueryOptions(shortCircuitQueryContext.serializedQueryOptions) + .setIsBinaryRow(ConnectContext.get().command == MysqlCommand.COM_STMT_EXECUTE); + + // TODO: optimize me + if (snapshotVisibleVersions != null && !snapshotVisibleVersions.isEmpty()) { + Long versionToSet = -1L; + for (Map.Entry<CloudPartition, Long> entry : snapshotVisibleVersions.entrySet()) { + MaterializedIndex selectedTable = + entry.getKey().getIndex(shortCircuitQueryContext.scanNode.getSelectedIndexId()); + if (selectedTable.getTabletIdsInOrder().contains(prunedTabletIdsOfBe)) { + versionToSet = entry.getValue(); + break; + } } - addKeyTuples(requestBuilder); + requestBuilder.setVersion(versionToSet); + } + // Only set cacheID for prepared statement excute phase, + // otherwise leading to many redundant cost in BE side + if (shortCircuitQueryContext.cacheID != null + && ConnectContext.get().command == MysqlCommand.COM_STMT_EXECUTE) { + InternalService.UUID.Builder uuidBuilder = InternalService.UUID.newBuilder(); + uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits()); + uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits()); + requestBuilder.setUuid(uuidBuilder); + } + requestBuilder.addKeyTuples(kBuilder); + requestBuilder.setTabletId(prunedTabletIdsOfBe); + requestBuilder.setReplicaId(replicaID); + pBatchRequestBuilder.addSubKeyLookupReq(requestBuilder); + } + + private List<byte[]> batchGetNext( + Status status, Backend backend, + InternalService.PTabletBatchKeyLookupRequest.Builder pBatchRequestBuilder) throws TException { + TResultBatch resultBatch = new TResultBatch(); + List<byte[]> result = Lists.newArrayList(); + resultBatch.setRows(Lists.newArrayList()); + + Preconditions.checkState(pBatchRequestBuilder.getSubKeyLookupReqCount() > 0); - InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); - Future<InternalService.PTabletKeyLookupResponse> futureResponse = - BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(), request); + // batch fetch data + InternalService.PTabletBatchKeyLookupRequest pBatchRequest = pBatchRequestBuilder.build(); + long timeoutTs = System.currentTimeMillis() + timeoutMs; + InternalService.PTabletBatchKeyLookupResponse pBatchResult = null; + try { + Future<InternalService.PTabletBatchKeyLookupResponse> futureBatchResponse = + BackendServiceProxy.getInstance() + .batchFetchTabletDataAsync(backend.getBrpcAddress(), pBatchRequest); long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { - LOG.warn("fetch result timeout {}", backend.getBrpcAddress()); + LOG.warn("batch fetch result timeout {}", backend.getBrpcAddress()); status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request timeout"); return null; } try { - pResult = futureResponse.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); + // todo: get the result asynchrously + pBatchResult = futureBatchResponse.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // continue to get result LOG.warn("future get interrupted Exception"); @@ -297,18 +688,18 @@ public class PointQueryExecutor implements CoordInterface { return null; } } catch (TimeoutException e) { - futureResponse.cancel(true); + futureBatchResponse.cancel(true); LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress()); status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch result timeout"); return null; } } catch (RpcException e) { - LOG.warn("query fetch rpc exception {}, e {}", backend.getBrpcAddress(), e); + LOG.warn("query batch fetch rpc exception {}, e {}", backend.getBrpcAddress(), e); status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); return null; } catch (ExecutionException e) { - LOG.warn("query fetch execution exception {}, addr {}", e, backend.getBrpcAddress()); + LOG.warn("query batch fetch execution exception {}, addr {}", e, backend.getBrpcAddress()); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. status.updateStatus(TStatusCode.TIMEOUT, e.getMessage()); @@ -318,43 +709,37 @@ public class PointQueryExecutor implements CoordInterface { } return null; } - Status resultStatus = new Status(pResult.getStatus()); - if (resultStatus.getErrorCode() != TStatusCode.OK) { - status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg()); - return null; - } - if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) { - LOG.debug("get empty rowbatch"); - rowBatch.setEos(true); - status.updateStatus(TStatusCode.OK, ""); - return rowBatch; - } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) { - byte[] serialResult = pResult.getRowBatch().toByteArray(); - TResultBatch resultBatch = new TResultBatch(); - TDeserializer deserializer = new TDeserializer( - new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver)); - try { - deserializer.deserialize(resultBatch, serialResult); - } catch (TException e) { - if (e.getMessage().contains("MaxMessageSize reached")) { - throw new TException("MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); - } else { - throw e; - } + // handle the response + boolean isOK = true; + for (InternalService.PTabletKeyLookupResponse subResponse : pBatchResult.getSubKeyLookupResList()) { + Status resultStatus = new Status(subResponse.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { + status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg()); + return null; } - rowBatch.setBatch(resultBatch); - rowBatch.setEos(true); + if (subResponse.hasEmptyBatch() && subResponse.getEmptyBatch()) { + LOG.debug("get empty rowbatch"); + continue; + } else if (subResponse.hasRowBatch() && subResponse.getRowBatch().size() > 0) { + byte[] serialResult = subResponse.getRowBatch().toByteArray(); + result.add(serialResult); + continue; + } else { + Preconditions.checkState(false, "No row batch or empty batch found"); + } + + if (isCancel) { + status.updateStatus(TStatusCode.CANCELLED, "cancelled"); + isOK = false; + break; + } + } + if (isOK) { status.updateStatus(TStatusCode.OK, ""); - return rowBatch; - } else { - Preconditions.checkState(false, "No row batch or empty batch found"); } - if (isCancel) { - status.updateStatus(TStatusCode.CANCELLED, "cancelled"); - } - return rowBatch; + return result; } public void cancel() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 54c5e68144c..9cde1897654 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -98,6 +98,11 @@ public class BackendServiceClient { return stub.tabletFetchData(request); } + public Future<InternalService.PTabletBatchKeyLookupResponse> batchFetchTabletDataAsync( + InternalService.PTabletBatchKeyLookupRequest batchRequest) { + return stub.tabletBatchFetchData(batchRequest); + } + public InternalService.PFetchDataResult fetchDataSync(InternalService.PFetchDataRequest request) { return blockingStub.fetchData(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 053a7428b52..5fe13a82d0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -307,6 +307,18 @@ public class BackendServiceProxy { } } + public Future<InternalService.PTabletBatchKeyLookupResponse> batchFetchTabletDataAsync( + TNetworkAddress address, InternalService.PTabletBatchKeyLookupRequest batchRequest) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.batchFetchTabletDataAsync(batchRequest); + } catch (Throwable e) { + LOG.warn("batch fetch tablet data catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public InternalService.PFetchDataResult fetchDataSync( TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException { try { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 547b2588168..867c0679dac 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -339,6 +339,11 @@ message PTabletKeyLookupRequest { optional int64 version = 7; // serilized from TQueryOptions optional bytes query_options = 8; + optional int64 replica_id = 9; +} + +message PTabletBatchKeyLookupRequest { + repeated PTabletKeyLookupRequest sub_key_lookup_req = 1; } message PTabletKeyLookupResponse { @@ -347,6 +352,11 @@ message PTabletKeyLookupResponse { optional bool empty_batch = 3; } +message PTabletBatchKeyLookupResponse { + optional PStatus status = 1; + repeated PTabletKeyLookupResponse sub_key_lookup_res = 2; +} + //Add message definition to fetch and update cache enum PCacheStatus { DEFAULT = 0; @@ -1030,6 +1040,7 @@ service PBackendService { rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse); rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns (PGetFileCacheMetaResponse); rpc tablet_fetch_data(PTabletKeyLookupRequest) returns (PTabletKeyLookupResponse); + rpc tablet_batch_fetch_data(PTabletBatchKeyLookupRequest) returns (PTabletBatchKeyLookupResponse); rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns (PFetchColIdsResponse); rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns (PGetTabletVersionsResponse); rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns (PReportStreamLoadStatusResponse); diff --git a/regression-test/data/point_query_p0/test_point_IN_query.out b/regression-test/data/point_query_p0/test_point_IN_query.out new file mode 100644 index 00000000000..6f1f4ae7b33 --- /dev/null +++ b/regression-test/data/point_query_p0/test_point_IN_query.out @@ -0,0 +1,27 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1_EQ_sql -- +123 100 110 zxcd + +-- !1_EQ_sql -- +6333 2642 480 zxc + +-- !1_EQ_sql -- +1231 1220 210 zxc + +-- !2_EQ_sql -- +222 100 115 zxc + +-- !2_EQ_sql -- +1231 1220 210 zxc + +-- !2_EQ_sql -- +1231 1220 210 zxc + +-- !3_EQ_sql -- +323 49 240 zxc + +-- !0_EQ_sql -- +12 12 120 zxc +123 100 110 zxcd +222 100 115 zxc + diff --git a/regression-test/data/point_query_p0/test_point_query.out b/regression-test/data/point_query_p0/test_point_query.out index 1cc4142e39f..f22bf31942e 100644 --- a/regression-test/data/point_query_p0/test_point_query.out +++ b/regression-test/data/point_query_p0/test_point_query.out @@ -160,3 +160,182 @@ -- !sql -- -10 20 aabc update val +-- !case_1_sql -- +123 132 a + +-- !case_1_sql -- +123 132 a +123 222 b + +-- !case_1_sql -- +1 1 d +123 132 a +123 222 b + +-- !case_2_sql -- + +-- !case_2_sql -- +222 150 c + +-- !case_2_sql -- +123 120 a + +-- !case_2_sql -- +400 250 g +400 260 f + +-- !case_2_sql -- +222 150 c +400 250 g +400 260 f + +-- !case_3_sql -- +123 100 a + +-- !case_3_sql -- + +-- !case_3_sql -- +100 100 aaa +100 120 aaaaa + +-- !case_3_sql -- +123 100 a + +-- !case_3_sql -- +400 250 d +400 280 e + +-- !case_3_sql -- +123 100 a +350 200 c +400 250 d + +-- !case_4_sql -- +123 100 110 a + +-- !case_4_sql -- + +-- !case_4_sql -- +1231 1220 210 d + +-- !case_4_sql -- +123 100 110 a + +-- !case_4_sql -- +123 100 110 a +1231 1220 210 d +222 100 115 b + +-- !case_5_sql -- +123 100 110 a + +-- !case_5_sql -- + +-- !case_5_sql -- +1231 1220 210 d + +-- !case_5_sql -- +123 100 110 a + +-- !case_5_sql -- +123 100 110 a +1231 1220 210 d +222 100 115 b +633 2642 480 g +6333 2642 480 h + +-- !case_6_sql -- +123 132 a + +-- !case_6_sql -- +123 132 a +123 222 b + +-- !case_6_sql -- +1 1 d +123 132 a +123 222 b +2 2 e +3 3 f +4 4 i + +-- !case_7_sql -- +123 100 110 a + +-- !case_7_sql -- + +-- !case_7_sql -- +1231 1220 210 d + +-- !case_7_sql -- +123 100 110 a + +-- !case_7_sql -- +123 100 110 a +1231 1220 210 d +222 100 115 b +633 2642 480 g +6333 2642 480 h + +-- !case_8_sql -- +123 100 a + +-- !case_8_sql -- + +-- !case_8_sql -- +123 100 a + +-- !case_8_sql -- +400 250 d +400 280 e + +-- !case_8_sql -- +123 100 a +350 200 c +400 250 d + +-- !case_9_sql -- +123 100 110 zxcd + +-- !case_9_sql -- +222 100 115 zxc + +-- !case_9_sql -- +323 49 240 zxc + +-- !case_9_sql -- +123 100 110 zxcd + +-- !case_9_sql -- +1231 1220 210 zxc + +-- !case_9_sql -- +6333 2642 480 zxc + +-- !case_9_sql -- +633 2642 480 zxc + +-- !case_9_sql -- +123 100 110 zxcd + +-- !case_9_sql -- +1231 1220 210 zxc +222 100 115 zxc + +-- !case_9_sql -- +12 12 120 zxc +123 100 110 zxcd +222 100 115 zxc + +-- !case_9_sql -- +1231 1220 210 zxc + +-- !case_9_sql -- +123 100 110 zxcd +222 100 115 zxc + +-- !case_9_sql -- +123 100 110 zxcd +1231 1220 210 zxc +222 100 115 zxc + diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out b/regression-test/data/point_query_p0/test_point_query_partition.out index bef064984c8..5c20cde01d9 100644 --- a/regression-test/data/point_query_p0/test_point_query_partition.out +++ b/regression-test/data/point_query_p0/test_point_query_partition.out @@ -31,6 +31,15 @@ -- !point_select -- +-- !point_in_select -- +-1 c +1 a +11 d +2 b +33 f +45 g +999 h + -- !point_selectxxx -- 686612 686612 686612 \N \N \N \N \N \N \N \N @@ -46,3 +55,19 @@ -- !point_selecteee -- 686613 686613 686613 \N \N \N \N \N \N \N \N +-- !point_in_selectxxx -- +686612 686612 686612 \N \N \N \N \N \N \N \N +686613 686613 686613 \N \N \N \N \N \N \N \N + +-- !point_in_selectyyy -- +686612 686612 686612 \N \N \N \N \N \N \N \N +686613 686613 686613 \N \N \N \N \N \N \N \N + +-- !point_in_selectmmm -- +686612 686612 686612 \N \N \N \N \N \N \N \N +686613 686613 686613 \N \N \N \N \N \N \N \N + +-- !point_in_selecteee -- +686612 686612 686612 \N \N \N \N \N \N \N \N +686613 686613 686613 \N \N \N \N \N \N \N \N + diff --git a/regression-test/data/point_query_p0/test_rowstore.out b/regression-test/data/point_query_p0/test_rowstore.out index 34e40867d6a..f0558d616c0 100644 --- a/regression-test/data/point_query_p0/test_rowstore.out +++ b/regression-test/data/point_query_p0/test_rowstore.out @@ -28,6 +28,11 @@ -- !point_select -- 33333333333333333333333333333333 3 +-- !point_in_select -- +11111111111111111111111111111111111111 3 +222222222222222222222222222222222 3 +33333333333333333333333333333333 3 + -- !point_select -- 3 @@ -37,6 +42,11 @@ -- !point_select -- 3 +-- !point_in_select -- +3 +3 +3 + -- !point_select -- 33333333333333333333333333333333 @@ -55,6 +65,11 @@ -- !point_select -- 3 +-- !point_in_select -- +3 +3 +3 + -- !point_select -- 2021-02-01T11:11:11 @@ -64,6 +79,11 @@ -- !point_select -- 2023-02-01T11:11:11 +-- !point_in_select -- +2021-02-01T11:11:11 +2022-02-01T11:11:11 +2023-02-01T11:11:11 + -- !point_select -- 2017-10-01T11:11:11.021 2017-10-01T11:11:11.170 2017-10-01T11:11:11.110111 30 @@ -88,6 +108,16 @@ -- !point_select -- 2017-10-01T11:11:11.028 \N \N 34 +-- !point_in_select -- +2017-10-01T11:11:11.021 2017-10-01T11:11:11.170 2017-10-01T11:11:11.110111 30 +2017-10-01T11:11:11.022 2017-10-01T11:11:11.160 2017-10-01T11:11:11.100111 31 +2017-10-01T11:11:11.023 2017-10-01T11:11:11.150 2017-10-01T11:11:11.130111 31 +2017-10-01T11:11:11.024 2017-10-01T11:11:11.140 2017-10-01T11:11:11.120111 32 +2017-10-01T11:11:11.025 2017-10-01T11:11:11.100 2017-10-01T11:11:11.140111 32 +2017-10-01T11:11:11.026 2017-10-01T11:11:11.110 2017-10-01T11:11:11.150111 33 +2017-10-01T11:11:11.027 \N \N 34 +2017-10-01T11:11:11.028 \N \N 34 + -- !sql -- 1 abc 1111919.123456789190000000 diff --git a/regression-test/data/point_query_p0/test_rowstore_query.out b/regression-test/data/point_query_p0/test_rowstore_query.out index b43e0263960..e4656d631a7 100644 --- a/regression-test/data/point_query_p0/test_rowstore_query.out +++ b/regression-test/data/point_query_p0/test_rowstore_query.out @@ -2,6 +2,10 @@ -- !sql -- 1 abc 1111919.123456789190000000 --- !sql -- +-- !point_sql -- +2 def 1111919.123456789190000000 + +-- !point_in_sql -- +1 abc 1111919.123456789190000000 2 def 1111919.123456789190000000 diff --git a/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out b/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out index fa90a56523c..0d640ff2b06 100644 --- a/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out +++ b/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out @@ -127,3 +127,9 @@ 5 1304 36548425 15229335116 991129292901.111380000 dd 2120-01-02 2024-01-01T12:36:38 652.692 5022-01-01T11:30:38 5022-01-01 6 1305 56054803 18031831909 100320.111390000 haha abcd 2220-01-02 2025-01-01T12:36:38 2.7692 6022-01-01T11:30:38 6022-01-01 +-- !stmt_read11_1 -- +1 1300 55356821 +2 1301 56052706 +3 1302 55702967 +4 1303 56054326 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 4f9409308d5..85feb96380c 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1355,9 +1355,9 @@ class Suite implements GroovyInterceptable { quickRunTest(tag, sql, isOrder) } - void quickExecute(String tag, PreparedStatement stmt) { + void quickExecute(String tag, PreparedStatement stmt, boolean isOrder = false) { logger.info("Execute tag: ${tag}, sql: ${stmt}".toString()) - quickRunTest(tag, stmt) + quickRunTest(tag, stmt, isOrder) } @Override @@ -1369,6 +1369,8 @@ class Suite implements GroovyInterceptable { return quickTest(name.substring("order_qt_".length()), (args as Object[])[0] as String, true) } else if (name.startsWith("qe_")) { return quickExecute(name.substring("qe_".length()), (args as Object[])[0] as PreparedStatement) + } else if (name.startsWith("order_qe_")) { + return quickExecute(name.substring("order_qe_".length()), (args as Object[])[0] as PreparedStatement, true) } else if (name.startsWith("assert") && name.length() > "assert".length()) { // delegate to junit Assertions dynamically return Assertions."$name"(*args) // *args: spread-dot diff --git a/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy b/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy index 4be53ff17f6..4ac5b7f5ec9 100644 --- a/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy +++ b/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy @@ -46,7 +46,7 @@ suite ("test_row_store_page_size_cloud") { explain { sql("select * from ps_table_1 where k1=1 and k2=1;") - contains("SHORT") + contains("SHORT-CIRCUIT") } qt_select_star "select * from ps_table_1 where k1=1 and k2=1;" @@ -81,7 +81,7 @@ suite ("test_row_store_page_size_cloud") { explain { sql("select * from ps_table_2 where k1=1 and k2=1;") - contains("SHORT") + contains("SHORT-CIRCUIT") } qt_select_star "select * from ps_table_2 where k1=1 and k2=1;" diff --git a/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy b/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy new file mode 100644 index 00000000000..29cc084c3bd --- /dev/null +++ b/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +suite("test_dynamic_partition_point_query") { + sql "drop table if exists dy_par_pq" + sql """ + CREATE TABLE IF NOT EXISTS dy_par_pq ( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int NOT NULL ) + UNIQUE KEY(k1) + PARTITION BY RANGE(k1) ( ) + DISTRIBUTED BY HASH(k1) BUCKETS 3 + PROPERTIES ( + "dynamic_partition.enable"="true", + "dynamic_partition.end"="3", + "dynamic_partition.buckets"="10", + "dynamic_partition.start"="-3", + "dynamic_partition.prefix"="p", + "dynamic_partition.time_unit"="DAY", + "dynamic_partition.create_history_partition"="true", + "dynamic_partition.replication_allocation" = "tag.location.default: 1", + "replication_allocation" = "tag.location.default: 1", + "store_row_column" = "true") + """ + def result = sql "show tables like 'dy_par_pq'" + logger.info("${result}") + assertEquals(result.size(), 1) + result = sql_return_maparray "show partitions from dy_par_pq" + assertEquals(result.get(0).Buckets.toInteger(), 10) + def currentDate = LocalDate.now() + def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd") + def currenteDay = currentDate.format(formatter) + sql "insert into dy_par_pq values ('${currenteDay}', 'a', 1);" + def previous1Day = currentDate.minusDays(1).format(formatter) + sql "insert into dy_par_pq values ('${previous1Day}', 'b', 2);" + def previous2Day = currentDate.minusDays(2).format(formatter) + sql "insert into dy_par_pq values ('${previous2Day}', 'c', 3);" + def previous3Day = currentDate.minusDays(3).format(formatter) + sql "insert into dy_par_pq values ('${previous3Day}', 'd', 4);" + def next1Day = currentDate.plusDays(1).format(formatter) + sql "insert into dy_par_pq values ('${next1Day}', 'e', 5);" + def next2Day = currentDate.plusDays(2).format(formatter) + sql "insert into dy_par_pq values ('${next2Day}', 'f', 6);" + def next3Day = currentDate.plusDays(3).format(formatter) + sql "insert into dy_par_pq values ('${next3Day}', 'g', 7);" + + result = sql """ + select + * + from + dy_par_pq + where + k1 in ( + '${currenteDay}', '${previous1Day}', '${previous2Day}', '${previous3Day}', + '${next1Day}', '${next2Day}', '${next3Day}') ; + """ + assertEquals(result.size(), 7) + explain { + sql """ + select + * + from + dy_par_pq + where + k1 in ( + '${currenteDay}', '${previous1Day}', '${previous2Day}', '${previous3Day}', + '${next1Day}', '${next2Day}', '${next3Day}') ; + """ + contains "SHORT-CIRCUIT" + } + + def previous4Day = currentDate.minusDays(4).format(formatter) + def next4Day = currentDate.plusDays(4).format(formatter) + result = sql """ + select + * + from + dy_par_pq + where + k1 in ( + '${currenteDay}', '${previous4Day}', '${next4Day}') ; + """ + assertEquals(result.size(), 1) + + result = sql """ + select + * + from + dy_par_pq + where + k1 = '${currenteDay}' ; + """ + assertEquals(result.size(), 1) + + result = sql """ + select + * + from + dy_par_pq + where + k1 = '${next4Day}' ; + """ + assertEquals(result.size(), 0) + + sql "drop table dy_par_pq" + +} diff --git a/regression-test/suites/point_query_p0/test_point_IN_query.groovy b/regression-test/suites/point_query_p0/test_point_IN_query.groovy new file mode 100644 index 00000000000..29e33787470 --- /dev/null +++ b/regression-test/suites/point_query_p0/test_point_IN_query.groovy @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.math.BigDecimal; + +suite("test_point_IN_query", "p0") { + def tableName = "rs_in_table" + sql """DROP TABLE IF EXISTS ${tableName}""" + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c int not null, + d string not null + ) + unique key(a, b, c) + partition by RANGE(a, b, c) + ( + partition p values [(1, 1, 1), (100, 100, 100)), + partition p0 values [(100, 100, 100), (200, 210, 220)), + partition p1 values [(200, 210, 220), (300, 250, 290)), + partition p2 values [(300, 250, 290), (350, 290, 310)), + partition p3 values [(350, 290, 310), (400, 350, 390)), + partition p4 values [(400, 350, 390), (800, 400, 450)), + partition p5 values [(800, 400, 450), (2000, 500, 500)), + partition p6 values [(2000, 500, 500), (5000, 600, 600)), + partition p7 values [(5000, 600, 600), (9999, 9999, 9999)) + ) + distributed by hash(a, c) + buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + sql """ + insert into ${tableName} values(123, 100, 110, "zxcd"); + insert into ${tableName} values(222, 100, 115, "zxc"); + insert into ${tableName} values(12, 12, 120, "zxc"); + insert into ${tableName} values(1231, 1220, 210, "zxc"); + insert into ${tableName} values(323, 49, 240, "zxc"); + insert into ${tableName} values(843, 7342, 370, "zxcde"); + insert into ${tableName} values(633, 2642, 480, "zxc"); + insert into ${tableName} values(6333, 2642, 480, "zxc"); + """ + + order_qt_1_EQ_sql "select * from ${tableName} where a = 123 and b in (100, 12, 1220, 7342, 999, 2642) and c in (110, 115, 120, 480);" + order_qt_1_EQ_sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and b = 2642 and c in (210, 110, 115, 210, 480);" + order_qt_1_EQ_sql "select * from ${tableName} where a in (123, 1, 222, 1231, 420, 500) and b in (132, 100, 222, 1220, 300) and c = 210;" + explain { + sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and b = 2642 and c in (210, 110, 115, 210, 480);" + contains "SHORT-CIRCUIT" + } + + order_qt_2_EQ_sql "select * from ${tableName} where a = 222 and b = 100 and c in (110, 115, 120, 480);" + order_qt_2_EQ_sql "select * from ${tableName} where a = 1231 and b in (100, 12, 1220, 7342, 999, 2642) and c = 210;" + order_qt_2_EQ_sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and b = 1220 and c = 210;" + explain { + sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and b = 1220 and c = 210;" + contains "SHORT-CIRCUIT" + } + + order_qt_3_EQ_sql "select * from ${tableName} where a = 323 and b = 49 and c = 240;" + order_qt_0_EQ_sql "select * from ${tableName} where a in (123, 222, 12) and b in (100, 12) and c in (110, 115, 120, 210);" + explain { + sql "select * from ${tableName} where a in (123, 222, 12) and b in (100, 12) and c in (110, 115, 120, 210);" + contains "SHORT-CIRCUIT" + } + + sql """DROP TABLE IF EXISTS ${tableName}""" +} \ No newline at end of file diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy b/regression-test/suites/point_query_p0/test_point_query.groovy index 99998a24ed6..ab71187e7e3 100644 --- a/regression-test/suites/point_query_p0/test_point_query.groovy +++ b/regression-test/suites/point_query_p0/test_point_query.groovy @@ -341,4 +341,404 @@ suite("test_point_query", "nonConcurrent") { qt_sql "select * from table_3821461 where col1 = 10 and col2 = 20 and loc3 = 'aabc';" sql "update table_3821461 set value = 'update value' where col1 = -10 or col1 = 20;" qt_sql """select * from table_3821461 where col1 = -10 and col2 = 20 and loc3 = 'aabc'""" + + tableName = "in_table_1" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 1: Default partitioning, part of the primary key is a bucket column + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c string not null + ) + unique key(a, b) + distributed by hash(a) buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + sql """ + insert into ${tableName} values(123, 132, "a"); + insert into ${tableName} values(123, 222, "b"); + insert into ${tableName} values(22, 2, "c"); + insert into ${tableName} values(1, 1, "d"); + insert into ${tableName} values(2, 2, "e"); + insert into ${tableName} values(3, 3, "f"); + insert into ${tableName} values(4, 4, "i"); + """ + qt_case_1_sql "select * from ${tableName} where a = 123 and b = 132;" + explain { + sql("select * from ${tableName} where a = 123 and b in (132, 1, 222, 333);") + contains "SHORT-CIRCUIT" + } + order_qt_case_1_sql "select * from ${tableName} where a = 123 and b in (132, 1, 222, 333);" + explain { + sql("select * from ${tableName} where a in (123, 1, 222) and b in (132, 1, 222, 333);") + contains "SHORT-CIRCUIT" + } + order_qt_case_1_sql "select * from ${tableName} where a in (123, 1, 222) and b in (132, 1, 222, 333);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + // Case 2: Partition columns, bucket columns, and primary keys are the same + tableName = "in_table_2" + sql """DROP TABLE IF EXISTS ${tableName}""" + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c string not null + ) + unique key(a, b) + partition by RANGE(a, b) + ( + partition p0 values [(100, 100), (200, 140)), + partition p1 values [(200, 140), (300, 170)), + partition p2 values [(300, 170), (400, 250)), + partition p3 values [(400, 250), (420, 300)), + partition p4 values [(420, 300), (500, 400)) + ) + distributed by hash(a, b) buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + sql """ + insert into ${tableName} values(123, 120, "a"); + insert into ${tableName} values(150, 120, "b"); + insert into ${tableName} values(222, 150, "c"); + insert into ${tableName} values(333, 200, "e"); + insert into ${tableName} values(400, 260, "f"); + insert into ${tableName} values(400, 250, "g"); + insert into ${tableName} values(440, 350, "h"); + insert into ${tableName} values(450, 320, "i"); + """ + + qt_case_2_sql "select * from ${tableName} where a = 123 and b = 100;" + qt_case_2_sql "select * from ${tableName} where a = 222 and b = 150;" + order_qt_case_2_sql "select * from ${tableName} where a = 123 and b in (132, 120, 222, 333);" + order_qt_case_2_sql "select * from ${tableName} where a = 400 and b in (260, 250, 300);" + order_qt_case_2_sql "select * from ${tableName} where a in (400, 222, 100) and b in (260, 250, 100, 150);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + tableName = "in_table_3" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 3: The partition column is the same as the primary key, and the bucket column is part of the primary key. + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c string not null + ) + unique key(a, b) + partition by RANGE(a, b) + ( + partition p0 values [(100, 100), (100, 140)), + partition p1 values [(100, 140), (200, 140)), + partition p2 values [(200, 140), (300, 170)), + partition p3 values [(300, 170), (400, 250)), + partition p4 values [(400, 250), (420, 300)), + partition p5 values [(420, 300), (500, 400)) + ) + distributed by hash(a) + buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + sql """ + insert into ${tableName} values(100, 100, "aaa"); + insert into ${tableName} values(100, 120, "aaaaa"); + insert into ${tableName} values(123, 100, "a"); + insert into ${tableName} values(150, 100, "b"); + insert into ${tableName} values(350, 200, "c"); + insert into ${tableName} values(400, 250, "d"); + insert into ${tableName} values(400, 280, "e"); + insert into ${tableName} values(450, 350, "f"); + """ + qt_case_3_sql "select * from ${tableName} where a = 123 and b = 100;" + qt_case_3_sql "select * from ${tableName} where a = 222 and b = 100;" + order_qt_case_3_sql "select * from ${tableName} where a = 100 and b in (132, 100, 222, 120);" + order_qt_case_3_sql "select * from ${tableName} where a = 123 and b in (132, 100, 222, 333);" + order_qt_case_3_sql "select * from ${tableName} where a = 400 and b in (250, 280, 300);" + order_qt_case_3_sql "select * from ${tableName} where a in (123, 1, 350, 400, 420, 500, 1000) and b in (132, 100, 222, 200, 350, 250);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + tableName = "in_table_4" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 4: Bucket columns and partition columns are both partial primary keys, + // and there is no overlap between bucket columns and partition columns + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c int not null, + d string not null + ) + unique key(a, b, c) + partition by RANGE(c) + ( + partition p0 values [(100), (200)), + partition p1 values [(200), (300)), + partition p2 values [(300), (400)), + partition p3 values [(400), (500)) + ) + distributed by hash(a, b) + buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + + sql """ + insert into ${tableName} values(123, 100, 110, "a"); + insert into ${tableName} values(222, 100, 115, "b"); + insert into ${tableName} values(12, 12, 120, "c"); + insert into ${tableName} values(1231, 1220, 210, "d"); + insert into ${tableName} values(323, 49, 240, "e"); + insert into ${tableName} values(843, 7342, 370, "f"); + insert into ${tableName} values(633, 2642, 480, "g"); + insert into ${tableName} values(6333, 2642, 480, "h"); + """ + + qt_case_4_sql "select * from ${tableName} where a = 123 and b = 100 and c = 110;" + qt_case_4_sql "select * from ${tableName} where a = 123 and b = 101 and c = 110;" + qt_case_4_sql "select * from ${tableName} where a = 1231 and b = 1220 and c = 210;" + order_qt_case_4_sql "select * from ${tableName} where a = 123 and b in (132, 100, 222, 333) and c in (110, 115, 120);" + order_qt_case_4_sql "select * from ${tableName} where a in (123, 1, 222, 1231, 420, 500) and b in (132, 100, 222, 1220, 300) and c in (210, 110, 115, 210);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + tableName = "in_table_5" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 5: Bucket columns and partition columns are both partial primary keys, + // and bucket columns and partition columns overlap + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c int not null, + d string not null + ) + unique key(a, b, c) + partition by RANGE(a) + ( + partition p0 values [(0), (100)), + partition p1 values [(100), (200)), + partition p2 values [(200), (300)), + partition p3 values [(300), (400)), + partition p4 values [(400), (500)), + partition p5 values [(500), (900)), + partition p6 values [(900), (1200)), + partition p7 values [(1200), (9000)) + ) + distributed by hash(a, b) + buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + + sql """ + insert into ${tableName} values(123, 100, 110, "a"); + insert into ${tableName} values(222, 100, 115, "b"); + insert into ${tableName} values(12, 12, 120, "c"); + insert into ${tableName} values(1231, 1220, 210, "d"); + insert into ${tableName} values(323, 49, 240, "e"); + insert into ${tableName} values(843, 7342, 370, "f"); + insert into ${tableName} values(633, 2642, 480, "g"); + insert into ${tableName} values(6333, 2642, 480, "h"); + """ + + qt_case_5_sql "select * from ${tableName} where a = 123 and b = 100 and c = 110;" + qt_case_5_sql "select * from ${tableName} where a = 123 and b = 101 and c = 110;" + qt_case_5_sql "select * from ${tableName} where a = 1231 and b = 1220 and c = 210;" + order_qt_case_5_sql "select * from ${tableName} where a = 123 and b in (132, 100, 222, 333) and c in (110, 115, 120);" + order_qt_case_5_sql "select * from ${tableName} where a in (123, 12, 222, 1231, 420, 500, 6333, 633, 843) and b in (132, 100, 222, 1220, 300, 2642) and c in (210, 110, 115, 210, 480);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + tableName = "in_table_6" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 6: Default partitioning, primary keys are all bucket columns + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c string not null + ) + unique key(a, b) + distributed by hash(a, b) buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + sql """ + insert into ${tableName} values(123, 132, "a"); + insert into ${tableName} values(123, 222, "b"); + insert into ${tableName} values(22, 2, "c"); + insert into ${tableName} values(1, 1, "d"); + insert into ${tableName} values(2, 2, "e"); + insert into ${tableName} values(3, 3, "f"); + insert into ${tableName} values(4, 4, "i"); + """ + qt_case_6_sql "select * from ${tableName} where a = 123 and b = 132;" + order_qt_case_6_sql "select * from ${tableName} where a = 123 and b in (132, 1, 222, 333);" + order_qt_case_6_sql "select * from ${tableName} where a in (123, 1, 222, 2, 3, 4) and b in (132, 1, 222, 333, 2, 3, 4);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + tableName = "in_table_7" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 7: Partition and bucket columns are the same, but only part of the primary key + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c int not null, + d string not null + ) + unique key(a, b, c) + partition by RANGE(a) + ( + partition p0 values [(0), (100)), + partition p1 values [(100), (200)), + partition p2 values [(200), (300)), + partition p3 values [(300), (400)), + partition p4 values [(400), (500)), + partition p5 values [(500), (900)), + partition p6 values [(900), (1200)), + partition p7 values [(1200), (9000)) + ) + distributed by hash(a) + buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + + sql """ + insert into ${tableName} values(123, 100, 110, "a"); + insert into ${tableName} values(222, 100, 115, "b"); + insert into ${tableName} values(12, 12, 120, "c"); + insert into ${tableName} values(1231, 1220, 210, "d"); + insert into ${tableName} values(323, 49, 240, "e"); + insert into ${tableName} values(843, 7342, 370, "f"); + insert into ${tableName} values(633, 2642, 480, "g"); + insert into ${tableName} values(6333, 2642, 480, "h"); + """ + + qt_case_7_sql "select * from ${tableName} where a = 123 and b = 100 and c = 110;" + qt_case_7_sql "select * from ${tableName} where a = 123 and b = 101 and c = 110;" + qt_case_7_sql "select * from ${tableName} where a = 1231 and b = 1220 and c = 210;" + order_qt_case_7_sql "select * from ${tableName} where a = 123 and b in (132, 100, 222, 333) and c in (110, 115, 120);" + order_qt_case_7_sql "select * from ${tableName} where a in (123, 12, 222, 1231, 420, 500, 6333, 633, 843) and b in (132, 100, 222, 1220, 300, 2642) and c in (210, 110, 115, 210, 480);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + tableName = "in_table_8" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 8: The bucket column is the same as the primary key, and the partition column is part of the primary key. + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c string not null + ) + unique key(a, b) + partition by RANGE(a) + ( + partition p0 values [(100), (200)), + partition p1 values [(200), (300)), + partition p2 values [(300), (400)), + partition p3 values [(400), (420)), + partition p4 values [(420), (500)) + ) + distributed by hash(a, b) + buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + sql """ + insert into ${tableName} values(123, 100, "a"); + insert into ${tableName} values(150, 100, "b"); + insert into ${tableName} values(350, 200, "c"); + insert into ${tableName} values(400, 250, "d"); + insert into ${tableName} values(400, 280, "e"); + insert into ${tableName} values(450, 350, "f"); + """ + qt_case_8_sql "select * from ${tableName} where a = 123 and b = 100;" + qt_case_8_sql "select * from ${tableName} where a = 222 and b = 100;" + order_qt_case_8_sql "select * from ${tableName} where a = 123 and b in (132, 100, 222, 333);" + order_qt_case_8_sql "select * from ${tableName} where a = 400 and b in (250, 280, 300);" + order_qt_case_8_sql "select * from ${tableName} where a in (123, 1, 350, 400, 420, 500, 1000) and b in (132, 100, 222, 200, 350, 250);" + + sql """DROP TABLE IF EXISTS ${tableName}""" + + tableName = "in_table_9" + sql """DROP TABLE IF EXISTS ${tableName}""" + // Case 9: Partition leftmost match + sql """ + create table ${tableName} ( + a int not null, + b int not null, + c int not null, + d string not null + ) + unique key(a, b, c) + partition by RANGE(a, b, c) + ( + partition p values [(1, 1, 1), (100, 100, 100)), + partition p0 values [(100, 100, 100), (200, 210, 220)), + partition p1 values [(200, 210, 220), (300, 250, 290)), + partition p2 values [(300, 250, 290), (350, 290, 310)), + partition p3 values [(350, 290, 310), (400, 350, 390)), + partition p4 values [(400, 350, 390), (800, 400, 450)), + partition p5 values [(800, 400, 450), (2000, 500, 500)), + partition p6 values [(2000, 500, 500), (5000, 600, 600)), + partition p7 values [(5000, 600, 600), (9999, 9999, 9999)) + ) + distributed by hash(a, c) + buckets 16 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + sql """ + insert into ${tableName} values(123, 100, 110, "zxcd"); + insert into ${tableName} values(222, 100, 115, "zxc"); + insert into ${tableName} values(12, 12, 120, "zxc"); + insert into ${tableName} values(1231, 1220, 210, "zxc"); + insert into ${tableName} values(323, 49, 240, "zxc"); + insert into ${tableName} values(843, 7342, 370, "zxcde"); + insert into ${tableName} values(633, 2642, 480, "zxc"); + insert into ${tableName} values(6333, 2642, 480, "zxc"); + """ + + order_qt_case_9_sql "select * from ${tableName} where a=123 and b=100 and c=110;" + order_qt_case_9_sql "select * from ${tableName} where a=222 and b=100 and c=115;" + order_qt_case_9_sql "select * from ${tableName} where a=323 and b=49 and c=240;" + order_qt_case_9_sql "select * from ${tableName} where b=100 and a=123 and c=110;" + order_qt_case_9_sql "select * from ${tableName} where a=1231 and b=1220 and c=210;" + order_qt_case_9_sql "select * from ${tableName} where a=6333 and b=2642 and c=480;" + order_qt_case_9_sql "select * from ${tableName} where a=633 and b=2642 and c=480;" + order_qt_case_9_sql "select * from ${tableName} where a=123 and b in (132,100,222,333) and c in (110, 115, 120);" + order_qt_case_9_sql "select * from ${tableName} where a in (222,1231) and b in (100,1220,2642) and c in (115,210,480);" + order_qt_case_9_sql "select * from ${tableName} where a in (123,222,12) and b in (100,12) and c in (110,115,120,210);" + order_qt_case_9_sql "select * from ${tableName} where a=1231 and b in (20490,1220,300) and c = 210;" + order_qt_case_9_sql "select * from ${tableName} where a in (123,1,222, 400,420, 500) and b in (132,100,222, 200,300) and c in (110,115,210);" + order_qt_case_9_sql "select * from ${tableName} where a in (123,1,222, 1231,420, 500) and b in (132,100,222, 1220,300) and c in (210,110,115,210);" + sql """DROP TABLE IF EXISTS ${tableName}""" } \ No newline at end of file diff --git a/regression-test/suites/point_query_p0/test_point_query_ck.groovy b/regression-test/suites/point_query_p0/test_point_query_ck.groovy index f7c53c7207e..189c0f22d1a 100644 --- a/regression-test/suites/point_query_p0/test_point_query_ck.groovy +++ b/regression-test/suites/point_query_p0/test_point_query_ck.groovy @@ -182,6 +182,32 @@ suite("test_point_query_ck") { stmt.setString(3, "dd") qe_point_select stmt + // IN query + def stmt_in = prepareStatement "SELECT /*+ SET_VAR(enable_nereids_planner=true) */ * FROM ${tableName} WHERE k1 IN (?, ?, ?, ?) AND k2 IN (?, ?, ?) AND k3 IN (?, ?, ?, ?)" + assertEquals(stmt_in.class, com.mysql.cj.jdbc.ServerPreparedStatement); + stmt_in.setInt(1, 1231) + stmt_in.setInt(2, 1237) + stmt_in.setInt(3, 251) + stmt_in.setInt(4, 252) + stmt_in.setBigDecimal(5, new BigDecimal("119291.11")) + stmt_in.setBigDecimal(6, new BigDecimal("120939.11130")) + stmt_in.setBigDecimal(7, new BigDecimal("12222.99121135")) + stmt_in.setString(8, "ddd") + stmt_in.setString(9, 'xxx') + stmt_in.setString(10, generateString(251)) + stmt_in.setString(11, generateString(252)) + order_qe_point_in_select stmt_in + stmt_in.close() + + stmt_in = prepareStatement "SELECT /*+ SET_VAR(enable_nereids_planner=true) */ * FROM ${tableName} WHERE k1 = ? AND k2 IN (?, ?) AND k3 IN (?, ?)" + assertEquals(stmt_in.class, com.mysql.cj.jdbc.ServerPreparedStatement); + stmt_in.setInt(1, 1235) + stmt_in.setBigDecimal(2, new BigDecimal("991129292901.11138")) + stmt_in.setBigDecimal(3, new BigDecimal("120939.11130")) + stmt_in.setString(4, "dd") + stmt_in.setString(5, "a ddd") + order_qe_point_in_select stmt_in + def stmt_fn = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) from ${tableName} where k1 = ? and k2 =? and k3 = ?" assertEquals(stmt_fn.class, com.mysql.cj.jdbc.ServerPreparedStatement); stmt_fn.setInt(1, 1231) @@ -191,6 +217,19 @@ suite("test_point_query_ck") { qe_point_select stmt_fn qe_point_select stmt_fn + // IN query + def stmt_fn_in = prepareStatement "SELECT /*+ SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) FROM ${tableName} WHERE k1 IN (?, ?) AND k2 IN (?, ?) AND k3 IN (?, ?)" + assertEquals(stmt_fn_in.class, com.mysql.cj.jdbc.ServerPreparedStatement); + stmt_fn_in.setInt(1, 1235) + stmt_fn_in.setInt(2, 1231) + stmt_fn_in.setBigDecimal(3, new BigDecimal("119291.11")) + stmt_fn_in.setBigDecimal(4, new BigDecimal("991129292901.11138")) + stmt_fn_in.setString(5, "dd") + stmt_fn_in.setString(6, "ddd") + order_qe_point_in_select stmt_fn_in + order_qe_point_in_select stmt_fn_in + order_qe_point_in_select stmt_fn_in + nprep_sql """ ALTER table ${tableName} ADD COLUMN new_column0 INT default "0"; """ @@ -201,34 +240,53 @@ suite("test_point_query_ck") { stmt.setString(3, "a ddd") qe_point_select stmt qe_point_select stmt + order_qe_point_in_select stmt_in + order_qe_point_in_select stmt_in // invalidate cache // sql "sync" nprep_sql """ INSERT INTO ${tableName} VALUES(1235, 120939.11130, "a ddd", "xxxxxx", "2030-01-02", "2020-01-01 12:36:38", 22.822, "7022-01-01 11:30:38", 0, 1929111.1111,[119291.19291], ["111", "222", "333"], 2) """ qe_point_select stmt qe_point_select stmt qe_point_select stmt + order_qe_point_in_select stmt_in + order_qe_point_in_select stmt_in + order_qe_point_in_select stmt_in nprep_sql """ - ALTER table ${tableName} ADD COLUMN new_column1 INT default "0"; + ALTER table ${tableName} ADD COLUMN new_column1 INT default "0"; """ qe_point_select stmt qe_point_select stmt + order_qe_point_in_select stmt_in + order_qe_point_in_select stmt_in nprep_sql """ - ALTER table ${tableName} DROP COLUMN new_column1; + ALTER table ${tableName} DROP COLUMN new_column1; """ qe_point_select stmt qe_point_select stmt - - nprep_sql """ - ALTER table ${tableName} ADD COLUMN new_column1 INT default "0"; + order_qe_point_in_select stmt_in + order_qe_point_in_select stmt_in + sql """ + ALTER table ${tableName} ADD COLUMN new_column1 INT default "0"; """ - sql "select 1" - qe_point_select stmt + qe_point_select stmt + order_qe_point_in_select stmt_in } + // disable useServerPrepStmts def result2 = connect(user, password, context.config.jdbcUrl) { qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = 1231 and k2 = 119291.11 and k3 = 'ddd'""" qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'""" qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4), k7 + 10.1 from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'""" + order_qt_in_sql """ + SELECT + /*+ SET_VAR(enable_nereids_planner=true) */ * + FROM + ${tableName} + WHERE + k1 IN (1231, 1237) AND + k2 IN (119291.11, 120939.11130) AND + k3 IN ('ddd', 'a ddd') + """ // prepared text // sql """ prepare stmt1 from select * from ${tableName} where k1 = % and k2 = % and k3 = % """ // qt_sql """execute stmt1 using (1231, 119291.11, 'ddd')""" @@ -257,8 +315,14 @@ suite("test_point_query_ck") { "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "false" );""" - sql """insert into ${tableName} values (0, "1", "2", "3")""" + sql """ + insert into ${tableName} values (0, "1", "2", "3"); + insert into ${tableName} values (1, "2", "3", "4"); + insert into ${tableName} values (2, "3", "4", "5"); + insert into ${tableName} values (3, "4", "5", "6"); + """ qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where customer_key = 0""" + order_qt_in_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where customer_key in (0, 1, 2, 3, 4, 5, 6)""" } } tableName = "test_ODS_EBA_LLREPORT_ck" diff --git a/regression-test/suites/point_query_p0/test_point_query_partition.groovy b/regression-test/suites/point_query_p0/test_point_query_partition.groovy index 848729b4423..a98ccff0393 100644 --- a/regression-test/suites/point_query_p0/test_point_query_partition.groovy +++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy @@ -114,6 +114,24 @@ suite("test_point_query_partition") { qe_point_select stmt } + // IN query + def result2 = connect(user=user, password=password, url=prepare_url) { + def stmt = prepareStatement "SELECT * FROM ${tableName} WHERE k1 IN (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); + stmt.setInt(1, 1) + stmt.setInt(2, 2) + stmt.setInt(3, 11) + stmt.setInt(4, -1) + stmt.setInt(5, 12) + stmt.setInt(6, 34) + stmt.setInt(7, 33) + stmt.setInt(8, 45) + stmt.setInt(9, 666) + stmt.setInt(10, 999) + stmt.setInt(11, 1000) + order_qe_point_in_select stmt + } + sql "DROP TABLE IF EXISTS regression_test_serving_p0.customer"; sql """ CREATE TABLE regression_test_serving_p0.customer ( @@ -135,8 +153,8 @@ suite("test_point_query_partition") { PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "store_row_column" = "true" - ); - """ + ); + """ sql """insert into regression_test_serving_p0.customer(customer_key, customer_value_0, customer_value_1) values(686612, "686612", "686612")""" sql """insert into regression_test_serving_p0.customer(customer_key, customer_value_0, customer_value_1) values(686613, "686613", "686613")""" def result3 = connect(user, password, prepare_url) { @@ -149,4 +167,15 @@ suite("test_point_query_partition") { qe_point_selectmmm stmt qe_point_selecteee stmt } + + // IN query + def result4 = connect(user=user, password=password, url=prepare_url) { + def stmt = prepareStatement "SELECT /*+ SET_VAR(enable_nereids_planner=true) */ * FROM regression_test_serving_p0.customer WHERE customer_key IN (?, ?)" + stmt.setInt(1, 686612) + stmt.setInt(2, 686613) + order_qe_point_in_selectxxx stmt + order_qe_point_in_selectyyy stmt + order_qe_point_in_selectmmm stmt + order_qe_point_in_selecteee stmt + } } \ No newline at end of file diff --git a/regression-test/suites/point_query_p0/test_rowstore.groovy b/regression-test/suites/point_query_p0/test_rowstore.groovy index 13279e3ce87..8d943f2cee8 100644 --- a/regression-test/suites/point_query_p0/test_rowstore.groovy +++ b/regression-test/suites/point_query_p0/test_rowstore.groovy @@ -204,14 +204,26 @@ suite("test_rowstore", "p0,nonConcurrent") { assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); qe_point_select stmt } + def prep_in_sql = { sql_str, in_list -> + def stmt = prepareStatement sql_str + for (int i = 0; i < in_list.size(); i++) { + stmt.setInt(i + 1, in_list[i]) + } + assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); + order_qe_point_in_select stmt + } def sql_str = "select v1, v2 from table_with_column_group where k1 = ?" + def sql_in_str = "select v1, v2 from table_with_column_group where k1 in (?, ?, ?, ?)" prep_sql sql_str, 1 prep_sql sql_str, 2 prep_sql sql_str, 3 + prep_in_sql sql_in_str, [1, 2, 3, 10] sql_str = "select v2 from table_with_column_group where k1 = ?" + sql_in_str = "select v2 from table_with_column_group where k1 in (?, ?, ?, ?)" prep_sql sql_str, 1 prep_sql sql_str, 2 prep_sql sql_str, 3 + prep_in_sql sql_in_str, [1, 2, 3, 10] sql_str = "select v1 from table_with_column_group where k1 = ?" prep_sql sql_str, 3 sql_str = "select v2, v1 from table_with_column_group where k1 = ?" @@ -222,14 +234,18 @@ suite("test_rowstore", "p0,nonConcurrent") { prep_sql sql_str, 1 sql_str = "select v2 from table_with_column_group2 where k1 = ?" + sql_in_str = "select v2 from table_with_column_group2 where k1 in (?, ?, ?, ?)" prep_sql sql_str, 1 prep_sql sql_str, 2 prep_sql sql_str, 3 + prep_in_sql sql_in_str, [1, 2, 3, 10] sql_str = "select v4 from table_with_column_group3 where k1 = ?" + sql_in_str = "select v4 from table_with_column_group3 where k1 in (?, ?, ?, ?)" prep_sql sql_str, 1 prep_sql sql_str, 2 prep_sql sql_str, 3 + prep_in_sql sql_in_str, [1, 2, 3, 10] def setPrepareStmtArgs = {stmt, user_id, date, datev2, datetimev2_1, datetimev2_2, city, age, sex -> java.text.SimpleDateFormat formater = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS"); @@ -242,6 +258,39 @@ suite("test_rowstore", "p0,nonConcurrent") { stmt.setInt(7, age) stmt.setInt(8, sex) } + def setInPrepareStmtArgs = {stmt, user_id_list, date_list, datev2_list, + datetimev2_1_list, datetimev2_2_list, city_list, age_list, sex_list -> + java.text.SimpleDateFormat formater = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS"); + def total_size = user_id_list.size() + date_list.size() + datev2_list.size() + + datetimev2_1_list.size() + datetimev2_2_list.size() + city_list.size() + + age_list.size() + sex_list.size() + def idx = 0 + for (int i = 0; i < user_id_list.size(); i++) { + stmt.setInt(++idx, user_id_list[i]) + } + for (int i = 0; i < date_list.size(); i++) { + stmt.setDate(++idx, java.sql.Date.valueOf(date_list[i])) + } + for (int i = 0; i < datev2_list.size(); i++) { + stmt.setDate(++idx, java.sql.Date.valueOf(datev2_list[i])) + } + for (int i = 0; i < datetimev2_1_list.size(); i++) { + stmt.setTimestamp(++idx, new java.sql.Timestamp(formater.parse(datetimev2_1_list[i]).getTime())) + } + for (int i = 0; i < datetimev2_2_list.size(); i++) { + stmt.setTimestamp(++idx, new java.sql.Timestamp(formater.parse(datetimev2_2_list[i]).getTime())) + } + for (int i = 0; i < city_list.size(); i++) { + stmt.setString(++idx, city_list[i]) + } + for (int i = 0; i < age_list.size(); i++) { + stmt.setInt(++idx, age_list[i]) + } + for (int i = 0; i < sex_list.size(); i++) { + stmt.setInt(++idx, sex_list[i]) + } + assertEquals(idx, total_size) + } def stmt = prepareStatement """ SELECT datetimev2_1,datetime_val1,datetime_val2,max_dwell_time FROM table_with_column_group_xxx t where user_id = ? and date = ? and datev2 = ? and datetimev2_1 = ? and datetimev2_2 = ? and city = ? and age = ? and sex = ?; """ setPrepareStmtArgs stmt, 1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.21', '2017-10-01 11:11:11.11', 'Beijing', 10, 1 @@ -260,6 +309,27 @@ suite("test_rowstore", "p0,nonConcurrent") { qe_point_select stmt setPrepareStmtArgs stmt, 4, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.28', '2017-10-01 11:11:11.18', 'Beijing', 10, 1 qe_point_select stmt + + def in_stmt = prepareStatement """ + SELECT + datetimev2_1,datetime_val1,datetime_val2,max_dwell_time + FROM + table_with_column_group_xxx t + WHERE + user_id IN (?, ?, ?, ?, ?) AND + date IN (?, ?) AND + datev2 IN (?, ?) AND + datetimev2_1 IN (?, ?, ?, ?, ?, ?, ?, ?) AND + datetimev2_2 IN (?, ?, ?, ?, ?, ?, ?, ?) AND + city IN (?, ?) AND + age IN (?, ?, ?, ?, ?) AND + sex IN (?, ?); + """ + setInPrepareStmtArgs in_stmt , [1, 2, 3, 4, 5], ['2017-10-01', '2017-10-02'], ['2017-10-01', '2017-10-03'], + ['2017-10-01 11:11:11.21', '2017-10-01 11:11:11.22', '2017-10-01 11:11:11.23', '2017-10-01 11:11:11.24', '2017-10-01 11:11:11.25', '2017-10-01 11:11:11.26', '2017-10-01 11:11:11.27', '2017-10-01 11:11:11.28'], + ['2017-10-01 11:11:11.11', '2017-10-01 11:11:11.12', '2017-10-01 11:11:11.13', '2017-10-01 11:11:11.14', '2017-10-01 11:11:11.15', '2017-10-01 11:11:11.16', '2017-10-01 11:11:11.17', '2017-10-01 11:11:11.18'], + ['Beijing', 'Shanghai'], [10, 11, 12, 13, 14], [0, 1] + order_qe_point_in_select in_stmt } sql "DROP TABLE IF EXISTS table_with_column_group4" diff --git a/regression-test/suites/point_query_p0/test_rowstore_query.groovy b/regression-test/suites/point_query_p0/test_rowstore_query.groovy index db5f74f3f61..bd4d51ab29b 100644 --- a/regression-test/suites/point_query_p0/test_rowstore_query.groovy +++ b/regression-test/suites/point_query_p0/test_rowstore_query.groovy @@ -39,5 +39,6 @@ suite("test_rowstore", "p0") { sql """insert into ${tableName} values (1, 'abc', 1111919.12345678919)""" qt_sql """select * from ${tableName}""" sql """insert into ${tableName} values (2, 'def', 1111919.12345678919)""" - qt_sql """select * from ${tableName} where k1 = 2""" + qt_point_sql """select * from ${tableName} where k1 = 2""" + qt_point_in_sql """select * from ${tableName} where k1 in (1, 2, 3, 4, 5)""" } \ No newline at end of file diff --git a/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy b/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy index 54ec1efa4b3..e6c161a1a56 100644 --- a/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy +++ b/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy @@ -190,5 +190,44 @@ suite("test_prepared_stmt_in_list", "nonConcurrent") { stmt_read10.setString(4, '5022-01-01 11:30:38') stmt_read10.setString(5, '6022-01-01 11:30:38') qe_stmt_read10_2 stmt_read10 + + table_name = "tbl_prepared_stmt_in_list2" + sql """DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` tinyint NULL COMMENT "", + `k2` smallint NULL COMMENT "", + `k3` int NULL COMMENT "" + ) ENGINE=OLAP + UNIQUE KEY(`k1`, `k2`) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "light_schema_change" = "true", + "storage_format" = "V2", + "store_row_column" = "true" + ) + """ + + sql """ INSERT INTO ${tableName} VALUES(1, 1300, 55356821) """ + sql """ INSERT INTO ${tableName} VALUES(2, 1301, 56052706) """ + sql """ INSERT INTO ${tableName} VALUES(3, 1302, 55702967) """ + sql """ INSERT INTO ${tableName} VALUES(4, 1303, 56054326) """ + sql """ INSERT INTO ${tableName} VALUES(5, 1304, 36548425) """ + sql """ INSERT INTO ${tableName} VALUES(6, 1305, 56054803) """ + sql """ INSERT INTO ${tableName} VALUES(7, 1306, 56055112) """ + sql """sync""" + + def stmt_read11 = prepareStatement "select * from ${tableName} WHERE `k1` IN (?, ?, ?, ?, ?) AND `k2` IN (?, ?, ?, ?)" + stmt_read11.setByte(1, (byte) 1) + stmt_read11.setByte(2, (byte) 2) + stmt_read11.setByte(3, (byte) 3) + stmt_read11.setByte(4, (byte) 4) + stmt_read11.setByte(5, (byte) 5) + stmt_read11.setShort(6, (short) 1300) + stmt_read11.setShort(7, (short) 1301) + stmt_read11.setShort(8, (short) 1302) + stmt_read11.setShort(9, (short) 1303) + order_qe_stmt_read11_1 stmt_read11 } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org