This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 325d32c5959 branch-4.0: [feat][iceberg] Support Iceberg Meta Procedure 
implementations #56257 (#56732)
325d32c5959 is described below

commit 325d32c5959958195e5ae29692f691379ba713ff
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 11 18:44:01 2025 +0800

    branch-4.0: [feat][iceberg] Support Iceberg Meta Procedure implementations 
#56257 (#56732)
    
    Cherry-picked from #56257
    
    Co-authored-by: Petrichor <[email protected]>
---
 .../action/IcebergCherrypickSnapshotAction.java    |  45 ++-
 .../iceberg/action/IcebergFastForwardAction.java   |  50 ++-
 .../action/IcebergRollbackToSnapshotAction.java    |  47 ++-
 .../action/IcebergRollbackToTimestampAction.java   |  54 ++-
 .../action/IcebergSetCurrentSnapshotAction.java    |  73 +++-
 .../action/test_iceberg_optimize_actions.out       |  67 ++++
 .../action/test_iceberg_optimize_actions.groovy    | 393 ++++++++++++++++++---
 7 files changed, 657 insertions(+), 72 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
index c0b96ab74a9..8a0e60e773d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
@@ -17,14 +17,21 @@
 
 package org.apache.doris.datasource.iceberg.action;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
 
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -41,8 +48,7 @@ public class IcebergCherrypickSnapshotAction extends 
BaseIcebergAction {
     public static final String SNAPSHOT_ID = "snapshot_id";
 
     public IcebergCherrypickSnapshotAction(Map<String, String> properties,
-            Optional<PartitionNamesInfo> partitionNamesInfo,
-            Optional<Expression> whereCondition,
+            Optional<PartitionNamesInfo> partitionNamesInfo, 
Optional<Expression> whereCondition,
             IcebergExternalTable icebergTable) {
         super("cherrypick_snapshot", properties, partitionNamesInfo, 
whereCondition, icebergTable);
     }
@@ -65,7 +71,38 @@ public class IcebergCherrypickSnapshotAction extends 
BaseIcebergAction {
 
     @Override
     protected List<String> executeAction(TableIf table) throws UserException {
-        throw new DdlException("Iceberg cherrypick_snapshot procedure is not 
implemented yet");
+        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+        Long sourceSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
+
+        try {
+            Snapshot targetSnapshot = icebergTable.snapshot(sourceSnapshotId);
+            if (targetSnapshot == null) {
+                throw new UserException("Snapshot not found in table");
+            }
+
+            
icebergTable.manageSnapshots().cherrypick(sourceSnapshotId).commit();
+            Snapshot currentSnapshot = icebergTable.currentSnapshot();
+
+            // invalid iceberg catalog table cache.
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) 
table);
+            return Lists.newArrayList(
+                    String.valueOf(sourceSnapshotId),
+                    String.valueOf(currentSnapshot.snapshotId()
+                    )
+            );
+
+        } catch (Exception e) {
+            throw new UserException("Failed to cherry-pick snapshot " + 
sourceSnapshotId + ": " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(new Column("source_snapshot_id", 
Type.BIGINT, false,
+                        "ID of the snapshot whose changes were cherry-picked 
into the current table state"),
+                new Column("current_snapshot_id", Type.BIGINT, false,
+                        "ID of the new snapshot created as a result of the 
cherry-pick operation, "
+                                + "now set as the current snapshot"));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
index 564b9858a51..93a9eccd8c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
@@ -17,14 +17,20 @@
 
 package org.apache.doris.datasource.iceberg.action;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
 
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Table;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -49,11 +55,11 @@ public class IcebergFastForwardAction extends 
BaseIcebergAction {
     protected void registerIcebergArguments() {
         // Register required arguments for branch and to
         namedArguments.registerRequiredArgument(BRANCH,
-                "Name of the target branch to fast-forward to",
+                "Name of the  branch to fast-forward to",
                 ArgumentParsers.nonEmptyString(BRANCH));
         namedArguments.registerRequiredArgument(TO,
-                "Target snapshot ID to fast-forward to",
-                ArgumentParsers.positiveLong(TO));
+                "Target branch  to fast-forward to",
+                ArgumentParsers.nonEmptyString(TO));
     }
 
     @Override
@@ -65,7 +71,41 @@ public class IcebergFastForwardAction extends 
BaseIcebergAction {
 
     @Override
     protected List<String> executeAction(TableIf table) throws UserException {
-        throw new DdlException("Iceberg fast_forward procedure is not 
implemented yet");
+        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+        String sourceBranch = namedArguments.getString(BRANCH);
+        String desBranch = namedArguments.getString(TO);
+
+        try {
+            Long snapshotBefore =
+                    icebergTable.snapshot(sourceBranch) != null ? 
icebergTable.snapshot(sourceBranch).snapshotId()
+                            : null;
+            icebergTable.manageSnapshots().fastForwardBranch(sourceBranch, 
desBranch).commit();
+            long snapshotAfter = 
icebergTable.snapshot(sourceBranch).snapshotId();
+            // invalid iceberg catalog table cache.
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) 
table);
+            return Lists.newArrayList(
+                    sourceBranch.trim(),
+                    String.valueOf(snapshotBefore),
+                    String.valueOf(snapshotAfter)
+            );
+
+        } catch (Exception e) {
+            throw new UserException(
+                    "Failed to fast-forward branch " + sourceBranch + " to 
snapshot " + desBranch + ": "
+                            + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+                new Column("branch_updated", Type.STRING, false,
+                        "Name of the branch that was fast-forwarded to match 
the target branch"),
+                new Column("previous_ref", Type.BIGINT, true,
+                        "Snapshot ID that the branch was pointing to before 
the fast-forward operation"),
+                new Column("updated_ref", Type.BIGINT, false,
+                        "Snapshot ID that the branch is pointing to after the 
fast-forward operation"));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
index bb788dcc27b..9cf135e46a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
@@ -17,14 +17,21 @@
 
 package org.apache.doris.datasource.iceberg.action;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
 
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -62,7 +69,43 @@ public class IcebergRollbackToSnapshotAction extends 
BaseIcebergAction {
 
     @Override
     protected List<String> executeAction(TableIf table) throws UserException {
-        throw new DdlException("Iceberg rollback_to_snapshot procedure is not 
implemented yet");
+        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+        Long targetSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
+
+        Snapshot targetSnapshot = icebergTable.snapshot(targetSnapshotId);
+        if (targetSnapshot == null) {
+            throw new UserException("Snapshot " + targetSnapshotId + " not 
found in table " + icebergTable.name());
+        }
+
+        try {
+            Snapshot previousSnapshot = icebergTable.currentSnapshot();
+            Long previousSnapshotId = previousSnapshot != null ? 
previousSnapshot.snapshotId() : null;
+            if (previousSnapshot != null && previousSnapshot.snapshotId() == 
targetSnapshotId) {
+                return Lists.newArrayList(
+                        String.valueOf(previousSnapshotId),
+                        String.valueOf(targetSnapshotId)
+                );
+            }
+            
icebergTable.manageSnapshots().rollbackTo(targetSnapshotId).commit();
+            // invalid iceberg catalog table cache.
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) 
table);
+            return Lists.newArrayList(
+                    String.valueOf(previousSnapshotId),
+                    String.valueOf(targetSnapshotId)
+            );
+
+        } catch (Exception e) {
+            throw new UserException("Failed to rollback to snapshot " + 
targetSnapshotId + ": " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+                new Column("previous_snapshot_id", Type.BIGINT, false,
+                        "ID of the snapshot that was current before the 
rollback operation"),
+                new Column("current_snapshot_id", Type.BIGINT, false,
+                        "ID of the snapshot that is now current after rolling 
back to the specified snapshot"));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
index 5f20e5b3323..b478d078349 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
@@ -17,13 +17,22 @@
 
 package org.apache.doris.datasource.iceberg.action;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
 
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+import java.time.format.DateTimeFormatter;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -34,6 +43,7 @@ import java.util.Optional;
  * at a specific timestamp.
  */
 public class IcebergRollbackToTimestampAction extends BaseIcebergAction {
+    private static final DateTimeFormatter DATETIME_MS_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
     public static final String TIMESTAMP = "timestamp";
 
     public IcebergRollbackToTimestampAction(Map<String, String> properties,
@@ -48,7 +58,7 @@ public class IcebergRollbackToTimestampAction extends 
BaseIcebergAction {
         // Create a custom timestamp parser that supports both ISO datetime and
         // millisecond formats
         namedArguments.registerRequiredArgument(TIMESTAMP,
-                "A timestamp to rollback to (ISO datetime 
'yyyy-MM-ddTHH:mm:ss' or milliseconds since epoch)",
+                "A timestamp to rollback to (formats: 'yyyy-MM-dd 
HH:mm:ss.SSS' or milliseconds since epoch)",
                 value -> {
                     if (value == null || value.trim().isEmpty()) {
                         throw new IllegalArgumentException("timestamp cannot 
be empty");
@@ -64,14 +74,13 @@ public class IcebergRollbackToTimestampAction extends 
BaseIcebergAction {
                         }
                         return trimmed;
                     } catch (NumberFormatException e) {
-                        // If not a number, try as ISO datetime format
+                        // Second attempt: Parse as ISO datetime format 
(yyyy-MM-dd HH:mm:ss.SSS)
                         try {
-                            java.time.LocalDateTime.parse(trimmed,
-                                    
java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+                            java.time.LocalDateTime.parse(trimmed, 
DATETIME_MS_FORMAT);
                             return trimmed;
                         } catch (java.time.format.DateTimeParseException dte) {
                             throw new IllegalArgumentException("Invalid 
timestamp format. Expected ISO datetime "
-                                    + "(yyyy-MM-ddTHH:mm:ss) or timestamp in 
milliseconds: " + trimmed);
+                                    + "(yyyy-MM-dd HH:mm:ss.SSS) or timestamp 
in milliseconds: " + trimmed);
                         }
                     }
                 });
@@ -87,7 +96,38 @@ public class IcebergRollbackToTimestampAction extends 
BaseIcebergAction {
 
     @Override
     protected List<String> executeAction(TableIf table) throws UserException {
-        throw new DdlException("Iceberg rollback_to_timestamp procedure is not 
implemented yet");
+        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+        String timestampStr = namedArguments.getString(TIMESTAMP);
+
+        Snapshot previousSnapshot = icebergTable.currentSnapshot();
+        Long previousSnapshotId = previousSnapshot != null ? 
previousSnapshot.snapshotId() : null;
+
+        try {
+            long targetTimestamp = TimeUtils.msTimeStringToLong(timestampStr, 
TimeUtils.getTimeZone());
+            
icebergTable.manageSnapshots().rollbackToTime(targetTimestamp).commit();
+
+            Snapshot currentSnapshot = icebergTable.currentSnapshot();
+            Long currentSnapshotId = currentSnapshot != null ? 
currentSnapshot.snapshotId() : null;
+            // invalid iceberg catalog table cache.
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) 
table);
+            return Lists.newArrayList(
+                    String.valueOf(previousSnapshotId),
+                    String.valueOf(currentSnapshotId)
+            );
+
+        } catch (Exception e) {
+            throw new UserException("Failed to rollback to timestamp " + 
timestampStr + ": " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+                new Column("previous_snapshot_id", Type.BIGINT, false,
+                        "ID of the snapshot that was current before the 
rollback operation"),
+                new Column("current_snapshot_id", Type.BIGINT, false,
+                        "ID of the snapshot that was current at the specified 
timestamp and is now set as current"));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
index c356076de13..cd663c3bf24 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
@@ -17,15 +17,22 @@
 
 package org.apache.doris.datasource.iceberg.action;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
 
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -81,7 +88,69 @@ public class IcebergSetCurrentSnapshotAction extends 
BaseIcebergAction {
 
     @Override
     protected List<String> executeAction(TableIf table) throws UserException {
-        throw new DdlException("Iceberg set_current_snapshot procedure is not 
implemented yet");
+        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+        Snapshot previousSnapshot = icebergTable.currentSnapshot();
+        Long previousSnapshotId = previousSnapshot != null ? 
previousSnapshot.snapshotId() : null;
+
+        Long targetSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
+        String ref = namedArguments.getString(REF);
+
+        try {
+            if (targetSnapshotId != null) {
+                Snapshot targetSnapshot = 
icebergTable.snapshot(targetSnapshotId);
+                if (targetSnapshot == null) {
+                    throw new UserException(
+                            "Snapshot " + targetSnapshotId + " not found in 
table " + icebergTable.name());
+                }
+
+                if (previousSnapshot != null && previousSnapshot.snapshotId() 
== targetSnapshotId) {
+                    return Lists.newArrayList(
+                            String.valueOf(previousSnapshotId),
+                            String.valueOf(targetSnapshotId)
+                    );
+                }
+
+                
icebergTable.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();
+
+            } else if (ref != null) {
+                Snapshot refSnapshot = icebergTable.snapshot(ref);
+                if (refSnapshot == null) {
+                    throw new UserException("Reference '" + ref + "' not found 
in table " + icebergTable.name());
+                }
+                targetSnapshotId = refSnapshot.snapshotId();
+
+                if (previousSnapshot != null && previousSnapshot.snapshotId() 
== targetSnapshotId) {
+                    return Lists.newArrayList(
+                            String.valueOf(previousSnapshotId),
+                            String.valueOf(targetSnapshotId)
+                    );
+                }
+
+                
icebergTable.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();
+            }
+
+            // invalid iceberg catalog table cache.
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) 
table);
+            return Lists.newArrayList(
+                    String.valueOf(previousSnapshotId),
+                    String.valueOf(targetSnapshotId)
+            );
+
+        } catch (Exception e) {
+            String target = targetSnapshotId != null ? "snapshot " + 
targetSnapshotId : "reference '" + ref + "'";
+            throw new UserException("Failed to set current snapshot to " + 
target + ": " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+                new Column("previous_snapshot_id", Type.BIGINT, false,
+                        "ID of the snapshot that was current before setting 
the new current snapshot"),
+                new Column("current_snapshot_id", Type.BIGINT, false,
+                        "ID of the snapshot that is now set as the current 
snapshot "
+                                + "(from snapshot_id parameter or resolved 
from ref parameter)"));
     }
 
     @Override
diff --git 
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
index c9633434869..ecbe09bc0d3 100644
--- 
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
+++ 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
@@ -1,4 +1,71 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !before_rollback_to_snapshot --
+1      v1.0    2024-01-01T10:00
+2      v1.1    2024-01-02T11:00
+3      v1.2    2024-01-03T12:00
+
+-- !after_rollback_to_snapshot --
+1      v1.0    2024-01-01T10:00
+
+-- !before_rollback_to_timestamp --
+1      v1.0    2025-01-01T10:00
+2      v1.1    2025-01-02T11:00
+3      v1.2    2025-01-03T12:00
+
+-- !after_rollback_to_timestamp --
+1      v1.0    2025-01-01T10:00
+2      v1.1    2025-01-02T11:00
+
+-- !before_set_current_snapshot_by_snapshotid --
+1      content1
+2      content2
+3      content3
+4      content4
+
+-- !after_set_current_snapshot_by_snapshotid --
+1      content1
+
+-- !before_set_current_snapshot_by_branch --
+1      content1
+
+-- !after_set_current_snapshot_by_branch --
+1      content1
+2      content2
+
+-- !before_set_current_snapshot_by_tag --
+1      content1
+2      content2
+
+-- !after_set_current_snapshot_by_tag --
+1      content1
+2      content2
+3      content3
+
+-- !before_cherrypick_snapshot --
+1      data1   1
+2      data2   2
+3      data3   3
+
+-- !rollback_snapshot --
+1      data1   1
+
+-- !after_cherrypick_snapshot --
+1      data1   1
+3      data3   3
+
+-- !before_test_fast_forward --
+1      record1 100
+2      record2 200
+3      record3 300
+
+-- !before_fast_forword_branch --
+1      record1 100
+
+-- !after_fast_forword_branch --
+1      record1 100
+2      record2 200
+3      record3 300
+
 -- !test_rewrite_data_files_results --
 0      1       2       3
 
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
index e8ce3d62e31..0cd818fe175 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
@@ -15,7 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeFormatterBuilder
+import java.time.temporal.ChronoField
+import java.time.LocalDateTime
+import java.time.ZoneId
+
 suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,external_docker_doris") {
+    DateTimeFormatter unifiedFormatter = new DateTimeFormatterBuilder()
+            .appendPattern("yyyy-MM-dd")
+            .optionalStart()
+            .appendLiteral('T')
+            .optionalEnd()
+            .optionalStart()
+            .appendLiteral(' ')
+            .optionalEnd()
+            .appendPattern("HH:mm:ss")
+            .optionalStart()
+            .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 3, true)
+            .optionalEnd()
+            .toFormatter()
 
     String enabled = context.config.otherConfigs.get("enableIcebergTest")
     if (enabled == null || !enabled.equalsIgnoreCase("true")) {
@@ -41,53 +60,332 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
     );"""
 
     sql """switch ${catalog_name}"""
+    sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
     sql """use ${db_name}"""
     def table_name = "test_iceberg_systable_partitioned"
 
-    // Test rollback_to_snapshot action
-    test {
-        sql """
-            OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
-            PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" = 
"123456789")
-        """
-        exception "Iceberg rollback_to_snapshot procedure is not implemented 
yet"
-    }
+    sql """drop table if exists ${db_name}.test_fast_forward"""
+    sql """
+        CREATE TABLE ${db_name}.test_fast_forward (
+        id BIGINT,
+        name STRING,
+        value INT
+    ) ENGINE=iceberg;
+    """
+    sql """
+    INSERT INTO ${db_name}.test_fast_forward VALUES
+    (1, 'record1', 100);
+    """
+    sql """
+    ALTER TABLE ${db_name}.test_fast_forward CREATE BRANCH feature_branch;
+    """
+    sql """INSERT INTO ${db_name}.test_fast_forward VALUES
+    (2, 'record2', 200);"""
+    sql """ALTER TABLE ${db_name}.test_fast_forward CREATE TAG feature_tag;"""
+    sql """
+    INSERT INTO ${db_name}.test_fast_forward VALUES
+    (3, 'record3', 300);
+    """
 
-    // Test rollback_to_timestamp action  
-    test {
-        sql """
-            OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
-            PROPERTIES("action" = "rollback_to_timestamp", "timestamp" = 
"2024-01-01T00:00:00")
-        """
-        exception "Iceberg rollback_to_timestamp procedure is not implemented 
yet"
-    }
+    sql """DROP TABLE IF EXISTS ${db_name}.test_cherrypick"""
 
-    // Test set_current_snapshot action
-    test {
-        sql """
-            OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
-            PROPERTIES("action" = "set_current_snapshot", "snapshot_id" = 
"987654321")
-        """
-        exception "Iceberg set_current_snapshot procedure is not implemented 
yet"
-    }
+    sql """
+        CREATE TABLE ${db_name}.test_cherrypick (
+            id BIGINT,
+            data STRING,
+            status INT
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${db_name}.test_cherrypick VALUES
+        (1, 'data1', 1)
+    """
+    sql """
+        INSERT INTO ${db_name}.test_cherrypick VALUES
+        (2, 'data2', 2)
+    """
+    sql """
+        INSERT INTO ${db_name}.test_cherrypick VALUES
+        (3, 'data3', 3)
+    """
+    logger.info("test_cherrypick table setup completed with 3 incremental 
snapshots")
 
-    // Test cherrypick_snapshot action
-    test {
-        sql """
-            OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
-            PROPERTIES("action" = "cherrypick_snapshot", "snapshot_id" = 
"555666777")
-        """
-        exception "Iceberg cherrypick_snapshot procedure is not implemented 
yet"
-    }
+    logger.info("Creating test_rollback table for rollback_to_snapshot 
testing")
+
+    sql """DROP TABLE IF EXISTS ${db_name}.test_rollback"""
+    sql """
+        CREATE TABLE ${db_name}.test_rollback (
+            id BIGINT,
+            version STRING,
+            timestamp datetime
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${db_name}.test_rollback VALUES
+        (1, 'v1.0', '2024-01-01 10:00:00')
+    """
+    sql """
+        INSERT INTO ${db_name}.test_rollback VALUES
+        (2, 'v1.1', '2024-01-02 11:00:00')
+    """
+    sql """
+        INSERT INTO ${db_name}.test_rollback VALUES
+        (3, 'v1.2', '2024-01-03 12:00:00')
+    """
+
+    logger.info("test_rollback table setup completed with 3 version snapshots")
+
+    logger.info("Creating test_rollback_timestamp table for timestamp-based 
rollback testing")
+
+    sql """DROP TABLE IF EXISTS ${db_name}.test_rollback_timestamp"""
+
+    sql """
+        CREATE TABLE ${db_name}.test_rollback_timestamp (
+            id BIGINT,
+            version STRING,
+            timestamp datetime
+        ) ENGINE=iceberg
+    """
+
+    sql """
+        INSERT INTO ${db_name}.test_rollback_timestamp VALUES
+        (1, 'v1.0', '2025-01-01 10:00:00')
+    """
+    sql """
+        INSERT INTO ${db_name}.test_rollback_timestamp VALUES
+        (2, 'v1.1', '2025-01-02 11:00:00')
+    """
+    sql """
+        INSERT INTO ${db_name}.test_rollback_timestamp VALUES
+        (3, 'v1.2', '2025-01-03 12:00:00')
+    """
+    logger.info("test_rollback_timestamp table setup completed with 
future-dated snapshots")
+
+    logger.info("Creating test_current_snapshot table for set_current_snapshot 
testing")
+
+    sql """DROP TABLE IF EXISTS ${db_name}.test_current_snapshot"""
+
+    sql """
+        CREATE TABLE ${db_name}.test_current_snapshot (
+            id BIGINT,
+            content STRING
+        ) ENGINE=iceberg
+    """
+
+    sql """
+        INSERT INTO ${db_name}.test_current_snapshot VALUES
+        (1, 'content1')
+    """
+    sql """
+        INSERT INTO ${db_name}.test_current_snapshot VALUES
+        (2, 'content2')
+    """
+    sql """
+        ALTER TABLE ${db_name}.test_current_snapshot CREATE BRANCH dev_branch
+    """
+    sql """
+        INSERT INTO ${db_name}.test_current_snapshot VALUES
+        (3, 'content3')
+    """
+    sql """
+        ALTER TABLE ${db_name}.test_current_snapshot CREATE TAG dev_tag
+    """
+    // Insert final content record - latest main state
+    sql """
+        INSERT INTO ${db_name}.test_current_snapshot VALUES
+        (4, 'content4')
+    """
+
+    logger.info("test_current_snapshot table setup completed with 4 snapshots, 
1 branch, and 1 tag")
+
+    // 
=====================================================================================
+    // Test Case 1: rollback_to_snapshot action
+    // Tests the ability to rollback a table to a specific historical snapshot
+    // 
=====================================================================================
+    logger.info("Starting rollback_to_snapshot test case")
+
+    // Capture table state before rollback operation
+    qt_before_rollback_to_snapshot """SELECT * FROM test_rollback ORDER BY 
id"""
+
+    // Retrieve all available snapshots for the rollback test table
+    List<List<Object>> rollbackSnapshotList = sql """
+        SELECT committed_at, snapshot_id FROM test_rollback\$snapshots ORDER 
BY committed_at
+    """
+    logger.info("Available snapshots for rollback test: 
${rollbackSnapshotList}")
+
+    // Validate snapshot data structure and count
+    assertTrue(rollbackSnapshotList.size() == 3, "Expected exactly 3 snapshots 
for rollback test")
+    assertTrue(rollbackSnapshotList[0].size() == 2, "Invalid snapshot metadata 
structure")
+
+    // Extract snapshot IDs for test operations
+    String rollbackEarliestSnapshotId = rollbackSnapshotList[0][1]  // 
First/oldest snapshot
+    String rollbackLatestSnapshotId = rollbackSnapshotList[2][1]    // 
Last/newest snapshot
+
+    // Execute rollback to the earliest snapshot
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_rollback
+        PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" = 
"${rollbackEarliestSnapshotId}")
+    """
+    qt_after_rollback_to_snapshot """SELECT * FROM test_rollback ORDER BY id"""
+
+    // 
=====================================================================================
+    // Test Case 2: rollback_to_timestamp action
+    // Tests the ability to rollback a table to a specific point in time using 
timestamps
+    // 
=====================================================================================
+    logger.info("Starting rollback_to_timestamp test case")
+
+    // Capture table state before timestamp-based rollback
+    qt_before_rollback_to_timestamp """SELECT * FROM test_rollback_timestamp 
ORDER BY id"""
+
+    // Retrieve snapshots ordered by timestamp (newest first) for timestamp 
rollback test
+    List<List<Object>> timestampSnapshotList = sql """
+        SELECT committed_at, snapshot_id FROM 
test_rollback_timestamp\$snapshots ORDER BY committed_at DESC
+    """
+    logger.info("Snapshot timeline for timestamp rollback: 
${timestampSnapshotList}")
+
+    // Validate snapshot availability for timestamp test
+    assertTrue(timestampSnapshotList.size() == 3, "Expected exactly 3 
snapshots for timestamp test")
+    assertTrue(timestampSnapshotList[0].size() == 2, "Invalid timestamp 
snapshot structure")
+
+    // Extract and format timestamp for rollback operation
+    String latestCommittedTime = timestampSnapshotList[0][0]
+
+    // Convert timestamp to required format for rollback operation
+    DateTimeFormatter outputFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+    LocalDateTime dateTime = LocalDateTime.parse(latestCommittedTime, 
unifiedFormatter)
+    String formattedSnapshotTime = 
dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter)
+
+    // Execute timestamp-based rollback
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_rollback_timestamp
+        PROPERTIES("action" = "rollback_to_timestamp", "timestamp" = 
"${formattedSnapshotTime}")
+    """
+    qt_after_rollback_to_timestamp """SELECT * FROM test_rollback_timestamp 
ORDER BY id"""
+
+
+    // 
=====================================================================================
+    // Test Case 3: set_current_snapshot action
+    // Tests setting the current snapshot using snapshot ID and 
reference-based approaches
+    // 
=====================================================================================
+    logger.info("Starting set_current_snapshot test case")
+
+    // Test setting current snapshot by snapshot ID
+    qt_before_set_current_snapshot_by_snapshotid """SELECT * FROM 
test_current_snapshot ORDER BY id"""
+
+    // Retrieve available snapshots for current snapshot test
+    List<List<Object>> currentSnapshotList = sql """
+        SELECT committed_at, snapshot_id FROM test_current_snapshot\$snapshots 
ORDER BY committed_at
+    """
+    logger.info("Available snapshots for current snapshot test: 
${currentSnapshotList}")
+
+    // Validate snapshot data for current snapshot test
+    assertTrue(currentSnapshotList.size() == 4, "Expected exactly 4 snapshots 
for current snapshot test")
+    assertTrue(currentSnapshotList[0].size() == 2, "Invalid current snapshot 
metadata structure")
+
+    String targetCurrentSnapshotId = currentSnapshotList[0][1]  // Select 
first snapshot as target
+
+    // Execute set current snapshot by snapshot ID
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_current_snapshot
+        PROPERTIES("action" = "set_current_snapshot", "snapshot_id" = 
"${targetCurrentSnapshotId}")
+    """
+    qt_after_set_current_snapshot_by_snapshotid """SELECT * FROM 
test_current_snapshot ORDER BY id"""
+
+    // Verify reference structure after snapshot change
+    List<List<Object>> currentSnapshotRefs = sql """
+        SELECT name, type FROM test_current_snapshot\$refs ORDER BY snapshot_id
+    """
+    logger.info("References after current snapshot change: 
${currentSnapshotRefs}")
+    assertTrue(currentSnapshotRefs.size() == 3, "Expected exactly 3 references 
after snapshot change")
+
+    // Test setting current snapshot by branch reference
+    qt_before_set_current_snapshot_by_branch """SELECT * FROM 
test_current_snapshot ORDER BY id"""
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_current_snapshot
+        PROPERTIES("action" = "set_current_snapshot", "ref" = "dev_branch")
+    """
+    qt_after_set_current_snapshot_by_branch """SELECT * FROM 
test_current_snapshot ORDER BY id"""
+
+    // Test setting current snapshot by tag reference
+    qt_before_set_current_snapshot_by_tag """SELECT * FROM 
test_current_snapshot ORDER BY id"""
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_current_snapshot
+        PROPERTIES("action" = "set_current_snapshot", "ref" = "dev_tag")
+    """
+    qt_after_set_current_snapshot_by_tag """SELECT * FROM 
test_current_snapshot ORDER BY id"""
+
+    // 
=====================================================================================
+    // Test Case 4: cherrypick_snapshot action
+    // Tests selective application of changes from a specific snapshot 
(cherrypick operation)
+    // 
=====================================================================================
+    logger.info("Starting cherrypick_snapshot test case")
+
+    // Capture initial state before cherrypick operations
+    qt_before_cherrypick_snapshot """SELECT * FROM test_cherrypick ORDER BY 
id"""
+
+    // Retrieve snapshots for cherrypick test scenario
+    List<List<Object>> cherrypickSnapshotList = sql """
+        SELECT committed_at, snapshot_id FROM test_cherrypick\$snapshots ORDER 
BY committed_at
+    """
+    logger.info("Available snapshots for cherrypick test: 
${cherrypickSnapshotList}")
+
+    // Validate cherrypick test data structure
+    assertTrue(cherrypickSnapshotList.size() == 3, "Expected exactly 3 
snapshots for cherrypick test")
+    assertTrue(cherrypickSnapshotList[0].size() == 2, "Invalid cherrypick 
snapshot structure")
+
+    String cherrypickEarliestSnapshotId = cherrypickSnapshotList[0][1]  // 
First snapshot for rollback
+    String cherrypickLatestSnapshotId = cherrypickSnapshotList[2][1]    // 
Last snapshot for cherrypick
+
+    // Step 1: Rollback to earliest snapshot to create test scenario
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_cherrypick
+        PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" = 
"${cherrypickEarliestSnapshotId}")
+    """
+    qt_rollback_snapshot """SELECT * FROM test_cherrypick ORDER BY id"""
+
+    // Step 2: Cherrypick changes from the latest snapshot
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_cherrypick
+        PROPERTIES("action" = "cherrypick_snapshot", "snapshot_id" = 
"${cherrypickLatestSnapshotId}")
+    """
+    qt_after_cherrypick_snapshot """SELECT * FROM test_cherrypick ORDER BY 
id"""
+
+
+    // 
=====================================================================================
+    // Test Case 5: fast_forward action
+    // Tests fast-forward operations for branch synchronization in Iceberg 
tables
+    // 
=====================================================================================
+    logger.info("Starting fast_forward action test case")
+
+    // Capture state before fast-forward operations
+    qt_before_test_fast_forward """SELECT * FROM test_fast_forward ORDER BY 
id"""
+
+    // Retrieve snapshot timeline for fast-forward test (newest first)
+    List<List<Object>> fastForwardSnapshotList = sql """
+        SELECT committed_at, snapshot_id FROM test_fast_forward\$snapshots 
ORDER BY committed_at DESC
+    """
+    logger.info("Snapshot timeline for fast-forward test: 
${fastForwardSnapshotList}")
+
+    // Validate fast-forward test data
+    assertTrue(fastForwardSnapshotList.size() == 3, "Expected exactly 3 
snapshots for fast-forward test")
+    assertTrue(fastForwardSnapshotList[0].size() == 2, "Invalid fast-forward 
snapshot structure")
+
+    // Verify available references for fast-forward operations
+    List<List<Object>> fastForwardRefs = sql """
+        SELECT name, type FROM test_fast_forward\$refs ORDER BY snapshot_id
+    """
+    logger.info("Available references for fast-forward: ${fastForwardRefs}")
+    assertTrue(fastForwardRefs.size() == 3, "Expected exactly 3 references for 
fast-forward test")
+
+    // Test fast-forward from feature branch to main branch
+    qt_before_fast_forword_branch """SELECT * FROM 
test_fast_forward@branch(feature_branch) ORDER BY id"""
+
+    sql """
+        OPTIMIZE TABLE ${catalog_name}.${db_name}.test_fast_forward
+        PROPERTIES("action" = "fast_forward", "branch" = "feature_branch", 
"to" = "main")
+    """
+    qt_after_fast_forword_branch """SELECT * FROM 
test_fast_forward@branch(feature_branch) ORDER BY id"""
 
-    // Test fast_forward action with branch
-    test {
-        sql """
-            OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
-            PROPERTIES("action" = "fast_forward", "branch" = "main", "to" = 
"111222333")
-        """
-        exception "Iceberg fast_forward procedure is not implemented yet"
-    }
 
     // Test expire_snapshots action
     test {
@@ -276,15 +574,6 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
         exception "Invalid target-file-size-bytes format: not-a-number"
     }
 
-    // Test set_current_snapshot with ref parameter
-    test {
-        sql """
-            OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
-            PROPERTIES("action" = "set_current_snapshot", "ref" = "main")
-        """
-        exception "Iceberg set_current_snapshot procedure is not implemented 
yet"
-    }
-
     // Test set_current_snapshot with both snapshot_id and ref
     test {
         sql """
@@ -309,7 +598,7 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
             OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
             PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" = 
"9223372036854775807")
         """
-        exception "Iceberg rollback_to_snapshot procedure is not implemented 
yet"
+        exception "Snapshot 9223372036854775807 not found in table"
     }
 
     // Test snapshot_id exceeding Long.MAX_VALUE
@@ -327,7 +616,7 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
             OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
             PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" = "  
123456789  ")
         """
-        exception "Iceberg rollback_to_snapshot procedure is not implemented 
yet"
+        exception "Snapshot 123456789 not found in table"
     }
 
     // Test case sensitivity in action names
@@ -336,6 +625,6 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
             OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
             PROPERTIES("action" = "ROLLBACK_TO_SNAPSHOT", "snapshot_id" = 
"123456789")
         """
-        exception "Iceberg rollback_to_snapshot procedure is not implemented 
yet"
+        exception "Snapshot 123456789 not found in table"
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to