This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5be3b92690c branch-3.1: [feat](paimon) Support paimon time travel and 
branch/tag #53327 (#54975)
5be3b92690c is described below

commit 5be3b92690c07627f65c25dd9b5884cd33b3efc4
Author: Petrichor <[email protected]>
AuthorDate: Wed Aug 20 08:09:25 2025 +0800

    branch-3.1: [feat](paimon) Support paimon time travel and branch/tag #53327 
(#54975)
    
    bp #53327
---
 .../create_preinstalled_scripts/paimon/run09.sql   |  88 +++++++
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |   2 +-
 .../org/apache/doris/analysis/TableScanParams.java |   1 +
 .../datasource/paimon/PaimonExternalCatalog.java   |  38 +--
 .../apache/doris/datasource/paimon/PaimonUtil.java | 291 +++++++++++++++++++--
 .../datasource/paimon/source/PaimonScanNode.java   |  48 +++-
 .../tablefunction/PaimonTableValuedFunction.java   |   4 +-
 .../paimon/paimon_time_travel.out                  | Bin 0 -> 40401 bytes
 .../paimon/paimon_incr_read.groovy                 |   4 -
 .../paimon/paimon_time_travel.groovy               | 278 ++++++++++++++++++++
 10 files changed, 693 insertions(+), 61 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
new file mode 100644
index 00000000000..058bbfd7e19
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
@@ -0,0 +1,88 @@
+use paimon;
+
+create database if not exists test_paimon_time_travel_db;
+
+use test_paimon_time_travel_db;
+drop table if exists tbl_time_travel;
+CREATE TABLE test_paimon_time_travel_db.tbl_time_travel (
+  order_id BIGINT NOT NULL ,
+  customer_id BIGINT NOT NULL,
+  order_date DATE,
+  total_amount DECIMAL(12,2),
+  shipping_address STRING,
+  order_status STRING,
+  is_paid BOOLEAN)
+USING paimon
+TBLPROPERTIES (
+  'bucket' = '3',
+  'primary-key' = 'order_id',
+  'file.format' = 'parquet'
+ );
+
+ -- insert into data snapshotid 1
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(1001, 2001, '2024-01-15', 299.99, '123 Maple Street, Springfield, IL 62701', 
'COMPLETED', true),
+(1002, 2002, '2024-01-16', 156.50, '456 Oak Avenue, Riverside, CA 92507', 
'PROCESSING', false),
+(1003, 2003, '2024-01-17', 89.00, '789 Pine Boulevard, Greenfield, TX 75001', 
'SHIPPED', true);
+
+-- insert into data snpashotid 2
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(2001, 3001, '2024-01-18', 445.75, '321 Cedar Lane, Millbrook, NY 12545', 
'PENDING', false),
+(2002, 3002, '2024-01-19', 67.25, '654 Birch Drive, Lakewood, CO 80226', 
'COMPLETED', true),
+(2003, 3003, '2024-01-20', 188.90, '987 Elm Court, Fairview, OR 97024', 
'CANCELLED', false);
+
+-- insert into data snpashotid 3
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(3001, 4001, '2024-01-21', 325.40, '159 Willow Street, Brookdale, FL 33602', 
'SHIPPED', true),
+(3002, 4002, '2024-01-22', 99.85, '753 Aspen Road, Clearwater, WA 98012', 
'PROCESSING', true),
+(3003, 4003, '2024-01-23', 512.30, '264 Chestnut Avenue, Westfield, MI 48097', 
'COMPLETED', false);
+
+--   create a tag based on the latest snapshot id, run the following sql
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag 
=> 't_1',snapshot => 1);
+--  create a tag based on the  snapshot id 1, run the following sql
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag 
=> 't_2', snapshot => 2);
+--  create a tag based on the  snapshot id 2, run the following sql
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag 
=> 't_3', snapshot => 3);
+
+-- insert into data snpashotid 4
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(5001, 6001, '2024-01-24', 278.60, '842 Hickory Lane, Stonewood, GA 30309', 
'PENDING', true),
+(5002, 6002, '2024-01-25', 134.75, '417 Poplar Street, Ridgefield, NV 89109', 
'SHIPPED', false),
+(5003, 6003, '2024-01-26', 389.20, '695 Sycamore Drive, Maplewood, AZ 85001', 
'COMPLETED', true);
+
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag 
=> 't_4', snapshot => 4);
+
+
+-- create branch 1
+CALL sys.create_branch('test_paimon_time_travel_db.tbl_time_travel', 'b_1');
+-- create branch 2
+CALL sys.create_branch('test_paimon_time_travel_db.tbl_time_travel', 'b_2');
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_1` VALUES
+(10001, 7001, '2024-01-27', 156.30, '108 Magnolia Street, Riverside, KY 
40475', 'PROCESSING', true),
+(10002, 7002, '2024-01-28', 423.80, '572 Dogwood Avenue, Pleasantville, NJ 
08232', 'COMPLETED', false),
+(10003, 7003, '2024-01-29', 89.45, '946 Redwood Boulevard, Hillcrest, SC 
29526', 'SHIPPED', true),
+(10004, 7004, '2024-01-30', 267.90, '213 Spruce Lane, Oceanview, ME 04401', 
'PENDING', false),
+(10005, 7005, '2024-01-31', 345.15, '687 Walnut Drive, Parkside, VT 05672', 
'CANCELLED', true);
+
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_1` VALUES
+(10006, 7001, '2024-01-27', 156.30, '108 Magnolia Street, Riverside, KY 
40475', 'PROCESSING', true),
+(10007, 7002, '2024-01-28', 423.80, '572 Dogwood Avenue, Pleasantville, NJ 
08232', 'COMPLETED', false),
+(10008, 7003, '2024-01-29', 89.45, '946 Redwood Boulevard, Hillcrest, SC 
29526', 'SHIPPED', true),
+(10009, 7004, '2024-01-30', 267.90, '213 Spruce Lane, Oceanview, ME 04401', 
'PENDING', false),
+(10010, 7005, '2024-01-31', 345.15, '687 Walnut Drive, Parkside, VT 05672', 
'CANCELLED', true);
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES
+(20001, 8001, '2024-02-01', 198.75, '741 Rosewood Court, Summerville, AL 
35148', 'COMPLETED', true),
+(20002, 8002, '2024-02-02', 456.20, '329 Cypress Street, Brookhaven, MS 
39601', 'PROCESSING', false),
+(20003, 8003, '2024-02-03', 67.85, '852 Juniper Lane, Fairfield, OH 45014', 
'SHIPPED', true),
+(20004, 8004, '2024-02-04', 312.40, '614 Peach Avenue, Greenville, TN 37743', 
'PENDING', true),
+(20005, 8005, '2024-02-05', 129.90, '507 Cherry Drive, Lakeside, UT 84040', 
'CANCELLED', false);
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES
+(20006, 8006, '2024-02-06', 234.60, '183 Bluebell Street, Millfield, ID 
83262', 'COMPLETED', true),
+(20007, 8007, '2024-02-07', 378.45, '465 Lavender Avenue, Thorndale, WY 
82201', 'PROCESSING', false),
+(20008, 8008, '2024-02-08', 92.30, '729 Iris Lane, Riverside, MN 55987', 
'SHIPPED', true),
+(20009, 8009, '2024-02-09', 445.80, '856 Tulip Boulevard, Sunnydale, ND 
58301', 'PENDING', false),
+(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 
'CANCELLED', true);
\ No newline at end of file
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 354c026432a..9c51a8a353d 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -1774,7 +1774,7 @@ sampleMethod
 
 tableSnapshot
     : FOR VERSION AS OF version=(INTEGER_VALUE | STRING_LITERAL)
-    | FOR TIME AS OF time=STRING_LITERAL
+    | FOR TIME AS OF time=(STRING_LITERAL | INTEGER_VALUE)
     ;
 
 // this rule is used for explicitly capturing wrong identifiers such as 
test-table, which should actually be `test-table`
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
index ed306f6402d..77f00af9fa5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 public class TableScanParams {
+    public static final String PARAMS_NAME = "name";
     public static String INCREMENTAL_READ = "incr";
     public static String BRANCH = "branch";
     public static String TAG = "tag";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 3e58e654cda..7cd6fa0f2c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -128,17 +128,6 @@ public class PaimonExternalCatalog extends ExternalCatalog 
{
         }
     }
 
-    public org.apache.paimon.table.Table getPaimonTable(NameMapping 
nameMapping) {
-        makeSureInitialized();
-        try {
-            return executionAuthenticator.execute(() -> 
catalog.getTable(Identifier.create(nameMapping
-                    .getRemoteDbName(), nameMapping.getRemoteTblName())));
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to get Paimon table:" + 
getName() + "." + nameMapping.getLocalDbName()
-                    + "." + nameMapping.getLocalTblName() + ", because " + 
ExceptionUtils.getRootCauseMessage(e), e);
-        }
-    }
-
     public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
         makeSureInitialized();
         try {
@@ -159,24 +148,35 @@ public class PaimonExternalCatalog extends 
ExternalCatalog {
         }
     }
 
-    public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping 
nameMapping, String queryType) {
-        return getPaimonSystemTable(nameMapping, null, queryType);
+    public org.apache.paimon.table.Table getPaimonTable(NameMapping 
nameMapping) {
+        return getPaimonTable(nameMapping, null, null);
     }
 
-    public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping 
nameMapping,
-                                                              String branch, 
String queryType) {
+    public org.apache.paimon.table.Table getPaimonTable(NameMapping 
nameMapping, String branch,
+            String queryType) {
         makeSureInitialized();
         try {
-            return executionAuthenticator.execute(() -> catalog.getTable(new 
Identifier(nameMapping.getRemoteDbName(),
-                    nameMapping.getRemoteTblName(), branch, queryType)));
+            Identifier identifier;
+            if (branch != null && queryType != null) {
+                identifier = new Identifier(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName(),
+                        branch, queryType);
+            } else if (branch != null) {
+                identifier = new Identifier(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName(),
+                        branch);
+            } else if (queryType != null) {
+                identifier = new Identifier(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName(),
+                        "main", queryType);
+            } else {
+                identifier = new Identifier(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName());
+            }
+            return executionAuthenticator.execute(() -> 
catalog.getTable(identifier));
         } catch (Exception e) {
-            throw new RuntimeException("Failed to get Paimon system table:" + 
getName() + "."
+            throw new RuntimeException("Failed to get Paimon table:" + 
getName() + "."
                     + nameMapping.getRemoteDbName() + "." + 
nameMapping.getRemoteTblName() + "$" + queryType
                     + ", because " + ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
-
     protected Catalog createCatalog() {
         try {
             return paimonProperties.initializeCatalog(getName(), new 
ArrayList<>(catalogProperty
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index 610cde051d8..efa899acfda 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -18,6 +18,8 @@
 package org.apache.doris.datasource.paimon;
 
 import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
@@ -25,7 +27,10 @@ import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.hive.HiveUtil;
+import org.apache.doris.datasource.paimon.source.PaimonSource;
 import org.apache.doris.thrift.TColumnType;
 import org.apache.doris.thrift.TPrimitiveType;
 import org.apache.doris.thrift.schema.external.TArrayField;
@@ -42,6 +47,8 @@ import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.StartupMode;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.options.ConfigOption;
@@ -49,6 +56,7 @@ import org.apache.paimon.partition.Partition;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.ArrayType;
@@ -66,16 +74,35 @@ import org.apache.paimon.utils.Projection;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 public class PaimonUtil {
     private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
     private static final Base64.Encoder BASE64_ENCODER = 
java.util.Base64.getUrlEncoder().withoutPadding();
+    private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+");
+
+    private static final List<ConfigOption<?>> 
PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS = Arrays.asList(
+            CoreOptions.SCAN_SNAPSHOT_ID,
+            CoreOptions.SCAN_TAG_NAME,
+            CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
+            CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
+            CoreOptions.INCREMENTAL_BETWEEN,
+            CoreOptions.INCREMENTAL_TO_AUTO_TAG);
+
+    private static final List<ConfigOption<?>> 
PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS = Arrays.asList(
+            CoreOptions.SCAN_TIMESTAMP_MILLIS,
+            CoreOptions.SCAN_TIMESTAMP,
+            CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
+            CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
+            CoreOptions.INCREMENTAL_BETWEEN,
+            CoreOptions.INCREMENTAL_TO_AUTO_TAG);
 
     public static List<InternalRow> read(
             Table table, @Nullable int[] projection, @Nullable Predicate 
predicate,
@@ -249,32 +276,6 @@ public class PaimonUtil {
         return paimonPrimitiveTypeToDorisType(type);
     }
 
-    public static List<Column> parseSchema(Table table) {
-        List<String> primaryKeys = table.primaryKeys();
-        return parseSchema(table.rowType(), primaryKeys);
-    }
-
-    public static List<Column> parseSchema(RowType rowType, List<String> 
primaryKeys) {
-        List<Column> resSchema = 
Lists.newArrayListWithCapacity(rowType.getFields().size());
-        rowType.getFields().forEach(field -> {
-            resSchema.add(new Column(field.name().toLowerCase(),
-                    PaimonUtil.paimonTypeToDorisType(field.type()), 
primaryKeys.contains(field.name()), null,
-                    field.type().isNullable(),
-                    field.description(), true,
-                    field.id()));
-        });
-        return resSchema;
-    }
-
-    public static <T> String encodeObjectToString(T t) {
-        try {
-            byte[] bytes = InstantiationUtil.serializeObject(t);
-            return new String(BASE64_ENCODER.encode(bytes), 
java.nio.charset.StandardCharsets.UTF_8);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     public static void updatePaimonColumnUniqueId(Column column, DataType 
dataType) {
         List<Column> columns = column.getChildren();
         switch (dataType.getTypeRoot()) {
@@ -376,4 +377,242 @@ public class PaimonUtil {
         tSchema.setRootField(getSchemaInfo(paimonTableSchema.fields()));
         return tSchema;
     }
+
+    public static List<Column> parseSchema(Table table) {
+        List<String> primaryKeys = table.primaryKeys();
+        return parseSchema(table.rowType(), primaryKeys);
+    }
+
+    public static List<Column> parseSchema(RowType rowType, List<String> 
primaryKeys) {
+        List<Column> resSchema = 
Lists.newArrayListWithCapacity(rowType.getFields().size());
+        rowType.getFields().forEach(field -> {
+            resSchema.add(new Column(field.name().toLowerCase(),
+                    PaimonUtil.paimonTypeToDorisType(field.type()),
+                    primaryKeys.contains(field.name()),
+                    null,
+                    field.type().isNullable(),
+                    field.description(),
+                    true,
+                    field.id()));
+        });
+        return resSchema;
+    }
+
+    public static <T> String encodeObjectToString(T t) {
+        try {
+            byte[] bytes = InstantiationUtil.serializeObject(t);
+            return new String(BASE64_ENCODER.encode(bytes), 
java.nio.charset.StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Builds a snapshot-specific table for time travel queries.
+     *
+     * @param baseTable the base Paimon table to copy configuration from
+     * @param tableSnapshot the snapshot specification (type + value)
+     * @return a Table instance configured for the specified time travel query
+     * @throws UserException if snapshot configuration is invalid
+     */
+    public static Table getTableBySnapshot(Table baseTable, TableSnapshot 
tableSnapshot)
+            throws UserException {
+        final String value = tableSnapshot.getValue();
+        final TableSnapshot.VersionType type = tableSnapshot.getType();
+        final boolean isDigital = DIGITAL_REGEX.matcher(value).matches();
+
+        switch (type) {
+            case TIME:
+                return isDigital
+                        ? getTableBySnapshotTimestampMillis(baseTable, value)
+                        : getTableBySnapshotTime(baseTable, value);
+
+            case VERSION:
+                if (isDigital) {
+                    return getTableBySnapshotId(baseTable, value);
+                }
+                return getTableByTag(baseTable, value);
+
+            default:
+                throw new UserException(String.format("Unsupported version 
type: %s", type));
+        }
+    }
+
+    /**
+     * Builds a table configured to read from a specific snapshot ID.
+     *
+     * @param baseTable the base Paimon table to copy configuration from
+     * @param snapshotId the snapshot ID as a string
+     * @return a Table instance configured to read from the specified snapshot 
ID
+     */
+    private static Table getTableBySnapshotId(Table baseTable, String 
snapshotId) {
+        Map<String, String> options = new HashMap<>(
+                PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);
+
+        // For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
+        // [scan_tag_name, scan_watermark, scan_snapshot_id]
+        options.put(CoreOptions.SCAN_TAG_NAME.key(), null);
+        options.put(CoreOptions.SCAN_WATERMARK.key(), null);
+        options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId);
+        
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));
+
+        return baseTable.copy(options);
+    }
+
+    /**
+     * Builds a table configured to read from a specific timestamp.
+     *
+     * @param baseTable the base Paimon table to copy configuration from
+     * @param timestampStr the timestamp as a string
+     * @return a Table instance configured to read from the specified timestamp
+     */
+    private static Table getTableBySnapshotTime(Table baseTable, String 
timestampStr) {
+        Map<String, String> options = new HashMap<>(
+                PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);
+
+        // For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
+        // [scan_timestamp, scan_timestamp_millis]
+        options.put(CoreOptions.SCAN_MODE.key(), 
StartupMode.FROM_TIMESTAMP.toString());
+        options.put(CoreOptions.SCAN_TIMESTAMP.key(), timestampStr);
+        options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
+        
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));
+
+        return baseTable.copy(options);
+    }
+
+    /**
+     * Builds a table configured to read from a specific timestamp in 
milliseconds.
+     *
+     * @param baseTable the base Paimon table to copy configuration from
+     * @param timestampStr the timestamp in milliseconds as a string
+     * @return a Table instance configured to read from the specified timestamp
+     */
+    private static Table getTableBySnapshotTimestampMillis(Table baseTable, 
String timestampStr) {
+        Map<String, String> options = new HashMap<>(
+                PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);
+
+        // For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
+        // [scan_timestamp, scan_timestamp_millis]
+        options.put(CoreOptions.SCAN_MODE.key(), 
StartupMode.FROM_TIMESTAMP.toString());
+        options.put(CoreOptions.SCAN_TIMESTAMP.key(), null);
+        options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), timestampStr);
+        
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));
+
+        return baseTable.copy(options);
+    }
+
+    /**
+     * Extracts the reference name (branch or tag name) from table scan 
parameters.
+     *
+     * @param scanParams the scan parameters containing reference name 
information
+     * @return the extracted reference name
+     * @throws IllegalArgumentException if the reference name is not properly 
specified
+     */
+    public static String extractBranchOrTagName(TableScanParams scanParams) {
+        if (!scanParams.getMapParams().isEmpty()) {
+            if 
(!scanParams.getMapParams().containsKey(TableScanParams.PARAMS_NAME)) {
+                throw new IllegalArgumentException("must contain key 'name' in 
params");
+            }
+            return scanParams.getMapParams().get(TableScanParams.PARAMS_NAME);
+        } else {
+            if (scanParams.getListParams().isEmpty() || 
scanParams.getListParams().get(0) == null) {
+                throw new IllegalArgumentException("must contain a branch/tag 
name in params");
+            }
+            return scanParams.getListParams().get(0);
+        }
+    }
+
+
+    /**
+     * Builds a branch-specific table for time travel queries.
+     *
+     * @param source the Paimon source containing catalog and table information
+     * @param baseTable the base Paimon table
+     * @param branchName the branch name
+     * @return a Table instance configured to read from the specified branch
+     * @throws UserException if branch does not exist
+     */
+    public static Table getTableByBranch(PaimonSource source, Table baseTable, 
String branchName) throws UserException {
+
+        if (!checkBranchExists(baseTable, branchName)) {
+            throw new UserException(String.format("Branch '%s' does not 
exist", branchName));
+        }
+
+        PaimonExternalCatalog catalog = (PaimonExternalCatalog) 
source.getCatalog();
+        ExternalTable externalTable = (ExternalTable) source.getTargetTable();
+        return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(), 
branchName, null);
+    }
+
+    /**
+     * Builds a tag-specific table for time travel queries.
+     *
+     * @param baseTable the base Paimon table to copy configuration from
+     * @param tagName the tag name
+     * @return a Table instance configured to read from the specified tag
+     * @throws UserException if tag does not exist
+     */
+    public static Table getTableByTag(Table baseTable, String tagName) throws 
UserException {
+        if (!checkTagsExists(baseTable, tagName)) {
+            throw new UserException(String.format("Tag '%s' does not exist", 
tagName));
+        }
+
+        Map<String, String> options = new HashMap<>(
+                PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);
+
+        // For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
+        // [scan_tag_name, scan_watermark, scan_snapshot_id]
+        options.put(CoreOptions.SCAN_TAG_NAME.key(), tagName);
+        options.put(CoreOptions.SCAN_WATERMARK.key(), null);
+        options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
+        
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));
+
+        return baseTable.copy(options);
+    }
+
+    /**
+     * Creates a map of conflicting Paimon options with null values for 
exclusion.
+     *
+     * @param illegalOptions the list of ConfigOptions that should be set to 
null
+     * @return a HashMap containing the illegal options as keys with null 
values
+     */
+    public static Map<String, String> 
excludePaimonConflictOptions(List<ConfigOption<?>> illegalOptions) {
+        return illegalOptions.stream()
+                .collect(HashMap::new,
+                        (m, option) -> m.put(option.key(), null),
+                        HashMap::putAll);
+    }
+
+    /**
+     * Checks if a tag exists in the given table.
+     *
+     * @param baseTable the Paimon table
+     * @param tagName the tag name to check
+     * @return true if tag exists, false otherwise
+     * @throws UserException if table is not a FileStoreTable
+     */
+    public static boolean checkTagsExists(Table baseTable, String tagName) 
throws UserException {
+        if (!(baseTable instanceof FileStoreTable)) {
+            throw new UserException("Table type should be FileStoreTable but 
got: " + baseTable.getClass().getName());
+        }
+
+        final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
+        return fileStoreTable.tagManager().tagExists(tagName);
+    }
+
+    /**
+     * Checks if a branch exists in the given table.
+     *
+     * @param baseTable the Paimon table
+     * @param branchName the branch name to check
+     * @return true if branch exists, false otherwise
+     * @throws UserException if table is not a FileStoreTable
+     */
+    public static boolean checkBranchExists(Table baseTable, String 
branchName) throws UserException {
+        if (!(baseTable instanceof FileStoreTable)) {
+            throw new UserException("Table type should be FileStoreTable but 
got: " + baseTable.getClass().getName());
+        }
+
+        final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
+        return fileStoreTable.branchManager().branchExists(branchName);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 5dc5eb02abe..1751d1733e8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.datasource.paimon.source;
 
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
@@ -374,16 +376,8 @@ public class PaimonScanNode extends FileQueryScanNode {
                                 .collect(Collectors.toList())
                                 .indexOf(slot.getColumn().getName()))
                 .toArray();
-        Table paimonTable = source.getPaimonTable();
 
-        if (getScanParams() != null && getQueryTableSnapshot() != null) {
-            throw new UserException("Can not specify scan params and table 
snapshot at same time.");
-        }
-        if (getQueryTableSnapshot() != null) {
-            throw new UserException("Paimon table does not support table 
snapshot query yet.");
-        }
-        Map<String, String> incrReadParams = getIncrReadParams();
-        paimonTable = paimonTable.copy(incrReadParams);
+        Table paimonTable = getProcessedTable();
         ReadBuilder readBuilder = paimonTable.newReadBuilder();
         return readBuilder.withFilter(predicates)
                 .withProjection(projected)
@@ -662,6 +656,42 @@ public class PaimonScanNode extends FileQueryScanNode {
 
         return paimonScanParams;
     }
+
+    /**
+     * Processes and returns the appropriate Paimon table object based on scan 
parameters or table snapshot.
+     * <p>
+     * This method handles different scan modes including incremental reads 
and system tables,
+     * applying the necessary transformations to the base Paimon table.
+     *
+     * @return processed Paimon table object configured according to scan 
parameters
+     * @throws UserException when system table configuration is incorrect
+     */
+    private Table getProcessedTable() throws UserException {
+        Table baseTable = source.getPaimonTable();
+        if (getScanParams() != null && getQueryTableSnapshot() != null) {
+            throw new UserException("Can not specify scan params and table 
snapshot at same time.");
+        }
+        TableScanParams theScanParams = getScanParams();
+        if (theScanParams != null) {
+            if (theScanParams.incrementalRead()) {
+                return baseTable.copy(getIncrReadParams());
+            }
+
+            if (theScanParams.isBranch()) {
+                return PaimonUtil.getTableByBranch(source, baseTable, 
PaimonUtil.extractBranchOrTagName(theScanParams));
+            }
+            if (theScanParams.isTag()) {
+                return PaimonUtil.getTableByTag(baseTable, 
PaimonUtil.extractBranchOrTagName(theScanParams));
+            }
+        }
+
+        TableSnapshot theTableSnapshot = getQueryTableSnapshot();
+        if (theTableSnapshot != null) {
+            return PaimonUtil.getTableBySnapshot(baseTable, theTableSnapshot);
+        }
+
+        return baseTable;
+    }
 }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
index 91c131603b6..6d5e1fba09d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
@@ -105,8 +105,8 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
         NameMapping buildNameMapping = externalTable.getOrBuildNameMapping();
         this.tblId = externalTable.getId();
 
-        this.paimonSysTable = 
paimonExternalCatalog.getPaimonSystemTable(buildNameMapping,
-                queryType);
+        this.paimonSysTable = 
paimonExternalCatalog.getPaimonTable(buildNameMapping,
+                "main", queryType);
         this.schema = PaimonUtil.parseSchema(paimonSysTable);
 
     }
diff --git 
a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out 
b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out
new file mode 100644
index 00000000000..3527cc4604e
Binary files /dev/null and 
b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out differ
diff --git 
a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy 
b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
index f32120f2ea7..bfc2457c6a3 100644
--- a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
+++ b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
@@ -96,10 +96,6 @@ suite("test_paimon_incr_read", 
"p0,external,doris,external_docker,external_docke
                 sql """select * from paimon_incr@incr('startSnapshotId'=1, 
'endSnapshotId'=2) for version as of 1"""
                 exception "Can not specify scan params and table snapshot"
             }
-            test {
-                sql """select * from paimon_incr for version as of 1"""
-                exception "Paimon table does not support table snapshot query 
yet"
-            }
         }
 
         test_incr_read("false")
diff --git 
a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy 
b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
new file mode 100644
index 00000000000..b6d3caddb8c
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.time.format.DateTimeFormatter
+import java.time.LocalDateTime
+import java.time.ZoneId
+
+
+
+suite("paimon_time_travel", 
"p0,external,doris,external_docker,external_docker_doris") {
+    logger.info("start paimon test")
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable paimon test.")
+        return
+    }
+    // Create date time formatter
+
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String catalog_name = "test_paimon_time_travel_catalog"
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    DateTimeFormatter iso_formatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS")
+    DateTimeFormatter standard_formatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+    String db_name = "test_paimon_time_travel_db"
+    String tableName = "tbl_time_travel"
+    try {
+        sql """drop catalog if exists ${catalog_name}"""
+
+        sql """
+                CREATE CATALOG ${catalog_name} PROPERTIES (
+                        'type' = 'paimon',
+                        'warehouse' = 's3://warehouse/wh',
+                        's3.endpoint' = 
'http://${externalEnvIp}:${minio_port}',
+                        's3.access_key' = 'admin',
+                        's3.secret_key' = 'password',
+                        's3.path.style.access' = 'true'
+                );
+            """
+        logger.info("catalog " + catalog_name + " created")
+        sql """switch `${catalog_name}`"""
+        logger.info("switched to catalog " + catalog_name)
+        sql """use ${db_name}"""
+        //system table snapshots to get create time.
+        List<List<Object>> snapshotRes = sql """ select 
snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id;"""
+        logger.info("Query result from ${tableName}\$snapshots: 
${snapshotRes}")
+        assertTrue(snapshotRes.size()==4)
+        assertTrue(snapshotRes[0].size()==2)
+
+        snapshotRes.eachWithIndex { snapshotRow, index ->
+            int snapshotId = snapshotRow[0] as int
+            String commitTime = snapshotRow[1] as String
+            String tagName = "t_${snapshotId}"
+
+            logger.info("Processing snapshot ${index + 1}: ID=${snapshotId}, 
commit_time=${commitTime}")
+
+            try {
+                LocalDateTime dateTime;
+                if (commitTime.contains("T")){
+                    dateTime = LocalDateTime.parse(commitTime, iso_formatter)
+                }else {
+                    dateTime = LocalDateTime.parse(commitTime, 
standard_formatter)
+                }
+
+                String snapshotTime = 
dateTime.atZone(ZoneId.systemDefault()).format(standard_formatter);
+                long timestamp = dateTime.atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli()
+
+                // Execute various types of time travel queries
+                String baseQueryName = "qt_time_travel_snapshot_${index + 1}"
+
+                // 1. Time travel by snapshot ID
+                "${baseQueryName}_version_count" """select count(*) from 
${tableName} FOR VERSION AS OF ${snapshotId} ;"""
+                "${baseQueryName}_version" """select * from ${tableName} FOR 
VERSION AS OF ${snapshotId} order by order_id;"""
+                "${baseQueryName}_version_select_columns" """select 
is_paid,total_amount,order_id,customer_id from ${tableName} FOR VERSION AS OF 
${snapshotId} order by order_id;"""
+                "${baseQueryName}_version_filter_order_status" """select * 
from ${tableName} FOR VERSION AS OF ${snapshotId} where order_status = 
'COMPLETED' order by order_id;"""
+                "${baseQueryName}_version_agg" """select 
+                                                    cast(MIN(total_amount) as 
decimal(9,3)) AS min_price,
+                                                    cast(MAX(total_amount) as 
decimal(9,3)) AS max_price,
+                                                    cast(AVG(total_amount) as 
decimal(9,3)) AS avg_price,
+                                                    COUNT(*) AS total_count,
+                                                    cast(SUM(total_amount) as 
decimal(9,3)) AS total_price from ${tableName} FOR VERSION AS OF ${snapshotId} 
;"""
+                "${baseQueryName}_version_group_by_is_paid" """select is_paid, 
count(*) as cnt from ${tableName} FOR VERSION AS OF ${snapshotId} GROUP BY 
is_paid order by cnt,is_paid;"""
+
+
+
+                // 2. Time travel by tag
+                "${baseQueryName}_tag_count" """select count(*) from 
${tableName} FOR VERSION AS OF '${tagName}';"""
+                "${baseQueryName}_tag" """select * from ${tableName} FOR 
VERSION AS OF '${tagName}' order by order_id;"""
+                "${baseQueryName}_tag_select_columns" """select 
is_paid,total_amount,order_id,customer_id from ${tableName} FOR VERSION AS OF 
'${tagName}' order by order_id;"""
+                "${baseQueryName}_tag_filter_order_status" """select * from 
${tableName} FOR VERSION AS OF '${tagName}' where order_status = 'COMPLETED' 
order by order_id;"""
+                "${baseQueryName}_tag_agg" """select 
+                                       cast(MIN(total_amount) as decimal(9,3)) 
AS min_price,
+                                       cast(MAX(total_amount) as decimal(9,3)) 
AS max_price,
+                                       cast(AVG(total_amount) as decimal(9,3)) 
AS avg_price,
+                                       COUNT(*) AS total_count,
+                                       cast(SUM(total_amount) as decimal(9,3)) 
AS total_price from ${tableName} FOR VERSION AS OF '${tagName}' ;"""
+                "${baseQueryName}_tag_group_by_is_paid" """select is_paid, 
count(*) as cnt from ${tableName} FOR VERSION AS OF '${tagName}' GROUP BY 
is_paid order by cnt,is_paid;"""
+
+                // 3. Time travel by time string
+                "${baseQueryName}_time_string_count" """select count(*) from 
${tableName} FOR TIME AS OF \"${snapshotTime}\" ;"""
+                "${baseQueryName}_time_string" """select * from ${tableName} 
FOR TIME AS OF \"${snapshotTime}\" order by order_id"""
+                "${baseQueryName}_time_string_select_columns" """select 
is_paid,total_amount,order_id,customer_id from ${tableName} FOR TIME AS OF 
\"${snapshotTime}\" order by order_id"""
+                "${baseQueryName}_time_string_filter_order_status" """select * 
from ${tableName} FOR TIME AS OF \"${snapshotTime}\" where order_status = 
'COMPLETED' order by order_id;"""
+                "${baseQueryName}_time_string_agg" """select 
+                                              cast(MIN(total_amount) as 
decimal(9,3)) AS min_price,
+                                              cast(MAX(total_amount) as 
decimal(9,3)) AS max_price,
+                                              cast(AVG(total_amount) as 
decimal(9,3)) AS avg_price,
+                                              COUNT(*) AS total_count,
+                                              cast(SUM(total_amount) as 
decimal(9,3)) AS total_price from ${tableName} FOR TIME AS OF 
\"${snapshotTime}\" ;"""
+                "${baseQueryName}_time_string_group_by_is_paid" """select 
is_paid, count(*) as cnt from ${tableName} FOR TIME AS OF \"${snapshotTime}\" 
GROUP BY is_paid order by cnt,is_paid;"""
+
+                // 4. Time travel by millisecond timestamp
+                "${baseQueryName}_time_millis_count" """select count(*) from 
${tableName} FOR TIME AS OF ${timestamp};"""
+                "${baseQueryName}_time_millis" """select * from ${tableName} 
FOR TIME AS OF ${timestamp} order by order_id"""
+                "${baseQueryName}_time_millis_select_columns" """select 
is_paid,total_amount,order_id,customer_id from ${tableName} FOR TIME AS OF 
${timestamp} order by order_id"""
+                "${baseQueryName}_time_millis_filter_order_status" """select * 
from ${tableName} FOR TIME AS OF ${timestamp} where order_status = 'COMPLETED' 
order by order_id;"""
+                "${baseQueryName}_time_millis_agg" """select 
+                                             cast(MIN(total_amount) as 
decimal(9,3)) AS min_price,
+                                             cast(MAX(total_amount) as 
decimal(9,3)) AS max_price,
+                                             cast(AVG(total_amount) as 
decimal(9,3)) AS avg_price,
+                                             COUNT(*) AS total_count,
+                                             cast(SUM(total_amount) as 
decimal(9,3)) AS total_price from ${tableName} FOR TIME AS OF ${timestamp} ;"""
+                "${baseQueryName}_time_millis_group_by_is_paid" """select 
is_paid, count(*) as cnt from ${tableName} FOR TIME AS OF ${timestamp} GROUP BY 
is_paid order by cnt,is_paid;"""
+
+
+                logger.info("Completed queries for snapshot ${snapshotId} with 
timestamp ${timestamp}")
+
+            } catch (Exception e) {
+                logger.error("Failed to process snapshot ${snapshotId}: 
${e.message}")
+                throw e
+            }
+        }
+
+        List<List<Object>> branchesResult = sql """ select branch_name from 
${tableName}\$branches order by branch_name;"""
+        logger.info("Query result from ${tableName}\$branches: 
${branchesResult}")
+        assertTrue(branchesResult.size()==2)
+        assertTrue(branchesResult[0].size()==1)
+
+        branchesResult.eachWithIndex { branchRow, index ->
+            String branchName = branchRow[0] as String
+            logger.info("Processing branch ${index + 1}: ${branchName}")
+            String baseQueryName = "qt_branch_${index + 1}"
+
+            try {
+                "${baseQueryName}_count_list" """select count(*) from 
${tableName}@branch(${branchName});"""
+                "${baseQueryName}_count_map" """select count(*) from 
${tableName}@branch(\"name\"="${branchName}");"""
+
+                "${baseQueryName}_count_list" """select * from 
${tableName}@branch(${branchName}) order by order_id;"""
+                "${baseQueryName}_count_map" """select * from 
${tableName}@branch(\"name\"="${branchName}") order by order_id;"""
+
+                "${baseQueryName}_select_columns_list" """select 
customer_id,is_paid,order_status from ${tableName}@branch(${branchName}) order 
by order_id;"""
+                "${baseQueryName}_select_columns_map" """select 
customer_id,is_paid,order_status from 
${tableName}@branch(\"name\"="${branchName}") order by order_id;"""
+
+                "${baseQueryName}_filter_amount_list" """select * from 
${tableName}@branch(${branchName}) where total_amount > 100.00 order by 
order_id;"""
+                "${baseQueryName}_filter_amount_map" """select * from 
${tableName}@branch(\"name\"="${branchName}") where total_amount > 100.00 order 
by order_id;"""
+
+                "${baseQueryName}_agg_list" """select 
+                                                    cast(MIN(total_amount) as 
decimal(9,3)) AS min_price,
+                                                    cast(MAX(total_amount) as 
decimal(9,3)) AS max_price,
+                                                    cast(AVG(total_amount) as 
decimal(9,3)) AS avg_price,
+                                                    COUNT(*) AS total_count,
+                                                    cast(SUM(total_amount) as 
decimal(9,3)) AS total_price from  ${tableName}@branch(${branchName}) ;"""
+                "${baseQueryName}_agg_map" """select 
+                                                    cast(MIN(total_amount) as 
decimal(9,3)) AS min_price,
+                                                    cast(MAX(total_amount) as 
decimal(9,3)) AS max_price,
+                                                    cast(AVG(total_amount) as 
decimal(9,3)) AS avg_price,
+                                                    COUNT(*) AS total_count,
+                                                    cast(SUM(total_amount) as 
decimal(9,3)) AS total_price from ${tableName}@branch(\"name\"="${branchName}") 
;"""
+                "${baseQueryName}_group_by_is_paid_list" """select is_paid, 
count(*) as cnt from ${tableName}@branch(${branchName}) GROUP BY is_paid order 
by cnt,is_paid;"""
+                "${baseQueryName}_group_by_is_paid_map" """select is_paid, 
count(*) as cnt from ${tableName}@branch(\"name\"="${branchName}") GROUP BY 
is_paid order by cnt,is_paid;"""
+                logger.info("Completed queries for branch: ${branchName}")
+
+            } catch (Exception e) {
+                logger.error("Failed to process branch ${branchName}: 
${e.message}")
+                throw e
+            }
+        }
+
+
+        List<List<Object>> tagsResult = sql """ select snapshot_id,tag_name 
from ${tableName}\$tags order by snapshot_id;"""
+        logger.info("Query result from ${tableName}\$tags: ${tagsResult}")
+        assertTrue(tagsResult.size()==4)
+        assertTrue(tagsResult[0].size()==2)
+
+        tagsResult.eachWithIndex { tagRow, index ->
+            String snapshotId = tagRow[0] as String
+            String tagName = tagRow[1] as String
+            logger.info("Processing tag ${index + 1}: ${tagName} (snapshot: 
${snapshotId})")
+            String baseQueryName = "qt_tag_${index + 1}"
+
+            try {
+                "${baseQueryName}_count_list" """select count(*) from 
${tableName}@tag(${tagName});"""
+                "${baseQueryName}_count_map" """select count(*) from 
${tableName}@tag(\"name\"="${tagName}");"""
+
+                "${baseQueryName}_select_all_list" """select * from 
${tableName}@tag(${tagName}) order by order_id;"""
+                "${baseQueryName}_select_all_map" """select * from 
${tableName}@tag(\"name\"="${tagName}") order by order_id;"""
+
+                "${baseQueryName}_select_columns_list" """select 
customer_id,is_paid,order_status from ${tableName}@tag(${tagName}) order by 
order_id;"""
+                "${baseQueryName}_select_columns_map" """select 
customer_id,is_paid,order_status from ${tableName}@tag(\"name\"="${tagName}") 
order by order_id;"""
+
+                "${baseQueryName}_filter_amount_list" """select * from 
${tableName}@tag(${tagName}) where total_amount > 100.00 order by order_id;"""
+                "${baseQueryName}_filter_amount_map" """select * from 
${tableName}@tag(\"name\"="${tagName}") where total_amount > 100.00 order by 
order_id;"""
+
+                "${baseQueryName}_agg_list" """select 
+                                        cast(MIN(total_amount) as 
decimal(9,3)) AS min_price,
+                                        cast(MAX(total_amount) as 
decimal(9,3)) AS max_price,
+                                        cast(AVG(total_amount) as 
decimal(9,3)) AS avg_price,
+                                        COUNT(*) AS total_count,
+                                        cast(SUM(total_amount) as 
decimal(9,3)) AS total_price from ${tableName}@tag(${tagName});"""
+                "${baseQueryName}_agg_map" """select 
+                                        cast(MIN(total_amount) as 
decimal(9,3)) AS min_price,
+                                        cast(MAX(total_amount) as 
decimal(9,3)) AS max_price,
+                                        cast(AVG(total_amount) as 
decimal(9,3)) AS avg_price,
+                                        COUNT(*) AS total_count,
+                                        cast(SUM(total_amount) as 
decimal(9,3)) AS total_price from ${tableName}@tag(\"name\"="${tagName}");"""
+                "${baseQueryName}_group_by_is_paid_list" """select is_paid, 
count(*) as cnt from ${tableName}@tag(${tagName}) GROUP BY is_paid order by 
cnt,is_paid;"""
+                "${baseQueryName}_group_by_is_paid_map" """select is_paid, 
count(*) as cnt from ${tableName}@tag(\"name\"="${tagName}") GROUP BY is_paid 
order by cnt,is_paid;"""
+
+                logger.info("Completed queries for tag: ${tagName} (snapshot: 
${snapshotId})")
+
+            } catch (Exception e) {
+                logger.error("Failed to process tag ${tagName}: ${e.message}")
+                throw e
+            }
+        }
+
+        test {
+            sql """ select * from 
${tableName}@branch('name'='not_exists_branch'); """
+            exception "Branch 'not_exists_branch' does not exist"
+        }
+        test {
+            sql """ select * from ${tableName}@branch(not_exists_branch); """
+            exception "Branch 'not_exists_branch' does not exist"
+        }
+        test {
+            sql """ select * from ${tableName}@tag('name'='not_exists_tag'); 
"""
+            exception "Tag 'not_exists_tag' does not exist"
+        }
+        test {
+            sql """ select * from ${tableName}@tag(not_exists_tag); """
+            exception "Tag 'not_exists_tag' does not exist"
+        }
+        test {
+            sql """ select * from ${tableName} for version as of 
'not_exists_tag'; """
+            exception "Tag 'not_exists_tag' does not exist"
+        }
+
+        // Use branch function to query tags
+        test {
+            sql """ select * from ${tableName}@tag('na'='not_exists_tag'); """
+            exception "must contain key 'name' in params"
+        }
+        test {
+            sql """ select * from 
${tableName}@branch('nme'='not_exists_branch'); """
+            exception "must contain key 'name' in params"
+        }
+
+    } finally {
+         // sql """drop catalog if exists ${catalog_name}"""
+    }
+}
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to