This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch 
feature/mor_value_predicate_pushdown_control
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 89d3715b3aa8180f724a0eae14049942a6af29d6
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sun Feb 8 19:57:23 2026 -0800

    [feature](scan) Add read_mor_as_dup_tables session variable to read MOR 
tables without merge
    
    Add a per-table session variable that allows reading MOR (Merge-On-Read)
    UNIQUE tables as DUP tables, skipping storage engine merge and delete
    sign filter while still honoring delete predicates.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 be/src/pipeline/exec/olap_scan_operator.cpp        |   8 +-
 be/src/pipeline/exec/olap_scan_operator.h          |   1 +
 be/src/vec/exec/scan/olap_scanner.cpp              |   5 +-
 .../doris/nereids/rules/analysis/BindRelation.java |   5 +-
 .../nereids/rules/rewrite/SetPreAggStatus.java     |   9 +-
 .../trees/plans/logical/LogicalOlapScan.java       |   7 +
 .../org/apache/doris/planner/OlapScanNode.java     |   4 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  28 +++
 .../nereids/rules/analysis/ReadMorAsDupTest.java   | 280 +++++++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 .../unique_with_mor_p0/test_read_mor_as_dup.groovy | 119 +++++++++
 11 files changed, 464 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 07c0522a4b5..b19f0d23cfe 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -456,7 +456,13 @@ bool OlapScanLocalState::_storage_no_merge() {
     return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS ||
             (p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
              p._olap_scan_node.__isset.enable_unique_key_merge_on_write &&
-             p._olap_scan_node.enable_unique_key_merge_on_write));
+             p._olap_scan_node.enable_unique_key_merge_on_write)) ||
+           _read_mor_as_dup();
+}
+
+bool OlapScanLocalState::_read_mor_as_dup() {
+    auto& p = _parent->cast<OlapScanOperatorX>();
+    return p._olap_scan_node.__isset.read_mor_as_dup && 
p._olap_scan_node.read_mor_as_dup;
 }
 
 Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* 
scanners) {
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 0577dd1ff2d..6517b13f271 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -105,6 +105,7 @@ private:
 
     bool _storage_no_merge() override;
 
+    bool _read_mor_as_dup();
     bool _push_down_topn(const vectorized::RuntimePredicate& predicate) 
override {
         if (!predicate.target_is_slot(_parent->node_id())) {
             return false;
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp 
b/be/src/vec/exec/scan/olap_scanner.cpp
index 7b577bdb629..2bd7b0f3269 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -334,7 +334,10 @@ Status OlapScanner::_init_tablet_reader_params(
     // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
     const bool single_version = _tablet_reader_params.has_single_version();
 
-    if (_state->skip_storage_engine_merge()) {
+    auto* olap_local_state = 
static_cast<pipeline::OlapScanLocalState*>(_local_state);
+    bool read_mor_as_dup = 
olap_local_state->olap_scan_node().__isset.read_mor_as_dup &&
+                           olap_local_state->olap_scan_node().read_mor_as_dup;
+    if (_state->skip_storage_engine_merge() || read_mor_as_dup) {
         _tablet_reader_params.direct_mode = true;
         _tablet_reader_params.aggregation = true;
     } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 2d11e1c75db..81a54f8c67e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -371,7 +371,10 @@ public class BindRelation extends OneAnalysisRuleFactory {
     public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan 
scan, ConnectContext connectContext,
             OlapTable olapTable) {
         if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
-                && !connectContext.getSessionVariable().skipDeleteSign()) {
+                && !connectContext.getSessionVariable().skipDeleteSign()
+                && !(olapTable.isMorTable()
+                    && 
connectContext.getSessionVariable().isReadMorAsDupEnabled(
+                        olapTable.getQualifiedDbName(), olapTable.getName()))) 
{
             // table qualifier is catalog.db.table, we make db.table.column
             Slot deleteSlot = null;
             for (Slot slot : scan.getOutput()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
index 2f7743cb86f..7ae49fc6ec8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
@@ -53,6 +53,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -151,7 +152,13 @@ public class SetPreAggStatus extends 
DefaultPlanRewriter<Stack<SetPreAggStatus.P
             long selectIndexId = logicalOlapScan.getSelectedIndexId();
             MaterializedIndexMeta meta = 
logicalOlapScan.getTable().getIndexMetaByIndexId(selectIndexId);
             if (meta.getKeysType() == KeysType.DUP_KEYS || (meta.getKeysType() 
== KeysType.UNIQUE_KEYS
-                    && 
logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())) {
+                    && 
logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())
+                    || (meta.getKeysType() == KeysType.UNIQUE_KEYS
+                        && logicalOlapScan.getTable().isMorTable()
+                        && ConnectContext.get() != null
+                        && 
ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
+                            logicalOlapScan.getTable().getQualifiedDbName(),
+                            logicalOlapScan.getTable().getName()))) {
                 return logicalOlapScan.withPreAggStatus(PreAggStatus.on());
             } else {
                 if (context.empty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
index bbcc44a38fc..d03a7d1b35b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
@@ -769,6 +769,13 @@ public class LogicalOlapScan extends 
LogicalCatalogRelation implements OlapScan,
                     && getTable().getKeysType() == KeysType.UNIQUE_KEYS) {
                 return;
             }
+            // When readMorAsDup is enabled, MOR tables are read as DUP, so 
uniqueness cannot be guaranteed.
+            if (getTable().getKeysType() == KeysType.UNIQUE_KEYS
+                    && getTable().isMorTable()
+                    && 
ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
+                        getTable().getQualifiedDbName(), 
getTable().getName())) {
+                return;
+            }
             ImmutableSet.Builder<Slot> uniqSlots = 
ImmutableSet.builderWithExpectedSize(outputSet.size());
             for (Slot slot : outputSet) {
                 if (!(slot instanceof SlotReference)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 9d4b6fddfc5..48614966468 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1197,6 +1197,10 @@ public class OlapScanNode extends ScanNode {
             boolean enabled = ConnectContext.get().getSessionVariable()
                     .isMorValuePredicatePushdownEnabled(dbName, tblName);
             msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled);
+            if (ConnectContext.get().getSessionVariable()
+                    .isReadMorAsDupEnabled(dbName, tblName)) {
+                msg.olap_scan_node.setReadMorAsDup(true);
+            }
         }
 
         msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a6e581199d5..42aef65243d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -735,6 +735,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES
             = "enable_mor_value_predicate_pushdown_tables";
 
+    public static final String READ_MOR_AS_DUP_TABLES = 
"read_mor_as_dup_tables";
+
     // When set use fix replica = true, the fixed replica maybe bad, try to 
use the health one if
     // this session variable is set to true.
     public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = 
"fallback_other_replica_when_fixed_corrupt";
@@ -2232,6 +2234,14 @@ public class SessionVariable implements Serializable, 
Writable {
                 + "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
     public String enableMorValuePredicatePushdownTables = "";
 
+    // Comma-separated list of MOR tables to read as DUP (skip merge, skip 
delete sign filter).
+    @VariableMgr.VarAttr(name = READ_MOR_AS_DUP_TABLES, needForward = true,
+            affectQueryResultInPlan = true, description = {
+                    "指定以DUP模式读取MOR表的表列表(跳过合并和删除标记过滤),格式:db1.tbl1,db2.tbl2 或 * 
表示所有MOR表。",
+                    "Comma-separated list of MOR tables to read as DUP (skip 
merge, skip delete sign filter). "
+                            + "Format: db1.tbl1,db2.tbl2 or * for all MOR 
tables."})
+    public String readMorAsDupTables = "";
+
     // Whether drop table when create table as select insert data appear error.
     @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
     public boolean dropTableIfCtasFailed = true;
@@ -4809,6 +4819,24 @@ public class SessionVariable implements Serializable, 
Writable {
         return false;
     }
 
+    public boolean isReadMorAsDupEnabled(String dbName, String tableName) {
+        if (readMorAsDupTables == null || readMorAsDupTables.isEmpty()) {
+            return false;
+        }
+        String trimmed = readMorAsDupTables.trim();
+        if ("*".equals(trimmed)) {
+            return true;
+        }
+        String fullName = dbName + "." + tableName;
+        for (String table : trimmed.split(",")) {
+            if (table.trim().equalsIgnoreCase(fullName)
+                    || table.trim().equalsIgnoreCase(tableName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /** canUseNereidsDistributePlanner */
     public static boolean canUseNereidsDistributePlanner() {
         ConnectContext connectContext = ConnectContext.get();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
new file mode 100644
index 00000000000..579ae3c527f
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.nereids.pattern.GeneratedPlanPatterns;
+import org.apache.doris.nereids.rules.RulePromise;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+class ReadMorAsDupTest extends TestWithFeService implements 
GeneratedPlanPatterns {
+    private static final String DB = "test_read_mor_as_dup_db";
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase(DB);
+        connectContext.setDatabase(DEFAULT_CLUSTER_PREFIX + DB);
+
+        // MOR UNIQUE table (enable_unique_key_merge_on_write = false)
+        createTable("CREATE TABLE " + DB + ".mor_tbl (\n"
+                + "  k INT NOT NULL,\n"
+                + "  v1 INT,\n"
+                + "  v2 VARCHAR(100)\n"
+                + ") ENGINE=OLAP\n"
+                + "UNIQUE KEY(k)\n"
+                + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1', 
'enable_unique_key_merge_on_write' = 'false');");
+
+        // MOW UNIQUE table (should not be affected)
+        createTable("CREATE TABLE " + DB + ".mow_tbl (\n"
+                + "  k INT NOT NULL,\n"
+                + "  v1 INT\n"
+                + ") ENGINE=OLAP\n"
+                + "UNIQUE KEY(k)\n"
+                + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1', 
'enable_unique_key_merge_on_write' = 'true');");
+
+        // DUP table (should not be affected)
+        createTable("CREATE TABLE " + DB + ".dup_tbl (\n"
+                + "  k INT NOT NULL,\n"
+                + "  v1 INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(k)\n"
+                + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1');");
+
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+    }
+
+    @Test
+    void testDeleteSignFilterPresentByDefault() {
+        connectContext.getSessionVariable().readMorAsDupTables = "";
+
+        PlanChecker.from(connectContext)
+                .analyze("select * from mor_tbl")
+                .rewrite()
+                .matches(
+                        logicalProject(
+                                logicalFilter(
+                                        logicalOlapScan()
+                                ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                        )
+                );
+    }
+
+    @Test
+    void testDeleteSignFilterSkippedWithWildcard() {
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+        try {
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testDeleteSignFilterSkippedWithTableName() {
+        connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl";
+
+        try {
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testDeleteSignFilterSkippedWithDbTableName() {
+        connectContext.getSessionVariable().readMorAsDupTables =
+                DEFAULT_CLUSTER_PREFIX + DB + ".mor_tbl";
+
+        try {
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testMowTableNotAffected() {
+        // MOW table should still have delete sign filter even with 
read_mor_as_dup = *
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+        try {
+            // MOW tables also have delete sign filter; read_mor_as_dup should 
NOT remove it
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mow_tbl")
+                    .rewrite()
+                    .matches(
+                            logicalProject(
+                                    logicalFilter(
+                                            logicalOlapScan()
+                                    ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                            )
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testPreAggOnForMorWithReadAsDup() {
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+        try {
+            Plan plan = PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .getPlan();
+
+            // Find the LogicalOlapScan and verify preAggStatus is ON
+            LogicalOlapScan scan = findOlapScan(plan);
+            Assertions.assertNotNull(scan, "Should find LogicalOlapScan in 
plan");
+            Assertions.assertTrue(scan.getPreAggStatus().isOn(),
+                    "PreAggStatus should be ON when read_mor_as_dup is 
enabled");
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testPerTableControlOnlyAffectsSpecifiedTable() {
+        // Only enable for mor_tbl, not for other tables
+        connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl";
+
+        try {
+            // mor_tbl should NOT have delete sign filter
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+
+            // dup_tbl should be unaffected (no delete sign filter for DUP)
+            PlanChecker.from(connectContext)
+                    .analyze("select * from dup_tbl")
+                    .rewrite()
+                    .matches(logicalOlapScan());
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testSessionVariableHelperMethod() {
+        // Test the isReadMorAsDupEnabled helper directly
+        connectContext.getSessionVariable().readMorAsDupTables = "";
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl"));
+
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl"));
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("any_db", "any_tbl"));
+
+        connectContext.getSessionVariable().readMorAsDupTables = "mydb.mytbl";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl"));
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("otherdb", 
"othertbl"));
+        // "mydb.mytbl" entry only matches full name or exact table name 
"mydb.mytbl"
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("anything", "mytbl"));
+
+        // Table name only (no db prefix) matches any db
+        connectContext.getSessionVariable().readMorAsDupTables = "mytbl";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "mytbl"));
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "othertbl"));
+
+        connectContext.getSessionVariable().readMorAsDupTables = 
"db1.tbl1,db2.tbl2";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl1"));
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db2", "tbl2"));
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl2"));
+
+        // Case insensitive
+        connectContext.getSessionVariable().readMorAsDupTables = "MyDB.MyTbl";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl"));
+
+        // Cleanup
+        connectContext.getSessionVariable().readMorAsDupTables = "";
+    }
+
+    private boolean hasDeleteSignPredicate(Set<Expression> conjuncts) {
+        return conjuncts.stream()
+                .anyMatch(expr -> expr.toSql().contains(Column.DELETE_SIGN));
+    }
+
+    private LogicalOlapScan findOlapScan(Plan plan) {
+        if (plan instanceof LogicalOlapScan) {
+            return (LogicalOlapScan) plan;
+        }
+        for (Plan child : plan.children()) {
+            LogicalOlapScan result = findOlapScan(child);
+            if (result != null) {
+                return result;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public RulePromise defaultPromise() {
+        return RulePromise.REWRITE;
+    }
+}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 7fcb1912c67..9c6320c86e8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -896,6 +896,8 @@ struct TOlapScanNode {
   23: optional TScoreRangeInfo score_range_info
   // Enable value predicate pushdown for MOR tables
   24: optional bool enable_mor_value_predicate_pushdown
+  // Read MOR table as DUP table: skip merge, skip delete sign
+  25: optional bool read_mor_as_dup
 }
 
 struct TEqJoinCondition {
diff --git 
a/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy 
b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy
new file mode 100644
index 00000000000..aada4292ff9
--- /dev/null
+++ b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_read_mor_as_dup") {
+    def tableName = "test_read_mor_as_dup_tbl"
+    def tableName2 = "test_read_mor_as_dup_tbl2"
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql "DROP TABLE IF EXISTS ${tableName2}"
+
+    // Create a MOR (Merge-On-Read) UNIQUE table
+    sql """
+        CREATE TABLE ${tableName} (
+            `k` int NOT NULL,
+            `v1` int NULL,
+            `v2` varchar(100) NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`k`)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"
+        );
+    """
+
+    // Create a second MOR table for per-table control test
+    sql """
+        CREATE TABLE ${tableName2} (
+            `k` int NOT NULL,
+            `v1` int NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`k`)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"
+        );
+    """
+
+    // Insert multiple versions of the same key
+    sql "INSERT INTO ${tableName} VALUES (1, 10, 'first');"
+    sql "INSERT INTO ${tableName} VALUES (1, 20, 'second');"
+    sql "INSERT INTO ${tableName} VALUES (1, 30, 'third');"
+    sql "INSERT INTO ${tableName} VALUES (2, 100, 'only_version');"
+
+    // Insert a row and then delete it
+    sql "INSERT INTO ${tableName} VALUES (3, 50, 'to_be_deleted');"
+    sql "DELETE FROM ${tableName} WHERE k = 3;"
+
+    // Insert into second table
+    sql "INSERT INTO ${tableName2} VALUES (1, 10);"
+    sql "INSERT INTO ${tableName2} VALUES (1, 20);"
+
+    // === Test 1: Normal query returns merged result ===
+    sql "SET read_mor_as_dup_tables = '';"
+    def normalResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
+    // Should see only latest version per key: (1,30,'third'), 
(2,100,'only_version')
+    // Key 3 was deleted, should not appear
+    assertTrue(normalResult.size() == 2, "Normal query should return 2 rows 
(merged), got ${normalResult.size()}")
+
+    // === Test 2: Wildcard — read all MOR tables as DUP ===
+    sql "SET read_mor_as_dup_tables = '*';"
+    def dupResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
+    // Should see all row versions for key 1 (3 versions) + key 2 (1 version)
+    // Key 3: delete predicates are still applied, so deleted row should still 
be filtered
+    // But the delete sign filter is skipped, so we may see it depending on 
how delete was done.
+    // For MOR tables, DELETE adds a delete predicate in rowsets, which IS 
still honored.
+    assertTrue(dupResult.size() >= 4, "DUP mode should return at least 4 rows 
(all versions), got ${dupResult.size()}")
+
+    // Verify key 1 has multiple versions
+    def key1Rows = dupResult.findAll { it[0] == 1 }
+    assertTrue(key1Rows.size() == 3, "Key 1 should have 3 versions in DUP 
mode, got ${key1Rows.size()}")
+
+    // === Test 3: Per-table control with db.table format ===
+    def dbName = sql "SELECT DATABASE();"
+    def currentDb = dbName[0][0]
+
+    sql "SET read_mor_as_dup_tables = '${currentDb}.${tableName}';"
+
+    // tableName should be in DUP mode
+    def perTableResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
+    assertTrue(perTableResult.size() >= 4, "Per-table DUP mode should return 
all versions, got ${perTableResult.size()}")
+
+    // tableName2 should still be in normal (merged) mode
+    def table2Result = sql "SELECT * FROM ${tableName2} ORDER BY k;"
+    assertTrue(table2Result.size() == 1, "Table2 should still be merged (1 
row), got ${table2Result.size()}")
+
+    // === Test 4: Per-table control with just table name ===
+    sql "SET read_mor_as_dup_tables = '${tableName2}';"
+
+    // tableName should now be in normal mode
+    def revertedResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
+    assertTrue(revertedResult.size() == 2, "Table1 should be merged again (2 
rows), got ${revertedResult.size()}")
+
+    // tableName2 should be in DUP mode
+    def table2DupResult = sql "SELECT * FROM ${tableName2} ORDER BY k, v1;"
+    assertTrue(table2DupResult.size() == 2, "Table2 in DUP mode should return 
2 rows, got ${table2DupResult.size()}")
+
+    // === Cleanup ===
+    sql "SET read_mor_as_dup_tables = '';"
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql "DROP TABLE IF EXISTS ${tableName2}"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to