This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 5be3b92690c branch-3.1: [feat](paimon) Support paimon time travel and
branch/tag #53327 (#54975)
5be3b92690c is described below
commit 5be3b92690c07627f65c25dd9b5884cd33b3efc4
Author: Petrichor <[email protected]>
AuthorDate: Wed Aug 20 08:09:25 2025 +0800
branch-3.1: [feat](paimon) Support paimon time travel and branch/tag #53327
(#54975)
bp #53327
---
.../create_preinstalled_scripts/paimon/run09.sql | 88 +++++++
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +-
.../org/apache/doris/analysis/TableScanParams.java | 1 +
.../datasource/paimon/PaimonExternalCatalog.java | 38 +--
.../apache/doris/datasource/paimon/PaimonUtil.java | 291 +++++++++++++++++++--
.../datasource/paimon/source/PaimonScanNode.java | 48 +++-
.../tablefunction/PaimonTableValuedFunction.java | 4 +-
.../paimon/paimon_time_travel.out | Bin 0 -> 40401 bytes
.../paimon/paimon_incr_read.groovy | 4 -
.../paimon/paimon_time_travel.groovy | 278 ++++++++++++++++++++
10 files changed, 693 insertions(+), 61 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
new file mode 100644
index 00000000000..058bbfd7e19
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
@@ -0,0 +1,88 @@
+use paimon;
+
+create database if not exists test_paimon_time_travel_db;
+
+use test_paimon_time_travel_db;
+drop table if exists tbl_time_travel;
+CREATE TABLE test_paimon_time_travel_db.tbl_time_travel (
+ order_id BIGINT NOT NULL ,
+ customer_id BIGINT NOT NULL,
+ order_date DATE,
+ total_amount DECIMAL(12,2),
+ shipping_address STRING,
+ order_status STRING,
+ is_paid BOOLEAN)
+USING paimon
+TBLPROPERTIES (
+ 'bucket' = '3',
+ 'primary-key' = 'order_id',
+ 'file.format' = 'parquet'
+ );
+
+ -- insert into data snapshotid 1
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(1001, 2001, '2024-01-15', 299.99, '123 Maple Street, Springfield, IL 62701',
'COMPLETED', true),
+(1002, 2002, '2024-01-16', 156.50, '456 Oak Avenue, Riverside, CA 92507',
'PROCESSING', false),
+(1003, 2003, '2024-01-17', 89.00, '789 Pine Boulevard, Greenfield, TX 75001',
'SHIPPED', true);
+
+-- insert into data snpashotid 2
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(2001, 3001, '2024-01-18', 445.75, '321 Cedar Lane, Millbrook, NY 12545',
'PENDING', false),
+(2002, 3002, '2024-01-19', 67.25, '654 Birch Drive, Lakewood, CO 80226',
'COMPLETED', true),
+(2003, 3003, '2024-01-20', 188.90, '987 Elm Court, Fairview, OR 97024',
'CANCELLED', false);
+
+-- insert into data snpashotid 3
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(3001, 4001, '2024-01-21', 325.40, '159 Willow Street, Brookdale, FL 33602',
'SHIPPED', true),
+(3002, 4002, '2024-01-22', 99.85, '753 Aspen Road, Clearwater, WA 98012',
'PROCESSING', true),
+(3003, 4003, '2024-01-23', 512.30, '264 Chestnut Avenue, Westfield, MI 48097',
'COMPLETED', false);
+
+-- create a tag based on the latest snapshot id, run the following sql
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag
=> 't_1',snapshot => 1);
+-- create a tag based on the snapshot id 1, run the following sql
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag
=> 't_2', snapshot => 2);
+-- create a tag based on the snapshot id 2, run the following sql
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag
=> 't_3', snapshot => 3);
+
+-- insert into data snpashotid 4
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(5001, 6001, '2024-01-24', 278.60, '842 Hickory Lane, Stonewood, GA 30309',
'PENDING', true),
+(5002, 6002, '2024-01-25', 134.75, '417 Poplar Street, Ridgefield, NV 89109',
'SHIPPED', false),
+(5003, 6003, '2024-01-26', 389.20, '695 Sycamore Drive, Maplewood, AZ 85001',
'COMPLETED', true);
+
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag
=> 't_4', snapshot => 4);
+
+
+-- create branch 1
+CALL sys.create_branch('test_paimon_time_travel_db.tbl_time_travel', 'b_1');
+-- create branch 2
+CALL sys.create_branch('test_paimon_time_travel_db.tbl_time_travel', 'b_2');
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_1` VALUES
+(10001, 7001, '2024-01-27', 156.30, '108 Magnolia Street, Riverside, KY
40475', 'PROCESSING', true),
+(10002, 7002, '2024-01-28', 423.80, '572 Dogwood Avenue, Pleasantville, NJ
08232', 'COMPLETED', false),
+(10003, 7003, '2024-01-29', 89.45, '946 Redwood Boulevard, Hillcrest, SC
29526', 'SHIPPED', true),
+(10004, 7004, '2024-01-30', 267.90, '213 Spruce Lane, Oceanview, ME 04401',
'PENDING', false),
+(10005, 7005, '2024-01-31', 345.15, '687 Walnut Drive, Parkside, VT 05672',
'CANCELLED', true);
+
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_1` VALUES
+(10006, 7001, '2024-01-27', 156.30, '108 Magnolia Street, Riverside, KY
40475', 'PROCESSING', true),
+(10007, 7002, '2024-01-28', 423.80, '572 Dogwood Avenue, Pleasantville, NJ
08232', 'COMPLETED', false),
+(10008, 7003, '2024-01-29', 89.45, '946 Redwood Boulevard, Hillcrest, SC
29526', 'SHIPPED', true),
+(10009, 7004, '2024-01-30', 267.90, '213 Spruce Lane, Oceanview, ME 04401',
'PENDING', false),
+(10010, 7005, '2024-01-31', 345.15, '687 Walnut Drive, Parkside, VT 05672',
'CANCELLED', true);
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES
+(20001, 8001, '2024-02-01', 198.75, '741 Rosewood Court, Summerville, AL
35148', 'COMPLETED', true),
+(20002, 8002, '2024-02-02', 456.20, '329 Cypress Street, Brookhaven, MS
39601', 'PROCESSING', false),
+(20003, 8003, '2024-02-03', 67.85, '852 Juniper Lane, Fairfield, OH 45014',
'SHIPPED', true),
+(20004, 8004, '2024-02-04', 312.40, '614 Peach Avenue, Greenville, TN 37743',
'PENDING', true),
+(20005, 8005, '2024-02-05', 129.90, '507 Cherry Drive, Lakeside, UT 84040',
'CANCELLED', false);
+
+INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES
+(20006, 8006, '2024-02-06', 234.60, '183 Bluebell Street, Millfield, ID
83262', 'COMPLETED', true),
+(20007, 8007, '2024-02-07', 378.45, '465 Lavender Avenue, Thorndale, WY
82201', 'PROCESSING', false),
+(20008, 8008, '2024-02-08', 92.30, '729 Iris Lane, Riverside, MN 55987',
'SHIPPED', true),
+(20009, 8009, '2024-02-09', 445.80, '856 Tulip Boulevard, Sunnydale, ND
58301', 'PENDING', false),
+(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401',
'CANCELLED', true);
\ No newline at end of file
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 354c026432a..9c51a8a353d 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -1774,7 +1774,7 @@ sampleMethod
tableSnapshot
: FOR VERSION AS OF version=(INTEGER_VALUE | STRING_LITERAL)
- | FOR TIME AS OF time=STRING_LITERAL
+ | FOR TIME AS OF time=(STRING_LITERAL | INTEGER_VALUE)
;
// this rule is used for explicitly capturing wrong identifiers such as
test-table, which should actually be `test-table`
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
index ed306f6402d..77f00af9fa5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
public class TableScanParams {
+ public static final String PARAMS_NAME = "name";
public static String INCREMENTAL_READ = "incr";
public static String BRANCH = "branch";
public static String TAG = "tag";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 3e58e654cda..7cd6fa0f2c8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -128,17 +128,6 @@ public class PaimonExternalCatalog extends ExternalCatalog
{
}
}
- public org.apache.paimon.table.Table getPaimonTable(NameMapping
nameMapping) {
- makeSureInitialized();
- try {
- return executionAuthenticator.execute(() ->
catalog.getTable(Identifier.create(nameMapping
- .getRemoteDbName(), nameMapping.getRemoteTblName())));
- } catch (Exception e) {
- throw new RuntimeException("Failed to get Paimon table:" +
getName() + "." + nameMapping.getLocalDbName()
- + "." + nameMapping.getLocalTblName() + ", because " +
ExceptionUtils.getRootCauseMessage(e), e);
- }
- }
-
public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
makeSureInitialized();
try {
@@ -159,24 +148,35 @@ public class PaimonExternalCatalog extends
ExternalCatalog {
}
}
- public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping, String queryType) {
- return getPaimonSystemTable(nameMapping, null, queryType);
+ public org.apache.paimon.table.Table getPaimonTable(NameMapping
nameMapping) {
+ return getPaimonTable(nameMapping, null, null);
}
- public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping,
- String branch,
String queryType) {
+ public org.apache.paimon.table.Table getPaimonTable(NameMapping
nameMapping, String branch,
+ String queryType) {
makeSureInitialized();
try {
- return executionAuthenticator.execute(() -> catalog.getTable(new
Identifier(nameMapping.getRemoteDbName(),
- nameMapping.getRemoteTblName(), branch, queryType)));
+ Identifier identifier;
+ if (branch != null && queryType != null) {
+ identifier = new Identifier(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName(),
+ branch, queryType);
+ } else if (branch != null) {
+ identifier = new Identifier(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName(),
+ branch);
+ } else if (queryType != null) {
+ identifier = new Identifier(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName(),
+ "main", queryType);
+ } else {
+ identifier = new Identifier(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName());
+ }
+ return executionAuthenticator.execute(() ->
catalog.getTable(identifier));
} catch (Exception e) {
- throw new RuntimeException("Failed to get Paimon system table:" +
getName() + "."
+ throw new RuntimeException("Failed to get Paimon table:" +
getName() + "."
+ nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName() + "$" + queryType
+ ", because " + ExceptionUtils.getRootCauseMessage(e), e);
}
}
-
protected Catalog createCatalog() {
try {
return paimonProperties.initializeCatalog(getName(), new
ArrayList<>(catalogProperty
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index 610cde051d8..efa899acfda 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -18,6 +18,8 @@
package org.apache.doris.datasource.paimon;
import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
@@ -25,7 +27,10 @@ import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveUtil;
+import org.apache.doris.datasource.paimon.source.PaimonSource;
import org.apache.doris.thrift.TColumnType;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.schema.external.TArrayField;
@@ -42,6 +47,8 @@ import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.ConfigOption;
@@ -49,6 +56,7 @@ import org.apache.paimon.partition.Partition;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.ArrayType;
@@ -66,16 +74,35 @@ import org.apache.paimon.utils.Projection;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
public class PaimonUtil {
private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
private static final Base64.Encoder BASE64_ENCODER =
java.util.Base64.getUrlEncoder().withoutPadding();
+ private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+");
+
+ private static final List<ConfigOption<?>>
PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS = Arrays.asList(
+ CoreOptions.SCAN_SNAPSHOT_ID,
+ CoreOptions.SCAN_TAG_NAME,
+ CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
+ CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
+ CoreOptions.INCREMENTAL_BETWEEN,
+ CoreOptions.INCREMENTAL_TO_AUTO_TAG);
+
+ private static final List<ConfigOption<?>>
PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS = Arrays.asList(
+ CoreOptions.SCAN_TIMESTAMP_MILLIS,
+ CoreOptions.SCAN_TIMESTAMP,
+ CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
+ CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
+ CoreOptions.INCREMENTAL_BETWEEN,
+ CoreOptions.INCREMENTAL_TO_AUTO_TAG);
public static List<InternalRow> read(
Table table, @Nullable int[] projection, @Nullable Predicate
predicate,
@@ -249,32 +276,6 @@ public class PaimonUtil {
return paimonPrimitiveTypeToDorisType(type);
}
- public static List<Column> parseSchema(Table table) {
- List<String> primaryKeys = table.primaryKeys();
- return parseSchema(table.rowType(), primaryKeys);
- }
-
- public static List<Column> parseSchema(RowType rowType, List<String>
primaryKeys) {
- List<Column> resSchema =
Lists.newArrayListWithCapacity(rowType.getFields().size());
- rowType.getFields().forEach(field -> {
- resSchema.add(new Column(field.name().toLowerCase(),
- PaimonUtil.paimonTypeToDorisType(field.type()),
primaryKeys.contains(field.name()), null,
- field.type().isNullable(),
- field.description(), true,
- field.id()));
- });
- return resSchema;
- }
-
- public static <T> String encodeObjectToString(T t) {
- try {
- byte[] bytes = InstantiationUtil.serializeObject(t);
- return new String(BASE64_ENCODER.encode(bytes),
java.nio.charset.StandardCharsets.UTF_8);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
public static void updatePaimonColumnUniqueId(Column column, DataType
dataType) {
List<Column> columns = column.getChildren();
switch (dataType.getTypeRoot()) {
@@ -376,4 +377,242 @@ public class PaimonUtil {
tSchema.setRootField(getSchemaInfo(paimonTableSchema.fields()));
return tSchema;
}
+
+ public static List<Column> parseSchema(Table table) {
+ List<String> primaryKeys = table.primaryKeys();
+ return parseSchema(table.rowType(), primaryKeys);
+ }
+
+ public static List<Column> parseSchema(RowType rowType, List<String>
primaryKeys) {
+ List<Column> resSchema =
Lists.newArrayListWithCapacity(rowType.getFields().size());
+ rowType.getFields().forEach(field -> {
+ resSchema.add(new Column(field.name().toLowerCase(),
+ PaimonUtil.paimonTypeToDorisType(field.type()),
+ primaryKeys.contains(field.name()),
+ null,
+ field.type().isNullable(),
+ field.description(),
+ true,
+ field.id()));
+ });
+ return resSchema;
+ }
+
+ public static <T> String encodeObjectToString(T t) {
+ try {
+ byte[] bytes = InstantiationUtil.serializeObject(t);
+ return new String(BASE64_ENCODER.encode(bytes),
java.nio.charset.StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Builds a snapshot-specific table for time travel queries.
+ *
+ * @param baseTable the base Paimon table to copy configuration from
+ * @param tableSnapshot the snapshot specification (type + value)
+ * @return a Table instance configured for the specified time travel query
+ * @throws UserException if snapshot configuration is invalid
+ */
+ public static Table getTableBySnapshot(Table baseTable, TableSnapshot
tableSnapshot)
+ throws UserException {
+ final String value = tableSnapshot.getValue();
+ final TableSnapshot.VersionType type = tableSnapshot.getType();
+ final boolean isDigital = DIGITAL_REGEX.matcher(value).matches();
+
+ switch (type) {
+ case TIME:
+ return isDigital
+ ? getTableBySnapshotTimestampMillis(baseTable, value)
+ : getTableBySnapshotTime(baseTable, value);
+
+ case VERSION:
+ if (isDigital) {
+ return getTableBySnapshotId(baseTable, value);
+ }
+ return getTableByTag(baseTable, value);
+
+ default:
+ throw new UserException(String.format("Unsupported version
type: %s", type));
+ }
+ }
+
+ /**
+ * Builds a table configured to read from a specific snapshot ID.
+ *
+ * @param baseTable the base Paimon table to copy configuration from
+ * @param snapshotId the snapshot ID as a string
+ * @return a Table instance configured to read from the specified snapshot
ID
+ */
+ private static Table getTableBySnapshotId(Table baseTable, String
snapshotId) {
+ Map<String, String> options = new HashMap<>(
+ PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);
+
+ // For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
+ // [scan_tag_name, scan_watermark, scan_snapshot_id]
+ options.put(CoreOptions.SCAN_TAG_NAME.key(), null);
+ options.put(CoreOptions.SCAN_WATERMARK.key(), null);
+ options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId);
+
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));
+
+ return baseTable.copy(options);
+ }
+
+ /**
+ * Builds a table configured to read from a specific timestamp.
+ *
+ * @param baseTable the base Paimon table to copy configuration from
+ * @param timestampStr the timestamp as a string
+ * @return a Table instance configured to read from the specified timestamp
+ */
+ private static Table getTableBySnapshotTime(Table baseTable, String
timestampStr) {
+ Map<String, String> options = new HashMap<>(
+ PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);
+
+ // For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
+ // [scan_timestamp, scan_timestamp_millis]
+ options.put(CoreOptions.SCAN_MODE.key(),
StartupMode.FROM_TIMESTAMP.toString());
+ options.put(CoreOptions.SCAN_TIMESTAMP.key(), timestampStr);
+ options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
+
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));
+
+ return baseTable.copy(options);
+ }
+
+ /**
+ * Builds a table configured to read from a specific timestamp in
milliseconds.
+ *
+ * @param baseTable the base Paimon table to copy configuration from
+ * @param timestampStr the timestamp in milliseconds as a string
+ * @return a Table instance configured to read from the specified timestamp
+ */
+ private static Table getTableBySnapshotTimestampMillis(Table baseTable,
String timestampStr) {
+ Map<String, String> options = new HashMap<>(
+ PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);
+
+ // For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
+ // [scan_timestamp, scan_timestamp_millis]
+ options.put(CoreOptions.SCAN_MODE.key(),
StartupMode.FROM_TIMESTAMP.toString());
+ options.put(CoreOptions.SCAN_TIMESTAMP.key(), null);
+ options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), timestampStr);
+
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));
+
+ return baseTable.copy(options);
+ }
+
+ /**
+ * Extracts the reference name (branch or tag name) from table scan
parameters.
+ *
+ * @param scanParams the scan parameters containing reference name
information
+ * @return the extracted reference name
+ * @throws IllegalArgumentException if the reference name is not properly
specified
+ */
+ public static String extractBranchOrTagName(TableScanParams scanParams) {
+ if (!scanParams.getMapParams().isEmpty()) {
+ if
(!scanParams.getMapParams().containsKey(TableScanParams.PARAMS_NAME)) {
+ throw new IllegalArgumentException("must contain key 'name' in
params");
+ }
+ return scanParams.getMapParams().get(TableScanParams.PARAMS_NAME);
+ } else {
+ if (scanParams.getListParams().isEmpty() ||
scanParams.getListParams().get(0) == null) {
+ throw new IllegalArgumentException("must contain a branch/tag
name in params");
+ }
+ return scanParams.getListParams().get(0);
+ }
+ }
+
+
+ /**
+ * Builds a branch-specific table for time travel queries.
+ *
+ * @param source the Paimon source containing catalog and table information
+ * @param baseTable the base Paimon table
+ * @param branchName the branch name
+ * @return a Table instance configured to read from the specified branch
+ * @throws UserException if branch does not exist
+ */
+ public static Table getTableByBranch(PaimonSource source, Table baseTable,
String branchName) throws UserException {
+
+ if (!checkBranchExists(baseTable, branchName)) {
+ throw new UserException(String.format("Branch '%s' does not
exist", branchName));
+ }
+
+ PaimonExternalCatalog catalog = (PaimonExternalCatalog)
source.getCatalog();
+ ExternalTable externalTable = (ExternalTable) source.getTargetTable();
+ return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(),
branchName, null);
+ }
+
+ /**
+ * Builds a tag-specific table for time travel queries.
+ *
+ * @param baseTable the base Paimon table to copy configuration from
+ * @param tagName the tag name
+ * @return a Table instance configured to read from the specified tag
+ * @throws UserException if tag does not exist
+ */
+ public static Table getTableByTag(Table baseTable, String tagName) throws
UserException {
+ if (!checkTagsExists(baseTable, tagName)) {
+ throw new UserException(String.format("Tag '%s' does not exist",
tagName));
+ }
+
+ Map<String, String> options = new HashMap<>(
+ PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);
+
+ // For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
+ // [scan_tag_name, scan_watermark, scan_snapshot_id]
+ options.put(CoreOptions.SCAN_TAG_NAME.key(), tagName);
+ options.put(CoreOptions.SCAN_WATERMARK.key(), null);
+ options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
+
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));
+
+ return baseTable.copy(options);
+ }
+
+ /**
+ * Creates a map of conflicting Paimon options with null values for
exclusion.
+ *
+ * @param illegalOptions the list of ConfigOptions that should be set to
null
+ * @return a HashMap containing the illegal options as keys with null
values
+ */
+ public static Map<String, String>
excludePaimonConflictOptions(List<ConfigOption<?>> illegalOptions) {
+ return illegalOptions.stream()
+ .collect(HashMap::new,
+ (m, option) -> m.put(option.key(), null),
+ HashMap::putAll);
+ }
+
+ /**
+ * Checks if a tag exists in the given table.
+ *
+ * @param baseTable the Paimon table
+ * @param tagName the tag name to check
+ * @return true if tag exists, false otherwise
+ * @throws UserException if table is not a FileStoreTable
+ */
+ public static boolean checkTagsExists(Table baseTable, String tagName)
throws UserException {
+ if (!(baseTable instanceof FileStoreTable)) {
+ throw new UserException("Table type should be FileStoreTable but
got: " + baseTable.getClass().getName());
+ }
+
+ final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
+ return fileStoreTable.tagManager().tagExists(tagName);
+ }
+
+ /**
+ * Checks if a branch exists in the given table.
+ *
+ * @param baseTable the Paimon table
+ * @param branchName the branch name to check
+ * @return true if branch exists, false otherwise
+ * @throws UserException if table is not a FileStoreTable
+ */
+ public static boolean checkBranchExists(Table baseTable, String
branchName) throws UserException {
+ if (!(baseTable instanceof FileStoreTable)) {
+ throw new UserException("Table type should be FileStoreTable but
got: " + baseTable.getClass().getName());
+ }
+
+ final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
+ return fileStoreTable.branchManager().branchExists(branchName);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 5dc5eb02abe..1751d1733e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -17,6 +17,8 @@
package org.apache.doris.datasource.paimon.source;
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
@@ -374,16 +376,8 @@ public class PaimonScanNode extends FileQueryScanNode {
.collect(Collectors.toList())
.indexOf(slot.getColumn().getName()))
.toArray();
- Table paimonTable = source.getPaimonTable();
- if (getScanParams() != null && getQueryTableSnapshot() != null) {
- throw new UserException("Can not specify scan params and table
snapshot at same time.");
- }
- if (getQueryTableSnapshot() != null) {
- throw new UserException("Paimon table does not support table
snapshot query yet.");
- }
- Map<String, String> incrReadParams = getIncrReadParams();
- paimonTable = paimonTable.copy(incrReadParams);
+ Table paimonTable = getProcessedTable();
ReadBuilder readBuilder = paimonTable.newReadBuilder();
return readBuilder.withFilter(predicates)
.withProjection(projected)
@@ -662,6 +656,42 @@ public class PaimonScanNode extends FileQueryScanNode {
return paimonScanParams;
}
+
+ /**
+ * Processes and returns the appropriate Paimon table object based on scan
parameters or table snapshot.
+ * <p>
+ * This method handles different scan modes including incremental reads
and system tables,
+ * applying the necessary transformations to the base Paimon table.
+ *
+ * @return processed Paimon table object configured according to scan
parameters
+ * @throws UserException when system table configuration is incorrect
+ */
+ private Table getProcessedTable() throws UserException {
+ Table baseTable = source.getPaimonTable();
+ if (getScanParams() != null && getQueryTableSnapshot() != null) {
+ throw new UserException("Can not specify scan params and table
snapshot at same time.");
+ }
+ TableScanParams theScanParams = getScanParams();
+ if (theScanParams != null) {
+ if (theScanParams.incrementalRead()) {
+ return baseTable.copy(getIncrReadParams());
+ }
+
+ if (theScanParams.isBranch()) {
+ return PaimonUtil.getTableByBranch(source, baseTable,
PaimonUtil.extractBranchOrTagName(theScanParams));
+ }
+ if (theScanParams.isTag()) {
+ return PaimonUtil.getTableByTag(baseTable,
PaimonUtil.extractBranchOrTagName(theScanParams));
+ }
+ }
+
+ TableSnapshot theTableSnapshot = getQueryTableSnapshot();
+ if (theTableSnapshot != null) {
+ return PaimonUtil.getTableBySnapshot(baseTable, theTableSnapshot);
+ }
+
+ return baseTable;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
index 91c131603b6..6d5e1fba09d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
@@ -105,8 +105,8 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
NameMapping buildNameMapping = externalTable.getOrBuildNameMapping();
this.tblId = externalTable.getId();
- this.paimonSysTable =
paimonExternalCatalog.getPaimonSystemTable(buildNameMapping,
- queryType);
+ this.paimonSysTable =
paimonExternalCatalog.getPaimonTable(buildNameMapping,
+ "main", queryType);
this.schema = PaimonUtil.parseSchema(paimonSysTable);
}
diff --git
a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out
b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out
new file mode 100644
index 00000000000..3527cc4604e
Binary files /dev/null and
b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out differ
diff --git
a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
index f32120f2ea7..bfc2457c6a3 100644
--- a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
+++ b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
@@ -96,10 +96,6 @@ suite("test_paimon_incr_read",
"p0,external,doris,external_docker,external_docke
sql """select * from paimon_incr@incr('startSnapshotId'=1,
'endSnapshotId'=2) for version as of 1"""
exception "Can not specify scan params and table snapshot"
}
- test {
- sql """select * from paimon_incr for version as of 1"""
- exception "Paimon table does not support table snapshot query
yet"
- }
}
test_incr_read("false")
diff --git
a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
new file mode 100644
index 00000000000..b6d3caddb8c
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.time.format.DateTimeFormatter
+import java.time.LocalDateTime
+import java.time.ZoneId
+
+
+
+suite("paimon_time_travel",
"p0,external,doris,external_docker,external_docker_doris") {
+ logger.info("start paimon test")
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable paimon test.")
+ return
+ }
+ // Create date time formatter
+
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String catalog_name = "test_paimon_time_travel_catalog"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ DateTimeFormatter iso_formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS")
+ DateTimeFormatter standard_formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ String db_name = "test_paimon_time_travel_db"
+ String tableName = "tbl_time_travel"
+ try {
+ sql """drop catalog if exists ${catalog_name}"""
+
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type' = 'paimon',
+ 'warehouse' = 's3://warehouse/wh',
+ 's3.endpoint' =
'http://${externalEnvIp}:${minio_port}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.path.style.access' = 'true'
+ );
+ """
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch `${catalog_name}`"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """use ${db_name}"""
+ //system table snapshots to get create time.
+ List<List<Object>> snapshotRes = sql """ select
snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id;"""
+ logger.info("Query result from ${tableName}\$snapshots:
${snapshotRes}")
+ assertTrue(snapshotRes.size()==4)
+ assertTrue(snapshotRes[0].size()==2)
+
+ snapshotRes.eachWithIndex { snapshotRow, index ->
+ int snapshotId = snapshotRow[0] as int
+ String commitTime = snapshotRow[1] as String
+ String tagName = "t_${snapshotId}"
+
+ logger.info("Processing snapshot ${index + 1}: ID=${snapshotId},
commit_time=${commitTime}")
+
+ try {
+ LocalDateTime dateTime;
+ if (commitTime.contains("T")){
+ dateTime = LocalDateTime.parse(commitTime, iso_formatter)
+ }else {
+ dateTime = LocalDateTime.parse(commitTime,
standard_formatter)
+ }
+
+ String snapshotTime =
dateTime.atZone(ZoneId.systemDefault()).format(standard_formatter);
+ long timestamp = dateTime.atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli()
+
+ // Execute various types of time travel queries
+ String baseQueryName = "qt_time_travel_snapshot_${index + 1}"
+
+ // 1. Time travel by snapshot ID
+ "${baseQueryName}_version_count" """select count(*) from
${tableName} FOR VERSION AS OF ${snapshotId} ;"""
+ "${baseQueryName}_version" """select * from ${tableName} FOR
VERSION AS OF ${snapshotId} order by order_id;"""
+ "${baseQueryName}_version_select_columns" """select
is_paid,total_amount,order_id,customer_id from ${tableName} FOR VERSION AS OF
${snapshotId} order by order_id;"""
+ "${baseQueryName}_version_filter_order_status" """select *
from ${tableName} FOR VERSION AS OF ${snapshotId} where order_status =
'COMPLETED' order by order_id;"""
+ "${baseQueryName}_version_agg" """select
+ cast(MIN(total_amount) as
decimal(9,3)) AS min_price,
+ cast(MAX(total_amount) as
decimal(9,3)) AS max_price,
+ cast(AVG(total_amount) as
decimal(9,3)) AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as
decimal(9,3)) AS total_price from ${tableName} FOR VERSION AS OF ${snapshotId}
;"""
+ "${baseQueryName}_version_group_by_is_paid" """select is_paid,
count(*) as cnt from ${tableName} FOR VERSION AS OF ${snapshotId} GROUP BY
is_paid order by cnt,is_paid;"""
+
+
+
+ // 2. Time travel by tag
+ "${baseQueryName}_tag_count" """select count(*) from
${tableName} FOR VERSION AS OF '${tagName}';"""
+ "${baseQueryName}_tag" """select * from ${tableName} FOR
VERSION AS OF '${tagName}' order by order_id;"""
+ "${baseQueryName}_tag_select_columns" """select
is_paid,total_amount,order_id,customer_id from ${tableName} FOR VERSION AS OF
'${tagName}' order by order_id;"""
+ "${baseQueryName}_tag_filter_order_status" """select * from
${tableName} FOR VERSION AS OF '${tagName}' where order_status = 'COMPLETED'
order by order_id;"""
+ "${baseQueryName}_tag_agg" """select
+ cast(MIN(total_amount) as decimal(9,3))
AS min_price,
+ cast(MAX(total_amount) as decimal(9,3))
AS max_price,
+ cast(AVG(total_amount) as decimal(9,3))
AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as decimal(9,3))
AS total_price from ${tableName} FOR VERSION AS OF '${tagName}' ;"""
+ "${baseQueryName}_tag_group_by_is_paid" """select is_paid,
count(*) as cnt from ${tableName} FOR VERSION AS OF '${tagName}' GROUP BY
is_paid order by cnt,is_paid;"""
+
+ // 3. Time travel by time string
+ "${baseQueryName}_time_string_count" """select count(*) from
${tableName} FOR TIME AS OF \"${snapshotTime}\" ;"""
+ "${baseQueryName}_time_string" """select * from ${tableName}
FOR TIME AS OF \"${snapshotTime}\" order by order_id"""
+ "${baseQueryName}_time_string_select_columns" """select
is_paid,total_amount,order_id,customer_id from ${tableName} FOR TIME AS OF
\"${snapshotTime}\" order by order_id"""
+ "${baseQueryName}_time_string_filter_order_status" """select *
from ${tableName} FOR TIME AS OF \"${snapshotTime}\" where order_status =
'COMPLETED' order by order_id;"""
+ "${baseQueryName}_time_string_agg" """select
+ cast(MIN(total_amount) as
decimal(9,3)) AS min_price,
+ cast(MAX(total_amount) as
decimal(9,3)) AS max_price,
+ cast(AVG(total_amount) as
decimal(9,3)) AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as
decimal(9,3)) AS total_price from ${tableName} FOR TIME AS OF
\"${snapshotTime}\" ;"""
+ "${baseQueryName}_time_string_group_by_is_paid" """select
is_paid, count(*) as cnt from ${tableName} FOR TIME AS OF \"${snapshotTime}\"
GROUP BY is_paid order by cnt,is_paid;"""
+
+ // 4. Time travel by millisecond timestamp
+ "${baseQueryName}_time_millis_count" """select count(*) from
${tableName} FOR TIME AS OF ${timestamp};"""
+ "${baseQueryName}_time_millis" """select * from ${tableName}
FOR TIME AS OF ${timestamp} order by order_id"""
+ "${baseQueryName}_time_millis_select_columns" """select
is_paid,total_amount,order_id,customer_id from ${tableName} FOR TIME AS OF
${timestamp} order by order_id"""
+ "${baseQueryName}_time_millis_filter_order_status" """select *
from ${tableName} FOR TIME AS OF ${timestamp} where order_status = 'COMPLETED'
order by order_id;"""
+ "${baseQueryName}_time_millis_agg" """select
+ cast(MIN(total_amount) as
decimal(9,3)) AS min_price,
+ cast(MAX(total_amount) as
decimal(9,3)) AS max_price,
+ cast(AVG(total_amount) as
decimal(9,3)) AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as
decimal(9,3)) AS total_price from ${tableName} FOR TIME AS OF ${timestamp} ;"""
+ "${baseQueryName}_time_millis_group_by_is_paid" """select
is_paid, count(*) as cnt from ${tableName} FOR TIME AS OF ${timestamp} GROUP BY
is_paid order by cnt,is_paid;"""
+
+
+ logger.info("Completed queries for snapshot ${snapshotId} with
timestamp ${timestamp}")
+
+ } catch (Exception e) {
+ logger.error("Failed to process snapshot ${snapshotId}:
${e.message}")
+ throw e
+ }
+ }
+
+ List<List<Object>> branchesResult = sql """ select branch_name from
${tableName}\$branches order by branch_name;"""
+ logger.info("Query result from ${tableName}\$branches:
${branchesResult}")
+ assertTrue(branchesResult.size()==2)
+ assertTrue(branchesResult[0].size()==1)
+
+ branchesResult.eachWithIndex { branchRow, index ->
+ String branchName = branchRow[0] as String
+ logger.info("Processing branch ${index + 1}: ${branchName}")
+ String baseQueryName = "qt_branch_${index + 1}"
+
+ try {
+ "${baseQueryName}_count_list" """select count(*) from
${tableName}@branch(${branchName});"""
+ "${baseQueryName}_count_map" """select count(*) from
${tableName}@branch(\"name\"="${branchName}");"""
+
+ "${baseQueryName}_count_list" """select * from
${tableName}@branch(${branchName}) order by order_id;"""
+ "${baseQueryName}_count_map" """select * from
${tableName}@branch(\"name\"="${branchName}") order by order_id;"""
+
+ "${baseQueryName}_select_columns_list" """select
customer_id,is_paid,order_status from ${tableName}@branch(${branchName}) order
by order_id;"""
+ "${baseQueryName}_select_columns_map" """select
customer_id,is_paid,order_status from
${tableName}@branch(\"name\"="${branchName}") order by order_id;"""
+
+ "${baseQueryName}_filter_amount_list" """select * from
${tableName}@branch(${branchName}) where total_amount > 100.00 order by
order_id;"""
+ "${baseQueryName}_filter_amount_map" """select * from
${tableName}@branch(\"name\"="${branchName}") where total_amount > 100.00 order
by order_id;"""
+
+ "${baseQueryName}_agg_list" """select
+ cast(MIN(total_amount) as
decimal(9,3)) AS min_price,
+ cast(MAX(total_amount) as
decimal(9,3)) AS max_price,
+ cast(AVG(total_amount) as
decimal(9,3)) AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as
decimal(9,3)) AS total_price from ${tableName}@branch(${branchName}) ;"""
+ "${baseQueryName}_agg_map" """select
+ cast(MIN(total_amount) as
decimal(9,3)) AS min_price,
+ cast(MAX(total_amount) as
decimal(9,3)) AS max_price,
+ cast(AVG(total_amount) as
decimal(9,3)) AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as
decimal(9,3)) AS total_price from ${tableName}@branch(\"name\"="${branchName}")
;"""
+ "${baseQueryName}_group_by_is_paid_list" """select is_paid,
count(*) as cnt from ${tableName}@branch(${branchName}) GROUP BY is_paid order
by cnt,is_paid;"""
+ "${baseQueryName}_group_by_is_paid_map" """select is_paid,
count(*) as cnt from ${tableName}@branch(\"name\"="${branchName}") GROUP BY
is_paid order by cnt,is_paid;"""
+ logger.info("Completed queries for branch: ${branchName}")
+
+ } catch (Exception e) {
+ logger.error("Failed to process branch ${branchName}:
${e.message}")
+ throw e
+ }
+ }
+
+
+ List<List<Object>> tagsResult = sql """ select snapshot_id,tag_name
from ${tableName}\$tags order by snapshot_id;"""
+ logger.info("Query result from ${tableName}\$tags: ${tagsResult}")
+ assertTrue(tagsResult.size()==4)
+ assertTrue(tagsResult[0].size()==2)
+
+ tagsResult.eachWithIndex { tagRow, index ->
+ String snapshotId = tagRow[0] as String
+ String tagName = tagRow[1] as String
+ logger.info("Processing tag ${index + 1}: ${tagName} (snapshot:
${snapshotId})")
+ String baseQueryName = "qt_tag_${index + 1}"
+
+ try {
+ "${baseQueryName}_count_list" """select count(*) from
${tableName}@tag(${tagName});"""
+ "${baseQueryName}_count_map" """select count(*) from
${tableName}@tag(\"name\"="${tagName}");"""
+
+ "${baseQueryName}_select_all_list" """select * from
${tableName}@tag(${tagName}) order by order_id;"""
+ "${baseQueryName}_select_all_map" """select * from
${tableName}@tag(\"name\"="${tagName}") order by order_id;"""
+
+ "${baseQueryName}_select_columns_list" """select
customer_id,is_paid,order_status from ${tableName}@tag(${tagName}) order by
order_id;"""
+ "${baseQueryName}_select_columns_map" """select
customer_id,is_paid,order_status from ${tableName}@tag(\"name\"="${tagName}")
order by order_id;"""
+
+ "${baseQueryName}_filter_amount_list" """select * from
${tableName}@tag(${tagName}) where total_amount > 100.00 order by order_id;"""
+ "${baseQueryName}_filter_amount_map" """select * from
${tableName}@tag(\"name\"="${tagName}") where total_amount > 100.00 order by
order_id;"""
+
+ "${baseQueryName}_agg_list" """select
+ cast(MIN(total_amount) as
decimal(9,3)) AS min_price,
+ cast(MAX(total_amount) as
decimal(9,3)) AS max_price,
+ cast(AVG(total_amount) as
decimal(9,3)) AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as
decimal(9,3)) AS total_price from ${tableName}@tag(${tagName});"""
+ "${baseQueryName}_agg_map" """select
+ cast(MIN(total_amount) as
decimal(9,3)) AS min_price,
+ cast(MAX(total_amount) as
decimal(9,3)) AS max_price,
+ cast(AVG(total_amount) as
decimal(9,3)) AS avg_price,
+ COUNT(*) AS total_count,
+ cast(SUM(total_amount) as
decimal(9,3)) AS total_price from ${tableName}@tag(\"name\"="${tagName}");"""
+ "${baseQueryName}_group_by_is_paid_list" """select is_paid,
count(*) as cnt from ${tableName}@tag(${tagName}) GROUP BY is_paid order by
cnt,is_paid;"""
+ "${baseQueryName}_group_by_is_paid_map" """select is_paid,
count(*) as cnt from ${tableName}@tag(\"name\"="${tagName}") GROUP BY is_paid
order by cnt,is_paid;"""
+
+ logger.info("Completed queries for tag: ${tagName} (snapshot:
${snapshotId})")
+
+ } catch (Exception e) {
+ logger.error("Failed to process tag ${tagName}: ${e.message}")
+ throw e
+ }
+ }
+
+ test {
+ sql """ select * from
${tableName}@branch('name'='not_exists_branch'); """
+ exception "Branch 'not_exists_branch' does not exist"
+ }
+ test {
+ sql """ select * from ${tableName}@branch(not_exists_branch); """
+ exception "Branch 'not_exists_branch' does not exist"
+ }
+ test {
+ sql """ select * from ${tableName}@tag('name'='not_exists_tag');
"""
+ exception "Tag 'not_exists_tag' does not exist"
+ }
+ test {
+ sql """ select * from ${tableName}@tag(not_exists_tag); """
+ exception "Tag 'not_exists_tag' does not exist"
+ }
+ test {
+ sql """ select * from ${tableName} for version as of
'not_exists_tag'; """
+ exception "Tag 'not_exists_tag' does not exist"
+ }
+
+ // Use branch function to query tags
+ test {
+ sql """ select * from ${tableName}@tag('na'='not_exists_tag'); """
+ exception "must contain key 'name' in params"
+ }
+ test {
+ sql """ select * from
${tableName}@branch('nme'='not_exists_branch'); """
+ exception "must contain key 'name' in params"
+ }
+
+ } finally {
+ // sql """drop catalog if exists ${catalog_name}"""
+ }
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]