This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch short-circuit-in
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/short-circuit-in by this push:
     new e00b9400562 [Enhancement](Short Circuit) short circuit query supports 
`IN` (#39468)
e00b9400562 is described below

commit e00b9400562a23bdf8ee416f47366b53534ce2c0
Author: Xr Ling <63634816+lxr...@users.noreply.github.com>
AuthorDate: Wed Jan 1 22:19:08 2025 +0800

    [Enhancement](Short Circuit) short circuit query supports `IN` (#39468)
    
    ## Proposed changes
    
    short circuit query support `IN`
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: lihangyu <15605149...@163.com>
    Co-authored-by: Liuyushiii <liuyushi...@163.com>
    Co-authored-by: eldenmoon <lihan...@selectdb.com>
---
 be/src/olap/base_tablet.cpp                        |   1 -
 be/src/service/internal_service.cpp                |  29 +
 be/src/service/internal_service.h                  |   5 +
 be/src/service/point_query_executor.cpp            |   5 +
 .../org/apache/doris/catalog/PartitionKey.java     |  23 +
 .../LogicalResultSinkToShortCircuitPointQuery.java |   3 +-
 .../doris/planner/HashDistributionPruner.java      |  30 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  81 ++-
 .../PartitionPruneV2ForShortCircuitPlan.java       |  41 +-
 .../doris/planner/PartitionPrunerV2Base.java       |  52 +-
 .../doris/planner/RangePartitionPrunerV2.java      |  66 ++-
 .../org/apache/doris/qe/PointQueryExecutor.java    | 599 +++++++++++++++++----
 .../org/apache/doris/rpc/BackendServiceClient.java |   5 +
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  12 +
 gensrc/proto/internal_service.proto                |  11 +
 .../data/point_query_p0/test_point_IN_query.out    |  27 +
 .../data/point_query_p0/test_point_query.out       | 179 ++++++
 .../point_query_p0/test_point_query_partition.out  |  25 +
 .../data/point_query_p0/test_rowstore.out          |  30 ++
 .../data/point_query_p0/test_rowstore_query.out    |   6 +-
 .../prepared_stmt_p0/prepared_stmt_in_list.out     |   6 +
 .../org/apache/doris/regression/suite/Suite.groovy |   6 +-
 .../test_row_store_page_size.groovy                |   4 +-
 .../test_dynamic_partition_point_query.groovy      | 122 +++++
 .../point_query_p0/test_point_IN_query.groovy      |  85 +++
 .../suites/point_query_p0/test_point_query.groovy  | 400 ++++++++++++++
 .../point_query_p0/test_point_query_ck.groovy      |  80 ++-
 .../test_point_query_partition.groovy              |  33 +-
 .../suites/point_query_p0/test_rowstore.groovy     |  70 +++
 .../point_query_p0/test_rowstore_query.groovy      |   3 +-
 .../prepared_stmt_p0/prepared_stmt_in_list.groovy  |  39 ++
 31 files changed, 1910 insertions(+), 168 deletions(-)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 40c5443477d..dc3b50688b6 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -495,7 +495,6 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
TabletSchema* latest
         }
         auto& segments = segment_caches[i]->get_segments();
         DCHECK_EQ(segments.size(), num_segments);
-
         for (auto id : picked_segments) {
             Status s = segments[id]->lookup_row_key(encoded_key, schema, 
with_seq_col, with_rowid,
                                                     &loc, stats, 
encoded_seq_value);
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index fb0b2f090bc..b58b697036d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -948,6 +948,35 @@ void 
PInternalService::tablet_fetch_data(google::protobuf::RpcController* contro
     }
 }
 
+void 
PInternalService::tablet_batch_fetch_data(google::protobuf::RpcController* 
controller,
+                                               const 
PTabletBatchKeyLookupRequest* batchRequest,
+                                               PTabletBatchKeyLookupResponse* 
batchResponse,
+                                               google::protobuf::Closure* 
done) {
+    int request_count = batchRequest->sub_key_lookup_req_size();
+    batchResponse->mutable_sub_key_lookup_res()->Reserve(request_count);
+    [[maybe_unused]] auto* cntl = static_cast<brpc::Controller*>(controller);
+    bool ret =
+            _light_work_pool.try_offer([this, batchRequest, batchResponse, 
done, request_count]() {
+                Status st = Status::OK();
+                brpc::ClosureGuard guard(done);
+                for (int i = 0; i < request_count; ++i) {
+                    batchResponse->add_sub_key_lookup_res();
+                    const PTabletKeyLookupRequest* request = 
&batchRequest->sub_key_lookup_req(i);
+                    PTabletKeyLookupResponse* response =
+                            batchResponse->mutable_sub_key_lookup_res(i);
+                    Status status = _tablet_fetch_data(request, response);
+                    status.to_protobuf(response->mutable_status());
+                    if (!status.ok()) {
+                        st = status;
+                    }
+                }
+                st.to_protobuf(batchResponse->mutable_status());
+            });
+    if (!ret) {
+        offer_failed(batchResponse, done, _light_work_pool);
+    }
+}
+
 void PInternalService::test_jdbc_connection(google::protobuf::RpcController* 
controller,
                                             const PJdbcTestConnectionRequest* 
request,
                                             PJdbcTestConnectionResult* result,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 66a0f867393..0c683ccc3ff 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -226,6 +226,11 @@ public:
                            PTabletKeyLookupResponse* response,
                            google::protobuf::Closure* done) override;
 
+    void tablet_batch_fetch_data(google::protobuf::RpcController* controller,
+                                 const PTabletBatchKeyLookupRequest* 
batchRequest,
+                                 PTabletBatchKeyLookupResponse* batchResponse,
+                                 google::protobuf::Closure* done) override;
+
     void test_jdbc_connection(google::protobuf::RpcController* controller,
                               const PJdbcTestConnectionRequest* request,
                               PJdbcTestConnectionResult* result,
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index ea991e158a1..3f77a9f2d2b 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -262,6 +262,11 @@ Status PointQueryExecutor::init(const 
PTabletKeyLookupRequest* request,
     auto cache_handle = LookupConnectionCache::instance()->get(uuid);
     _binary_row_format = request->is_binary_row();
     _tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id()));
+
+    if (_tablet->tablet_meta()->replica_id() != request->replica_id()) {
+        return Status::OK();
+    }
+
     if (cache_handle != nullptr) {
         _reusable = cache_handle;
         _profile_metrics.hit_lookup_cache = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index 29bfda8b201..854cdd27f50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -58,6 +58,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.zip.CRC32;
@@ -79,6 +80,24 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         types = Lists.newArrayList();
     }
 
+    private PartitionKey(PartitionKey other) {
+        this.keys = new ArrayList<>(other.keys.size());
+        for (LiteralExpr expr : other.keys) {
+            try {
+                String value = expr.getStringValue();
+                if ("null".equalsIgnoreCase(value)) {
+                    this.keys.add(NullLiteral.create(expr.getType()));
+                } else {
+                    this.keys.add(LiteralExpr.create(value, expr.getType()));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("Create partition key failed: " + 
e.getMessage());
+            }
+        }
+        this.originHiveKeys = new ArrayList<>(other.originHiveKeys);
+        this.types = new ArrayList<>(other.types);
+    }
+
     public void setDefaultListPartition(boolean isDefaultListPartitionKey) {
         this.isDefaultListPartitionKey = isDefaultListPartitionKey;
     }
@@ -205,6 +224,10 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         return createListPartitionKeyWithTypes(values, types, false);
     }
 
+    public static PartitionKey clone(PartitionKey other) {
+        return new PartitionKey(other);
+    }
+
     public void pushColumn(LiteralExpr keyValue, PrimitiveType keyType) {
         keys.add(keyValue);
         types.add(keyType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
index c087dcbb37b..56103f863d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
@@ -25,6 +25,7 @@ import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.Cast;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InPredicate;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@@ -53,7 +54,7 @@ public class LogicalResultSinkToShortCircuitPointQuery 
implements RewriteRuleFac
     private boolean 
filterMatchShortCircuitCondition(LogicalFilter<LogicalOlapScan> filter) {
         return filter.getConjuncts().stream().allMatch(
                 // all conjuncts match with pattern `key = ?`
-                expression -> (expression instanceof EqualTo)
+                expression -> ((expression instanceof EqualTo) || expression 
instanceof InPredicate)
                         && 
(removeCast(expression.child(0)).isKeyColumnFromTable()
                         || (expression.child(0) instanceof SlotReference
                         && ((SlotReference) 
expression.child(0)).getName().equals(Column.DELETE_SIGN)))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java
index 2a60e4029a6..38bfe6820fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java
@@ -26,9 +26,8 @@ import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.common.Config;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.util.Collection;
 import java.util.List;
@@ -49,8 +48,6 @@ import java.util.Set;
  * If depth is larger than 'max_distribution_pruner_recursion_depth', all 
buckets will be return without pruning.
  */
 public class HashDistributionPruner implements DistributionPruner {
-    private static final Logger LOG = 
LogManager.getLogger(HashDistributionPruner.class);
-
     // partition list, sort by the hash code
     private List<Long> bucketsList;
     // partition columns
@@ -61,6 +58,18 @@ public class HashDistributionPruner implements 
DistributionPruner {
 
     private boolean isBaseIndexSelected;
 
+    /*
+     * This map maintains a relationship between distribution keys and their 
corresponding tablet IDs.
+     * For example, if the distribution columns are (k1, k2, k3),
+     * and the tuple (1, 2, 3) is hashed into bucket 1001,
+     * then the `distributionKey2TabletIDs` would map the key (1, 2, 3) to the 
tablet ID 1001.
+     * (1, 2, 3) -> 1001
+     * Map structure:
+     * - Key: PartitionKey, representing a specific combination of 
distribution columns (e.g., k1, k2, k3).
+     * - Value: Set<Long>, containing the tablet IDs associated with the 
corresponding distribution key.
+     */
+    private Map<PartitionKey, Set<Long>> distributionKey2TabletIDs = 
Maps.newHashMap();
+
     public HashDistributionPruner(List<Long> bucketsList, List<Column> columns,
                            Map<String, PartitionColumnFilter> filters, int 
hashMod, boolean isBaseIndexSelected) {
         this.bucketsList = bucketsList;
@@ -70,13 +79,21 @@ public class HashDistributionPruner implements 
DistributionPruner {
         this.isBaseIndexSelected = isBaseIndexSelected;
     }
 
+    public Map<PartitionKey, Set<Long>> getDistributionKeysTabletIDs() {
+        return distributionKey2TabletIDs;
+    }
+
     // columnId: which column to compute
     // hashKey: the key which to compute hash value
     public Collection<Long> prune(int columnId, PartitionKey hashKey, int 
complex) {
         if (columnId == distributionColumns.size()) {
             // compute Hash Key
             long hashValue = hashKey.getHashValue();
-            return Lists.newArrayList(bucketsList.get((int) ((hashValue & 
0xffffffff) % hashMod)));
+            List<Long> result =
+                    Lists.newArrayList(bucketsList.get((int) ((hashValue & 
0xffffffff) % hashMod)));
+            
distributionKey2TabletIDs.computeIfAbsent(PartitionKey.clone(hashKey),
+                                                      k -> 
Sets.newHashSet(result)).addAll(result);
+            return result;
         }
         Column keyColumn = distributionColumns.get(columnId);
         String columnName = isBaseIndexSelected ? keyColumn.getName()
@@ -119,9 +136,6 @@ public class HashDistributionPruner implements 
DistributionPruner {
             Collection<Long> subList = prune(columnId + 1, hashKey, 
newComplex);
             resultSet.addAll(subList);
             hashKey.popColumn();
-            if (resultSet.size() >= bucketsList.size()) {
-                break;
-            }
         }
         return resultSet;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 92175523f22..3756dc985c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.InPredicate;
 import org.apache.doris.analysis.IntLiteral;
-import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
@@ -52,6 +51,7 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Partition.PartitionState;
 import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ScalarType;
@@ -98,9 +98,13 @@ import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
 import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -192,7 +196,21 @@ public class OlapScanNode extends ScanNode {
     private Set<Long> sampleTabletIds = Sets.newHashSet();
     private TableSample tableSample;
 
+    private HashSet<Long> scanBackendIds = new HashSet<>();
+
+    private PartitionPruner partitionPruner = null;
+
     private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
+
+    // Maps partition column names to a RangeMap that associates ColumnBound 
ranges with lists of partition IDs,
+    // similar to the implementation in PartitionPrunerV2Base.
+    private Map<String, RangeMap<ColumnBound, List<Long>>> 
partitionCol2PartitionID = Maps.newHashMap();
+
+    private Map<PartitionKey, Set<Long>> distributionKeys2TabletID = 
Maps.newHashMap();
+
+    /// tablet id -> (backend id -> replica)
+    private Table<Long, Long, Replica> scanBackendReplicaTable = 
HashBasedTable.create();
+
     // a bucket seq may map to many tablets, and each tablet has a
     // TScanRangeLocations.
     public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations 
= ArrayListMultimap.create();
@@ -257,6 +275,14 @@ public class OlapScanNode extends ScanNode {
         return scanBackendIds;
     }
 
+    public Map<String, RangeMap<ColumnBound, List<Long>>> 
getPartitionCol2PartitionID() {
+        return partitionCol2PartitionID;
+    }
+
+    public Map<PartitionKey, Set<Long>> getDistributionKeys2TabletID() {
+        return distributionKeys2TabletID;
+    }
+
     public void setSampleTabletIds(List<Long> sampleTablets) {
         if (sampleTablets != null) {
             this.sampleTabletIds.addAll(sampleTablets);
@@ -292,6 +318,10 @@ public class OlapScanNode extends ScanNode {
         return scanTabletIds;
     }
 
+    public Table<Long, Long, Replica> getScanBackendReplicaTable() {
+        return scanBackendReplicaTable;
+    }
+
     public void setForceOpenPreAgg(boolean forceOpenPreAgg) {
         this.forceOpenPreAgg = forceOpenPreAgg;
     }
@@ -656,9 +686,9 @@ public class OlapScanNode extends ScanNode {
         cardinality = (long) statsDeriveResult.getRowCount();
     }
 
+    // get the pruned partition IDs
     private Collection<Long> partitionPrune(PartitionInfo partitionInfo,
             PartitionNames partitionNames) throws AnalysisException {
-        PartitionPruner partitionPruner = null;
         Map<Long, PartitionItem> keyItemMap;
         if (partitionNames != null) {
             keyItemMap = Maps.newHashMap();
@@ -675,13 +705,12 @@ public class OlapScanNode extends ScanNode {
         if (partitionInfo.getType() == PartitionType.RANGE) {
             if (isPointQuery() && partitionInfo.getPartitionColumns().size() 
== 1) {
                 // short circuit, a quick path to find partition
-                ColumnRange filterRange = 
columnNameToRange.get(partitionInfo.getPartitionColumns().get(0).getName());
-                LiteralExpr lowerBound = 
filterRange.getRangeSet().get().asRanges().stream()
-                        .findFirst().get().lowerEndpoint().getValue();
-                LiteralExpr upperBound = 
filterRange.getRangeSet().get().asRanges().stream()
-                        .findFirst().get().upperEndpoint().getValue();
+                Column col = partitionInfo.getPartitionColumns().get(0);
+                // todo: support range query
+                Set<Range<ColumnBound>> filterRanges =
+                        
columnNameToRange.get(col.getName()).getRangeSet().get().asRanges();
                 cachedPartitionPruner.update(keyItemMap);
-                return cachedPartitionPruner.prune(lowerBound, upperBound);
+                return cachedPartitionPruner.prune(filterRanges, 
col.getName(), partitionCol2PartitionID);
             }
             partitionPruner = new RangePartitionPrunerV2(keyItemMap,
                     partitionInfo.getPartitionColumns(), columnNameToRange);
@@ -699,12 +728,22 @@ public class OlapScanNode extends ScanNode {
         switch (distributionInfo.getType()) {
             case HASH: {
                 HashDistributionInfo info = (HashDistributionInfo) 
distributionInfo;
-                distributionPruner = new 
HashDistributionPruner(table.getTabletIdsInOrder(),
+                distributionPruner =
+                        new HashDistributionPruner(table.getTabletIdsInOrder(),
                         info.getDistributionColumns(),
                         columnFilters,
                         info.getBucketNum(),
                         getSelectedIndexId() == olapTable.getBaseIndexId());
-                return distributionPruner.prune();
+                HashDistributionPruner hashPruner = (HashDistributionPruner) 
distributionPruner;
+                Collection<Long> resultIDs = hashPruner.prune();
+                Map<PartitionKey, Set<Long>> newPrunedIDs = 
hashPruner.getDistributionKeysTabletIDs();
+                for (Map.Entry<PartitionKey, Set<Long>> entry : 
newPrunedIDs.entrySet()) {
+                    distributionKeys2TabletID.merge(entry.getKey(), 
entry.getValue(), (existingSet, newSet) -> {
+                        existingSet.addAll(newSet);
+                        return existingSet;
+                    });
+                }
+                return resultIDs;
             }
             case RANDOM: {
                 return null;
@@ -960,6 +999,7 @@ public class OlapScanNode extends ScanNode {
                     collectedStat = true;
                 }
                 scanBackendIds.add(backend.getId());
+                scanBackendReplicaTable.put(tabletId, backend.getId(), 
replica);
                 // For skipping missing version of tablet, we only select the 
backend with the highest last
                 // success version replica to save as much data as possible.
                 if (skipMissingVersion) {
@@ -1030,10 +1070,20 @@ public class OlapScanNode extends ScanNode {
         // Step1: compute partition ids
         PartitionNames partitionNames = ((BaseTableRef) 
desc.getRef()).getPartitionNames();
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
-        if (partitionInfo.getType() == PartitionType.RANGE || 
partitionInfo.getType() == PartitionType.LIST) {
-            selectedPartitionIds = partitionPrune(partitionInfo, 
partitionNames);
-        } else {
-            selectedPartitionIds = olapTable.getPartitionIds();
+        switch (partitionInfo.getType()) {
+            case RANGE:
+                selectedPartitionIds = partitionPrune(partitionInfo, 
partitionNames);
+                if (isPointQuery() && partitionPruner instanceof 
RangePartitionPrunerV2) {
+                    RangePartitionPrunerV2 rangePartitionPruner = 
(RangePartitionPrunerV2) partitionPruner;
+                    this.partitionCol2PartitionID = 
rangePartitionPruner.getPartitionCol2PartitionID();
+                }
+                break;
+            case LIST:
+                selectedPartitionIds = partitionPrune(partitionInfo, 
partitionNames);
+                break;
+            default:
+                selectedPartitionIds = olapTable.getPartitionIds();
+                break;
         }
         selectedPartitionIds = 
olapTable.selectNonEmptyPartitionIds(selectedPartitionIds);
         selectedPartitionNum = selectedPartitionIds.size();
@@ -1336,6 +1386,8 @@ public class OlapScanNode extends ScanNode {
         // Lazy evaluation
         selectedIndexId = olapTable.getBaseIndexId();
         // Only key columns
+        distributionKeys2TabletID.clear();
+        partitionCol2PartitionID.clear();
         computeColumnsFilter(olapTable.getBaseSchemaKeyColumns(), 
olapTable.getPartitionInfo());
         computePartitionInfo();
         scanBackendIds.clear();
@@ -1343,6 +1395,7 @@ public class OlapScanNode extends ScanNode {
         bucketSeq2locations.clear();
         scanReplicaIds.clear();
         sampleTabletIds.clear();
+        scanBackendReplicaTable.clear();
         try {
             createScanRangeLocations();
         } catch (AnalysisException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
index 0b98cac9724..fe6c7474bc7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
@@ -23,18 +23,24 @@ import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.common.AnalysisException;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base 
{
     private static final Logger LOG = 
LogManager.getLogger(PartitionPruneV2ForShortCircuitPlan.class);
     // map to record literal range to find specific partition
-    private RangeMap<LiteralExpr, Long> partitionRangeMapByLiteral = new 
RangeMap<>();
+    private RangeMap<ColumnBound, List<Long>> partitionRangeMap = 
TreeRangeMap.create();
     // last timestamp partitionRangeMapByLiteral updated
     private long lastPartitionRangeMapUpdateTimestampMs = 0;
 
@@ -42,19 +48,36 @@ public class PartitionPruneV2ForShortCircuitPlan extends 
PartitionPrunerV2Base {
         super();
     }
 
+    public static <C extends Comparable<C>, V> Set<V>
+            getOverlappingRangeValues(RangeMap<C, List<V>> partRangeMap, 
Set<Range<C>> ranges) {
+        Set<V> partitionIds = Sets.newHashSet();
+        for (Range<C> range : ranges) {
+            Map<Range<C>, List<V>> overlappingRanges = 
partRangeMap.subRangeMap(range).asMapOfRanges();
+            for (Map.Entry<Range<C>, List<V>> entry : 
overlappingRanges.entrySet()) {
+                partitionIds.addAll(entry.getValue());
+            }
+        }
+        return partitionIds;
+    }
+
+    public RangeMap<ColumnBound, List<Long>> 
getPartitionColValue2PartitionID() {
+        return partitionRangeMap;
+    }
+
     public boolean update(Map<Long, PartitionItem> keyItemMap) {
         // interval to update partitionRangeMapByLiteral
         long partitionRangeMapUpdateIntervalS = 10;
         if (System.currentTimeMillis() - lastPartitionRangeMapUpdateTimestampMs
                     > partitionRangeMapUpdateIntervalS * 1000) {
-            partitionRangeMapByLiteral = new RangeMap<>();
+            partitionRangeMap = TreeRangeMap.create();
             // recalculate map
             for (Entry<Long, PartitionItem> entry : keyItemMap.entrySet()) {
                 Range<PartitionKey> range = entry.getValue().getItems();
                 LiteralExpr partitionLowerBound = (LiteralExpr) 
range.lowerEndpoint().getKeys().get(0);
                 LiteralExpr partitionUpperBound = (LiteralExpr) 
range.upperEndpoint().getKeys().get(0);
-                Range<LiteralExpr> partitionRange = 
Range.closedOpen(partitionLowerBound, partitionUpperBound);
-                partitionRangeMapByLiteral.put(partitionRange, entry.getKey());
+                Range<ColumnBound> partitionRange =
+                        Range.closedOpen(ColumnBound.of(partitionLowerBound), 
ColumnBound.of(partitionUpperBound));
+                partitionRangeMap.put(partitionRange, 
Lists.newArrayList(entry.getKey()));
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("update partitionRangeMapByLiteral");
@@ -65,9 +88,13 @@ public class PartitionPruneV2ForShortCircuitPlan extends 
PartitionPrunerV2Base {
         return false;
     }
 
-    public Collection<Long> prune(LiteralExpr lowerBound, LiteralExpr 
upperBound) throws AnalysisException {
-        Range<LiteralExpr> filterRangeValue = Range.closed(lowerBound, 
upperBound);
-        return 
partitionRangeMapByLiteral.getOverlappingRangeValues(filterRangeValue);
+    public Collection<Long> prune(Set<Range<ColumnBound>> partitionColumnRange,
+                                  String partitionColName,
+                                  Map<String, RangeMap<ColumnBound, 
List<Long>>> partitionCol2PartitionID) {
+        Set<Long> overlappingRangeValues = 
getOverlappingRangeValues(partitionRangeMap, partitionColumnRange);
+        partitionCol2PartitionID.putIfAbsent(
+                        partitionColName, partitionRangeMap);
+        return overlappingRangeValues;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
index 1d9f163ca80..60fcc523c38 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeMap;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -48,6 +50,19 @@ public abstract class PartitionPrunerV2Base implements 
PartitionPruner {
     // currently only used for list partition
     private Map.Entry<Long, PartitionItem> defaultPartition;
 
+    /*
+     * This map maintains the relationship between partition columns and their 
corresponding partition IDs.
+     * For example, if the partition columns are (k1, k2, k3), and there is a 
partition `p0` with the range
+     * [(1, 5, 10), (5, 10, 20)), then the `partitionCol2PartitionID` map 
should be:
+     * k1 -> [1, 5) -> p0
+     * Map structure:
+     * - Key: String, representing the name of a partition column (e.g., k1).
+     * - Value: RangeMap<ColumnBound, List<Long>>, where each range of column 
bounds is mapped to a list of
+     *          partition IDs. For instance, the range [1, 5) for column `k1` 
would map to partition ID `p0`.
+     */
+    // todo: `List<Long>` is not neccessary, `Long` is enough
+    protected Map<String, RangeMap<ColumnBound, List<Long>>> 
partitionCol2PartitionID = Maps.newHashMap();
+
     // Only called in PartitionPruneV2ByShortCircuitPlan constructor
     PartitionPrunerV2Base() {
         this.idToPartitionItem = null;
@@ -76,6 +91,10 @@ public abstract class PartitionPrunerV2Base implements 
PartitionPruner {
         findDefaultPartition(idToPartitionItem);
     }
 
+    public Map<String, RangeMap<ColumnBound, List<Long>>> 
getPartitionCol2PartitionID() {
+        return partitionCol2PartitionID;
+    }
+
     private Collection<Long> handleDefaultPartition(Collection<Long> result) {
         if (this.defaultPartition != null) {
             Set<Long> r = result.stream().collect(Collectors.toSet());
@@ -107,11 +126,12 @@ public abstract class PartitionPrunerV2Base implements 
PartitionPruner {
     @Override
     public Collection<Long> prune() throws AnalysisException {
         Map<Column, FinalFilters> columnToFilters = Maps.newHashMap();
-        for (Column column : partitionColumns) {
+        for (Column column : partitionColumns) {    // partition col is key
             ColumnRange columnRange = columnNameToRange.get(column.getName());
             if (columnRange == null) {
                 columnToFilters.put(column, FinalFilters.noFilters());
             } else {
+                // add the partiton&key col
                 columnToFilters.put(column, getFinalFilters(columnRange, 
column));
             }
         }
@@ -160,22 +180,32 @@ public abstract class PartitionPrunerV2Base implements 
PartitionPruner {
      * partitions.
      */
     private Collection<Long> pruneSingleColumnPartition(Map<Column, 
FinalFilters> columnToFilters) {
-        FinalFilters finalFilters = 
columnToFilters.get(partitionColumns.get(0));
+        Column partitionCol = partitionColumns.get(0);
+        FinalFilters finalFilters = columnToFilters.get(partitionCol);
         switch (finalFilters.type) {
             case CONSTANT_FALSE_FILTERS:
                 return Collections.emptySet();
             case HAVE_FILTERS:
                 genSingleColumnRangeMap();
                 Preconditions.checkNotNull(singleColumnRangeMap);
-                return finalFilters.filters.stream()
-                        .map(filter -> {
-                            RangeMap<ColumnBound, UniqueId> filtered = 
singleColumnRangeMap.subRangeMap(filter);
-                            return filtered.asMapOfRanges().values().stream()
-                                    .map(UniqueId::getPartitionId)
-                                    .collect(Collectors.toSet());
-                        })
-                        .flatMap(Set::stream)
-                        .collect(Collectors.toSet());
+                partitionCol2PartitionID.put(partitionCol.getName(), 
TreeRangeMap.create());
+                Set<Long> resultPartID = Sets.newHashSet();
+                finalFilters.filters.forEach(filter -> {
+                    RangeMap<ColumnBound, UniqueId> filtered = 
singleColumnRangeMap.subRangeMap(filter);
+
+                    filtered.asMapOfRanges().forEach((range, partID) -> {
+                        RangeMap<ColumnBound, List<Long>> rangeMap =
+                                
partitionCol2PartitionID.get(partitionCol.getName());
+                        List<Long> partitionIds = 
rangeMap.get(range.lowerEndpoint());
+                        if (partitionIds == null) {
+                            partitionIds = Lists.newArrayList();
+                            rangeMap.put(range, partitionIds);
+                        }
+                        partitionIds.add(partID.getPartitionId());
+                        resultPartID.add(partID.getPartitionId());
+                    });
+                });
+                return resultPartID;
             case NO_FILTERS:
             default:
                 return idToPartitionItem.keySet();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
index 8acba72f156..5d523b01bfb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
@@ -57,13 +57,43 @@ public class RangePartitionPrunerV2 extends 
PartitionPrunerV2Base {
 
     public static RangeMap<ColumnBound, UniqueId> 
genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) {
         RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create();
-        idToPartitionItem.forEach((id, item) -> {
+        idToPartitionItem.forEach((id, item) -> {   // partition id , key range
             Range<PartitionKey> range = item.getItems();
             candidate.put(mapPartitionKeyRange(range, 0), new 
RangePartitionUniqueId(id));
         });
         return candidate;
     }
 
+    public static <C extends Comparable<C>> Range<C> buildRange(C lowerBound, 
BoundType lowerBoundType,
+                                                                C upperBound, 
BoundType upperBoundType) {
+        if (lowerBound.compareTo(upperBound) > 0) {
+            // swap
+            C temp = lowerBound;
+            lowerBound = upperBound;
+            upperBound = temp;
+
+            BoundType tempBoundType = lowerBoundType;
+            lowerBoundType = upperBoundType;
+            upperBoundType = tempBoundType;
+        }
+        if (lowerBoundType == BoundType.CLOSED && upperBoundType == 
BoundType.OPEN) {
+            // [lowerBound, upperBound)
+            return Range.closedOpen(lowerBound, upperBound);
+        } else if (lowerBoundType == BoundType.OPEN && upperBoundType == 
BoundType.CLOSED) {
+            // (lowerBound, upperBound]
+            return Range.openClosed(lowerBound, upperBound);
+        } else if (lowerBoundType == BoundType.CLOSED && upperBoundType == 
BoundType.CLOSED) {
+            // [lowerBound, upperBound]
+            return Range.closed(lowerBound, upperBound);
+        } else if (lowerBoundType == BoundType.OPEN && upperBoundType == 
BoundType.OPEN) {
+            // (lowerBound, upperBound)
+            return Range.open(lowerBound, upperBound);
+        } else {
+            throw new IllegalArgumentException("Unsupported BoundType 
combination: "
+                + lowerBoundType + " and " + upperBoundType);
+        }
+    }
+
     /**
      * This is just like the logic in v1 version, but we support disjunctive 
predicates here.
      */
@@ -154,8 +184,38 @@ public class RangePartitionPrunerV2 extends 
PartitionPrunerV2Base {
                             && filter.hasUpperBound() && 
filter.upperBoundType() == BoundType.CLOSED
                             && filter.lowerEndpoint() == 
filter.upperEndpoint()) {
                         // Equal to predicate, e.g., col=1, the filter range 
is [1, 1].
-                        minKey.pushColumn(filter.lowerEndpoint().getValue(), 
column.getDataType());
-                        maxKey.pushColumn(filter.upperEndpoint().getValue(), 
column.getDataType());
+                        ColumnBound lowerFilter = filter.lowerEndpoint();
+                        ColumnBound upperFilter = filter.upperEndpoint();
+                        minKey.pushColumn(lowerFilter.getValue(), 
column.getDataType());
+                        maxKey.pushColumn(upperFilter.getValue(), 
column.getDataType());
+
+                        // Locate the partition to which the filter belongs
+                        List<Long> partID = Lists.newArrayList();
+                        for (Map.Entry<Range<PartitionKey>, Long> 
rangeMapEntry : rangeMap.asMapOfRanges().entrySet()) {
+                            Range<PartitionKey> partitionColRange = 
rangeMapEntry.getKey();
+                            PartitionKey upperPartitionKeys = 
partitionColRange.upperEndpoint();
+                            int partitionLess = upperPartitionKeys.getKeys()
+                                                  
.get(columnIdx).compareTo(lowerFilter.getValue());
+                            if (partitionLess < 0) {
+                                continue;
+                            }
+                            PartitionKey lowerPartitionKeys = 
partitionColRange.lowerEndpoint();
+                            int partitionGreater = lowerPartitionKeys.getKeys()
+                                                     
.get(columnIdx).compareTo(upperFilter.getValue());
+                            if (partitionGreater > 0) {
+                                break;
+                            }
+                            Range<ColumnBound> keyColRange = buildRange(
+                                    
ColumnBound.of(lowerPartitionKeys.getKeys().get(columnIdx)),
+                                    partitionColRange.lowerBoundType(),
+                                    
ColumnBound.of(upperPartitionKeys.getKeys().get(columnIdx)),
+                                    partitionColRange.upperBoundType());
+                            if (filter.isConnected(keyColRange)) {
+                                partID.add(rangeMapEntry.getValue());
+                            }
+                        }
+                        partitionCol2PartitionID.putIfAbsent(column.getName(), 
TreeRangeMap.create());
+                        
partitionCol2PartitionID.get(column.getName()).put(filter, partID);
                         result.addAll(doPruneMulti(columnToFilters, rangeMap, 
columnIdx + 1, minKey, maxKey));
                         minKey.popColumn();
                         maxKey.popColumn();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
index 9e4030b768b..d65c6032b86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
@@ -19,19 +19,26 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Replica;
 import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.KeyTuple;
@@ -48,6 +55,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TDeserializer;
@@ -55,31 +65,78 @@ import org.apache.thrift.TException;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 public class PointQueryExecutor implements CoordInterface {
     private static final Logger LOG = 
LogManager.getLogger(PointQueryExecutor.class);
-    private long tabletID = 0;
+    private static final ConcurrentHashMap<Class<? extends Expr>, 
ConjunctHandler> handlers = new ConcurrentHashMap<>();
+
+    private class RoundRobinScheduler<T> {
+        private final List<T> list;
+        private AtomicInteger currentIndex;
+
+        public RoundRobinScheduler(List<T> list) {
+            this.list = list;
+            this.currentIndex = new AtomicInteger(0);
+        }
+
+        public T next() {
+            if (list.isEmpty()) {
+                return null;
+            }
+            int index = currentIndex.getAndUpdate(i -> (i + 1) % list.size());
+            return list.get(index);
+        }
+    }
+
+    private List<Long> scanTabletIDs = new ArrayList<>();
     private long timeoutMs = Config.point_query_timeout_ms; // default 10s
 
     private boolean isCancel = false;
     private List<Backend> candidateBackends;
+
+    // key: tablet id, value: backend and replica id
+    private HashMap<Long, Pair<Backend, Long>> pickedCandidateReplicas = 
Maps.newHashMap();
+
+    RoundRobinScheduler<Backend> roundRobinscheduler = null;
+
+    // tablet id -> backend id -> replica
+    Table<Long, Long, Replica> replicaMetaTable = null;
+
     private final int maxMsgSizeOfResultReceiver;
 
     // used for snapshot read in cloud mode
-    private List<Long> snapshotVisibleVersions;
+    // Key: cloud partition id, Value: snapshot visible version
+    private HashMap<CloudPartition, Long> snapshotVisibleVersions;
 
     private final ShortCircuitQueryContext shortCircuitQueryContext;
 
+    Map<PartitionKey, Set<Long>> distributionKeys2TabletID = null;
+
+    // Maps partition column names to a RangeMap that associates ColumnBound 
ranges with lists of partition IDs,
+    // similar to the implementation in PartitionPrunerV2Base.
+    Map<String, RangeMap<ColumnBound, List<Long>>> partitionCol2PartitionID = 
null;
+
+    List<Set<Long>> keyTupleIndex2TabletID = null;
+
+    List<List<String>> allKeyTuples = null;
+
+    private List<Integer> distributionKeyColumns = Lists.newArrayList();
+
+    private List<Integer> partitionKeyColumns = Lists.newArrayList();
+
     public PointQueryExecutor(ShortCircuitQueryContext ctx, int 
maxMessageSize) {
         ctx.sanitize();
         this.shortCircuitQueryContext = ctx;
@@ -97,10 +154,13 @@ public class PointQueryExecutor implements CoordInterface {
                 partitions.add((CloudPartition) table.getPartition(id));
             }
         }
-        snapshotVisibleVersions = 
CloudPartition.getSnapshotVisibleVersion(partitions);
-        // Only support single partition at present
-        Preconditions.checkState(snapshotVisibleVersions.size() == 1);
-        LOG.debug("set cloud version {}", snapshotVisibleVersions.get(0));
+        snapshotVisibleVersions = (snapshotVisibleVersions == null) ? 
Maps.newHashMap() : snapshotVisibleVersions;
+        List<Long> versionList = 
CloudPartition.getSnapshotVisibleVersion(partitions);
+        for (int i = 0; i < versionList.size(); ++i) {
+            snapshotVisibleVersions.put(partitions.get(i), versionList.get(i));
+        }
+
+        LOG.debug("set cloud version {}", snapshotVisibleVersions);
     }
 
     void setScanRangeLocations() throws Exception {
@@ -111,8 +171,9 @@ public class PointQueryExecutor implements CoordInterface {
         if (scanNode.getScanTabletIds().isEmpty()) {
             return;
         }
-        Preconditions.checkState(scanNode.getScanTabletIds().size() == 1);
-        this.tabletID = scanNode.getScanTabletIds().get(0);
+        this.distributionKeys2TabletID = 
scanNode.getDistributionKeys2TabletID();
+        this.partitionCol2PartitionID = scanNode.getPartitionCol2PartitionID();
+        this.scanTabletIDs = scanNode.getScanTabletIds();
 
         // update partition version if cloud mode
         if (Config.isCloudMode()
@@ -128,10 +189,9 @@ public class PointQueryExecutor implements CoordInterface {
                 candidateBackends.add(backend);
             }
         }
-        // Random read replicas
-        Collections.shuffle(this.candidateBackends);
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("set scan locations, backend ids {}, tablet id {}", 
candidateBackends, tabletID);
+            LOG.debug("set scan locations, backend ids {}, tablet ids {}", 
candidateBackends, scanTabletIDs);
         }
     }
 
@@ -159,40 +219,260 @@ public class PointQueryExecutor implements 
CoordInterface {
                         .getMysqlChannel(), null, null);
     }
 
-    private static void updateScanNodeConjuncts(OlapScanNode scanNode, 
List<Expr> conjunctVals) {
-        for (int i = 0; i < conjunctVals.size(); ++i) {
-            BinaryPredicate binaryPredicate = (BinaryPredicate) 
scanNode.getConjuncts().get(i);
-            if (binaryPredicate.getChild(0) instanceof LiteralExpr) {
-                binaryPredicate.setChild(0, conjunctVals.get(i));
-            } else if (binaryPredicate.getChild(1) instanceof LiteralExpr) {
-                binaryPredicate.setChild(1, conjunctVals.get(i));
+    /*
+     * Interface for handling different conjunct types
+     */
+    private interface ConjunctHandler {
+        int handle(Expr expr, List<Expr> conjunctVals, int 
handledConjunctVals) throws AnalysisException;
+    }
+
+    private static class InPredicateHandler implements ConjunctHandler {
+        public static final InPredicateHandler INSTANCE = new 
InPredicateHandler();
+
+        private InPredicateHandler() {
+        }
+
+        @Override
+        public int handle(Expr expr, List<Expr> conjunctVals, int 
handledConjunctVals) throws AnalysisException {
+            InPredicate inPredicate = (InPredicate) expr;
+            if (inPredicate.isNotIn()) {
+                throw new AnalysisException("Not support NOT IN predicate in 
point query");
+            }
+            for (int j = 1; j < inPredicate.getChildren().size(); ++j) {
+                if (inPredicate.getChild(j) instanceof LiteralExpr) {
+                    inPredicate.setChild(j, 
conjunctVals.get(handledConjunctVals++));
+                } else {
+                    Preconditions.checkState(false, "Should contains literal 
in " + inPredicate.toSqlImpl());
+                }
+            }
+            return handledConjunctVals;
+        }
+    }
+
+    private static class BinaryPredicateHandler implements ConjunctHandler {
+        public static final BinaryPredicateHandler INSTANCE = new 
BinaryPredicateHandler();
+
+        private BinaryPredicateHandler() {
+        }
+
+        @Override
+        public int handle(Expr expr, List<Expr> conjunctVals, int 
handledConjunctVals) throws AnalysisException {
+            BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
+            Expr left = binaryPredicate.getChild(0);
+            Expr right = binaryPredicate.getChild(1);
+
+            if (isDeleteSign(left) || isDeleteSign(right)) {
+                return handledConjunctVals;
+            }
+
+            if (isLiteralExpr(left)) {
+                binaryPredicate.setChild(0, 
conjunctVals.get(handledConjunctVals++));
+            } else if (isLiteralExpr(right)) {
+                binaryPredicate.setChild(1, 
conjunctVals.get(handledConjunctVals++));
             } else {
                 Preconditions.checkState(false, "Should contains literal in " 
+ binaryPredicate.toSqlImpl());
             }
+            return handledConjunctVals;
+        }
+
+        private boolean isLiteralExpr(Expr expr) {
+            return expr instanceof LiteralExpr;
+        }
+
+        private boolean isDeleteSign(Expr expr) {
+            return expr instanceof SlotRef && ((SlotRef) 
expr).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN);
+        }
+    }
+
+    private static void initHandler() {
+        handlers.put(InPredicate.class, InPredicateHandler.INSTANCE);
+        handlers.put(BinaryPredicate.class, BinaryPredicateHandler.INSTANCE);
+    }
+
+    private static void updateScanNodeConjuncts(OlapScanNode scanNode, 
List<Expr> conjunctVals) {
+        List<Expr> conjuncts = scanNode.getConjuncts();
+        if (handlers.isEmpty()) {
+            initHandler();
+        }
+        int handledConjunctVals = 0;
+        for (Expr expr : conjuncts) {
+            ConjunctHandler handler = handlers.get(expr.getClass());
+            if (handler == null) {
+                throw new AnalysisException("Not support conjunct type " + 
expr.getClass().getName());
+            }
+            handledConjunctVals = handler.handle(expr, conjunctVals, 
handledConjunctVals);
         }
+
+        Preconditions.checkState(handledConjunctVals == conjunctVals.size());
     }
 
     public void setTimeout(long timeoutMs) {
         this.timeoutMs = timeoutMs;
     }
 
-    void addKeyTuples(
-            InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
-        // TODO handle IN predicates
-        Map<String, Expr> columnExpr = Maps.newHashMap();
-        KeyTuple.Builder kBuilder = KeyTuple.newBuilder();
-        for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) {
+    /*
+     * According to the current ordered key tuple, based on the leftmost 
partition principle,
+     * get the partition ID to which it belongs
+     */
+    private Set<Long> getLeftMostPartitionIDs(List<String> orderedKeyTuple,
+                                            List<Column> keyColumns) {
+        Set<Long> leftMostPartitionIDs = Sets.newHashSet();
+        for (int i = 0; i < partitionKeyColumns.size(); ++i) {
+            int colIdx = partitionKeyColumns.get(i);
+            String partitionColName = keyColumns.get(colIdx).getName();
+            try {
+                ColumnBound partitionKey = 
ColumnBound.of(LiteralExpr.create(orderedKeyTuple.get(colIdx),
+                        keyColumns.get(colIdx).getType()));
+                List<Long> partitionIDs = Lists.newArrayList(
+                        Optional.ofNullable(
+                                
partitionCol2PartitionID.get(partitionColName).get(partitionKey))
+                                .orElse(Collections.emptyList()));
+                // Add the first partition column directly
+                if (i == 0) {
+                    leftMostPartitionIDs.addAll(partitionIDs);
+                    continue;
+                }
+                if (leftMostPartitionIDs.isEmpty() || partitionIDs == null) {
+                    break;
+                }
+                partitionIDs.retainAll(leftMostPartitionIDs);
+                if (partitionIDs.isEmpty()) {
+                    break;
+                }
+            } catch (Exception e) {
+                throw new AnalysisException("Failed to create partition key 
for key tuple: " + orderedKeyTuple);
+            }
+        }
+        return leftMostPartitionIDs;
+    }
+
+    private void pickCandidateBackends() {
+        for (Long tabletID : scanTabletIDs) {
+            roundRobinPickReplica(tabletID);
+        }
+    }
+
+    private void roundRobinPickReplica(Long tabletID) {
+        while (true) {
+            Backend backend = roundRobinscheduler.next();
+            if (backend == null) {
+                break;
+            }
+            Map<Long, Replica> beWithReplica = replicaMetaTable.row(tabletID);
+            if (!beWithReplica.containsKey(backend.getId())) {
+                continue;
+            }
+            pickedCandidateReplicas
+                    .putIfAbsent(tabletID,
+                                 Pair.of(backend, 
beWithReplica.get(backend.getId()).getId()));
+            break;
+        }
+    }
+
+    // Use the leftmost matching partitions to filter out tablets that do not 
belong to these partitions
+    private void addTabletIDsForKeyTuple(List<String> orderedKeyTuple, 
List<Column> keyColumns,
+                                         OlapTable olapTable, Set<Long> 
leftMostPartitionIDs) {
+        // get part of the key tuple using distribution columns
+        List<String> keyTupleForDistributionPrune = Lists.newArrayList();
+        for (Integer idx : distributionKeyColumns) {
+            keyTupleForDistributionPrune.add(orderedKeyTuple.get(idx));
+        }
+        Set<Long> tabletIDs = Sets.newHashSet();
+        for (PartitionKey key : distributionKeys2TabletID.keySet()) {
+            List<String> distributionKeys = Lists.newArrayList();
+            for (LiteralExpr expr : key.getKeys()) {
+                distributionKeys.add(expr.getStringValue());
+            }
+            if (distributionKeys.equals(keyTupleForDistributionPrune)) {
+                Set<Long> originTabletIDs = 
Sets.newHashSet(distributionKeys2TabletID.get(key));
+                // If partitions are not explicitly created, this condition 
holds true
+                if (leftMostPartitionIDs.isEmpty()) {
+                    tabletIDs.addAll(originTabletIDs);
+                } else {
+                    Set<Long> prunedTabletIDs = Sets.newHashSet();
+                    for (Long partitionID : leftMostPartitionIDs) {
+                        Partition partition = 
olapTable.getPartition(partitionID);
+                        MaterializedIndex selectedTable =
+                                
partition.getIndex(shortCircuitQueryContext.scanNode.getSelectedIndexId());
+                        // filter out tablets that do not belong to this 
partition
+                        selectedTable.getTablets().forEach(tablet -> {
+                            if (originTabletIDs.contains(tablet.getId())) {
+                                prunedTabletIDs.add(tablet.getId());
+                            }
+                        });
+                        tabletIDs.addAll(prunedTabletIDs);
+                    }
+                }
+                break;
+            }
+        }
+        keyTupleIndex2TabletID.add(tabletIDs.isEmpty() ? null : tabletIDs);
+    }
+
+    // Get all possible key tuple combinations
+    void getAllKeyTupleCombination(List<Expr> conjuncts, int index,
+                        List<String> currentKeyTuple,
+                        List<List<String>> result,
+                        List<String> columnExpr,
+                        List<Column> keyColumns) {
+        if (index == conjuncts.size()) {
+            List<String> orderedKeyTuple = new 
ArrayList<>(currentKeyTuple.size());
+            OlapTable olapTable = 
shortCircuitQueryContext.scanNode.getOlapTable();
+
+            // add key tuple in keys order
+            for (Column column : keyColumns) {
+                int colIdx = columnExpr.indexOf(column.getName());
+                String currentKey = currentKeyTuple.get(colIdx);
+                orderedKeyTuple.add(currentKey);
+            }
+            result.add(Lists.newArrayList(orderedKeyTuple));
+            Set<Long> leftMostPartitionIDs = 
getLeftMostPartitionIDs(orderedKeyTuple, keyColumns);
+            addTabletIDsForKeyTuple(orderedKeyTuple, keyColumns, olapTable, 
leftMostPartitionIDs);
+            return;
+        }
+
+        Expr expr = conjuncts.get(index);
+        if (expr instanceof BinaryPredicate) {
             BinaryPredicate predicate = (BinaryPredicate) expr;
             Expr left = predicate.getChild(0);
             Expr right = predicate.getChild(1);
             SlotRef columnSlot = left.unwrapSlotRef();
-            columnExpr.put(columnSlot.getColumnName(), right);
+            if (left instanceof SlotRef && ((SlotRef) 
left).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN)) {
+                getAllKeyTupleCombination(conjuncts, index + 1, 
currentKeyTuple, result, columnExpr, keyColumns);
+                return;
+            }
+            columnExpr.add(columnSlot.getColumnName());
+            currentKeyTuple.add(right.getStringValue());
+            getAllKeyTupleCombination(conjuncts, index + 1, currentKeyTuple, 
result, columnExpr, keyColumns);
+            currentKeyTuple.remove(currentKeyTuple.size() - 1);
+            columnExpr.remove(columnExpr.size() - 1);
+        } else if (expr instanceof InPredicate) {
+            InPredicate inPredicate = (InPredicate) expr;
+            if (inPredicate.isNotIn()) {
+                throw new AnalysisException("Not support NOT IN predicate in 
point query");
+            }
+            SlotRef columnSlot = inPredicate.getChild(0).unwrapSlotRef();
+            columnExpr.add(columnSlot.getColumnName());
+            for (int i = 1; i < inPredicate.getChildren().size(); ++i) {
+                currentKeyTuple.add(inPredicate.getChild(i).getStringValue());
+                getAllKeyTupleCombination(conjuncts, index + 1, 
currentKeyTuple, result, columnExpr, keyColumns);
+                currentKeyTuple.remove(currentKeyTuple.size() - 1);
+            }
+            columnExpr.remove(columnExpr.size() - 1);
+        } else {
+            throw new AnalysisException("Not support conjunct type " + 
expr.getClass().getName());
         }
-        // add key tuple in keys order
-        for (Column column : 
shortCircuitQueryContext.scanNode.getOlapTable().getBaseSchemaKeyColumns()) {
-            
kBuilder.addKeyColumnRep(columnExpr.get(column.getName()).getStringValue());
+    }
+
+    List<List<String>> addAllKeyTuples(List<Column> keyColumns) {
+        List<Expr> conjuncts = 
shortCircuitQueryContext.scanNode.getConjuncts();
+        List<List<String>> keyTuples = Lists.newArrayList();
+        if (keyTupleIndex2TabletID == null) {
+            keyTupleIndex2TabletID = Lists.newArrayList();
         }
-        requestBuilder.addKeyTuples(kBuilder);
+        getAllKeyTupleCombination(conjuncts, 0, new ArrayList<>(), keyTuples, 
Lists.newArrayList(), keyColumns);
+        Preconditions.checkState(keyTuples.size() == 
keyTupleIndex2TabletID.size());
+        return keyTuples;
     }
 
     @Override
@@ -208,21 +488,71 @@ public class PointQueryExecutor implements CoordInterface 
{
         if (candidateBackends == null || candidateBackends.isEmpty()) {
             return new RowBatch();
         }
-        Iterator<Backend> backendIter = candidateBackends.iterator();
-        RowBatch rowBatch = null;
-        int tryCount = 0;
-        int maxTry = Math.min(Config.max_point_query_retry_time, 
candidateBackends.size());
+
+        // pick candidate backends
+        OlapScanNode scanNode = shortCircuitQueryContext.scanNode;
+        replicaMetaTable = scanNode.getScanBackendReplicaTable();
+        roundRobinscheduler = new RoundRobinScheduler<>(candidateBackends);
+        pickCandidateBackends();
+
+        OlapTable olapTable = scanNode.getOlapTable();
+        List<Column> keyColumns = olapTable.getBaseSchemaKeyColumns();
+        for (int i = 0; i < keyColumns.size(); ++i) {
+            Column column = keyColumns.get(i);
+            if (olapTable.isPartitionColumn(column.getName())) {
+                partitionKeyColumns.add(i);
+            }
+            if (olapTable.isDistributionColumn(column.getName())) {
+                distributionKeyColumns.add(i);
+            }
+        }
+        RowBatch rowBatch = new RowBatch();
         Status status = new Status();
-        do {
-            Backend backend = backendIter.next();
-            rowBatch = getNextInternal(status, backend);
-            if (rowBatch != null) {
+        this.allKeyTuples = addAllKeyTuples(keyColumns);
+        TResultBatch resultBatch = new TResultBatch();
+        resultBatch.setRows(Lists.newArrayList());
+        List<byte[]> batchSerialResult = Lists.newArrayList();
+        Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> 
batchRequestBuilders =
+                buildBatchRequest(status);
+        // send batch request
+        if (batchRequestBuilders.isEmpty()) {
+            status.updateStatus(TStatusCode.OK, "");
+            rowBatch.setEos(true);
+            return rowBatch;
+        }
+        for (Map.Entry<Backend, 
InternalService.PTabletBatchKeyLookupRequest.Builder> entry :
+                    batchRequestBuilders.entrySet()) {
+            List<byte[]> subSerialResult = batchGetNext(status, 
entry.getKey(), entry.getValue());
+
+            if (!status.ok()) {
                 break;
             }
-            if (++tryCount >= maxTry) {
-                break;
+            batchSerialResult.addAll(subSerialResult);
+        }
+
+        // todo: maybe there is a better way
+        if (!batchSerialResult.isEmpty()) {
+            TDeserializer deserializer = new TDeserializer(
+                        new 
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+            for (byte[] serialResult : batchSerialResult) {
+                TResultBatch tmpResultBatch = new TResultBatch();
+                try {
+                    deserializer.deserialize(tmpResultBatch, serialResult);
+                    tmpResultBatch.getRows().forEach(row -> {
+                        resultBatch.addToRows(row);
+                    });
+                } catch (TException e) {
+                    if (e.getMessage().contains("MaxMessageSize reached")) {
+                        throw new TException("MaxMessageSize reached, try 
increase max_msg_size_of_result_receiver");
+                    } else {
+                        throw e;
+                    }
+                }
             }
-        } while (true);
+            rowBatch.setBatch(resultBatch);
+        }
+        rowBatch.setEos(true);
+
         // handle status code
         if (!status.ok()) {
             if (Strings.isNullOrEmpty(status.getErrorMsg())) {
@@ -250,45 +580,106 @@ public class PointQueryExecutor implements 
CoordInterface {
         // only handles in getNext()
     }
 
-    private RowBatch getNextInternal(Status status, Backend backend) throws 
TException {
-        long timeoutTs = System.currentTimeMillis() + timeoutMs;
-        RowBatch rowBatch = new RowBatch();
-        InternalService.PTabletKeyLookupResponse pResult = null;
-        try {
-            
Preconditions.checkNotNull(shortCircuitQueryContext.serializedDescTable);
-
-            InternalService.PTabletKeyLookupRequest.Builder requestBuilder
-                    = InternalService.PTabletKeyLookupRequest.newBuilder()
-                    .setTabletId(tabletID)
-                    .setDescTbl(shortCircuitQueryContext.serializedDescTable)
-                    
.setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
-                    
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions)
-                    .setIsBinaryRow(ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE);
-            if (snapshotVisibleVersions != null && 
!snapshotVisibleVersions.isEmpty()) {
-                requestBuilder.setVersion(snapshotVisibleVersions.get(0));
-            }
-            // Only set cacheID for prepared statement excute phase,
-            // otherwise leading to many redundant cost in BE side
-            if (shortCircuitQueryContext.cacheID != null
-                        && ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE) {
-                InternalService.UUID.Builder uuidBuilder = 
InternalService.UUID.newBuilder();
-                
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
-                
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
-                requestBuilder.setUuid(uuidBuilder);
+    private void collectBatchRequests(
+            Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> 
batchRequests,
+            KeyTuple.Builder kBuilder, Set<Long> tabletIDsOfKeyTuple) {
+        // check containsKey
+        for (Long tabletID : tabletIDsOfKeyTuple) {
+            
Preconditions.checkState(pickedCandidateReplicas.containsKey(tabletID));
+            Pair<Backend, Long> beWithReplicaID = 
pickedCandidateReplicas.get(tabletID);
+            Backend candidate = beWithReplicaID.first;
+            batchRequests.putIfAbsent(
+                    candidate,
+                    InternalService.PTabletBatchKeyLookupRequest.newBuilder());
+            buildSubRequest(tabletID, kBuilder, beWithReplicaID.second,
+                            batchRequests.get(candidate));
+        }
+    }
+
+    // Find the tabletID, backend, and replica corresponding to each keyTuple,
+    // and then add them to batchRequests. Each backend corresponds to a 
batchRequest.
+    private Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder>
+            buildBatchRequest(Status status) throws TException {
+        Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder> 
batchRequestBuilders =
+                Maps.newHashMap();
+        for (int i = 0; i < keyTupleIndex2TabletID.size(); ++i) {
+            if (keyTupleIndex2TabletID.get(i) == null) {
+                continue;
+            }
+            KeyTuple.Builder kBuilder = KeyTuple.newBuilder();
+            for (String key : this.allKeyTuples.get(i)) {
+                kBuilder.addKeyColumnRep(key);
+            }
+            collectBatchRequests(batchRequestBuilders, kBuilder, 
keyTupleIndex2TabletID.get(i));
+        }
+        return batchRequestBuilders;
+    }
+
+    // Build a request about a keyTuple, that is, SubRequest
+    private void buildSubRequest(
+            Long prunedTabletIdsOfBe, KeyTuple.Builder kBuilder, Long 
replicaID,
+            InternalService.PTabletBatchKeyLookupRequest.Builder 
pBatchRequestBuilder) {
+        InternalService.PTabletKeyLookupRequest.Builder requestBuilder
+                = InternalService.PTabletKeyLookupRequest.newBuilder()
+                .setDescTbl(shortCircuitQueryContext.serializedDescTable)
+                .setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
+                
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions)
+                .setIsBinaryRow(ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE);
+
+        // TODO: optimize me
+        if (snapshotVisibleVersions != null && 
!snapshotVisibleVersions.isEmpty()) {
+            Long versionToSet = -1L;
+            for (Map.Entry<CloudPartition, Long> entry : 
snapshotVisibleVersions.entrySet()) {
+                MaterializedIndex selectedTable =
+                        
entry.getKey().getIndex(shortCircuitQueryContext.scanNode.getSelectedIndexId());
+                if 
(selectedTable.getTabletIdsInOrder().contains(prunedTabletIdsOfBe)) {
+                    versionToSet = entry.getValue();
+                    break;
+                }
             }
-            addKeyTuples(requestBuilder);
+            requestBuilder.setVersion(versionToSet);
+        }
+        // Only set cacheID for prepared statement excute phase,
+        // otherwise leading to many redundant cost in BE side
+        if (shortCircuitQueryContext.cacheID != null
+                        && ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE) {
+            InternalService.UUID.Builder uuidBuilder = 
InternalService.UUID.newBuilder();
+            
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
+            
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
+            requestBuilder.setUuid(uuidBuilder);
+        }
+        requestBuilder.addKeyTuples(kBuilder);
+        requestBuilder.setTabletId(prunedTabletIdsOfBe);
+        requestBuilder.setReplicaId(replicaID);
+        pBatchRequestBuilder.addSubKeyLookupReq(requestBuilder);
+    }
+
+    private List<byte[]> batchGetNext(
+            Status status, Backend backend,
+            InternalService.PTabletBatchKeyLookupRequest.Builder 
pBatchRequestBuilder) throws TException {
+        TResultBatch resultBatch = new TResultBatch();
+        List<byte[]> result = Lists.newArrayList();
+        resultBatch.setRows(Lists.newArrayList());
+
+        
Preconditions.checkState(pBatchRequestBuilder.getSubKeyLookupReqCount() > 0);
 
-            InternalService.PTabletKeyLookupRequest request = 
requestBuilder.build();
-            Future<InternalService.PTabletKeyLookupResponse> futureResponse =
-                    
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(),
 request);
+        // batch fetch data
+        InternalService.PTabletBatchKeyLookupRequest pBatchRequest = 
pBatchRequestBuilder.build();
+        long timeoutTs = System.currentTimeMillis() + timeoutMs;
+        InternalService.PTabletBatchKeyLookupResponse pBatchResult = null;
+        try {
+            Future<InternalService.PTabletBatchKeyLookupResponse> 
futureBatchResponse =
+                    BackendServiceProxy.getInstance()
+                    .batchFetchTabletDataAsync(backend.getBrpcAddress(), 
pBatchRequest);
             long currentTs = System.currentTimeMillis();
             if (currentTs >= timeoutTs) {
-                LOG.warn("fetch result timeout {}", backend.getBrpcAddress());
+                LOG.warn("batch fetch result timeout {}", 
backend.getBrpcAddress());
                 status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request 
timeout");
                 return null;
             }
             try {
-                pResult = futureResponse.get(timeoutTs - currentTs, 
TimeUnit.MILLISECONDS);
+                // todo: get the result asynchrously
+                pBatchResult = futureBatchResponse.get(timeoutTs - currentTs, 
TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 // continue to get result
                 LOG.warn("future get interrupted Exception");
@@ -297,18 +688,18 @@ public class PointQueryExecutor implements CoordInterface 
{
                     return null;
                 }
             } catch (TimeoutException e) {
-                futureResponse.cancel(true);
+                futureBatchResponse.cancel(true);
                 LOG.warn("fetch result timeout {}, addr {}", timeoutTs - 
currentTs, backend.getBrpcAddress());
                 status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch 
result timeout");
                 return null;
             }
         } catch (RpcException e) {
-            LOG.warn("query fetch rpc exception {}, e {}", 
backend.getBrpcAddress(), e);
+            LOG.warn("query batch fetch rpc exception {}, e {}", 
backend.getBrpcAddress(), e);
             status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
             SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
             return null;
         } catch (ExecutionException e) {
-            LOG.warn("query fetch execution exception {}, addr {}", e, 
backend.getBrpcAddress());
+            LOG.warn("query batch fetch execution exception {}, addr {}", e, 
backend.getBrpcAddress());
             if (e.getMessage().contains("time out")) {
                 // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
                 status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
@@ -318,43 +709,37 @@ public class PointQueryExecutor implements CoordInterface 
{
             }
             return null;
         }
-        Status resultStatus = new Status(pResult.getStatus());
-        if (resultStatus.getErrorCode() != TStatusCode.OK) {
-            status.updateStatus(resultStatus.getErrorCode(), 
resultStatus.getErrorMsg());
-            return null;
-        }
 
-        if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
-            LOG.debug("get empty rowbatch");
-            rowBatch.setEos(true);
-            status.updateStatus(TStatusCode.OK, "");
-            return rowBatch;
-        } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
-            byte[] serialResult = pResult.getRowBatch().toByteArray();
-            TResultBatch resultBatch = new TResultBatch();
-            TDeserializer deserializer = new TDeserializer(
-                    new 
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
-            try {
-                deserializer.deserialize(resultBatch, serialResult);
-            } catch (TException e) {
-                if (e.getMessage().contains("MaxMessageSize reached")) {
-                    throw new TException("MaxMessageSize reached, try increase 
max_msg_size_of_result_receiver");
-                } else {
-                    throw e;
-                }
+        // handle the response
+        boolean isOK = true;
+        for (InternalService.PTabletKeyLookupResponse subResponse : 
pBatchResult.getSubKeyLookupResList()) {
+            Status resultStatus = new Status(subResponse.getStatus());
+            if (resultStatus.getErrorCode() != TStatusCode.OK) {
+                status.updateStatus(resultStatus.getErrorCode(), 
resultStatus.getErrorMsg());
+                return null;
             }
-            rowBatch.setBatch(resultBatch);
-            rowBatch.setEos(true);
+            if (subResponse.hasEmptyBatch() && subResponse.getEmptyBatch()) {
+                LOG.debug("get empty rowbatch");
+                continue;
+            } else if (subResponse.hasRowBatch() && 
subResponse.getRowBatch().size() > 0) {
+                byte[] serialResult = subResponse.getRowBatch().toByteArray();
+                result.add(serialResult);
+                continue;
+            } else {
+                Preconditions.checkState(false, "No row batch or empty batch 
found");
+            }
+
+            if (isCancel) {
+                status.updateStatus(TStatusCode.CANCELLED, "cancelled");
+                isOK = false;
+                break;
+            }
+        }
+        if (isOK) {
             status.updateStatus(TStatusCode.OK, "");
-            return rowBatch;
-        } else {
-            Preconditions.checkState(false, "No row batch or empty batch 
found");
         }
 
-        if (isCancel) {
-            status.updateStatus(TStatusCode.CANCELLED, "cancelled");
-        }
-        return rowBatch;
+        return result;
     }
 
     public void cancel() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 54c5e68144c..9cde1897654 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -98,6 +98,11 @@ public class BackendServiceClient {
         return stub.tabletFetchData(request);
     }
 
+    public Future<InternalService.PTabletBatchKeyLookupResponse> 
batchFetchTabletDataAsync(
+            InternalService.PTabletBatchKeyLookupRequest batchRequest) {
+        return stub.tabletBatchFetchData(batchRequest);
+    }
+
     public InternalService.PFetchDataResult 
fetchDataSync(InternalService.PFetchDataRequest request) {
         return blockingStub.fetchData(request);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 053a7428b52..5fe13a82d0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -307,6 +307,18 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<InternalService.PTabletBatchKeyLookupResponse> 
batchFetchTabletDataAsync(
+            TNetworkAddress address, 
InternalService.PTabletBatchKeyLookupRequest batchRequest) throws RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.batchFetchTabletDataAsync(batchRequest);
+        } catch (Throwable e) {
+            LOG.warn("batch fetch tablet data catch a exception, 
address={}:{}",
+                    address.getHostname(), address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
     public InternalService.PFetchDataResult fetchDataSync(
             TNetworkAddress address, InternalService.PFetchDataRequest 
request) throws RpcException {
         try {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 547b2588168..867c0679dac 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -339,6 +339,11 @@ message PTabletKeyLookupRequest {
     optional int64 version = 7;
     // serilized from TQueryOptions 
     optional bytes query_options = 8;
+    optional int64 replica_id = 9;
+}
+
+message PTabletBatchKeyLookupRequest {
+    repeated PTabletKeyLookupRequest sub_key_lookup_req = 1;
 }
 
 message PTabletKeyLookupResponse {
@@ -347,6 +352,11 @@ message PTabletKeyLookupResponse {
     optional bool empty_batch = 3;
 }
 
+message PTabletBatchKeyLookupResponse {
+    optional PStatus status = 1;
+    repeated PTabletKeyLookupResponse sub_key_lookup_res = 2;
+}
+
 //Add message definition to fetch and update cache
 enum PCacheStatus {    
     DEFAULT = 0;
@@ -1030,6 +1040,7 @@ service PBackendService {
     rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse);
     rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns 
(PGetFileCacheMetaResponse);
     rpc tablet_fetch_data(PTabletKeyLookupRequest) returns 
(PTabletKeyLookupResponse);
+    rpc tablet_batch_fetch_data(PTabletBatchKeyLookupRequest) returns 
(PTabletBatchKeyLookupResponse);
     rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns 
(PFetchColIdsResponse);
     rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns 
(PGetTabletVersionsResponse);
     rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns 
(PReportStreamLoadStatusResponse);
diff --git a/regression-test/data/point_query_p0/test_point_IN_query.out 
b/regression-test/data/point_query_p0/test_point_IN_query.out
new file mode 100644
index 00000000000..6f1f4ae7b33
--- /dev/null
+++ b/regression-test/data/point_query_p0/test_point_IN_query.out
@@ -0,0 +1,27 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !1_EQ_sql --
+123    100     110     zxcd
+
+-- !1_EQ_sql --
+6333   2642    480     zxc
+
+-- !1_EQ_sql --
+1231   1220    210     zxc
+
+-- !2_EQ_sql --
+222    100     115     zxc
+
+-- !2_EQ_sql --
+1231   1220    210     zxc
+
+-- !2_EQ_sql --
+1231   1220    210     zxc
+
+-- !3_EQ_sql --
+323    49      240     zxc
+
+-- !0_EQ_sql --
+12     12      120     zxc
+123    100     110     zxcd
+222    100     115     zxc
+
diff --git a/regression-test/data/point_query_p0/test_point_query.out 
b/regression-test/data/point_query_p0/test_point_query.out
index 1cc4142e39f..f22bf31942e 100644
--- a/regression-test/data/point_query_p0/test_point_query.out
+++ b/regression-test/data/point_query_p0/test_point_query.out
@@ -160,3 +160,182 @@
 -- !sql --
 -10    20      aabc    update val
 
+-- !case_1_sql --
+123    132     a
+
+-- !case_1_sql --
+123    132     a
+123    222     b
+
+-- !case_1_sql --
+1      1       d
+123    132     a
+123    222     b
+
+-- !case_2_sql --
+
+-- !case_2_sql --
+222    150     c
+
+-- !case_2_sql --
+123    120     a
+
+-- !case_2_sql --
+400    250     g
+400    260     f
+
+-- !case_2_sql --
+222    150     c
+400    250     g
+400    260     f
+
+-- !case_3_sql --
+123    100     a
+
+-- !case_3_sql --
+
+-- !case_3_sql --
+100    100     aaa
+100    120     aaaaa
+
+-- !case_3_sql --
+123    100     a
+
+-- !case_3_sql --
+400    250     d
+400    280     e
+
+-- !case_3_sql --
+123    100     a
+350    200     c
+400    250     d
+
+-- !case_4_sql --
+123    100     110     a
+
+-- !case_4_sql --
+
+-- !case_4_sql --
+1231   1220    210     d
+
+-- !case_4_sql --
+123    100     110     a
+
+-- !case_4_sql --
+123    100     110     a
+1231   1220    210     d
+222    100     115     b
+
+-- !case_5_sql --
+123    100     110     a
+
+-- !case_5_sql --
+
+-- !case_5_sql --
+1231   1220    210     d
+
+-- !case_5_sql --
+123    100     110     a
+
+-- !case_5_sql --
+123    100     110     a
+1231   1220    210     d
+222    100     115     b
+633    2642    480     g
+6333   2642    480     h
+
+-- !case_6_sql --
+123    132     a
+
+-- !case_6_sql --
+123    132     a
+123    222     b
+
+-- !case_6_sql --
+1      1       d
+123    132     a
+123    222     b
+2      2       e
+3      3       f
+4      4       i
+
+-- !case_7_sql --
+123    100     110     a
+
+-- !case_7_sql --
+
+-- !case_7_sql --
+1231   1220    210     d
+
+-- !case_7_sql --
+123    100     110     a
+
+-- !case_7_sql --
+123    100     110     a
+1231   1220    210     d
+222    100     115     b
+633    2642    480     g
+6333   2642    480     h
+
+-- !case_8_sql --
+123    100     a
+
+-- !case_8_sql --
+
+-- !case_8_sql --
+123    100     a
+
+-- !case_8_sql --
+400    250     d
+400    280     e
+
+-- !case_8_sql --
+123    100     a
+350    200     c
+400    250     d
+
+-- !case_9_sql --
+123    100     110     zxcd
+
+-- !case_9_sql --
+222    100     115     zxc
+
+-- !case_9_sql --
+323    49      240     zxc
+
+-- !case_9_sql --
+123    100     110     zxcd
+
+-- !case_9_sql --
+1231   1220    210     zxc
+
+-- !case_9_sql --
+6333   2642    480     zxc
+
+-- !case_9_sql --
+633    2642    480     zxc
+
+-- !case_9_sql --
+123    100     110     zxcd
+
+-- !case_9_sql --
+1231   1220    210     zxc
+222    100     115     zxc
+
+-- !case_9_sql --
+12     12      120     zxc
+123    100     110     zxcd
+222    100     115     zxc
+
+-- !case_9_sql --
+1231   1220    210     zxc
+
+-- !case_9_sql --
+123    100     110     zxcd
+222    100     115     zxc
+
+-- !case_9_sql --
+123    100     110     zxcd
+1231   1220    210     zxc
+222    100     115     zxc
+
diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out 
b/regression-test/data/point_query_p0/test_point_query_partition.out
index bef064984c8..5c20cde01d9 100644
--- a/regression-test/data/point_query_p0/test_point_query_partition.out
+++ b/regression-test/data/point_query_p0/test_point_query_partition.out
@@ -31,6 +31,15 @@
 
 -- !point_select --
 
+-- !point_in_select --
+-1     c
+1      a
+11     d
+2      b
+33     f
+45     g
+999    h
+
 -- !point_selectxxx --
 686612 686612  686612  \N      \N      \N      \N      \N      \N      \N      
\N
 
@@ -46,3 +55,19 @@
 -- !point_selecteee --
 686613 686613  686613  \N      \N      \N      \N      \N      \N      \N      
\N
 
+-- !point_in_selectxxx --
+686612 686612  686612  \N      \N      \N      \N      \N      \N      \N      
\N
+686613 686613  686613  \N      \N      \N      \N      \N      \N      \N      
\N
+
+-- !point_in_selectyyy --
+686612 686612  686612  \N      \N      \N      \N      \N      \N      \N      
\N
+686613 686613  686613  \N      \N      \N      \N      \N      \N      \N      
\N
+
+-- !point_in_selectmmm --
+686612 686612  686612  \N      \N      \N      \N      \N      \N      \N      
\N
+686613 686613  686613  \N      \N      \N      \N      \N      \N      \N      
\N
+
+-- !point_in_selecteee --
+686612 686612  686612  \N      \N      \N      \N      \N      \N      \N      
\N
+686613 686613  686613  \N      \N      \N      \N      \N      \N      \N      
\N
+
diff --git a/regression-test/data/point_query_p0/test_rowstore.out 
b/regression-test/data/point_query_p0/test_rowstore.out
index 34e40867d6a..f0558d616c0 100644
--- a/regression-test/data/point_query_p0/test_rowstore.out
+++ b/regression-test/data/point_query_p0/test_rowstore.out
@@ -28,6 +28,11 @@
 -- !point_select --
 33333333333333333333333333333333       3
 
+-- !point_in_select --
+11111111111111111111111111111111111111 3
+222222222222222222222222222222222      3
+33333333333333333333333333333333       3
+
 -- !point_select --
 3
 
@@ -37,6 +42,11 @@
 -- !point_select --
 3
 
+-- !point_in_select --
+3
+3
+3
+
 -- !point_select --
 33333333333333333333333333333333
 
@@ -55,6 +65,11 @@
 -- !point_select --
 3
 
+-- !point_in_select --
+3
+3
+3
+
 -- !point_select --
 2021-02-01T11:11:11
 
@@ -64,6 +79,11 @@
 -- !point_select --
 2023-02-01T11:11:11
 
+-- !point_in_select --
+2021-02-01T11:11:11
+2022-02-01T11:11:11
+2023-02-01T11:11:11
+
 -- !point_select --
 2017-10-01T11:11:11.021        2017-10-01T11:11:11.170 
2017-10-01T11:11:11.110111      30
 
@@ -88,6 +108,16 @@
 -- !point_select --
 2017-10-01T11:11:11.028        \N      \N      34
 
+-- !point_in_select --
+2017-10-01T11:11:11.021        2017-10-01T11:11:11.170 
2017-10-01T11:11:11.110111      30
+2017-10-01T11:11:11.022        2017-10-01T11:11:11.160 
2017-10-01T11:11:11.100111      31
+2017-10-01T11:11:11.023        2017-10-01T11:11:11.150 
2017-10-01T11:11:11.130111      31
+2017-10-01T11:11:11.024        2017-10-01T11:11:11.140 
2017-10-01T11:11:11.120111      32
+2017-10-01T11:11:11.025        2017-10-01T11:11:11.100 
2017-10-01T11:11:11.140111      32
+2017-10-01T11:11:11.026        2017-10-01T11:11:11.110 
2017-10-01T11:11:11.150111      33
+2017-10-01T11:11:11.027        \N      \N      34
+2017-10-01T11:11:11.028        \N      \N      34
+
 -- !sql --
 1      abc     1111919.123456789190000000
 
diff --git a/regression-test/data/point_query_p0/test_rowstore_query.out 
b/regression-test/data/point_query_p0/test_rowstore_query.out
index b43e0263960..e4656d631a7 100644
--- a/regression-test/data/point_query_p0/test_rowstore_query.out
+++ b/regression-test/data/point_query_p0/test_rowstore_query.out
@@ -2,6 +2,10 @@
 -- !sql --
 1      abc     1111919.123456789190000000
 
--- !sql --
+-- !point_sql --
+2      def     1111919.123456789190000000
+
+-- !point_in_sql --
+1      abc     1111919.123456789190000000
 2      def     1111919.123456789190000000
 
diff --git a/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out 
b/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out
index fa90a56523c..0d640ff2b06 100644
--- a/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out
+++ b/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out
@@ -127,3 +127,9 @@
 5      1304    36548425        15229335116     991129292901.111380000  dd      
2120-01-02      2024-01-01T12:36:38     652.692 5022-01-01T11:30:38     
5022-01-01
 6      1305    56054803        18031831909     100320.111390000        haha 
abcd       2220-01-02      2025-01-01T12:36:38     2.7692  6022-01-01T11:30:38  
   6022-01-01
 
+-- !stmt_read11_1 --
+1      1300    55356821
+2      1301    56052706
+3      1302    55702967
+4      1303    56054326
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 4f9409308d5..85feb96380c 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1355,9 +1355,9 @@ class Suite implements GroovyInterceptable {
         quickRunTest(tag, sql, isOrder)
     }
 
-    void quickExecute(String tag, PreparedStatement stmt) {
+    void quickExecute(String tag, PreparedStatement stmt, boolean isOrder = 
false) {
         logger.info("Execute tag: ${tag}, sql: ${stmt}".toString())
-        quickRunTest(tag, stmt)
+        quickRunTest(tag, stmt, isOrder)
     }
 
     @Override
@@ -1369,6 +1369,8 @@ class Suite implements GroovyInterceptable {
             return quickTest(name.substring("order_qt_".length()), (args as 
Object[])[0] as String, true)
         } else if (name.startsWith("qe_")) {
             return quickExecute(name.substring("qe_".length()), (args as 
Object[])[0] as PreparedStatement)
+        } else if (name.startsWith("order_qe_")) {
+            return quickExecute(name.substring("order_qe_".length()), (args as 
Object[])[0] as PreparedStatement, true)
         } else if (name.startsWith("assert") && name.length() > 
"assert".length()) {
             // delegate to junit Assertions dynamically
             return Assertions."$name"(*args) // *args: spread-dot
diff --git 
a/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
 
b/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
index 4be53ff17f6..4ac5b7f5ec9 100644
--- 
a/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
+++ 
b/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
@@ -46,7 +46,7 @@ suite ("test_row_store_page_size_cloud") {
 
     explain {
         sql("select * from ps_table_1 where k1=1 and k2=1;")
-        contains("SHORT")
+        contains("SHORT-CIRCUIT")
     }
 
     qt_select_star "select * from ps_table_1 where k1=1 and k2=1;"
@@ -81,7 +81,7 @@ suite ("test_row_store_page_size_cloud") {
 
     explain {
         sql("select * from ps_table_2 where k1=1 and k2=1;")
-        contains("SHORT")
+        contains("SHORT-CIRCUIT")
     }
 
     qt_select_star "select * from ps_table_2 where k1=1 and k2=1;"
diff --git 
a/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy
 
b/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy
new file mode 100644
index 00000000000..29cc084c3bd
--- /dev/null
+++ 
b/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.time.LocalDate
+import java.time.format.DateTimeFormatter
+
+suite("test_dynamic_partition_point_query") {
+    sql "drop table if exists dy_par_pq"
+    sql """
+        CREATE TABLE IF NOT EXISTS dy_par_pq ( k1 date NOT NULL, k2 
varchar(20) NOT NULL, k3 int NOT NULL )
+        UNIQUE KEY(k1)
+        PARTITION BY RANGE(k1) ( )
+        DISTRIBUTED BY HASH(k1) BUCKETS 3
+        PROPERTIES (
+            "dynamic_partition.enable"="true",
+            "dynamic_partition.end"="3",
+            "dynamic_partition.buckets"="10",
+            "dynamic_partition.start"="-3",
+            "dynamic_partition.prefix"="p",
+            "dynamic_partition.time_unit"="DAY",
+            "dynamic_partition.create_history_partition"="true",
+            "dynamic_partition.replication_allocation" = 
"tag.location.default: 1",
+            "replication_allocation" = "tag.location.default: 1",
+            "store_row_column" = "true")
+        """
+    def result  = sql "show tables like 'dy_par_pq'"
+    logger.info("${result}")
+    assertEquals(result.size(), 1)
+    result = sql_return_maparray "show partitions from dy_par_pq"
+    assertEquals(result.get(0).Buckets.toInteger(), 10)
+    def currentDate = LocalDate.now()
+    def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
+    def currenteDay = currentDate.format(formatter)
+    sql "insert into dy_par_pq values ('${currenteDay}', 'a', 1);"
+    def previous1Day = currentDate.minusDays(1).format(formatter)
+    sql "insert into dy_par_pq values ('${previous1Day}', 'b', 2);"
+    def previous2Day = currentDate.minusDays(2).format(formatter)
+    sql "insert into dy_par_pq values ('${previous2Day}', 'c', 3);"
+    def previous3Day = currentDate.minusDays(3).format(formatter)
+    sql "insert into dy_par_pq values ('${previous3Day}', 'd', 4);"
+    def next1Day = currentDate.plusDays(1).format(formatter)
+    sql "insert into dy_par_pq values ('${next1Day}', 'e', 5);"
+    def next2Day = currentDate.plusDays(2).format(formatter)
+    sql "insert into dy_par_pq values ('${next2Day}', 'f', 6);"
+    def next3Day = currentDate.plusDays(3).format(formatter)
+    sql "insert into dy_par_pq values ('${next3Day}', 'g', 7);"
+
+    result = sql """
+        select 
+            * 
+        from 
+            dy_par_pq 
+        where 
+            k1 in (
+            '${currenteDay}', '${previous1Day}', '${previous2Day}', 
'${previous3Day}',
+            '${next1Day}', '${next2Day}', '${next3Day}') ;
+    """
+    assertEquals(result.size(), 7)
+    explain {
+        sql """
+            select 
+                * 
+            from 
+                dy_par_pq 
+            where 
+                k1 in (
+                '${currenteDay}', '${previous1Day}', '${previous2Day}', 
'${previous3Day}',
+                '${next1Day}', '${next2Day}', '${next3Day}') ;
+        """
+        contains "SHORT-CIRCUIT"
+    }
+
+    def previous4Day = currentDate.minusDays(4).format(formatter)
+    def next4Day = currentDate.plusDays(4).format(formatter)
+    result = sql """
+        select 
+            * 
+        from 
+            dy_par_pq 
+        where 
+            k1 in (
+            '${currenteDay}', '${previous4Day}', '${next4Day}') ;
+    """
+    assertEquals(result.size(), 1)
+
+    result = sql """
+        select 
+            * 
+        from 
+            dy_par_pq 
+        where 
+            k1 = '${currenteDay}' ;
+    """
+    assertEquals(result.size(), 1)
+
+    result = sql """
+        select 
+            * 
+        from 
+            dy_par_pq 
+        where 
+            k1 = '${next4Day}' ;
+    """
+    assertEquals(result.size(), 0)
+    
+    sql "drop table dy_par_pq"
+
+}
diff --git a/regression-test/suites/point_query_p0/test_point_IN_query.groovy 
b/regression-test/suites/point_query_p0/test_point_IN_query.groovy
new file mode 100644
index 00000000000..29e33787470
--- /dev/null
+++ b/regression-test/suites/point_query_p0/test_point_IN_query.groovy
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.math.BigDecimal;
+
+suite("test_point_IN_query", "p0") {
+    def tableName = "rs_in_table"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    sql """    
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c int not null,
+                d string not null
+            )
+            unique key(a, b, c)
+            partition by RANGE(a, b, c)
+            (
+                partition p values [(1, 1, 1), (100, 100, 100)),
+                partition p0 values [(100, 100, 100), (200, 210, 220)),
+                partition p1 values [(200, 210, 220), (300, 250, 290)),
+                partition p2 values [(300, 250, 290), (350, 290, 310)),
+                partition p3 values [(350, 290, 310), (400, 350, 390)),
+                partition p4 values [(400, 350, 390), (800, 400, 450)),
+                partition p5 values [(800, 400, 450), (2000, 500, 500)),
+                partition p6 values [(2000, 500, 500), (5000, 600, 600)),
+                partition p7 values [(5000, 600, 600), (9999, 9999, 9999))
+            )
+            distributed by hash(a, c)
+            buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+    sql """
+            insert into ${tableName} values(123, 100, 110, "zxcd");
+            insert into ${tableName} values(222, 100, 115, "zxc");
+            insert into ${tableName} values(12, 12, 120, "zxc");
+            insert into ${tableName} values(1231, 1220, 210, "zxc");
+            insert into ${tableName} values(323, 49, 240, "zxc");
+            insert into ${tableName} values(843, 7342, 370, "zxcde");
+            insert into ${tableName} values(633, 2642, 480, "zxc");
+            insert into ${tableName} values(6333, 2642, 480, "zxc");
+        """
+    
+    order_qt_1_EQ_sql "select * from ${tableName} where a = 123 and b in (100, 
12, 1220, 7342, 999, 2642) and  c in (110, 115, 120, 480);"
+    order_qt_1_EQ_sql "select * from ${tableName} where a in (12, 222, 1231, 
6333) and b = 2642 and c in (210, 110, 115, 210, 480);"
+    order_qt_1_EQ_sql "select * from ${tableName} where a in (123, 1, 222, 
1231, 420, 500) and b in (132, 100, 222, 1220, 300) and c = 210;"
+    explain {
+        sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and b 
= 2642 and c in (210, 110, 115, 210, 480);"
+        contains "SHORT-CIRCUIT"
+    }
+
+    order_qt_2_EQ_sql "select * from ${tableName} where a = 222 and b = 100 
and c in (110, 115, 120, 480);"
+    order_qt_2_EQ_sql "select * from ${tableName} where a = 1231 and  b in 
(100, 12, 1220, 7342, 999, 2642) and c = 210;"
+    order_qt_2_EQ_sql "select * from ${tableName} where a in (12, 222, 1231, 
6333) and  b = 1220 and c = 210;"
+    explain {
+        sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and  
b = 1220 and c = 210;"
+        contains "SHORT-CIRCUIT"
+    }
+
+    order_qt_3_EQ_sql "select * from ${tableName} where a = 323 and b = 49 and 
c = 240;"
+    order_qt_0_EQ_sql "select * from ${tableName} where a in (123, 222, 12) 
and b in (100, 12) and c in (110, 115, 120, 210);"
+    explain {
+        sql "select * from ${tableName} where a in (123, 222, 12) and b in 
(100, 12) and c in (110, 115, 120, 210);"
+        contains "SHORT-CIRCUIT"
+    }
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+}
\ No newline at end of file
diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy 
b/regression-test/suites/point_query_p0/test_point_query.groovy
index 99998a24ed6..ab71187e7e3 100644
--- a/regression-test/suites/point_query_p0/test_point_query.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query.groovy
@@ -341,4 +341,404 @@ suite("test_point_query", "nonConcurrent") {
     qt_sql "select * from table_3821461 where col1 = 10 and col2 = 20 and loc3 
= 'aabc';"
     sql "update table_3821461 set value = 'update value' where col1 = -10 or 
col1 = 20;"
     qt_sql """select * from table_3821461 where col1 = -10 and col2 = 20 and 
loc3 = 'aabc'"""
+
+    tableName = "in_table_1"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 1: Default partitioning, part of the primary key is a bucket column
+    sql """
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c string not null
+            )
+            unique key(a, b)
+            distributed by hash(a) buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+    sql """
+            insert into ${tableName} values(123, 132, "a");
+            insert into ${tableName} values(123, 222, "b");
+            insert into ${tableName} values(22, 2, "c");
+            insert into ${tableName} values(1, 1, "d");
+            insert into ${tableName} values(2, 2, "e");
+            insert into ${tableName} values(3, 3, "f");
+            insert into ${tableName} values(4, 4, "i");
+        """
+    qt_case_1_sql "select * from ${tableName} where a = 123 and b = 132;"
+    explain {
+        sql("select * from ${tableName} where a = 123 and b in (132, 1, 222, 
333);")
+        contains "SHORT-CIRCUIT"
+    }
+    order_qt_case_1_sql "select * from ${tableName} where a = 123 and b in 
(132, 1, 222, 333);"
+    explain {
+        sql("select * from ${tableName} where a in (123, 1, 222) and b in 
(132, 1, 222, 333);")
+        contains "SHORT-CIRCUIT"
+    }
+    order_qt_case_1_sql "select * from ${tableName} where a in (123, 1, 222) 
and b in (132, 1, 222, 333);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    // Case 2: Partition columns, bucket columns, and primary keys are the same
+    tableName = "in_table_2"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    sql """
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c string not null
+            )
+            unique key(a, b)
+            partition by RANGE(a, b)
+            (
+                partition p0 values [(100, 100), (200, 140)),
+                partition p1 values [(200, 140), (300, 170)),
+                partition p2 values [(300, 170), (400, 250)),
+                partition p3 values [(400, 250), (420, 300)),
+                partition p4 values [(420, 300), (500, 400))
+            )
+            distributed by hash(a, b) buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+            """
+    sql """
+            insert into ${tableName} values(123, 120, "a");
+            insert into ${tableName} values(150, 120, "b");
+            insert into ${tableName} values(222, 150, "c");
+            insert into ${tableName} values(333, 200, "e");
+            insert into ${tableName} values(400, 260, "f");
+            insert into ${tableName} values(400, 250, "g");
+            insert into ${tableName} values(440, 350, "h");
+            insert into ${tableName} values(450, 320, "i");
+        """
+
+    qt_case_2_sql "select * from ${tableName} where a = 123 and b = 100;"
+    qt_case_2_sql "select * from ${tableName} where a = 222 and b = 150;"
+    order_qt_case_2_sql "select * from ${tableName} where a = 123 and b in 
(132, 120, 222, 333);"
+    order_qt_case_2_sql "select * from ${tableName} where a = 400 and b in 
(260, 250, 300);"
+    order_qt_case_2_sql "select * from ${tableName} where a in (400, 222, 100) 
and b in (260, 250, 100, 150);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    tableName = "in_table_3"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 3: The partition column is the same as the primary key, and the 
bucket column is part of the primary key.
+    sql """    
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c string not null
+            )
+            unique key(a, b)
+            partition by RANGE(a, b)
+            (
+                partition p0 values [(100, 100), (100, 140)),
+                partition p1 values [(100, 140), (200, 140)),
+                partition p2 values [(200, 140), (300, 170)),
+                partition p3 values [(300, 170), (400, 250)),
+                partition p4 values [(400, 250), (420, 300)),
+                partition p5 values [(420, 300), (500, 400))
+            )
+            distributed by hash(a)
+            buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+    sql """
+            insert into ${tableName} values(100, 100, "aaa");
+            insert into ${tableName} values(100, 120, "aaaaa");
+            insert into ${tableName} values(123, 100, "a");
+            insert into ${tableName} values(150, 100, "b");
+            insert into ${tableName} values(350, 200, "c");
+            insert into ${tableName} values(400, 250, "d");
+            insert into ${tableName} values(400, 280, "e");
+            insert into ${tableName} values(450, 350, "f");
+        """
+    qt_case_3_sql "select * from ${tableName} where a = 123 and b = 100;"
+    qt_case_3_sql "select * from ${tableName} where a = 222 and b = 100;"
+    order_qt_case_3_sql "select * from ${tableName} where a = 100 and b in 
(132, 100, 222, 120);"
+    order_qt_case_3_sql "select * from ${tableName} where a = 123 and b in 
(132, 100, 222, 333);"
+    order_qt_case_3_sql "select * from ${tableName} where a = 400 and b in 
(250, 280, 300);"
+    order_qt_case_3_sql "select * from ${tableName} where a in (123, 1, 350, 
400, 420, 500, 1000) and b in (132, 100, 222, 200, 350, 250);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    tableName = "in_table_4"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 4: Bucket columns and partition columns are both partial primary 
keys,
+    // and there is no overlap between bucket columns and partition columns
+    sql """
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c int not null,
+                d string not null
+            )
+            unique key(a, b, c)
+            partition by RANGE(c)
+            (
+                partition p0 values [(100), (200)),
+                partition p1 values [(200), (300)),
+                partition p2 values [(300), (400)),
+                partition p3 values [(400), (500))
+            )
+            distributed by hash(a, b)
+            buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+
+    sql """
+            insert into ${tableName} values(123, 100, 110, "a");
+            insert into ${tableName} values(222, 100, 115, "b");
+            insert into ${tableName} values(12, 12, 120, "c");
+            insert into ${tableName} values(1231, 1220, 210, "d");
+            insert into ${tableName} values(323, 49, 240, "e");
+            insert into ${tableName} values(843, 7342, 370, "f");
+            insert into ${tableName} values(633, 2642, 480, "g");
+            insert into ${tableName} values(6333, 2642, 480, "h");
+        """
+
+    qt_case_4_sql "select * from ${tableName} where a = 123 and b = 100 and c 
= 110;"
+    qt_case_4_sql "select * from ${tableName} where a = 123 and b = 101 and c 
= 110;"
+    qt_case_4_sql "select * from ${tableName} where a = 1231 and b = 1220 and 
c = 210;"
+    order_qt_case_4_sql "select * from ${tableName} where a = 123 and b in 
(132, 100, 222, 333) and c in (110, 115, 120);"
+    order_qt_case_4_sql "select * from ${tableName} where a in (123, 1, 222, 
1231, 420, 500) and b in (132, 100, 222, 1220, 300) and c in (210, 110, 115, 
210);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    tableName = "in_table_5"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 5: Bucket columns and partition columns are both partial primary 
keys,
+    // and bucket columns and partition columns overlap
+    sql """
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c int not null,
+                d string not null
+            )
+            unique key(a, b, c)
+            partition by RANGE(a)
+            (
+                partition p0 values [(0), (100)),
+                partition p1 values [(100), (200)),
+                partition p2 values [(200), (300)),
+                partition p3 values [(300), (400)),
+                partition p4 values [(400), (500)),
+                partition p5 values [(500), (900)),
+                partition p6 values [(900), (1200)),
+                partition p7 values [(1200), (9000))
+            )
+            distributed by hash(a, b)
+            buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+
+    sql """
+            insert into ${tableName} values(123, 100, 110, "a");
+            insert into ${tableName} values(222, 100, 115, "b");
+            insert into ${tableName} values(12, 12, 120, "c");
+            insert into ${tableName} values(1231, 1220, 210, "d");
+            insert into ${tableName} values(323, 49, 240, "e");
+            insert into ${tableName} values(843, 7342, 370, "f");
+            insert into ${tableName} values(633, 2642, 480, "g");
+            insert into ${tableName} values(6333, 2642, 480, "h");
+        """
+
+    qt_case_5_sql "select * from ${tableName} where a = 123 and b = 100 and c 
= 110;"
+    qt_case_5_sql "select * from ${tableName} where a = 123 and b = 101 and c 
= 110;"
+    qt_case_5_sql "select * from ${tableName} where a = 1231 and b = 1220 and 
c = 210;"
+    order_qt_case_5_sql "select * from ${tableName} where a = 123 and b in 
(132, 100, 222, 333) and c in (110, 115, 120);"
+    order_qt_case_5_sql "select * from ${tableName} where a in (123, 12, 222, 
1231, 420, 500, 6333, 633, 843) and b in (132, 100, 222, 1220, 300, 2642) and c 
in (210, 110, 115, 210, 480);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    
+    tableName = "in_table_6"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 6: Default partitioning, primary keys are all bucket columns
+    sql """
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c string not null
+            )
+            unique key(a, b)
+            distributed by hash(a, b) buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+    sql """
+            insert into ${tableName} values(123, 132, "a");
+            insert into ${tableName} values(123, 222, "b");
+            insert into ${tableName} values(22, 2, "c");
+            insert into ${tableName} values(1, 1, "d");
+            insert into ${tableName} values(2, 2, "e");
+            insert into ${tableName} values(3, 3, "f");
+            insert into ${tableName} values(4, 4, "i");
+        """
+    qt_case_6_sql "select * from ${tableName} where a = 123 and b = 132;"
+    order_qt_case_6_sql "select * from ${tableName} where a = 123 and b in 
(132, 1, 222, 333);"
+    order_qt_case_6_sql "select * from ${tableName} where a in (123, 1, 222, 
2, 3, 4) and b in (132, 1, 222, 333, 2, 3, 4);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    tableName = "in_table_7"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 7: Partition and bucket columns are the same, but only part of the 
primary key
+    sql """
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c int not null,
+                d string not null
+            )
+            unique key(a, b, c)
+            partition by RANGE(a)
+            (
+                partition p0 values [(0), (100)),
+                partition p1 values [(100), (200)),
+                partition p2 values [(200), (300)),
+                partition p3 values [(300), (400)),
+                partition p4 values [(400), (500)),
+                partition p5 values [(500), (900)),
+                partition p6 values [(900), (1200)),
+                partition p7 values [(1200), (9000))
+            )
+            distributed by hash(a)
+            buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+
+    sql """
+            insert into ${tableName} values(123, 100, 110, "a");
+            insert into ${tableName} values(222, 100, 115, "b");
+            insert into ${tableName} values(12, 12, 120, "c");
+            insert into ${tableName} values(1231, 1220, 210, "d");
+            insert into ${tableName} values(323, 49, 240, "e");
+            insert into ${tableName} values(843, 7342, 370, "f");
+            insert into ${tableName} values(633, 2642, 480, "g");
+            insert into ${tableName} values(6333, 2642, 480, "h");
+        """
+
+    qt_case_7_sql "select * from ${tableName} where a = 123 and b = 100 and c 
= 110;"
+    qt_case_7_sql "select * from ${tableName} where a = 123 and b = 101 and c 
= 110;"
+    qt_case_7_sql "select * from ${tableName} where a = 1231 and b = 1220 and 
c = 210;"
+    order_qt_case_7_sql "select * from ${tableName} where a = 123 and b in 
(132, 100, 222, 333) and c in (110, 115, 120);"
+    order_qt_case_7_sql "select * from ${tableName} where a in (123, 12, 222, 
1231, 420, 500, 6333, 633, 843) and b in (132, 100, 222, 1220, 300, 2642) and c 
in (210, 110, 115, 210, 480);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    tableName = "in_table_8"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 8: The bucket column is the same as the primary key, and the 
partition column is part of the primary key.
+    sql """    
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c string not null
+            )
+            unique key(a, b)
+            partition by RANGE(a)
+            (
+                partition p0 values [(100), (200)),
+                partition p1 values [(200), (300)),
+                partition p2 values [(300), (400)),
+                partition p3 values [(400), (420)),
+                partition p4 values [(420), (500))
+            )
+            distributed by hash(a, b)
+            buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+    sql """
+            insert into ${tableName} values(123, 100, "a");
+            insert into ${tableName} values(150, 100, "b");
+            insert into ${tableName} values(350, 200, "c");
+            insert into ${tableName} values(400, 250, "d");
+            insert into ${tableName} values(400, 280, "e");
+            insert into ${tableName} values(450, 350, "f");
+        """
+    qt_case_8_sql "select * from ${tableName} where a = 123 and b = 100;"
+    qt_case_8_sql "select * from ${tableName} where a = 222 and b = 100;"
+    order_qt_case_8_sql "select * from ${tableName} where a = 123 and b in 
(132, 100, 222, 333);"
+    order_qt_case_8_sql "select * from ${tableName} where a = 400 and b in 
(250, 280, 300);"
+    order_qt_case_8_sql "select * from ${tableName} where a in (123, 1, 350, 
400, 420, 500, 1000) and b in (132, 100, 222, 200, 350, 250);"
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    tableName = "in_table_9"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    // Case 9: Partition leftmost match
+    sql """    
+            create table ${tableName} (
+                a int not null,
+                b int not null,
+                c int not null,
+                d string not null
+            )
+            unique key(a, b, c)
+            partition by RANGE(a, b, c)
+            (
+                partition p values [(1, 1, 1), (100, 100, 100)),
+                partition p0 values [(100, 100, 100), (200, 210, 220)),
+                partition p1 values [(200, 210, 220), (300, 250, 290)),
+                partition p2 values [(300, 250, 290), (350, 290, 310)),
+                partition p3 values [(350, 290, 310), (400, 350, 390)),
+                partition p4 values [(400, 350, 390), (800, 400, 450)),
+                partition p5 values [(800, 400, 450), (2000, 500, 500)),
+                partition p6 values [(2000, 500, 500), (5000, 600, 600)),
+                partition p7 values [(5000, 600, 600), (9999, 9999, 9999))
+            )
+            distributed by hash(a, c)
+            buckets 16
+            PROPERTIES(
+                "replication_num" = "1",
+                "store_row_column" = "true"
+            );
+        """
+    sql """
+            insert into ${tableName} values(123, 100, 110, "zxcd");
+            insert into ${tableName} values(222, 100, 115, "zxc");
+            insert into ${tableName} values(12, 12, 120, "zxc");
+            insert into ${tableName} values(1231, 1220, 210, "zxc");
+            insert into ${tableName} values(323, 49, 240, "zxc");
+            insert into ${tableName} values(843, 7342, 370, "zxcde");
+            insert into ${tableName} values(633, 2642, 480, "zxc");
+            insert into ${tableName} values(6333, 2642, 480, "zxc");
+        """
+    
+    order_qt_case_9_sql "select * from ${tableName} where a=123 and b=100 and 
c=110;"
+    order_qt_case_9_sql "select * from ${tableName} where a=222 and b=100 and 
c=115;"
+    order_qt_case_9_sql "select * from ${tableName} where a=323 and b=49 and 
c=240;"
+    order_qt_case_9_sql "select * from ${tableName} where b=100 and a=123 and 
c=110;"
+    order_qt_case_9_sql "select * from ${tableName} where a=1231 and b=1220 
and c=210;"
+    order_qt_case_9_sql "select * from ${tableName} where a=6333 and b=2642 
and c=480;"
+    order_qt_case_9_sql "select * from ${tableName} where a=633 and b=2642 and 
c=480;"
+    order_qt_case_9_sql "select * from ${tableName} where a=123 and b in 
(132,100,222,333) and c in (110, 115, 120);"
+    order_qt_case_9_sql "select * from ${tableName} where a in (222,1231) and 
b in (100,1220,2642) and c in (115,210,480);"
+    order_qt_case_9_sql "select * from ${tableName} where a in (123,222,12) 
and b in (100,12) and c in (110,115,120,210);"
+    order_qt_case_9_sql "select * from ${tableName} where a=1231 and b in 
(20490,1220,300) and c = 210;"
+    order_qt_case_9_sql "select * from ${tableName} where a in (123,1,222, 
400,420, 500) and b in (132,100,222, 200,300) and c in (110,115,210);"
+    order_qt_case_9_sql "select * from ${tableName} where a in (123,1,222, 
1231,420, 500) and b in (132,100,222, 1220,300) and c in (210,110,115,210);"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
 } 
\ No newline at end of file
diff --git a/regression-test/suites/point_query_p0/test_point_query_ck.groovy 
b/regression-test/suites/point_query_p0/test_point_query_ck.groovy
index f7c53c7207e..189c0f22d1a 100644
--- a/regression-test/suites/point_query_p0/test_point_query_ck.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query_ck.groovy
@@ -182,6 +182,32 @@ suite("test_point_query_ck") {
                 stmt.setString(3, "dd")
                 qe_point_select stmt
 
+                // IN query
+                def stmt_in = prepareStatement "SELECT /*+ 
SET_VAR(enable_nereids_planner=true) */ * FROM ${tableName} WHERE k1 IN (?, ?, 
?, ?) AND k2 IN (?, ?, ?) AND k3 IN (?, ?, ?, ?)"
+                assertEquals(stmt_in.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
+                stmt_in.setInt(1, 1231)
+                stmt_in.setInt(2, 1237)
+                stmt_in.setInt(3, 251)
+                stmt_in.setInt(4, 252)
+                stmt_in.setBigDecimal(5, new BigDecimal("119291.11"))
+                stmt_in.setBigDecimal(6, new BigDecimal("120939.11130"))
+                stmt_in.setBigDecimal(7, new BigDecimal("12222.99121135"))
+                stmt_in.setString(8, "ddd")
+                stmt_in.setString(9, 'xxx')
+                stmt_in.setString(10, generateString(251))
+                stmt_in.setString(11, generateString(252))
+                order_qe_point_in_select stmt_in
+                stmt_in.close()
+
+                stmt_in = prepareStatement "SELECT /*+ 
SET_VAR(enable_nereids_planner=true) */ * FROM ${tableName} WHERE k1 = ? AND k2 
IN (?, ?) AND k3 IN (?, ?)"
+                assertEquals(stmt_in.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
+                stmt_in.setInt(1, 1235)
+                stmt_in.setBigDecimal(2, new BigDecimal("991129292901.11138"))
+                stmt_in.setBigDecimal(3, new BigDecimal("120939.11130"))
+                stmt_in.setString(4, "dd")
+                stmt_in.setString(5, "a    ddd")
+                order_qe_point_in_select stmt_in
+
                 def stmt_fn = prepareStatement "select /*+ 
SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) from ${tableName} 
where k1 = ? and k2 =? and k3 = ?"
                 assertEquals(stmt_fn.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
                 stmt_fn.setInt(1, 1231)
@@ -191,6 +217,19 @@ suite("test_point_query_ck") {
                 qe_point_select stmt_fn
                 qe_point_select stmt_fn
 
+                // IN query
+                def stmt_fn_in = prepareStatement "SELECT /*+ 
SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) FROM ${tableName} 
WHERE k1 IN (?, ?) AND k2 IN (?, ?) AND k3 IN (?, ?)"
+                assertEquals(stmt_fn_in.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
+                stmt_fn_in.setInt(1, 1235)
+                stmt_fn_in.setInt(2, 1231)
+                stmt_fn_in.setBigDecimal(3, new BigDecimal("119291.11"))
+                stmt_fn_in.setBigDecimal(4, new 
BigDecimal("991129292901.11138"))
+                stmt_fn_in.setString(5, "dd")
+                stmt_fn_in.setString(6, "ddd")
+                order_qe_point_in_select stmt_fn_in
+                order_qe_point_in_select stmt_fn_in
+                order_qe_point_in_select stmt_fn_in
+
                 nprep_sql """
                 ALTER table ${tableName} ADD COLUMN new_column0 INT default 
"0";
                 """
@@ -201,34 +240,53 @@ suite("test_point_query_ck") {
                 stmt.setString(3, "a    ddd")
                 qe_point_select stmt
                 qe_point_select stmt
+                order_qe_point_in_select stmt_in
+                order_qe_point_in_select stmt_in
                 // invalidate cache
                 // sql "sync"
                 nprep_sql """ INSERT INTO ${tableName} VALUES(1235, 
120939.11130, "a    ddd", "xxxxxx", "2030-01-02", "2020-01-01 12:36:38", 
22.822, "7022-01-01 11:30:38", 0, 1929111.1111,[119291.19291], ["111", "222", 
"333"], 2) """
                 qe_point_select stmt
                 qe_point_select stmt
                 qe_point_select stmt
+                order_qe_point_in_select stmt_in
+                order_qe_point_in_select stmt_in
+                order_qe_point_in_select stmt_in
                 nprep_sql """
-                ALTER table ${tableName} ADD COLUMN new_column1 INT default 
"0";
+                    ALTER table ${tableName} ADD COLUMN new_column1 INT 
default "0";
                 """
                 qe_point_select stmt
                 qe_point_select stmt
+                order_qe_point_in_select stmt_in
+                order_qe_point_in_select stmt_in
                 nprep_sql """
-                ALTER table ${tableName} DROP COLUMN new_column1;
+                    ALTER table ${tableName} DROP COLUMN new_column1;
                 """
                 qe_point_select stmt
                 qe_point_select stmt
-
-                nprep_sql """
-                  ALTER table ${tableName} ADD COLUMN new_column1 INT default 
"0";
+                order_qe_point_in_select stmt_in
+                order_qe_point_in_select stmt_in
+                sql """
+                    ALTER table ${tableName} ADD COLUMN new_column1 INT 
default "0";
                 """
-                sql "select 1"
-                qe_point_select stmt
+                qe_point_select stmt        
+                order_qe_point_in_select stmt_in
             }
+
             // disable useServerPrepStmts
             def result2 = connect(user, password, context.config.jdbcUrl) {
                 qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * 
from ${tableName} where k1 = 1231 and k2 = 119291.11 and k3 = 'ddd'"""
                 qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * 
from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a    ddd'"""
                 qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ 
hex(k3), hex(k4), k7 + 10.1 from ${tableName} where k1 = 1237 and k2 = 
120939.11130 and k3 = 'a    ddd'"""
+                order_qt_in_sql """
+                        SELECT 
+                            /*+ SET_VAR(enable_nereids_planner=true) */ * 
+                        FROM 
+                            ${tableName} 
+                        WHERE 
+                            k1 IN (1231, 1237) AND 
+                            k2 IN (119291.11, 120939.11130) AND 
+                            k3 IN ('ddd', 'a    ddd')
+                """
                 // prepared text
                 // sql """ prepare stmt1 from  select * from ${tableName} 
where k1 = % and k2 = % and k3 = % """
                 // qt_sql """execute stmt1 using (1231, 119291.11, 'ddd')"""
@@ -257,8 +315,14 @@ suite("test_point_query_ck") {
                     "enable_unique_key_merge_on_write" = "true",
                     "disable_auto_compaction" = "false"
                     );"""
-                sql """insert into ${tableName} values (0, "1", "2", "3")"""
+                sql """
+                    insert into ${tableName} values (0, "1", "2", "3");
+                    insert into ${tableName} values (1, "2", "3", "4");
+                    insert into ${tableName} values (2, "3", "4", "5");
+                    insert into ${tableName} values (3, "4", "5", "6");
+                    """
                 qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * 
from ${tableName} where customer_key = 0"""
+                order_qt_in_sql """select /*+ 
SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where customer_key 
in (0, 1, 2, 3, 4, 5, 6)"""
             }
         }
         tableName = "test_ODS_EBA_LLREPORT_ck"
diff --git 
a/regression-test/suites/point_query_p0/test_point_query_partition.groovy 
b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
index 848729b4423..a98ccff0393 100644
--- a/regression-test/suites/point_query_p0/test_point_query_partition.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
@@ -114,6 +114,24 @@ suite("test_point_query_partition") {
         qe_point_select stmt
     }
 
+    // IN query
+    def result2 = connect(user=user, password=password, url=prepare_url) {
+        def stmt = prepareStatement "SELECT * FROM ${tableName} WHERE k1 IN 
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
+        assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement);
+        stmt.setInt(1, 1)
+        stmt.setInt(2, 2)
+        stmt.setInt(3, 11)
+        stmt.setInt(4, -1)
+        stmt.setInt(5, 12)
+        stmt.setInt(6, 34)
+        stmt.setInt(7, 33)
+        stmt.setInt(8, 45)
+        stmt.setInt(9, 666)
+        stmt.setInt(10, 999)
+        stmt.setInt(11, 1000)
+        order_qe_point_in_select stmt
+    }
+
     sql "DROP TABLE IF EXISTS regression_test_serving_p0.customer";
     sql """
         CREATE TABLE regression_test_serving_p0.customer (
@@ -135,8 +153,8 @@ suite("test_point_query_partition") {
         PROPERTIES (
         "replication_allocation" = "tag.location.default: 1",
         "store_row_column" = "true"
-        ); 
-    """  
+        );
+    """
     sql """insert into regression_test_serving_p0.customer(customer_key, 
customer_value_0, customer_value_1) values(686612, "686612", "686612")"""
     sql """insert into regression_test_serving_p0.customer(customer_key, 
customer_value_0, customer_value_1) values(686613, "686613", "686613")"""
     def result3 = connect(user, password, prepare_url) {
@@ -149,4 +167,15 @@ suite("test_point_query_partition") {
         qe_point_selectmmm stmt 
         qe_point_selecteee stmt 
     }
+
+    // IN query
+    def result4 = connect(user=user, password=password, url=prepare_url) {
+        def stmt = prepareStatement "SELECT /*+ 
SET_VAR(enable_nereids_planner=true) */ * FROM 
regression_test_serving_p0.customer WHERE customer_key IN (?, ?)"
+        stmt.setInt(1, 686612)
+        stmt.setInt(2, 686613)
+        order_qe_point_in_selectxxx stmt 
+        order_qe_point_in_selectyyy stmt 
+        order_qe_point_in_selectmmm stmt 
+        order_qe_point_in_selecteee stmt 
+    }
 } 
\ No newline at end of file
diff --git a/regression-test/suites/point_query_p0/test_rowstore.groovy 
b/regression-test/suites/point_query_p0/test_rowstore.groovy
index 13279e3ce87..8d943f2cee8 100644
--- a/regression-test/suites/point_query_p0/test_rowstore.groovy
+++ b/regression-test/suites/point_query_p0/test_rowstore.groovy
@@ -204,14 +204,26 @@ suite("test_rowstore", "p0,nonConcurrent") {
             assertEquals(stmt.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
             qe_point_select stmt
         }
+        def prep_in_sql = { sql_str, in_list ->
+            def stmt = prepareStatement sql_str
+            for (int i = 0; i < in_list.size(); i++) {
+                stmt.setInt(i + 1, in_list[i])
+            }
+            assertEquals(stmt.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
+            order_qe_point_in_select stmt
+        }
         def sql_str = "select v1, v2 from table_with_column_group where k1 = ?"
+        def sql_in_str = "select v1, v2 from table_with_column_group where k1 
in (?, ?, ?, ?)"
         prep_sql sql_str, 1
         prep_sql sql_str, 2 
         prep_sql sql_str, 3
+        prep_in_sql sql_in_str, [1, 2, 3, 10]
         sql_str = "select v2 from table_with_column_group where k1 = ?"
+        sql_in_str = "select v2 from table_with_column_group where k1 in (?, 
?, ?, ?)"
         prep_sql sql_str, 1
         prep_sql sql_str, 2 
         prep_sql sql_str, 3
+        prep_in_sql sql_in_str, [1, 2, 3, 10]
         sql_str = "select v1 from table_with_column_group where k1 = ?"
         prep_sql sql_str, 3
         sql_str = "select v2, v1 from table_with_column_group where k1 = ?"
@@ -222,14 +234,18 @@ suite("test_rowstore", "p0,nonConcurrent") {
         prep_sql sql_str, 1
 
         sql_str = "select v2 from table_with_column_group2 where k1 = ?"
+        sql_in_str = "select v2 from table_with_column_group2 where k1 in (?, 
?, ?, ?)"
         prep_sql sql_str, 1
         prep_sql sql_str, 2
         prep_sql sql_str, 3
+        prep_in_sql sql_in_str, [1, 2, 3, 10]
 
         sql_str = "select v4 from table_with_column_group3 where k1 = ?"
+        sql_in_str = "select v4 from table_with_column_group3 where k1 in (?, 
?, ?, ?)"
         prep_sql sql_str, 1
         prep_sql sql_str, 2
         prep_sql sql_str, 3
+        prep_in_sql sql_in_str, [1, 2, 3, 10]
 
         def setPrepareStmtArgs = {stmt, user_id, date, datev2, datetimev2_1, 
datetimev2_2, city, age, sex ->
             java.text.SimpleDateFormat formater = new 
java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS");
@@ -242,6 +258,39 @@ suite("test_rowstore", "p0,nonConcurrent") {
             stmt.setInt(7, age)
             stmt.setInt(8, sex)
         }
+        def setInPrepareStmtArgs = {stmt, user_id_list, date_list, datev2_list,
+                                    datetimev2_1_list, datetimev2_2_list, 
city_list, age_list, sex_list ->
+            java.text.SimpleDateFormat formater = new 
java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS");
+            def total_size = user_id_list.size() + date_list.size() + 
datev2_list.size() +
+                             datetimev2_1_list.size() + 
datetimev2_2_list.size() + city_list.size() +
+                             age_list.size() + sex_list.size()
+            def idx = 0
+            for (int i = 0; i < user_id_list.size(); i++) {
+                stmt.setInt(++idx, user_id_list[i])
+            }
+            for (int i = 0; i < date_list.size(); i++) {
+                stmt.setDate(++idx, java.sql.Date.valueOf(date_list[i]))
+            }
+            for (int i = 0; i < datev2_list.size(); i++) {
+                stmt.setDate(++idx, java.sql.Date.valueOf(datev2_list[i]))
+            }
+            for (int i = 0; i < datetimev2_1_list.size(); i++) {
+                stmt.setTimestamp(++idx, new 
java.sql.Timestamp(formater.parse(datetimev2_1_list[i]).getTime()))
+            }
+            for (int i = 0; i < datetimev2_2_list.size(); i++) {
+                stmt.setTimestamp(++idx, new 
java.sql.Timestamp(formater.parse(datetimev2_2_list[i]).getTime()))
+            }
+            for (int i = 0; i < city_list.size(); i++) {
+                stmt.setString(++idx, city_list[i])
+            }
+            for (int i = 0; i < age_list.size(); i++) {
+                stmt.setInt(++idx, age_list[i])
+            }
+            for (int i = 0; i < sex_list.size(); i++) {
+                stmt.setInt(++idx, sex_list[i])
+            }
+            assertEquals(idx, total_size)
+        }
 
         def stmt = prepareStatement """ SELECT 
datetimev2_1,datetime_val1,datetime_val2,max_dwell_time FROM 
table_with_column_group_xxx t where user_id = ? and date = ? and datev2 = ? and 
datetimev2_1 = ? and datetimev2_2 = ? and city = ? and age = ? and sex = ?; """
         setPrepareStmtArgs stmt, 1, '2017-10-01', '2017-10-01', '2017-10-01 
11:11:11.21', '2017-10-01 11:11:11.11', 'Beijing', 10, 1
@@ -260,6 +309,27 @@ suite("test_rowstore", "p0,nonConcurrent") {
         qe_point_select stmt
         setPrepareStmtArgs stmt, 4, '2017-10-01', '2017-10-01', '2017-10-01 
11:11:11.28', '2017-10-01 11:11:11.18', 'Beijing', 10, 1
         qe_point_select stmt
+
+        def in_stmt = prepareStatement """ 
+            SELECT 
+                datetimev2_1,datetime_val1,datetime_val2,max_dwell_time 
+            FROM 
+                table_with_column_group_xxx t 
+            WHERE 
+                user_id IN (?, ?, ?, ?, ?) AND 
+                date IN (?, ?) AND 
+                datev2 IN (?, ?) AND 
+                datetimev2_1 IN (?, ?, ?, ?, ?, ?, ?, ?) AND 
+                datetimev2_2 IN (?, ?, ?, ?, ?, ?, ?, ?) AND 
+                city IN (?, ?) AND 
+                age IN (?, ?, ?, ?, ?) AND 
+                sex IN (?, ?); 
+        """
+        setInPrepareStmtArgs in_stmt , [1, 2, 3, 4, 5], ['2017-10-01', 
'2017-10-02'], ['2017-10-01', '2017-10-03'], 
+            ['2017-10-01 11:11:11.21', '2017-10-01 11:11:11.22', '2017-10-01 
11:11:11.23', '2017-10-01 11:11:11.24', '2017-10-01 11:11:11.25', '2017-10-01 
11:11:11.26', '2017-10-01 11:11:11.27', '2017-10-01 11:11:11.28'], 
+            ['2017-10-01 11:11:11.11', '2017-10-01 11:11:11.12', '2017-10-01 
11:11:11.13', '2017-10-01 11:11:11.14', '2017-10-01 11:11:11.15', '2017-10-01 
11:11:11.16', '2017-10-01 11:11:11.17', '2017-10-01 11:11:11.18'], 
+            ['Beijing', 'Shanghai'], [10, 11, 12, 13, 14], [0, 1]
+        order_qe_point_in_select in_stmt
     }
 
     sql "DROP TABLE IF EXISTS table_with_column_group4"
diff --git a/regression-test/suites/point_query_p0/test_rowstore_query.groovy 
b/regression-test/suites/point_query_p0/test_rowstore_query.groovy
index db5f74f3f61..bd4d51ab29b 100644
--- a/regression-test/suites/point_query_p0/test_rowstore_query.groovy
+++ b/regression-test/suites/point_query_p0/test_rowstore_query.groovy
@@ -39,5 +39,6 @@ suite("test_rowstore", "p0") {
     sql """insert into ${tableName} values (1, 'abc', 1111919.12345678919)"""
     qt_sql """select * from ${tableName}"""
     sql """insert into ${tableName} values (2, 'def', 1111919.12345678919)"""
-    qt_sql """select * from ${tableName} where k1 = 2"""
+    qt_point_sql """select * from ${tableName} where k1 = 2"""
+    qt_point_in_sql """select * from ${tableName} where k1 in (1, 2, 3, 4, 
5)"""
 }
\ No newline at end of file
diff --git 
a/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy 
b/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy
index 54ec1efa4b3..e6c161a1a56 100644
--- a/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy
+++ b/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy
@@ -190,5 +190,44 @@ suite("test_prepared_stmt_in_list", "nonConcurrent") {
         stmt_read10.setString(4, '5022-01-01 11:30:38')
         stmt_read10.setString(5, '6022-01-01 11:30:38')
         qe_stmt_read10_2 stmt_read10
+
+        table_name = "tbl_prepared_stmt_in_list2"
+        sql """DROP TABLE IF EXISTS ${tableName} """
+        sql """
+             CREATE TABLE IF NOT EXISTS ${tableName} (
+                     `k1` tinyint NULL COMMENT "",
+                     `k2` smallint NULL COMMENT "",
+                     `k3` int NULL COMMENT ""
+                   ) ENGINE=OLAP
+                   UNIQUE KEY(`k1`, `k2`)
+                   DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 16
+                   PROPERTIES (
+                   "replication_allocation" = "tag.location.default: 1",
+                   "light_schema_change" = "true",
+                   "storage_format" = "V2",
+                   "store_row_column" = "true"
+                   )
+               """
+
+        sql """ INSERT INTO ${tableName} VALUES(1, 1300, 55356821) """
+        sql """ INSERT INTO ${tableName} VALUES(2, 1301, 56052706) """
+        sql """ INSERT INTO ${tableName} VALUES(3, 1302, 55702967) """
+        sql """ INSERT INTO ${tableName} VALUES(4, 1303, 56054326) """
+        sql """ INSERT INTO ${tableName} VALUES(5, 1304, 36548425) """
+        sql """ INSERT INTO ${tableName} VALUES(6, 1305, 56054803) """
+        sql """ INSERT INTO ${tableName} VALUES(7, 1306, 56055112) """
+        sql """sync"""
+
+        def stmt_read11 = prepareStatement "select * from ${tableName} WHERE 
`k1` IN (?, ?, ?, ?, ?) AND `k2` IN (?, ?, ?, ?)"
+        stmt_read11.setByte(1, (byte) 1)
+        stmt_read11.setByte(2, (byte) 2)
+        stmt_read11.setByte(3, (byte) 3)
+        stmt_read11.setByte(4, (byte) 4)
+        stmt_read11.setByte(5, (byte) 5)
+        stmt_read11.setShort(6, (short) 1300)
+        stmt_read11.setShort(7, (short) 1301)
+        stmt_read11.setShort(8, (short) 1302)
+        stmt_read11.setShort(9, (short) 1303)
+        order_qe_stmt_read11_1 stmt_read11
    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to