This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 325d32c5959 branch-4.0: [feat][iceberg] Support Iceberg Meta Procedure
implementations #56257 (#56732)
325d32c5959 is described below
commit 325d32c5959958195e5ae29692f691379ba713ff
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 11 18:44:01 2025 +0800
branch-4.0: [feat][iceberg] Support Iceberg Meta Procedure implementations
#56257 (#56732)
Cherry-picked from #56257
Co-authored-by: Petrichor <[email protected]>
---
.../action/IcebergCherrypickSnapshotAction.java | 45 ++-
.../iceberg/action/IcebergFastForwardAction.java | 50 ++-
.../action/IcebergRollbackToSnapshotAction.java | 47 ++-
.../action/IcebergRollbackToTimestampAction.java | 54 ++-
.../action/IcebergSetCurrentSnapshotAction.java | 73 +++-
.../action/test_iceberg_optimize_actions.out | 67 ++++
.../action/test_iceberg_optimize_actions.groovy | 393 ++++++++++++++++++---
7 files changed, 657 insertions(+), 72 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
index c0b96ab74a9..8a0e60e773d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java
@@ -17,14 +17,21 @@
package org.apache.doris.datasource.iceberg.action;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -41,8 +48,7 @@ public class IcebergCherrypickSnapshotAction extends
BaseIcebergAction {
public static final String SNAPSHOT_ID = "snapshot_id";
public IcebergCherrypickSnapshotAction(Map<String, String> properties,
- Optional<PartitionNamesInfo> partitionNamesInfo,
- Optional<Expression> whereCondition,
+ Optional<PartitionNamesInfo> partitionNamesInfo,
Optional<Expression> whereCondition,
IcebergExternalTable icebergTable) {
super("cherrypick_snapshot", properties, partitionNamesInfo,
whereCondition, icebergTable);
}
@@ -65,7 +71,38 @@ public class IcebergCherrypickSnapshotAction extends
BaseIcebergAction {
@Override
protected List<String> executeAction(TableIf table) throws UserException {
- throw new DdlException("Iceberg cherrypick_snapshot procedure is not
implemented yet");
+ Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+ Long sourceSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
+
+ try {
+ Snapshot targetSnapshot = icebergTable.snapshot(sourceSnapshotId);
+ if (targetSnapshot == null) {
+ throw new UserException("Snapshot not found in table");
+ }
+
+
icebergTable.manageSnapshots().cherrypick(sourceSnapshotId).commit();
+ Snapshot currentSnapshot = icebergTable.currentSnapshot();
+
+ // invalid iceberg catalog table cache.
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable)
table);
+ return Lists.newArrayList(
+ String.valueOf(sourceSnapshotId),
+ String.valueOf(currentSnapshot.snapshotId()
+ )
+ );
+
+ } catch (Exception e) {
+ throw new UserException("Failed to cherry-pick snapshot " +
sourceSnapshotId + ": " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(new Column("source_snapshot_id",
Type.BIGINT, false,
+ "ID of the snapshot whose changes were cherry-picked
into the current table state"),
+ new Column("current_snapshot_id", Type.BIGINT, false,
+ "ID of the new snapshot created as a result of the
cherry-pick operation, "
+ + "now set as the current snapshot"));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
index 564b9858a51..93a9eccd8c6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java
@@ -17,14 +17,20 @@
package org.apache.doris.datasource.iceberg.action;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Table;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -49,11 +55,11 @@ public class IcebergFastForwardAction extends
BaseIcebergAction {
protected void registerIcebergArguments() {
// Register required arguments for branch and to
namedArguments.registerRequiredArgument(BRANCH,
- "Name of the target branch to fast-forward to",
+ "Name of the branch to fast-forward to",
ArgumentParsers.nonEmptyString(BRANCH));
namedArguments.registerRequiredArgument(TO,
- "Target snapshot ID to fast-forward to",
- ArgumentParsers.positiveLong(TO));
+ "Target branch to fast-forward to",
+ ArgumentParsers.nonEmptyString(TO));
}
@Override
@@ -65,7 +71,41 @@ public class IcebergFastForwardAction extends
BaseIcebergAction {
@Override
protected List<String> executeAction(TableIf table) throws UserException {
- throw new DdlException("Iceberg fast_forward procedure is not
implemented yet");
+ Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+ String sourceBranch = namedArguments.getString(BRANCH);
+ String desBranch = namedArguments.getString(TO);
+
+ try {
+ Long snapshotBefore =
+ icebergTable.snapshot(sourceBranch) != null ?
icebergTable.snapshot(sourceBranch).snapshotId()
+ : null;
+ icebergTable.manageSnapshots().fastForwardBranch(sourceBranch,
desBranch).commit();
+ long snapshotAfter =
icebergTable.snapshot(sourceBranch).snapshotId();
+ // invalid iceberg catalog table cache.
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable)
table);
+ return Lists.newArrayList(
+ sourceBranch.trim(),
+ String.valueOf(snapshotBefore),
+ String.valueOf(snapshotAfter)
+ );
+
+ } catch (Exception e) {
+ throw new UserException(
+ "Failed to fast-forward branch " + sourceBranch + " to
snapshot " + desBranch + ": "
+ + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("branch_updated", Type.STRING, false,
+ "Name of the branch that was fast-forwarded to match
the target branch"),
+ new Column("previous_ref", Type.BIGINT, true,
+ "Snapshot ID that the branch was pointing to before
the fast-forward operation"),
+ new Column("updated_ref", Type.BIGINT, false,
+ "Snapshot ID that the branch is pointing to after the
fast-forward operation"));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
index bb788dcc27b..9cf135e46a7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java
@@ -17,14 +17,21 @@
package org.apache.doris.datasource.iceberg.action;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -62,7 +69,43 @@ public class IcebergRollbackToSnapshotAction extends
BaseIcebergAction {
@Override
protected List<String> executeAction(TableIf table) throws UserException {
- throw new DdlException("Iceberg rollback_to_snapshot procedure is not
implemented yet");
+ Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+ Long targetSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
+
+ Snapshot targetSnapshot = icebergTable.snapshot(targetSnapshotId);
+ if (targetSnapshot == null) {
+ throw new UserException("Snapshot " + targetSnapshotId + " not
found in table " + icebergTable.name());
+ }
+
+ try {
+ Snapshot previousSnapshot = icebergTable.currentSnapshot();
+ Long previousSnapshotId = previousSnapshot != null ?
previousSnapshot.snapshotId() : null;
+ if (previousSnapshot != null && previousSnapshot.snapshotId() ==
targetSnapshotId) {
+ return Lists.newArrayList(
+ String.valueOf(previousSnapshotId),
+ String.valueOf(targetSnapshotId)
+ );
+ }
+
icebergTable.manageSnapshots().rollbackTo(targetSnapshotId).commit();
+ // invalid iceberg catalog table cache.
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable)
table);
+ return Lists.newArrayList(
+ String.valueOf(previousSnapshotId),
+ String.valueOf(targetSnapshotId)
+ );
+
+ } catch (Exception e) {
+ throw new UserException("Failed to rollback to snapshot " +
targetSnapshotId + ": " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("previous_snapshot_id", Type.BIGINT, false,
+ "ID of the snapshot that was current before the
rollback operation"),
+ new Column("current_snapshot_id", Type.BIGINT, false,
+ "ID of the snapshot that is now current after rolling
back to the specified snapshot"));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
index 5f20e5b3323..b478d078349 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java
@@ -17,13 +17,22 @@
package org.apache.doris.datasource.iceberg.action;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -34,6 +43,7 @@ import java.util.Optional;
* at a specific timestamp.
*/
public class IcebergRollbackToTimestampAction extends BaseIcebergAction {
+ private static final DateTimeFormatter DATETIME_MS_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static final String TIMESTAMP = "timestamp";
public IcebergRollbackToTimestampAction(Map<String, String> properties,
@@ -48,7 +58,7 @@ public class IcebergRollbackToTimestampAction extends
BaseIcebergAction {
// Create a custom timestamp parser that supports both ISO datetime and
// millisecond formats
namedArguments.registerRequiredArgument(TIMESTAMP,
- "A timestamp to rollback to (ISO datetime
'yyyy-MM-ddTHH:mm:ss' or milliseconds since epoch)",
+ "A timestamp to rollback to (formats: 'yyyy-MM-dd
HH:mm:ss.SSS' or milliseconds since epoch)",
value -> {
if (value == null || value.trim().isEmpty()) {
throw new IllegalArgumentException("timestamp cannot
be empty");
@@ -64,14 +74,13 @@ public class IcebergRollbackToTimestampAction extends
BaseIcebergAction {
}
return trimmed;
} catch (NumberFormatException e) {
- // If not a number, try as ISO datetime format
+ // Second attempt: Parse as ISO datetime format
(yyyy-MM-dd HH:mm:ss.SSS)
try {
- java.time.LocalDateTime.parse(trimmed,
-
java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+ java.time.LocalDateTime.parse(trimmed,
DATETIME_MS_FORMAT);
return trimmed;
} catch (java.time.format.DateTimeParseException dte) {
throw new IllegalArgumentException("Invalid
timestamp format. Expected ISO datetime "
- + "(yyyy-MM-ddTHH:mm:ss) or timestamp in
milliseconds: " + trimmed);
+ + "(yyyy-MM-dd HH:mm:ss.SSS) or timestamp
in milliseconds: " + trimmed);
}
}
});
@@ -87,7 +96,38 @@ public class IcebergRollbackToTimestampAction extends
BaseIcebergAction {
@Override
protected List<String> executeAction(TableIf table) throws UserException {
- throw new DdlException("Iceberg rollback_to_timestamp procedure is not
implemented yet");
+ Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+ String timestampStr = namedArguments.getString(TIMESTAMP);
+
+ Snapshot previousSnapshot = icebergTable.currentSnapshot();
+ Long previousSnapshotId = previousSnapshot != null ?
previousSnapshot.snapshotId() : null;
+
+ try {
+ long targetTimestamp = TimeUtils.msTimeStringToLong(timestampStr,
TimeUtils.getTimeZone());
+
icebergTable.manageSnapshots().rollbackToTime(targetTimestamp).commit();
+
+ Snapshot currentSnapshot = icebergTable.currentSnapshot();
+ Long currentSnapshotId = currentSnapshot != null ?
currentSnapshot.snapshotId() : null;
+ // invalid iceberg catalog table cache.
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable)
table);
+ return Lists.newArrayList(
+ String.valueOf(previousSnapshotId),
+ String.valueOf(currentSnapshotId)
+ );
+
+ } catch (Exception e) {
+ throw new UserException("Failed to rollback to timestamp " +
timestampStr + ": " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("previous_snapshot_id", Type.BIGINT, false,
+ "ID of the snapshot that was current before the
rollback operation"),
+ new Column("current_snapshot_id", Type.BIGINT, false,
+ "ID of the snapshot that was current at the specified
timestamp and is now set as current"));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
index c356076de13..cd663c3bf24 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergSetCurrentSnapshotAction.java
@@ -17,15 +17,22 @@
package org.apache.doris.datasource.iceberg.action;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -81,7 +88,69 @@ public class IcebergSetCurrentSnapshotAction extends
BaseIcebergAction {
@Override
protected List<String> executeAction(TableIf table) throws UserException {
- throw new DdlException("Iceberg set_current_snapshot procedure is not
implemented yet");
+ Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+ Snapshot previousSnapshot = icebergTable.currentSnapshot();
+ Long previousSnapshotId = previousSnapshot != null ?
previousSnapshot.snapshotId() : null;
+
+ Long targetSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
+ String ref = namedArguments.getString(REF);
+
+ try {
+ if (targetSnapshotId != null) {
+ Snapshot targetSnapshot =
icebergTable.snapshot(targetSnapshotId);
+ if (targetSnapshot == null) {
+ throw new UserException(
+ "Snapshot " + targetSnapshotId + " not found in
table " + icebergTable.name());
+ }
+
+ if (previousSnapshot != null && previousSnapshot.snapshotId()
== targetSnapshotId) {
+ return Lists.newArrayList(
+ String.valueOf(previousSnapshotId),
+ String.valueOf(targetSnapshotId)
+ );
+ }
+
+
icebergTable.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();
+
+ } else if (ref != null) {
+ Snapshot refSnapshot = icebergTable.snapshot(ref);
+ if (refSnapshot == null) {
+ throw new UserException("Reference '" + ref + "' not found
in table " + icebergTable.name());
+ }
+ targetSnapshotId = refSnapshot.snapshotId();
+
+ if (previousSnapshot != null && previousSnapshot.snapshotId()
== targetSnapshotId) {
+ return Lists.newArrayList(
+ String.valueOf(previousSnapshotId),
+ String.valueOf(targetSnapshotId)
+ );
+ }
+
+
icebergTable.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();
+ }
+
+ // invalid iceberg catalog table cache.
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable)
table);
+ return Lists.newArrayList(
+ String.valueOf(previousSnapshotId),
+ String.valueOf(targetSnapshotId)
+ );
+
+ } catch (Exception e) {
+ String target = targetSnapshotId != null ? "snapshot " +
targetSnapshotId : "reference '" + ref + "'";
+ throw new UserException("Failed to set current snapshot to " +
target + ": " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("previous_snapshot_id", Type.BIGINT, false,
+ "ID of the snapshot that was current before setting
the new current snapshot"),
+ new Column("current_snapshot_id", Type.BIGINT, false,
+ "ID of the snapshot that is now set as the current
snapshot "
+ + "(from snapshot_id parameter or resolved
from ref parameter)"));
}
@Override
diff --git
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
index c9633434869..ecbe09bc0d3 100644
---
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
+++
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_optimize_actions.out
@@ -1,4 +1,71 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !before_rollback_to_snapshot --
+1 v1.0 2024-01-01T10:00
+2 v1.1 2024-01-02T11:00
+3 v1.2 2024-01-03T12:00
+
+-- !after_rollback_to_snapshot --
+1 v1.0 2024-01-01T10:00
+
+-- !before_rollback_to_timestamp --
+1 v1.0 2025-01-01T10:00
+2 v1.1 2025-01-02T11:00
+3 v1.2 2025-01-03T12:00
+
+-- !after_rollback_to_timestamp --
+1 v1.0 2025-01-01T10:00
+2 v1.1 2025-01-02T11:00
+
+-- !before_set_current_snapshot_by_snapshotid --
+1 content1
+2 content2
+3 content3
+4 content4
+
+-- !after_set_current_snapshot_by_snapshotid --
+1 content1
+
+-- !before_set_current_snapshot_by_branch --
+1 content1
+
+-- !after_set_current_snapshot_by_branch --
+1 content1
+2 content2
+
+-- !before_set_current_snapshot_by_tag --
+1 content1
+2 content2
+
+-- !after_set_current_snapshot_by_tag --
+1 content1
+2 content2
+3 content3
+
+-- !before_cherrypick_snapshot --
+1 data1 1
+2 data2 2
+3 data3 3
+
+-- !rollback_snapshot --
+1 data1 1
+
+-- !after_cherrypick_snapshot --
+1 data1 1
+3 data3 3
+
+-- !before_test_fast_forward --
+1 record1 100
+2 record2 200
+3 record3 300
+
+-- !before_fast_forword_branch --
+1 record1 100
+
+-- !after_fast_forword_branch --
+1 record1 100
+2 record2 200
+3 record3 300
+
-- !test_rewrite_data_files_results --
0 1 2 3
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
index e8ce3d62e31..0cd818fe175 100644
---
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_optimize_actions.groovy
@@ -15,7 +15,26 @@
// specific language governing permissions and limitations
// under the License.
+import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeFormatterBuilder
+import java.time.temporal.ChronoField
+import java.time.LocalDateTime
+import java.time.ZoneId
+
suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,external_docker_doris") {
+ DateTimeFormatter unifiedFormatter = new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd")
+ .optionalStart()
+ .appendLiteral('T')
+ .optionalEnd()
+ .optionalStart()
+ .appendLiteral(' ')
+ .optionalEnd()
+ .appendPattern("HH:mm:ss")
+ .optionalStart()
+ .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 3, true)
+ .optionalEnd()
+ .toFormatter()
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
@@ -41,53 +60,332 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
);"""
sql """switch ${catalog_name}"""
+ sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
sql """use ${db_name}"""
def table_name = "test_iceberg_systable_partitioned"
- // Test rollback_to_snapshot action
- test {
- sql """
- OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
- PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" =
"123456789")
- """
- exception "Iceberg rollback_to_snapshot procedure is not implemented
yet"
- }
+ sql """drop table if exists ${db_name}.test_fast_forward"""
+ sql """
+ CREATE TABLE ${db_name}.test_fast_forward (
+ id BIGINT,
+ name STRING,
+ value INT
+ ) ENGINE=iceberg;
+ """
+ sql """
+ INSERT INTO ${db_name}.test_fast_forward VALUES
+ (1, 'record1', 100);
+ """
+ sql """
+ ALTER TABLE ${db_name}.test_fast_forward CREATE BRANCH feature_branch;
+ """
+ sql """INSERT INTO ${db_name}.test_fast_forward VALUES
+ (2, 'record2', 200);"""
+ sql """ALTER TABLE ${db_name}.test_fast_forward CREATE TAG feature_tag;"""
+ sql """
+ INSERT INTO ${db_name}.test_fast_forward VALUES
+ (3, 'record3', 300);
+ """
- // Test rollback_to_timestamp action
- test {
- sql """
- OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
- PROPERTIES("action" = "rollback_to_timestamp", "timestamp" =
"2024-01-01T00:00:00")
- """
- exception "Iceberg rollback_to_timestamp procedure is not implemented
yet"
- }
+ sql """DROP TABLE IF EXISTS ${db_name}.test_cherrypick"""
- // Test set_current_snapshot action
- test {
- sql """
- OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
- PROPERTIES("action" = "set_current_snapshot", "snapshot_id" =
"987654321")
- """
- exception "Iceberg set_current_snapshot procedure is not implemented
yet"
- }
+ sql """
+ CREATE TABLE ${db_name}.test_cherrypick (
+ id BIGINT,
+ data STRING,
+ status INT
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${db_name}.test_cherrypick VALUES
+ (1, 'data1', 1)
+ """
+ sql """
+ INSERT INTO ${db_name}.test_cherrypick VALUES
+ (2, 'data2', 2)
+ """
+ sql """
+ INSERT INTO ${db_name}.test_cherrypick VALUES
+ (3, 'data3', 3)
+ """
+ logger.info("test_cherrypick table setup completed with 3 incremental
snapshots")
- // Test cherrypick_snapshot action
- test {
- sql """
- OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
- PROPERTIES("action" = "cherrypick_snapshot", "snapshot_id" =
"555666777")
- """
- exception "Iceberg cherrypick_snapshot procedure is not implemented
yet"
- }
+ logger.info("Creating test_rollback table for rollback_to_snapshot
testing")
+
+ sql """DROP TABLE IF EXISTS ${db_name}.test_rollback"""
+ sql """
+ CREATE TABLE ${db_name}.test_rollback (
+ id BIGINT,
+ version STRING,
+ timestamp datetime
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${db_name}.test_rollback VALUES
+ (1, 'v1.0', '2024-01-01 10:00:00')
+ """
+ sql """
+ INSERT INTO ${db_name}.test_rollback VALUES
+ (2, 'v1.1', '2024-01-02 11:00:00')
+ """
+ sql """
+ INSERT INTO ${db_name}.test_rollback VALUES
+ (3, 'v1.2', '2024-01-03 12:00:00')
+ """
+
+ logger.info("test_rollback table setup completed with 3 version snapshots")
+
+ logger.info("Creating test_rollback_timestamp table for timestamp-based
rollback testing")
+
+ sql """DROP TABLE IF EXISTS ${db_name}.test_rollback_timestamp"""
+
+ sql """
+ CREATE TABLE ${db_name}.test_rollback_timestamp (
+ id BIGINT,
+ version STRING,
+ timestamp datetime
+ ) ENGINE=iceberg
+ """
+
+ sql """
+ INSERT INTO ${db_name}.test_rollback_timestamp VALUES
+ (1, 'v1.0', '2025-01-01 10:00:00')
+ """
+ sql """
+ INSERT INTO ${db_name}.test_rollback_timestamp VALUES
+ (2, 'v1.1', '2025-01-02 11:00:00')
+ """
+ sql """
+ INSERT INTO ${db_name}.test_rollback_timestamp VALUES
+ (3, 'v1.2', '2025-01-03 12:00:00')
+ """
+ logger.info("test_rollback_timestamp table setup completed with
future-dated snapshots")
+
+ logger.info("Creating test_current_snapshot table for set_current_snapshot
testing")
+
+ sql """DROP TABLE IF EXISTS ${db_name}.test_current_snapshot"""
+
+ sql """
+ CREATE TABLE ${db_name}.test_current_snapshot (
+ id BIGINT,
+ content STRING
+ ) ENGINE=iceberg
+ """
+
+ sql """
+ INSERT INTO ${db_name}.test_current_snapshot VALUES
+ (1, 'content1')
+ """
+ sql """
+ INSERT INTO ${db_name}.test_current_snapshot VALUES
+ (2, 'content2')
+ """
+ sql """
+ ALTER TABLE ${db_name}.test_current_snapshot CREATE BRANCH dev_branch
+ """
+ sql """
+ INSERT INTO ${db_name}.test_current_snapshot VALUES
+ (3, 'content3')
+ """
+ sql """
+ ALTER TABLE ${db_name}.test_current_snapshot CREATE TAG dev_tag
+ """
+ // Insert final content record - latest main state
+ sql """
+ INSERT INTO ${db_name}.test_current_snapshot VALUES
+ (4, 'content4')
+ """
+
+ logger.info("test_current_snapshot table setup completed with 4 snapshots,
1 branch, and 1 tag")
+
+ //
=====================================================================================
+ // Test Case 1: rollback_to_snapshot action
+ // Tests the ability to rollback a table to a specific historical snapshot
+ //
=====================================================================================
+ logger.info("Starting rollback_to_snapshot test case")
+
+ // Capture table state before rollback operation
+ qt_before_rollback_to_snapshot """SELECT * FROM test_rollback ORDER BY
id"""
+
+ // Retrieve all available snapshots for the rollback test table
+ List<List<Object>> rollbackSnapshotList = sql """
+ SELECT committed_at, snapshot_id FROM test_rollback\$snapshots ORDER
BY committed_at
+ """
+ logger.info("Available snapshots for rollback test:
${rollbackSnapshotList}")
+
+ // Validate snapshot data structure and count
+ assertTrue(rollbackSnapshotList.size() == 3, "Expected exactly 3 snapshots
for rollback test")
+ assertTrue(rollbackSnapshotList[0].size() == 2, "Invalid snapshot metadata
structure")
+
+ // Extract snapshot IDs for test operations
+ String rollbackEarliestSnapshotId = rollbackSnapshotList[0][1] //
First/oldest snapshot
+ String rollbackLatestSnapshotId = rollbackSnapshotList[2][1] //
Last/newest snapshot
+
+ // Execute rollback to the earliest snapshot
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_rollback
+ PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" =
"${rollbackEarliestSnapshotId}")
+ """
+ qt_after_rollback_to_snapshot """SELECT * FROM test_rollback ORDER BY id"""
+
+ //
=====================================================================================
+ // Test Case 2: rollback_to_timestamp action
+ // Tests the ability to rollback a table to a specific point in time using
timestamps
+ //
=====================================================================================
+ logger.info("Starting rollback_to_timestamp test case")
+
+ // Capture table state before timestamp-based rollback
+ qt_before_rollback_to_timestamp """SELECT * FROM test_rollback_timestamp
ORDER BY id"""
+
+ // Retrieve snapshots ordered by timestamp (newest first) for timestamp
rollback test
+ List<List<Object>> timestampSnapshotList = sql """
+ SELECT committed_at, snapshot_id FROM
test_rollback_timestamp\$snapshots ORDER BY committed_at DESC
+ """
+ logger.info("Snapshot timeline for timestamp rollback:
${timestampSnapshotList}")
+
+ // Validate snapshot availability for timestamp test
+ assertTrue(timestampSnapshotList.size() == 3, "Expected exactly 3
snapshots for timestamp test")
+ assertTrue(timestampSnapshotList[0].size() == 2, "Invalid timestamp
snapshot structure")
+
+ // Extract and format timestamp for rollback operation
+ String latestCommittedTime = timestampSnapshotList[0][0]
+
+ // Convert timestamp to required format for rollback operation
+ DateTimeFormatter outputFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ LocalDateTime dateTime = LocalDateTime.parse(latestCommittedTime,
unifiedFormatter)
+ String formattedSnapshotTime =
dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter)
+
+ // Execute timestamp-based rollback
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_rollback_timestamp
+ PROPERTIES("action" = "rollback_to_timestamp", "timestamp" =
"${formattedSnapshotTime}")
+ """
+ qt_after_rollback_to_timestamp """SELECT * FROM test_rollback_timestamp
ORDER BY id"""
+
+
+ //
=====================================================================================
+ // Test Case 3: set_current_snapshot action
+ // Tests setting the current snapshot using snapshot ID and
reference-based approaches
+ //
=====================================================================================
+ logger.info("Starting set_current_snapshot test case")
+
+ // Test setting current snapshot by snapshot ID
+ qt_before_set_current_snapshot_by_snapshotid """SELECT * FROM
test_current_snapshot ORDER BY id"""
+
+ // Retrieve available snapshots for current snapshot test
+ List<List<Object>> currentSnapshotList = sql """
+ SELECT committed_at, snapshot_id FROM test_current_snapshot\$snapshots
ORDER BY committed_at
+ """
+ logger.info("Available snapshots for current snapshot test:
${currentSnapshotList}")
+
+ // Validate snapshot data for current snapshot test
+ assertTrue(currentSnapshotList.size() == 4, "Expected exactly 4 snapshots
for current snapshot test")
+ assertTrue(currentSnapshotList[0].size() == 2, "Invalid current snapshot
metadata structure")
+
+ String targetCurrentSnapshotId = currentSnapshotList[0][1] // Select
first snapshot as target
+
+ // Execute set current snapshot by snapshot ID
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_current_snapshot
+ PROPERTIES("action" = "set_current_snapshot", "snapshot_id" =
"${targetCurrentSnapshotId}")
+ """
+ qt_after_set_current_snapshot_by_snapshotid """SELECT * FROM
test_current_snapshot ORDER BY id"""
+
+ // Verify reference structure after snapshot change
+ List<List<Object>> currentSnapshotRefs = sql """
+ SELECT name, type FROM test_current_snapshot\$refs ORDER BY snapshot_id
+ """
+ logger.info("References after current snapshot change:
${currentSnapshotRefs}")
+ assertTrue(currentSnapshotRefs.size() == 3, "Expected exactly 3 references
after snapshot change")
+
+ // Test setting current snapshot by branch reference
+ qt_before_set_current_snapshot_by_branch """SELECT * FROM
test_current_snapshot ORDER BY id"""
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_current_snapshot
+ PROPERTIES("action" = "set_current_snapshot", "ref" = "dev_branch")
+ """
+ qt_after_set_current_snapshot_by_branch """SELECT * FROM
test_current_snapshot ORDER BY id"""
+
+ // Test setting current snapshot by tag reference
+ qt_before_set_current_snapshot_by_tag """SELECT * FROM
test_current_snapshot ORDER BY id"""
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_current_snapshot
+ PROPERTIES("action" = "set_current_snapshot", "ref" = "dev_tag")
+ """
+ qt_after_set_current_snapshot_by_tag """SELECT * FROM
test_current_snapshot ORDER BY id"""
+
+ //
=====================================================================================
+ // Test Case 4: cherrypick_snapshot action
+ // Tests selective application of changes from a specific snapshot
(cherrypick operation)
+ //
=====================================================================================
+ logger.info("Starting cherrypick_snapshot test case")
+
+ // Capture initial state before cherrypick operations
+ qt_before_cherrypick_snapshot """SELECT * FROM test_cherrypick ORDER BY
id"""
+
+ // Retrieve snapshots for cherrypick test scenario
+ List<List<Object>> cherrypickSnapshotList = sql """
+ SELECT committed_at, snapshot_id FROM test_cherrypick\$snapshots ORDER
BY committed_at
+ """
+ logger.info("Available snapshots for cherrypick test:
${cherrypickSnapshotList}")
+
+ // Validate cherrypick test data structure
+ assertTrue(cherrypickSnapshotList.size() == 3, "Expected exactly 3
snapshots for cherrypick test")
+ assertTrue(cherrypickSnapshotList[0].size() == 2, "Invalid cherrypick
snapshot structure")
+
+ String cherrypickEarliestSnapshotId = cherrypickSnapshotList[0][1] //
First snapshot for rollback
+ String cherrypickLatestSnapshotId = cherrypickSnapshotList[2][1] //
Last snapshot for cherrypick
+
+ // Step 1: Rollback to earliest snapshot to create test scenario
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_cherrypick
+ PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" =
"${cherrypickEarliestSnapshotId}")
+ """
+ qt_rollback_snapshot """SELECT * FROM test_cherrypick ORDER BY id"""
+
+ // Step 2: Cherrypick changes from the latest snapshot
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_cherrypick
+ PROPERTIES("action" = "cherrypick_snapshot", "snapshot_id" =
"${cherrypickLatestSnapshotId}")
+ """
+ qt_after_cherrypick_snapshot """SELECT * FROM test_cherrypick ORDER BY
id"""
+
+
+ //
=====================================================================================
+ // Test Case 5: fast_forward action
+ // Tests fast-forward operations for branch synchronization in Iceberg
tables
+ //
=====================================================================================
+ logger.info("Starting fast_forward action test case")
+
+ // Capture state before fast-forward operations
+ qt_before_test_fast_forward """SELECT * FROM test_fast_forward ORDER BY
id"""
+
+ // Retrieve snapshot timeline for fast-forward test (newest first)
+ List<List<Object>> fastForwardSnapshotList = sql """
+ SELECT committed_at, snapshot_id FROM test_fast_forward\$snapshots
ORDER BY committed_at DESC
+ """
+ logger.info("Snapshot timeline for fast-forward test:
${fastForwardSnapshotList}")
+
+ // Validate fast-forward test data
+ assertTrue(fastForwardSnapshotList.size() == 3, "Expected exactly 3
snapshots for fast-forward test")
+ assertTrue(fastForwardSnapshotList[0].size() == 2, "Invalid fast-forward
snapshot structure")
+
+ // Verify available references for fast-forward operations
+ List<List<Object>> fastForwardRefs = sql """
+ SELECT name, type FROM test_fast_forward\$refs ORDER BY snapshot_id
+ """
+ logger.info("Available references for fast-forward: ${fastForwardRefs}")
+ assertTrue(fastForwardRefs.size() == 3, "Expected exactly 3 references for
fast-forward test")
+
+ // Test fast-forward from feature branch to main branch
+ qt_before_fast_forword_branch """SELECT * FROM
test_fast_forward@branch(feature_branch) ORDER BY id"""
+
+ sql """
+ OPTIMIZE TABLE ${catalog_name}.${db_name}.test_fast_forward
+ PROPERTIES("action" = "fast_forward", "branch" = "feature_branch",
"to" = "main")
+ """
+ qt_after_fast_forword_branch """SELECT * FROM
test_fast_forward@branch(feature_branch) ORDER BY id"""
- // Test fast_forward action with branch
- test {
- sql """
- OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
- PROPERTIES("action" = "fast_forward", "branch" = "main", "to" =
"111222333")
- """
- exception "Iceberg fast_forward procedure is not implemented yet"
- }
// Test expire_snapshots action
test {
@@ -276,15 +574,6 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
exception "Invalid target-file-size-bytes format: not-a-number"
}
- // Test set_current_snapshot with ref parameter
- test {
- sql """
- OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
- PROPERTIES("action" = "set_current_snapshot", "ref" = "main")
- """
- exception "Iceberg set_current_snapshot procedure is not implemented
yet"
- }
-
// Test set_current_snapshot with both snapshot_id and ref
test {
sql """
@@ -309,7 +598,7 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" =
"9223372036854775807")
"""
- exception "Iceberg rollback_to_snapshot procedure is not implemented
yet"
+ exception "Snapshot 9223372036854775807 not found in table"
}
// Test snapshot_id exceeding Long.MAX_VALUE
@@ -327,7 +616,7 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" = "
123456789 ")
"""
- exception "Iceberg rollback_to_snapshot procedure is not implemented
yet"
+ exception "Snapshot 123456789 not found in table"
}
// Test case sensitivity in action names
@@ -336,6 +625,6 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
OPTIMIZE TABLE ${catalog_name}.${db_name}.${table_name}
PROPERTIES("action" = "ROLLBACK_TO_SNAPSHOT", "snapshot_id" =
"123456789")
"""
- exception "Iceberg rollback_to_snapshot procedure is not implemented
yet"
+ exception "Snapshot 123456789 not found in table"
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]