This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 0b19a54d58c branch-3.1: [opt](iceberg) support create branch/tag for 
iceberg #51727 (#52852)
0b19a54d58c is described below

commit 0b19a54d58c50d8f9ed0b8fc0c6b9ab2a378fd85
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 8 17:42:41 2025 +0800

    branch-3.1: [opt](iceberg) support create branch/tag for iceberg #51727 
(#52852)
    
    bp #51727
    
    ---------
    
    Co-authored-by: wuwenchi <[email protected]>
---
 .../create_preinstalled_scripts/iceberg/run15.sql  |   8 +
 docker/thirdparties/run-thirdparties-docker.sh     |   2 +-
 .../antlr4/org/apache/doris/nereids/DorisLexer.g4  |   8 +
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  46 +++
 fe/fe-core/src/main/cup/sql_parser.cup             | 132 ++++++++
 .../main/java/org/apache/doris/alter/Alter.java    |  24 +-
 .../java/org/apache/doris/alter/AlterOpType.java   |   2 +
 .../org/apache/doris/analysis/AlterTableStmt.java  |   4 +-
 .../analysis/CreateOrReplaceBranchClause.java      |  55 ++++
 .../doris/analysis/CreateOrReplaceTagClause.java   |  55 ++++
 .../org/apache/doris/datasource/CatalogIf.java     |  17 +
 .../apache/doris/datasource/ExternalCatalog.java   |  46 +++
 .../org/apache/doris/datasource/ExternalTable.java |   2 +-
 .../doris/datasource/hive/HiveMetadataOps.java     |  14 +
 .../datasource/iceberg/IcebergMetadataOps.java     | 118 +++++++
 .../datasource/operations/ExternalMetadataOps.java |  40 +++
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../trees/plans/commands/info/BranchOptions.java   |  89 +++++
 .../commands/info/CreateOrReplaceBranchInfo.java   |  87 +++++
 .../commands/info/CreateOrReplaceTagInfo.java      |  87 +++++
 .../plans/commands/info/RetentionSnapshots.java    |  42 +++
 .../trees/plans/commands/info/TagOptions.java      |  64 ++++
 .../java/org/apache/doris/persist/EditLog.java     |  13 +
 .../org/apache/doris/persist/OperationType.java    |   1 +
 .../apache/doris/persist/TableBranchOrTagInfo.java |  82 +++++
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   8 +
 .../IcebergExternalTableBranchAndTagTest.java      | 357 +++++++++++++++++++++
 .../info/CreateOrReplaceBranchOrTagInfoTest.java   | 281 ++++++++++++++++
 .../iceberg/iceberg_branch_tag_operate.out         | Bin 0 -> 786 bytes
 .../iceberg/iceberg_branch_tag_operate.groovy      | 194 +++++++++++
 30 files changed, 1880 insertions(+), 4 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run15.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run15.sql
new file mode 100644
index 00000000000..e65c777e545
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run15.sql
@@ -0,0 +1,8 @@
+use demo.test_db;
+CREATE TABLE tmp_schema_change_branch (id bigint, data string, col float);
+INSERT INTO tmp_schema_change_branch VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 
'c', 3.0);
+ALTER TABLE tmp_schema_change_branch CREATE BRANCH test_branch;
+ALTER TABLE tmp_schema_change_branch CREATE TAG test_tag;
+ALTER TABLE tmp_schema_change_branch DROP COLUMN col;
+ALTER TABLE tmp_schema_change_branch ADD COLUMN new_col date;
+INSERT INTO tmp_schema_change_branch VALUES (4, 'd', date('2024-04-04')), (5, 
'e', date('2024-05-05'));
diff --git a/docker/thirdparties/run-thirdparties-docker.sh 
b/docker/thirdparties/run-thirdparties-docker.sh
index 26580301dd9..bebeb703069 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -739,7 +739,7 @@ if [[ "${RUN_SPARK}" -eq 1 ]]; then
 fi
 
 if [[ "${RUN_ICEBERG}" -eq 1 ]]; then
-    start_iceberg > start_icerberg.log 2>&1 &
+    start_iceberg > start_iceberg.log 2>&1 &
     pids["iceberg"]=$!
 fi
 
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4
index 7d2f90e2ccf..7419b51e717 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4
@@ -119,6 +119,7 @@ BITOR: 'BITOR';
 BITXOR: 'BITXOR';
 BLOB: 'BLOB';
 BOOLEAN: 'BOOLEAN';
+BRANCH: 'BRANCH';
 BRIEF: 'BRIEF';
 BROKER: 'BROKER';
 BUCKETS: 'BUCKETS';
@@ -186,6 +187,7 @@ DATEV2: 'DATEV2';
 DATETIMEV1: 'DATETIMEV1';
 DATEV1: 'DATEV1';
 DAY: 'DAY';
+DAYS: 'DAYS';
 DECIMAL: 'DECIMAL';
 DECIMALV2: 'DECIMALV2';
 DECIMALV3: 'DECIMALV3';
@@ -279,6 +281,7 @@ HLL_UNION: 'HLL_UNION';
 HOSTNAME: 'HOSTNAME';
 HOTSPOT: 'HOTSPOT';
 HOUR: 'HOUR';
+HOURS:  'HOURS';
 HUB: 'HUB';
 IDENTIFIED: 'IDENTIFIED';
 IF: 'IF';
@@ -357,6 +360,7 @@ MIGRATIONS: 'MIGRATIONS';
 MIN: 'MIN';
 MINUS: 'MINUS';
 MINUTE: 'MINUTE';
+MINUTES: 'MINUTES';
 MODIFY: 'MODIFY';
 MONTH: 'MONTH';
 MTMV: 'MTMV';
@@ -448,6 +452,8 @@ RESOURCES: 'RESOURCES';
 RESTORE: 'RESTORE';
 RESTRICTIVE: 'RESTRICTIVE';
 RESUME: 'RESUME';
+RETAIN: 'RETAIN';
+RETENTION: 'RETENTION';
 RETURNS: 'RETURNS';
 REVOKE: 'REVOKE';
 REWRITTEN: 'REWRITTEN';
@@ -481,6 +487,7 @@ SIGNED: 'SIGNED';
 SKEW: 'SKEW';
 SMALLINT: 'SMALLINT';
 SNAPSHOT: 'SNAPSHOT';
+SNAPSHOTS: 'SNAPSHOTS';
 SONAME: 'SONAME';
 SPLIT: 'SPLIT';
 SQL: 'SQL';
@@ -507,6 +514,7 @@ TABLES: 'TABLES';
 TABLESAMPLE: 'TABLESAMPLE';
 TABLET: 'TABLET';
 TABLETS: 'TABLETS';
+TAG: 'TAG';
 TASK: 'TASK';
 TASKS: 'TASKS';
 TEMPORARY: 'TEMPORARY';
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 7185145b1af..fb53ae82c80 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -640,6 +640,44 @@ alterTableClause
     | ADD TEMPORARY? PARTITIONS
         FROM from=partitionValueList TO to=partitionValueList
         INTERVAL INTEGER_VALUE unit=identifier? properties=propertyClause?     
     #alterMultiPartitionClause
+    | createOrReplaceTagClause                                                 
     #createOrReplaceTagClauses
+    | createOrReplaceBranchClause                                              
     #createOrReplaceBranchClauses
+    ;
+
+createOrReplaceTagClause
+    : CREATE TAG (IF NOT EXISTS)? name=identifier ops=tagOptions
+    | (CREATE OR)? REPLACE TAG name=identifier ops=tagOptions
+    ;
+
+createOrReplaceBranchClause
+    : CREATE BRANCH (IF NOT EXISTS)? name=identifier ops=branchOptions
+    | (CREATE OR)? REPLACE BRANCH name=identifier ops=branchOptions
+    ;
+
+tagOptions
+    : (AS OF VERSION version=INTEGER_VALUE)? (retainTime)?
+    ;
+
+branchOptions
+    : (AS OF VERSION version=INTEGER_VALUE)? (retainTime)? (retentionSnapshot)?
+    ;
+
+retainTime
+    : RETAIN timeValueWithUnit
+    ;
+
+retentionSnapshot
+    : WITH SNAPSHOT RETENTION minSnapshotsToKeep
+    | WITH SNAPSHOT RETENTION timeValueWithUnit
+    | WITH SNAPSHOT RETENTION minSnapshotsToKeep timeValueWithUnit
+    ;
+
+minSnapshotsToKeep
+    : value=INTEGER_VALUE SNAPSHOTS
+    ;
+
+timeValueWithUnit
+    : timeValue=INTEGER_VALUE  timeUnit=(DAYS | HOURS | MINUTES)
     ;
 
 columnPosition
@@ -1768,6 +1806,7 @@ nonReserved
     | BITXOR
     | BLOB
     | BOOLEAN
+    | BRANCH
     | BRIEF
     | BROKER
     | BUCKETS
@@ -1822,6 +1861,7 @@ nonReserved
     | DATEV1
     | DATEV2
     | DAY
+    | DAYS
     | DECIMAL
     | DECIMALV2
     | DECIMALV3
@@ -1877,6 +1917,7 @@ nonReserved
     | HOSTNAME
     | HOTSPOT
     | HOUR
+    | HOURS
     | HUB
     | IDENTIFIED
     | IGNORE
@@ -1925,6 +1966,7 @@ nonReserved
     | MIGRATIONS
     | MIN
     | MINUTE
+    | MINUTES
     | MODIFY
     | MONTH
     | MTMV
@@ -1988,6 +2030,8 @@ nonReserved
     | RESTORE
     | RESTRICTIVE
     | RESUME
+    | RETAIN
+    | RETENTION
     | RETURNS
     | REWRITTEN
     | RIGHT_BRACE
@@ -2008,6 +2052,7 @@ nonReserved
     | SHAPE
     | SKEW
     | SNAPSHOT
+    | SNAPSHOTS
     | SONAME
     | SPLIT
     | SQL
@@ -2025,6 +2070,7 @@ nonReserved
     | STRUCT
     | SUM
     | TABLES
+    | TAG
     | TASK
     | TASKS
     | TEMPORARY
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index fd837e2b0b9..95d77384a81 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -59,6 +59,11 @@ import org.apache.doris.cloud.analysis.UseCloudClusterStmt;
 import org.apache.doris.cloud.proto.Cloud.StagePB;
 import org.apache.doris.indexpolicy.IndexPolicyTypeEnum;
 import org.apache.doris.mysql.MysqlPassword;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.BranchOptions;
+import org.apache.doris.nereids.trees.plans.commands.info.TagOptions;
+import org.apache.doris.nereids.trees.plans.commands.info.RetentionSnapshots;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.policy.PolicyTypeEnum;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionMeta;
@@ -293,6 +298,7 @@ terminal String
     KW_BITMAP_UNION,
     KW_BLOB,
     KW_BOOLEAN,
+    KW_BRANCH,
     KW_BRIEF,
     KW_BROKER,
     KW_BUCKETS,
@@ -350,6 +356,7 @@ terminal String
     KW_DATETIMEV1,
     KW_DATEV1,
     KW_DAY,
+    KW_DAYS,
     KW_DECIMAL,
     KW_DECIMALV2,
     KW_DECIMALV3,
@@ -436,6 +443,7 @@ terminal String
     KW_HOTSPOT,
     KW_HOSTNAME,
     KW_HOUR,
+    KW_HOURS,
     KW_HUB,
     KW_IDENTIFIED,
     KW_IF,
@@ -496,6 +504,7 @@ terminal String
     KW_MIN,
     KW_MINUS,
     KW_MINUTE,
+    KW_MINUTES,
     KW_MODIFY,
     KW_MONTH,
     KW_MATCH,
@@ -582,6 +591,8 @@ terminal String
     KW_RESOURCES,
     KW_RESTORE,
     KW_RESUME,
+    KW_RETAIN,
+    KW_RETENTION,
     KW_RETURNS,
     KW_REVOKE,
     KW_RIGHT,
@@ -609,6 +620,7 @@ terminal String
     KW_SKEW,
     KW_SMALLINT,
     KW_SNAPSHOT,
+    KW_SNAPSHOTS,
     KW_SONAME,
     KW_SPLIT,
     KW_SQL,
@@ -635,6 +647,7 @@ terminal String
     KW_TABLESAMPLE,
     KW_TABLET,
     KW_TABLETS,
+    KW_TAG,
     KW_TASK,
     KW_TASKS,
     KW_TEMPORARY,
@@ -1016,6 +1029,12 @@ nonterminal Boolean opt_generated_always;
 
 nonterminal Boolean opt_detailed;
 
+// branch & tag
+nonterminal BranchOptions branch_options;
+nonterminal TagOptions tag_options;
+nonterminal Optional<Long> opt_as_of_version, opt_retain_time, 
time_value_with_unit;
+nonterminal RetentionSnapshots opt_retention_snapshots;
+
 precedence nonassoc COMMA;
 precedence nonassoc STRING_LITERAL;
 precedence nonassoc KW_COLUMNS;
@@ -1763,6 +1782,103 @@ alter_table_clause ::=
     {:
         RESULT = new 
AlterMultiPartitionClause(PartitionKeyDesc.createMultiFixed(lower, upper, 
num_interval), properties, isTempPartition);
     :}
+    | KW_CREATE KW_BRANCH opt_if_not_exists:ifNotExists ident:branchName 
branch_options:branchOptions
+    {:
+        CreateOrReplaceBranchInfo branchInfo = new 
CreateOrReplaceBranchInfo(branchName, true, false, ifNotExists, branchOptions);
+        RESULT = new CreateOrReplaceBranchClause(branchInfo);
+    :}
+    | KW_REPLACE KW_BRANCH ident:branchName branch_options:branchOptions
+    {:
+        CreateOrReplaceBranchInfo branchInfo = new 
CreateOrReplaceBranchInfo(branchName, false, true, false, branchOptions);
+        RESULT = new CreateOrReplaceBranchClause(branchInfo);
+    :}
+    | KW_CREATE KW_OR KW_REPLACE KW_BRANCH ident:branchName 
branch_options:branchOptions
+    {:
+        CreateOrReplaceBranchInfo branchInfo = new 
CreateOrReplaceBranchInfo(branchName, true, true, false, branchOptions);
+        RESULT = new CreateOrReplaceBranchClause(branchInfo);
+    :}
+    | KW_CREATE KW_TAG opt_if_not_exists:ifNotExists ident:tagName 
tag_options:tagOptions
+    {:
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(tagName, 
true, false, ifNotExists, tagOptions);
+        RESULT = new CreateOrReplaceTagClause(tagInfo);
+    :}
+    | KW_REPLACE KW_TAG ident:tagName tag_options:tagOptions
+    {:
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(tagName, 
false, true, false, tagOptions);
+        RESULT = new CreateOrReplaceTagClause(tagInfo);
+    :}
+    | KW_CREATE KW_OR KW_REPLACE KW_TAG ident:tagName tag_options:tagOptions
+    {:
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(tagName, 
true, true, false, tagOptions);
+        RESULT = new CreateOrReplaceTagClause(tagInfo);
+    :}
+    ;
+
+branch_options ::=
+    opt_as_of_version:asOfVersion opt_retain_time:retainTime 
opt_retention_snapshots:retentionSnapshots
+    {:
+        RESULT = new BranchOptions(asOfVersion, retainTime, 
retentionSnapshots.getNumSnapshots(), retentionSnapshots.getRetain());
+    :}
+    ;
+
+opt_as_of_version ::=
+    {:
+        RESULT = Optional.empty();
+    :}
+    | KW_AS KW_OF KW_VERSION INTEGER_LITERAL:v
+    {:
+        RESULT = Optional.of(v);
+    :}
+    ;
+
+opt_retain_time ::=
+    {:
+        RESULT = Optional.empty();
+    :}
+    | KW_RETAIN time_value_with_unit:timeValueWithUnit
+    {:
+        RESULT = timeValueWithUnit;
+    :}
+    ;
+
+time_value_with_unit ::=
+    INTEGER_LITERAL:t KW_DAYS
+    {:
+        RESULT = Optional.of(t * 86400 * 1000L);
+    :}
+    | INTEGER_LITERAL:t KW_HOURS
+    {:
+        RESULT = Optional.of(t * 3600 * 1000L);
+    :}
+    | INTEGER_LITERAL:t KW_MINUTES
+    {:
+        RESULT = Optional.of(t * 60 * 1000L);
+    :}
+    ;
+
+opt_retention_snapshots ::=
+    {:
+        RESULT = new RetentionSnapshots(Optional.empty(), Optional.empty());
+    :}
+    | KW_WITH KW_SNAPSHOT KW_RETENTION INTEGER_LITERAL:s KW_SNAPSHOTS
+    {:
+        RESULT = new RetentionSnapshots(Optional.of(s.intValue()), 
Optional.empty());
+    :}
+    | KW_WITH KW_SNAPSHOT KW_RETENTION time_value_with_unit:timeValueWithUnit
+    {:
+        RESULT = new RetentionSnapshots(Optional.empty(), timeValueWithUnit);
+    :}
+    | KW_WITH KW_SNAPSHOT KW_RETENTION INTEGER_LITERAL:s KW_SNAPSHOTS 
time_value_with_unit:timeValueWithUnit
+    {:
+        RESULT = new RetentionSnapshots(Optional.of(s.intValue()), 
timeValueWithUnit);
+    :}
+    ;
+
+tag_options ::=
+    opt_as_of_version:asOfVersion opt_retain_time:retainTime
+    {:
+        RESULT = new TagOptions(asOfVersion, retainTime);
+    :}
     ;
 
 opt_enable_feature_properties ::=
@@ -8139,6 +8255,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_BOOLEAN:id
     {: RESULT = id; :}
+    | KW_BRANCH:id
+    {: RESULT = id; :}
     | KW_BRIEF:id
     {: RESULT = id; :}
     | KW_BROKER:id
@@ -8205,6 +8323,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_DATETIMEV1:id
     {: RESULT = id; :}
+    | KW_DAYS:id
+    {: RESULT = id; :}
     | KW_DECIMAL:id
     {: RESULT = id; :}
     | KW_DEFERRED:id
@@ -8279,6 +8399,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_HOSTNAME:id
     {: RESULT = id; :}
+    | KW_HOURS:id
+    {: RESULT = id; :}
     | KW_HUB:id
     {: RESULT = id; :}
     | KW_IDENTIFIED:id
@@ -8323,6 +8445,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_MERGE:id
     {: RESULT = id; :}
+    | KW_MINUTES:id
+    {: RESULT = id; :}
     | KW_MODIFY:id
     {: RESULT = id; :}
     | KW_NAME:id
@@ -8409,6 +8533,10 @@ keyword ::=
     {: RESULT = id; :}
     | KW_RESTORE:id
     {: RESULT = id; :}
+    | KW_RETAIN:id
+    {: RESULT = id; :}
+    | KW_RETENTION:id
+    {: RESULT = id; :}
     | KW_RETURNS:id
     {: RESULT = id; :}
     | KW_ROLLBACK:id
@@ -8425,6 +8553,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_SNAPSHOT:id
     {: RESULT = id; :}
+    | KW_SNAPSHOTS:id
+    {: RESULT = id; :}
     | KW_SONAME:id
     {: RESULT = id; :}
     | KW_SPLIT:id
@@ -8513,6 +8643,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_FREE:id
     {: RESULT = id; :}
+    | KW_TAG:id
+    {: RESULT = id; :}
     | KW_TASK:id
     {: RESULT = id; :}
     | KW_TASKS:id    
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index b22e301e98a..d0b3909ae11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -26,6 +26,8 @@ import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.AlterViewStmt;
 import org.apache.doris.analysis.ColumnRenameClause;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.CreateOrReplaceBranchClause;
+import org.apache.doris.analysis.CreateOrReplaceTagClause;
 import org.apache.doris.analysis.DropMaterializedViewStmt;
 import org.apache.doris.analysis.DropPartitionClause;
 import org.apache.doris.analysis.DropPartitionFromIndexClause;
@@ -334,6 +336,26 @@ public class Alter {
         Env.getCurrentEnv().getEditLog().logModifyTableProperties(info);
     }
 
+    private void processAlterTableForExternalTable(
+            ExternalTable table,
+            List<AlterClause> alterClauses) throws UserException {
+        for (AlterClause alterClause : alterClauses) {
+            if (alterClause instanceof ModifyTablePropertiesClause) {
+                setExternalTableAutoAnalyzePolicy(table, alterClauses);
+            } else if (alterClause instanceof CreateOrReplaceBranchClause) {
+                table.getCatalog().createOrReplaceBranch(
+                        table.getDbName(), table.getName(),
+                        ((CreateOrReplaceBranchClause) 
alterClause).getBranchInfo());
+            } else if (alterClause instanceof CreateOrReplaceTagClause) {
+                table.getCatalog().createOrReplaceTag(
+                        table.getDbName(), table.getName(),
+                        ((CreateOrReplaceTagClause) alterClause).getTagInfo());
+            } else {
+                throw new UserException("Invalid alter operations for external 
table: " + alterClauses);
+            }
+        }
+    }
+
     private boolean needChangeMTMVState(List<AlterClause> alterClauses) {
         for (AlterClause alterClause : alterClauses) {
             if (alterClause.needChangeMTMVState()) {
@@ -532,7 +554,7 @@ public class Alter {
             case HUDI_EXTERNAL_TABLE:
             case TRINO_CONNECTOR_EXTERNAL_TABLE:
                 alterClauses.addAll(stmt.getOps());
-                setExternalTableAutoAnalyzePolicy((ExternalTable) tableIf, 
alterClauses);
+                processAlterTableForExternalTable((ExternalTable) tableIf, 
alterClauses);
                 return;
             default:
                 throw new DdlException("Do not support alter "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java
index 818ceda2ceb..06777976ff6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java
@@ -42,6 +42,8 @@ public enum AlterOpType {
     MODIFY_TABLE_COMMENT,
     MODIFY_COLUMN_COMMENT,
     MODIFY_ENGINE,
+    ALTER_BRANCH,
+    ALTER_TAG,
     INVALID_OP; // INVALID_OP must be the last one
 
     // true means 2 operations have no conflict.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java
index 7e48c33b5b2..443421ff83f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java
@@ -169,7 +169,9 @@ public class AlterTableStmt extends DdlStmt implements 
NotFallbackInParser {
                     || alterClause instanceof DropColumnClause
                     || alterClause instanceof ModifyColumnClause
                     || alterClause instanceof ReorderColumnsClause
-                    || alterClause instanceof ModifyEngineClause) {
+                    || alterClause instanceof ModifyEngineClause
+                    || alterClause instanceof CreateOrReplaceBranchClause
+                    || alterClause instanceof CreateOrReplaceTagClause) {
                 clauses.add(alterClause);
             } else {
                 throw new AnalysisException(table.getType().toString() + " [" 
+ table.getName() + "] "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateOrReplaceBranchClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateOrReplaceBranchClause.java
new file mode 100644
index 00000000000..29700cb2ab0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateOrReplaceBranchClause.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.common.UserException;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+
+public class CreateOrReplaceBranchClause extends AlterTableClause {
+    private final CreateOrReplaceBranchInfo branchInfo;
+
+    public CreateOrReplaceBranchClause(CreateOrReplaceBranchInfo branchInfo) {
+        super(AlterOpType.ALTER_BRANCH);
+        this.branchInfo = branchInfo;
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+
+    }
+
+    @Override
+    public String toSql() {
+        return branchInfo.toSql();
+    }
+
+    public CreateOrReplaceBranchInfo getBranchInfo() {
+        return branchInfo;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateOrReplaceTagClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateOrReplaceTagClause.java
new file mode 100644
index 00000000000..048fbb127c7
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateOrReplaceTagClause.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.common.UserException;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
+
+public class CreateOrReplaceTagClause extends AlterTableClause {
+    private final CreateOrReplaceTagInfo tagInfo;
+
+    public CreateOrReplaceTagClause(CreateOrReplaceTagInfo tagInfo) {
+        super(AlterOpType.ALTER_TAG);
+        this.tagInfo = tagInfo;
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+
+    }
+
+    @Override
+    public String toSql() {
+        return tagInfo.toSql();
+    }
+
+    public CreateOrReplaceTagInfo getTagInfo() {
+        return tagInfo;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index 3a5fbb970ec..1a3e0a0073e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -31,6 +31,8 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -207,4 +209,19 @@ public interface CatalogIf<T extends DatabaseIf> {
     default String fromRemoteTableName(String remoteDatabaseName, String 
remoteTableName) {
         return remoteTableName;
     }
+
+    // Create or replace branch operations, overridden by subclass if necessary
+    default void createOrReplaceBranch(String db, String tbl, 
CreateOrReplaceBranchInfo branchInfo)
+            throws UserException {
+        throw new UserException("Not support create or replace branch 
operation");
+    }
+
+    // Create or replace tag operation, overridden by subclass if necessary
+    default void createOrReplaceTag(String db, String tbl, 
CreateOrReplaceTagInfo tagInfo) throws UserException {
+        throw new UserException("Not support create or replace tag operation");
+    }
+
+    default void replayCreateOrReplaceBranchOrTag(String dbName, String 
tblName) {
+
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index bed32c62b85..d360c7089fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -60,10 +60,13 @@ import org.apache.doris.datasource.test.TestExternalCatalog;
 import org.apache.doris.datasource.test.TestExternalDatabase;
 import 
org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
 import org.apache.doris.persist.CreateDbInfo;
 import org.apache.doris.persist.CreateTableInfo;
 import org.apache.doris.persist.DropDbInfo;
 import org.apache.doris.persist.DropInfo;
+import org.apache.doris.persist.TableBranchOrTagInfo;
 import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -1293,4 +1296,47 @@ public abstract class ExternalCatalog
         throw new UnsupportedOperationException("View is not supported.");
     }
 
+    @Override
+    public void createOrReplaceBranch(String db, String tbl, 
CreateOrReplaceBranchInfo branchInfo)
+            throws UserException {
+        makeSureInitialized();
+        if (metadataOps == null) {
+            throw new DdlException("branching operation is not supported for 
catalog: " + getName());
+        }
+        try {
+            metadataOps.createOrReplaceBranch(db, tbl, branchInfo);
+            TableBranchOrTagInfo info = new TableBranchOrTagInfo(getName(), 
db, tbl);
+            Env.getCurrentEnv().getEditLog().logBranchOrTag(info);
+        } catch (Exception e) {
+            LOG.warn("Failed to create or replace branch for table {}.{} in 
catalog {}",
+                    db, tbl, getName(), e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void createOrReplaceTag(String db, String tbl, 
CreateOrReplaceTagInfo tagInfo)
+            throws UserException {
+        makeSureInitialized();
+        if (metadataOps == null) {
+            throw new DdlException("Tagging operation is not supported for 
catalog: " + getName());
+        }
+        try {
+            metadataOps.createOrReplaceTag(db, tbl, tagInfo);
+            TableBranchOrTagInfo info = new TableBranchOrTagInfo(getName(), 
db, tbl);
+            Env.getCurrentEnv().getEditLog().logBranchOrTag(info);
+        } catch (Exception e) {
+            LOG.warn("Failed to create or replace tag for table {}.{} in 
catalog {}",
+                    db, tbl, getName(), e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void replayCreateOrReplaceBranchOrTag(String dbName, String 
tblName) {
+        if (metadataOps != null) {
+            metadataOps.afterCreateOrReplaceBranchOrTag(dbName, tblName);
+        }
+    }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 8e999df2aef..d6b41544201 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -135,7 +135,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
         return false;
     }
 
-    protected void makeSureInitialized() {
+    protected synchronized void makeSureInitialized() {
         try {
             // getDbOrAnalysisException will call makeSureInitialized in 
ExternalCatalog.
             ExternalDatabase db = catalog.getDbOrAnalysisException(dbName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 2ff09f75789..aa88fb8abf3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -40,6 +40,8 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
 import org.apache.doris.datasource.property.constants.HMSProperties;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -365,6 +367,18 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
         }
     }
 
+    @Override
+    public void createOrReplaceBranchImpl(String dbName, String tblName, 
CreateOrReplaceBranchInfo branchInfo)
+            throws UserException {
+        throw new UserException("Not support create or replace branch in hive 
catalog.");
+    }
+
+    @Override
+    public void createOrReplaceTagImpl(String dbName, String tblName, 
CreateOrReplaceTagInfo tagInfo)
+            throws UserException {
+        throw new UserException("Not support create or replace tag in hive 
catalog.");
+    }
+
     @Override
     public List<String> listTableNames(String dbName) {
         return client.getAllTables(dbName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 60e8d4343f5..6ff4df612af 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -33,10 +33,17 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.DorisTypeVisitor;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
+import org.apache.doris.nereids.trees.plans.commands.info.BranchOptions;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.TagOptions;
 
+import org.apache.iceberg.ManageSnapshots;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
@@ -337,6 +344,117 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         throw new UnsupportedOperationException("Truncate Iceberg table is not 
supported.");
     }
 
+    @Override
+    public void createOrReplaceBranchImpl(String dbName, String tblName, 
CreateOrReplaceBranchInfo branchInfo)
+            throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisCatalog, 
dbName, tblName);
+        BranchOptions branchOptions = branchInfo.getBranchOptions();
+
+        Long snapshotId = branchOptions.getSnapshotId()
+                .orElse(
+                        // use current snapshot
+                        
Optional.ofNullable(icebergTable.currentSnapshot()).map(Snapshot::snapshotId).orElse(null));
+
+        ManageSnapshots manageSnapshots = icebergTable.manageSnapshots();
+        String branchName = branchInfo.getBranchName();
+        boolean refExists = null != icebergTable.refs().get(branchName);
+        boolean create = branchInfo.getCreate();
+        boolean replace = branchInfo.getReplace();
+        boolean ifNotExists = branchInfo.getIfNotExists();
+
+        Runnable safeCreateBranch = () -> {
+            if (snapshotId == null) {
+                manageSnapshots.createBranch(branchName);
+            } else {
+                manageSnapshots.createBranch(branchName, snapshotId);
+            }
+        };
+
+        if (create && replace && !refExists) {
+            safeCreateBranch.run();
+        } else if (replace) {
+            if (snapshotId == null) {
+                // Cannot perform a replace operation on an empty table
+                throw new UserException(
+                        "Cannot complete replace branch operation on " + 
icebergTable.name()
+                                + " , main has no snapshot");
+            }
+            manageSnapshots.replaceBranch(branchName, snapshotId);
+        } else {
+            if (refExists && ifNotExists) {
+                return;
+            }
+            safeCreateBranch.run();
+        }
+
+        branchOptions.getRetain().ifPresent(n -> 
manageSnapshots.setMaxSnapshotAgeMs(branchName, n));
+        branchOptions.getNumSnapshots().ifPresent(n -> 
manageSnapshots.setMinSnapshotsToKeep(branchName, n));
+        branchOptions.getRetention().ifPresent(n -> 
manageSnapshots.setMaxRefAgeMs(branchName, n));
+
+        try {
+            preExecutionAuthenticator.execute(() -> manageSnapshots.commit());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to create or replace branch: " + branchName + " in 
table: " + icebergTable.name()
+                            + ", error message is: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void afterCreateOrReplaceBranchOrTag(String dbName, String tblName) 
{
+        ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
+        if (db != null) {
+            ExternalTable tbl = db.getTableNullable(tblName);
+            if (tbl != null) {
+                tbl.unsetObjectCreated();
+            }
+        }
+    }
+
+    @Override
+    public void createOrReplaceTagImpl(String dbName, String tblName, 
CreateOrReplaceTagInfo tagInfo)
+            throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisCatalog, 
dbName, tblName);
+        TagOptions tagOptions = tagInfo.getTagOptions();
+        Long snapshotId = tagOptions.getSnapshotId()
+                .orElse(
+                        // use current snapshot
+                        
Optional.ofNullable(icebergTable.currentSnapshot()).map(Snapshot::snapshotId).orElse(null));
+
+        if (snapshotId == null) {
+            // Creating tag for empty tables is not allowed
+            throw new UserException(
+                    "Cannot complete replace branch operation on " + 
icebergTable.name() + " , main has no snapshot");
+        }
+
+        String tagName = tagInfo.getTagName();
+        boolean create = tagInfo.getCreate();
+        boolean replace = tagInfo.getReplace();
+        boolean ifNotExists = tagInfo.getIfNotExists();
+        boolean refExists = null != icebergTable.refs().get(tagName);
+
+        ManageSnapshots manageSnapshots = icebergTable.manageSnapshots();
+        if (create && replace && !refExists) {
+            manageSnapshots.createTag(tagName, snapshotId);
+        } else if (replace) {
+            manageSnapshots.replaceTag(tagName, snapshotId);
+        } else {
+            if (refExists && ifNotExists) {
+                return;
+            }
+            manageSnapshots.createTag(tagName, snapshotId);
+        }
+
+        tagOptions.getRetain().ifPresent(n -> 
manageSnapshots.setMaxRefAgeMs(tagName, n));
+        try {
+            preExecutionAuthenticator.execute(() -> manageSnapshots.commit());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to create or replace tag: " + tagName + " in 
table: " + icebergTable.name()
+                            + ", error message is: " + e.getMessage(), e);
+        }
+    }
+
     public PreExecutionAuthenticator getPreExecutionAuthenticator() {
         return preExecutionAuthenticator;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
index c8e9c63d7b9..d790a8cd085 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
@@ -23,6 +23,8 @@ import org.apache.doris.analysis.DropDbStmt;
 import org.apache.doris.analysis.DropTableStmt;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
 
 import org.apache.iceberg.view.View;
 
@@ -98,6 +100,7 @@ public interface ExternalMetadataOps {
     }
 
     /**
+     * truncate table in external metastore
      *
      * @param dbName
      * @param tblName
@@ -113,6 +116,43 @@ public interface ExternalMetadataOps {
     default void afterTruncateTable(String dbName, String tblName) {
     }
 
+    /**
+     * create or replace branch in external metastore
+     *
+     * @param dbName
+     * @param tblName
+     * @param branchInfo
+     * @throws UserException
+     */
+    default void createOrReplaceBranch(String dbName, String tblName, 
CreateOrReplaceBranchInfo branchInfo)
+            throws UserException {
+        createOrReplaceBranchImpl(dbName, tblName, branchInfo);
+        afterCreateOrReplaceBranchOrTag(dbName, tblName);
+    }
+
+    void createOrReplaceBranchImpl(String dbName, String tblName, 
CreateOrReplaceBranchInfo branchInfo)
+            throws UserException;
+
+    default void afterCreateOrReplaceBranchOrTag(String dbName, String 
tblName) {
+    }
+
+    /**
+     * create or replace tag in external metastore
+     *
+     * @param dbName
+     * @param tblName
+     * @param tagInfo
+     * @throws UserException
+     */
+    default void createOrReplaceTag(String dbName, String tblName, 
CreateOrReplaceTagInfo tagInfo)
+            throws UserException {
+        createOrReplaceTagImpl(dbName, tblName, tagInfo);
+        afterCreateOrReplaceBranchOrTag(dbName, tblName);
+    }
+
+    void createOrReplaceTagImpl(String dbName, String tblName, 
CreateOrReplaceTagInfo tagInfo)
+            throws UserException;
+
     /**
      *
      * @return
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 20f4191f80e..21a3a6f849f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -117,6 +117,7 @@ import 
org.apache.doris.persist.SetReplicaVersionOperationLog;
 import org.apache.doris.persist.SetTableStatusOperationLog;
 import org.apache.doris.persist.TableAddOrDropColumnsInfo;
 import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
+import org.apache.doris.persist.TableBranchOrTagInfo;
 import org.apache.doris.persist.TableInfo;
 import org.apache.doris.persist.TablePropertyInfo;
 import org.apache.doris.persist.TableRenameColumnInfo;
@@ -973,6 +974,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_BRANCH_OR_TAG: {
+                data = TableBranchOrTagInfo.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BranchOptions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BranchOptions.java
new file mode 100644
index 00000000000..8b3a8725f75
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BranchOptions.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import java.util.Optional;
+
+/**
+ * Represents options that can be specified for a branch operation in the 
Nereids module.
+ * <p>
+ * This class encapsulates optional parameters that control the behavior of 
branch operations,
+ * such as specifying a snapshot ID, retention policy, number of snapshots to 
keep, and retention period.
+ */
+public class BranchOptions {
+    public static final BranchOptions EMPTY = new 
BranchOptions(Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+    private final Optional<Long> snapshotId;
+    // retain time in milliseconds
+    private final Optional<Long> retain;
+    private final Optional<Integer> numSnapshots;
+    private final Optional<Long> retention;
+
+    public BranchOptions(Optional<Long> snapshotId,
+                         Optional<Long> retain,
+                         Optional<Integer> numSnapshots,
+                         Optional<Long> retention) {
+        this.snapshotId = snapshotId;
+        this.retain = retain;
+        this.numSnapshots = numSnapshots;
+        this.retention = retention;
+    }
+
+    public Optional<Long> getSnapshotId() {
+        return snapshotId;
+    }
+
+    public Optional<Long> getRetain() {
+        return retain;
+    }
+
+    public Optional<Integer> getNumSnapshots() {
+        return numSnapshots;
+    }
+
+    public Optional<Long> getRetention() {
+        return retention;
+    }
+
+    /**
+     * Generates the SQL representation of the branch options.
+     */
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        if (snapshotId.isPresent()) {
+            sb.append(" AS OF VERSION ").append(snapshotId.get());
+        }
+        if (retain.isPresent()) {
+            // "RETAIN", and convert retain time to MINUTES
+            sb.append(" RETAIN ").append(retain.get() / 1000 / 60).append(" 
MINUTES");
+        }
+        if (numSnapshots.isPresent() || retention.isPresent()) {
+            sb.append(" WITH SNAPSHOT RETENTION");
+            if (numSnapshots.isPresent()) {
+                sb.append(" ").append(numSnapshots.get()).append(" SNAPSHOTS");
+            }
+            if (retention.isPresent()) {
+                sb.append(" ").append(retention.get() / 1000 / 60).append(" 
MINUTES");
+            }
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchInfo.java
new file mode 100644
index 00000000000..0b1dd71757f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchInfo.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+/**
+ * Represents the information required to create or replace a branch in the 
Nereids module.
+ * <p>
+ * This class encapsulates the branch name, operation flags (create, replace, 
ifNotExists),
+ * and associated branch options that control the behavior of the branch 
operation.
+ */
+public class CreateOrReplaceBranchInfo {
+
+    private final String branchName;
+    private final BranchOptions branchOptions;
+    private final Boolean create;
+    private final Boolean replace;
+    private final Boolean ifNotExists;
+
+    public CreateOrReplaceBranchInfo(String branchName,
+                                     boolean create,
+                                     boolean replace,
+                                     boolean ifNotExists,
+                                     BranchOptions branchOptions) {
+        this.branchName = branchName;
+        this.create = create;
+        this.replace = replace;
+        this.ifNotExists = ifNotExists;
+        this.branchOptions = branchOptions;
+    }
+
+    public String getBranchName() {
+        return branchName;
+    }
+
+    public BranchOptions getBranchOptions() {
+        return branchOptions;
+    }
+
+    public Boolean getCreate() {
+        return create;
+    }
+
+    public Boolean getReplace() {
+        return replace;
+    }
+
+    public Boolean getIfNotExists() {
+        return ifNotExists;
+    }
+
+    /**
+     * Generates the SQL representation of the create or replace branch 
command.
+     */
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        if (create && replace) {
+            sb.append("CREATE OR REPLACE BRANCH");
+        } else if (create) {
+            sb.append("CREATE BRANCH");
+        } else if (replace) {
+            sb.append("REPLACE BRANCH");
+        }
+        if (ifNotExists) {
+            sb.append(" IF NOT EXISTS");
+        }
+        sb.append(" ").append(branchName);
+        if (branchOptions != null) {
+            sb.append(branchOptions.toSql());
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceTagInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceTagInfo.java
new file mode 100644
index 00000000000..e5506946f28
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceTagInfo.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+/**
+ * Represents the information needed to create or replace a tag in the system.
+ *
+ */
+public class CreateOrReplaceTagInfo {
+
+    private final String tagName;
+    private final TagOptions tagOptions;
+    private final Boolean create;
+    private final Boolean replace;
+    private final Boolean ifNotExists;
+
+    public CreateOrReplaceTagInfo(String tagName,
+                                  boolean create,
+                                  boolean replace,
+                                  boolean ifNotExists,
+                                  TagOptions tagOptions) {
+        this.tagName = tagName;
+        this.create = create;
+        this.replace = replace;
+        this.ifNotExists = ifNotExists;
+        this.tagOptions = tagOptions;
+    }
+
+    public String getTagName() {
+        return tagName;
+    }
+
+    public TagOptions getTagOptions() {
+        return tagOptions;
+    }
+
+    public Boolean getCreate() {
+        return create;
+    }
+
+    public Boolean getReplace() {
+        return replace;
+    }
+
+    public Boolean getIfNotExists() {
+        return ifNotExists;
+    }
+
+    /**
+     * Generates the SQL representation of the create or replace tag command.
+     *
+     * @return SQL string for creating or replacing a tag
+     */
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        if (create && replace) {
+            sb.append("CREATE OR REPLACE TAG");
+        } else if (create) {
+            sb.append("CREATE TAG");
+        } else if (replace) {
+            sb.append("REPLACE TAG");
+        }
+        if (ifNotExists) {
+            sb.append(" IF NOT EXISTS");
+        }
+        sb.append(" ").append(tagName);
+        if (tagOptions != null) {
+            sb.append(tagOptions.toSql());
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RetentionSnapshots.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RetentionSnapshots.java
new file mode 100644
index 00000000000..468b0f9c678
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RetentionSnapshots.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import java.util.Optional;
+
+/**
+ * Represents retention options for branch in create branch statement
+ */
+public class RetentionSnapshots {
+
+    private final Optional<Integer> numSnapshots;
+    private final Optional<Long> retain;
+
+    public RetentionSnapshots(Optional<Integer> numSnapshots, Optional<Long> 
retain) {
+        this.numSnapshots = numSnapshots;
+        this.retain = retain;
+    }
+
+    public Optional<Integer> getNumSnapshots() {
+        return numSnapshots;
+    }
+
+    public Optional<Long> getRetain() {
+        return retain;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/TagOptions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/TagOptions.java
new file mode 100644
index 00000000000..291c87bff45
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/TagOptions.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import java.util.Optional;
+
+/**
+ * Represents the options available for managing tags in Iceberg through Doris.
+ * This class encapsulates optional parameters that can be specified when 
creating or manipulating tags.
+ *
+ * <p>{@code TagOptions} is typically used in conjunction with commands that 
interact with Iceberg tables,
+ * such as creating a tag or specifying retention policies.</p>
+ */
+public class TagOptions {
+    public static final TagOptions EMPTY = new TagOptions(Optional.empty(), 
Optional.empty());
+
+    private final Optional<Long> snapshotId;
+
+    private final Optional<Long> retain;
+
+    public TagOptions(Optional<Long> snapshotId,
+                      Optional<Long> retain) {
+        this.snapshotId = snapshotId;
+        this.retain = retain;
+    }
+
+    public Optional<Long> getSnapshotId() {
+        return snapshotId;
+    }
+
+    public Optional<Long> getRetain() {
+        return retain;
+    }
+
+    /**
+     * Generates the SQL representation of the tag options.
+     */
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        if (snapshotId.isPresent()) {
+            sb.append(" AS OF VERSION ").append(snapshotId.get());
+        }
+        if (retain.isPresent()) {
+            // "RETAIN", and convert retain time to MINUTES
+            sb.append(" RETAIN ").append(retain.get() / 1000 / 60).append(" 
MINUTES");
+        }
+        return sb.toString();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 9edd99484dc..4d119d0f5cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -50,6 +50,7 @@ import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.cooldown.CooldownConfHandler;
 import org.apache.doris.cooldown.CooldownConfList;
 import org.apache.doris.cooldown.CooldownDelete;
+import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalObjectLog;
@@ -1272,6 +1273,14 @@ public class EditLog {
                     env.getIndexPolicyMgr().replayDropIndexPolicy(log);
                     break;
                 }
+                case OperationType.OP_BRANCH_OR_TAG: {
+                    TableBranchOrTagInfo info = (TableBranchOrTagInfo) 
journal.getData();
+                    CatalogIf ctl = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(info.getCtlName());
+                    if (ctl != null) {
+                        ctl.replayCreateOrReplaceBranchOrTag(info.getDbName(), 
info.getTblName());
+                    }
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}, log id: {}", opCode, 
logId, e);
@@ -2246,4 +2255,8 @@ public class EditLog {
     private boolean exceedMaxJournalSize(short op, Writable writable) throws 
IOException {
         return journal.exceedMaxJournalSize(op, writable);
     }
+
+    public void logBranchOrTag(TableBranchOrTagInfo info) {
+        logEdit(OperationType.OP_BRANCH_OR_TAG, info);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index fc4aff23757..78da9ec14a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -56,6 +56,7 @@ public class OperationType {
     public static final short OP_REPLACE_TEMP_PARTITION = 210;
     public static final short OP_BATCH_MODIFY_PARTITION = 211;
     public static final short OP_REPLACE_TABLE = 212;
+    public static final short OP_BRANCH_OR_TAG = 213;
 
     // 20~29 120~129 220~229 ...
     @Deprecated
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/TableBranchOrTagInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/TableBranchOrTagInfo.java
new file mode 100644
index 00000000000..ea1f54607f4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/TableBranchOrTagInfo.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents the information needed for table's branch or tag operation.
+ * This class is used for serialization and deserialization of table branch or 
tag information.
+ */
+public class TableBranchOrTagInfo implements Writable {
+    @SerializedName(value = "ctl")
+    private String ctlName;
+    @SerializedName(value = "db")
+    private String dbName;
+    @SerializedName(value = "tbl")
+    private String tblName;
+
+    public TableBranchOrTagInfo(String ctlName, String dbName, String tblName) 
{
+        this.ctlName = ctlName;
+        this.dbName = dbName;
+        this.tblName = tblName;
+    }
+
+    public String getCtlName() {
+        return ctlName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTblName() {
+        return tblName;
+    }
+
+    public static TableBranchOrTagInfo read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, TableBranchOrTagInfo.class);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof TableBranchOrTagInfo)) {
+            return false;
+        }
+        TableBranchOrTagInfo other = (TableBranchOrTagInfo) obj;
+        return ctlName.equals(other.ctlName) && dbName.equals(other.dbName) && 
tblName.equals(other.tblName);
+    }
+}
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index c087edc6175..9533ee7665a 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -123,6 +123,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("bin", new Integer(SqlParserSymbols.KW_BIN));
         keywordMap.put("binlog", new Integer(SqlParserSymbols.KW_BINLOG));
         keywordMap.put("bitmap", new Integer(SqlParserSymbols.KW_BITMAP));
+        keywordMap.put("branch", new Integer(SqlParserSymbols.KW_BRANCH));
         keywordMap.put("inverted", new Integer(SqlParserSymbols.KW_INVERTED));
         keywordMap.put("bitmap_empty", new 
Integer(SqlParserSymbols.KW_BITMAP_EMPTY));
         keywordMap.put("bitmap_union", new 
Integer(SqlParserSymbols.KW_BITMAP_UNION));
@@ -187,6 +188,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("datetimev2", new 
Integer(SqlParserSymbols.KW_DATETIMEV2));
         keywordMap.put("time", new Integer(SqlParserSymbols.KW_TIME));
         keywordMap.put("day", new Integer(SqlParserSymbols.KW_DAY));
+        keywordMap.put("days", new Integer(SqlParserSymbols.KW_DAYS));
         keywordMap.put("decimal", new Integer(SqlParserSymbols.KW_DECIMAL));
         keywordMap.put("decimalv2", new 
Integer(SqlParserSymbols.KW_DECIMALV2));
         keywordMap.put("decimalv3", new 
Integer(SqlParserSymbols.KW_DECIMALV3));
@@ -274,6 +276,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("hll_union", new 
Integer(SqlParserSymbols.KW_HLL_UNION));
         keywordMap.put("hostname", new Integer(SqlParserSymbols.KW_HOSTNAME));
         keywordMap.put("hour", new Integer(SqlParserSymbols.KW_HOUR));
+        keywordMap.put("hours", new Integer(SqlParserSymbols.KW_HOURS));
         keywordMap.put("hotspot", new Integer(SqlParserSymbols.KW_HOTSPOT));
         keywordMap.put("hub", new Integer(SqlParserSymbols.KW_HUB));
         keywordMap.put("identified", new 
Integer(SqlParserSymbols.KW_IDENTIFIED));
@@ -342,6 +345,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("min", new Integer(SqlParserSymbols.KW_MIN));
         keywordMap.put("minus", new Integer(SqlParserSymbols.KW_MINUS));
         keywordMap.put("minute", new Integer(SqlParserSymbols.KW_MINUTE));
+        keywordMap.put("minutes", new Integer(SqlParserSymbols.KW_MINUTES));
         keywordMap.put("modify", new Integer(SqlParserSymbols.KW_MODIFY));
         keywordMap.put("month", new Integer(SqlParserSymbols.KW_MONTH));
         keywordMap.put("name", new Integer(SqlParserSymbols.KW_NAME));
@@ -420,6 +424,8 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("resources", new 
Integer(SqlParserSymbols.KW_RESOURCES));
         keywordMap.put("restore", new Integer(SqlParserSymbols.KW_RESTORE));
         keywordMap.put("resume", new Integer(SqlParserSymbols.KW_RESUME));
+        keywordMap.put("retain", new Integer(SqlParserSymbols.KW_RETAIN));
+        keywordMap.put("retention", new 
Integer(SqlParserSymbols.KW_RETENTION));
         keywordMap.put("returns", new Integer(SqlParserSymbols.KW_RETURNS));
         keywordMap.put("revoke", new Integer(SqlParserSymbols.KW_REVOKE));
         keywordMap.put("right", new Integer(SqlParserSymbols.KW_RIGHT));
@@ -447,6 +453,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("skew", new Integer(SqlParserSymbols.KW_SKEW));
         keywordMap.put("smallint", new Integer(SqlParserSymbols.KW_SMALLINT));
         keywordMap.put("snapshot", new Integer(SqlParserSymbols.KW_SNAPSHOT));
+        keywordMap.put("snapshots", new 
Integer(SqlParserSymbols.KW_SNAPSHOTS));
         keywordMap.put("soname", new Integer(SqlParserSymbols.KW_SONAME));
         keywordMap.put("split", new Integer(SqlParserSymbols.KW_SPLIT));
         keywordMap.put("sql", new Integer(SqlParserSymbols.KW_SQL));
@@ -474,6 +481,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("tablesample", new 
Integer(SqlParserSymbols.KW_TABLESAMPLE));
         keywordMap.put("tablet", new Integer(SqlParserSymbols.KW_TABLET));
         keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS));
+        keywordMap.put("tag", new Integer(SqlParserSymbols.KW_TAG));
         keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK));
         keywordMap.put("tasks", new Integer(SqlParserSymbols.KW_TASKS));       
 
         keywordMap.put("temporary", new 
Integer(SqlParserSymbols.KW_TEMPORARY));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableBranchAndTagTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableBranchAndTagTest.java
new file mode 100644
index 00000000000..4626b2e7b2d
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableBranchAndTagTest.java
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.nereids.trees.plans.commands.info.BranchOptions;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.TagOptions;
+import org.apache.doris.persist.EditLog;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+public class IcebergExternalTableBranchAndTagTest {
+
+    Path tempDirectory;
+    Table icebergTable;
+    IcebergExternalCatalog catalog;
+    IcebergExternalDatabase db;
+    IcebergExternalTable dorisTable;
+    HadoopCatalog icebergCatalog;
+    MockedStatic<IcebergUtils> mockedIcebergUtils;
+    MockedStatic<Env> mockedEnv;
+    String dbName = "db";
+    String tblName = "tbl";
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        HashMap<String, String> map = new HashMap<>();
+        tempDirectory = Files.createTempDirectory("");
+        map.put("warehouse", "file://" + tempDirectory.toString());
+        map.put("type", "hadoop");
+        System.out.println(tempDirectory);
+        icebergCatalog =
+                (HadoopCatalog) 
CatalogUtil.buildIcebergCatalog("iceberg_catalog", map, new Configuration());
+
+        // init iceberg table
+        icebergCatalog.createNamespace(Namespace.of(dbName));
+        icebergTable = icebergCatalog.createTable(
+                TableIdentifier.of(dbName, tblName),
+            new Schema(Types.NestedField.required(1, "level", 
Types.StringType.get())));
+
+        // init external table
+        catalog = Mockito.spy(new IcebergHadoopExternalCatalog(1L, "iceberg", 
null, map, null));
+        catalog.setInitializedForTest(true);
+        // db = new IcebergExternalDatabase(catalog, 1L, dbName, dbName);
+        db = Mockito.spy(new IcebergExternalDatabase(catalog, 1L, dbName, 
dbName));
+        dorisTable = Mockito.spy(new IcebergExternalTable(1, tblName, tblName, 
catalog, db));
+        Mockito.doReturn(db).when(catalog).getDbNullable(Mockito.any());
+        Mockito.doReturn(dorisTable).when(db).getTableNullable(Mockito.any());
+
+        // mock IcebergUtils.getIcebergTable to return our test icebergTable
+        mockedIcebergUtils = Mockito.mockStatic(IcebergUtils.class);
+        mockedIcebergUtils.when(() -> 
IcebergUtils.getIcebergTable(Mockito.any(), Mockito.any(), Mockito.any()))
+                .thenReturn(icebergTable);
+
+        // mock Env.getCurrentEnv().getEditLog().logBranchOrTag(info) to do 
nothing
+        Env mockEnv = Mockito.mock(Env.class);
+        EditLog mockEditLog = Mockito.mock(EditLog.class);
+        mockedEnv = Mockito.mockStatic(Env.class);
+        mockedEnv.when(Env::getCurrentEnv).thenReturn(mockEnv);
+        Mockito.when(mockEnv.getEditLog()).thenReturn(mockEditLog);
+        Mockito.doNothing().when(mockEditLog).logBranchOrTag(Mockito.any());
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        if (icebergCatalog != null) {
+            icebergCatalog.dropTable(TableIdentifier.of("db", "tbl"));
+            icebergCatalog.dropNamespace(Namespace.of("db"));
+        }
+        Files.deleteIfExists(tempDirectory);
+
+        // close the static mock
+        if (mockedIcebergUtils != null) {
+            mockedIcebergUtils.close();
+        }
+        if (mockedEnv != null) {
+            mockedEnv.close();
+        }
+    }
+
+    @Test
+    public void testCreateTagWithTable() throws UserException, IOException {
+        String tag1 = "tag1";
+        String tag2 = "tag2";
+        String tag3 = "tag3";
+
+        // create a new tag: tag1
+        // will fail
+        CreateOrReplaceTagInfo info =
+                new CreateOrReplaceTagInfo(tag1, true, false, false, 
TagOptions.EMPTY);
+        Assertions.assertThrows(
+                UserException.class,
+                () -> catalog.createOrReplaceTag(dbName, tblName, info));
+
+        // add some data
+        addSomeDataIntoIcebergTable();
+        List<Snapshot> snapshots = 
Lists.newArrayList(icebergTable.snapshots());
+        Assertions.assertEquals(1, snapshots.size());
+
+        // create a new tag: tag1
+        catalog.createOrReplaceTag(dbName, tblName, info);
+        assertSnapshotRef(
+                icebergTable.refs().get(tag1),
+                icebergTable.currentSnapshot().snapshotId(),
+                false, null, null, null);
+
+        // create an existed tag: tag1
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> catalog.createOrReplaceTag(dbName, tblName, info));
+
+        // create an existed tag with replace
+        CreateOrReplaceTagInfo info2 =
+                new CreateOrReplaceTagInfo(tag1, true, true, false, 
TagOptions.EMPTY);
+        catalog.createOrReplaceTag(dbName, tblName, info2);
+        assertSnapshotRef(
+                icebergTable.refs().get(tag1),
+                icebergTable.currentSnapshot().snapshotId(),
+                false, null, null, null);
+
+        // create an existed tag with if not exists
+        CreateOrReplaceTagInfo info3 =
+                new CreateOrReplaceTagInfo(tag1, true, false, true, 
TagOptions.EMPTY);
+        catalog.createOrReplaceTag(dbName, tblName, info3);
+        assertSnapshotRef(
+                icebergTable.refs().get(tag1),
+                icebergTable.currentSnapshot().snapshotId(),
+                false, null, null, null);
+
+        // add some data
+        addSomeDataIntoIcebergTable();
+        addSomeDataIntoIcebergTable();
+        snapshots = Lists.newArrayList(icebergTable.snapshots());
+        Assertions.assertEquals(3, snapshots.size());
+
+        // create new tag: tag2 with snapshotId
+        TagOptions tagOps = new TagOptions(
+                Optional.of(snapshots.get(1).snapshotId()),
+                Optional.empty());
+        CreateOrReplaceTagInfo info4 =
+                new CreateOrReplaceTagInfo(tag2, true, false, false, tagOps);
+        catalog.createOrReplaceTag(dbName, tblName, info4);
+        assertSnapshotRef(
+                icebergTable.refs().get(tag2),
+                snapshots.get(1).snapshotId(),
+                false, null, null, null);
+
+        // update tag2
+        TagOptions tagOps2 = new TagOptions(
+                Optional.empty(),
+                Optional.of(2L));
+        CreateOrReplaceTagInfo info5 =
+                new CreateOrReplaceTagInfo(tag2, true, true, false, tagOps2);
+        catalog.createOrReplaceTag(dbName, tblName, info5);
+        assertSnapshotRef(
+                icebergTable.refs().get(tag2),
+                icebergTable.currentSnapshot().snapshotId(),
+                false, null, null, 2L);
+
+        // create new tag: tag3
+        CreateOrReplaceTagInfo info6 =
+                new CreateOrReplaceTagInfo(tag3, true, false, false, tagOps2);
+        catalog.createOrReplaceTag(dbName, tblName, info6);
+        assertSnapshotRef(
+                icebergTable.refs().get(tag3),
+                icebergTable.currentSnapshot().snapshotId(),
+                false, null, null, 2L);
+
+        Assertions.assertEquals(4, icebergTable.refs().size());
+    }
+
+    @Test
+    public void testCreateBranchWithNotEmptyTable() throws UserException, 
IOException {
+
+        String branch1 = "branch1";
+        String branch2 = "branch2";
+        String branch3 = "branch3";
+
+        // create a new branch: branch1
+        CreateOrReplaceBranchInfo info =
+                new CreateOrReplaceBranchInfo(branch1, true, false, false, 
BranchOptions.EMPTY);
+        catalog.createOrReplaceBranch(dbName, tblName, info);
+        List<Snapshot> snapshots = 
Lists.newArrayList(icebergTable.snapshots());
+        Assertions.assertEquals(1, snapshots.size());
+        assertSnapshotRef(
+                icebergTable.refs().get(branch1),
+                snapshots.get(0).snapshotId(),
+                true, null, null, null);
+
+        // create an existed branch, failed
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> catalog.createOrReplaceBranch(dbName, tblName, info));
+
+        // create or replace an empty branch, will fail
+        // because cannot perform a replace operation on an empty branch.
+        CreateOrReplaceBranchInfo info2 =
+                new CreateOrReplaceBranchInfo(branch1, true, true, false, 
BranchOptions.EMPTY);
+        Assertions.assertThrows(
+                UserException.class,
+                () -> catalog.createOrReplaceBranch(dbName, tblName, info2));
+
+        // create an existed branch with ifNotExists
+        CreateOrReplaceBranchInfo info4 =
+                new CreateOrReplaceBranchInfo(branch1, true, false, true, 
BranchOptions.EMPTY);
+        catalog.createOrReplaceBranch(dbName, tblName, info4);
+        assertSnapshotRef(
+                icebergTable.refs().get(branch1),
+                snapshots.get(0).snapshotId(),
+                true, null, null, null);
+
+        // add some data
+        addSomeDataIntoIcebergTable();
+        snapshots = Lists.newArrayList(icebergTable.snapshots());
+        Assertions.assertEquals(2, snapshots.size());
+
+        // update branch1
+        catalog.createOrReplaceBranch(dbName, tblName, info2);
+        assertSnapshotRef(
+                icebergTable.refs().get(branch1),
+                icebergTable.currentSnapshot().snapshotId(),
+                true, null, null, null);
+
+        // create or replace a new branch: branch2
+        CreateOrReplaceBranchInfo info3 =
+                new CreateOrReplaceBranchInfo(branch2, true, true, false, 
BranchOptions.EMPTY);
+        catalog.createOrReplaceBranch(dbName, tblName, info3);
+        assertSnapshotRef(
+                icebergTable.refs().get(branch2),
+                icebergTable.currentSnapshot().snapshotId(),
+                true, null, null, null);
+
+        // update branch2
+        BranchOptions brOps = new BranchOptions(
+                Optional.empty(),
+                Optional.of(1L),
+                Optional.of(2),
+                Optional.of(3L));
+        CreateOrReplaceBranchInfo info5 =
+                new CreateOrReplaceBranchInfo(branch2, true, true, false, 
brOps);
+        catalog.createOrReplaceBranch(dbName, tblName, info5);
+        assertSnapshotRef(
+                icebergTable.refs().get(branch2),
+                icebergTable.currentSnapshot().snapshotId(),
+                true, 1L, 2, 3L);
+
+        // total branch:
+        //   'main','branch1','branch2'
+        Assertions.assertEquals(3, icebergTable.refs().size());
+
+        // insert some data
+        addSomeDataIntoIcebergTable();
+        addSomeDataIntoIcebergTable();
+        addSomeDataIntoIcebergTable();
+        addSomeDataIntoIcebergTable();
+        snapshots = Lists.newArrayList(icebergTable.snapshots());
+        Assertions.assertEquals(6, snapshots.size());
+
+        // create a new branch: branch3
+        BranchOptions brOps2 = new BranchOptions(
+                Optional.of(snapshots.get(4).snapshotId()),
+                Optional.of(1L),
+                Optional.of(2),
+                Optional.of(3L));
+        CreateOrReplaceBranchInfo info6 =
+                new CreateOrReplaceBranchInfo(branch3, true, true, false, 
brOps2);
+        catalog.createOrReplaceBranch(dbName, tblName, info6);
+        assertSnapshotRef(
+                icebergTable.refs().get(branch3),
+                snapshots.get(4).snapshotId(),
+                true, 1L, 2, 3L);
+
+        // update branch1
+        catalog.createOrReplaceBranch(dbName, tblName, info2);
+        assertSnapshotRef(
+                icebergTable.refs().get(branch1),
+                icebergTable.currentSnapshot().snapshotId(),
+                true, null, null, null);
+
+        Assertions.assertEquals(4, icebergTable.refs().size());
+    }
+
+    private void addSomeDataIntoIcebergTable() throws IOException {
+        Path fileA = 
Files.createFile(tempDirectory.resolve(UUID.randomUUID().toString()));
+        DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
+                .withPath(fileA.toString())
+                .withFileSizeInBytes(10)
+                .withRecordCount(1)
+                .withFormat("parquet");
+        icebergTable.newFastAppend()
+                .appendFile(builder.build())
+                .commit();
+    }
+
+    private void assertSnapshotRef(
+            SnapshotRef ref,
+            Long snapshotId,
+            boolean isBranch,
+            Long maxSnapshotAgeMs,
+            Integer minSnapshotsToKeep,
+            Long maxRefAgeMs) {
+        if (snapshotId != null) {
+            Assertions.assertEquals(snapshotId, ref.snapshotId());
+        }
+        if (isBranch) {
+            Assertions.assertTrue(ref.isBranch());
+        } else {
+            Assertions.assertTrue(ref.isTag());
+        }
+        Assertions.assertEquals(maxSnapshotAgeMs, ref.maxSnapshotAgeMs());
+        Assertions.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep());
+        Assertions.assertEquals(maxRefAgeMs, ref.maxRefAgeMs());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchOrTagInfoTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchOrTagInfoTest.java
new file mode 100644
index 00000000000..e3d9ad9b23b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/CreateOrReplaceBranchOrTagInfoTest.java
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+public class CreateOrReplaceBranchOrTagInfoTest {
+
+    @Test
+    public void testCreateBranchToSql() {
+        // Test CREATE BRANCH with no options
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, null);
+        String expected = "CREATE BRANCH test_branch";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testReplaceBranchToSql() {
+        // Test REPLACE BRANCH with no options
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", false, true, false, null);
+        String expected = "REPLACE BRANCH test_branch";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateOrReplaceBranchToSql() {
+        // Test CREATE OR REPLACE BRANCH with no options
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, true, false, null);
+        String expected = "CREATE OR REPLACE BRANCH test_branch";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateBranchIfNotExistsToSql() {
+        // Test CREATE BRANCH IF NOT EXISTS
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, true, null);
+        String expected = "CREATE BRANCH IF NOT EXISTS test_branch";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateOrReplaceBranchIfNotExistsToSql() {
+        // Test CREATE OR REPLACE BRANCH IF NOT EXISTS
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, true, true, null);
+        String expected = "CREATE OR REPLACE BRANCH IF NOT EXISTS test_branch";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateBranchWithSnapshotIdToSql() {
+        // Test CREATE BRANCH with snapshot ID
+        BranchOptions options = new BranchOptions(
+                Optional.of(123456L), Optional.empty(), Optional.empty(), 
Optional.empty());
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, options);
+        String expected = "CREATE BRANCH test_branch AS OF VERSION 123456";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateBranchWithRetainToSql() {
+        // Test CREATE BRANCH with retain time (1 hour = 3600000ms)
+        BranchOptions options = new BranchOptions(
+                Optional.empty(), Optional.of(3600000L), Optional.empty(), 
Optional.empty());
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, options);
+        String expected = "CREATE BRANCH test_branch RETAIN 60 MINUTES";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateBranchWithNumSnapshotsToSql() {
+        // Test CREATE BRANCH with number of snapshots
+        BranchOptions options = new BranchOptions(
+                Optional.empty(), Optional.empty(), Optional.of(5), 
Optional.empty());
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, options);
+        String expected = "CREATE BRANCH test_branch WITH SNAPSHOT RETENTION 5 
SNAPSHOTS";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateBranchWithRetentionToSql() {
+        // Test CREATE BRANCH with retention time (1 day = 86400000ms)
+        BranchOptions options = new BranchOptions(
+                Optional.empty(), Optional.empty(), Optional.empty(), 
Optional.of(86400000L));
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, options);
+        String expected = "CREATE BRANCH test_branch WITH SNAPSHOT RETENTION 
1440 MINUTES";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateBranchWithNumSnapshotsAndRetentionToSql() {
+        // Test CREATE BRANCH with both num snapshots and retention
+        BranchOptions options = new BranchOptions(
+                Optional.empty(), Optional.empty(), Optional.of(10), 
Optional.of(86400000L));
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, options);
+        String expected = "CREATE BRANCH test_branch WITH SNAPSHOT RETENTION 
10 SNAPSHOTS 1440 MINUTES";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testCreateBranchWithAllOptionsToSql() {
+        // Test CREATE BRANCH with all options
+        BranchOptions options = new BranchOptions(
+                Optional.of(123456L), Optional.of(3600000L), Optional.of(5), 
Optional.of(86400000L));
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, options);
+        String expected
+                = "CREATE BRANCH test_branch AS OF VERSION 123456 RETAIN 60 
MINUTES WITH SNAPSHOT RETENTION 5 SNAPSHOTS 1440 MINUTES";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testEmptyOptionsToSql() {
+        // Test with BranchOptions.EMPTY
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "test_branch", true, false, false, BranchOptions.EMPTY);
+        String expected = "CREATE BRANCH test_branch";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    @Test
+    public void testBranchNameWithSpecialCharacters() {
+        // Test branch name with underscores and numbers
+        CreateOrReplaceBranchInfo branchInfo = new CreateOrReplaceBranchInfo(
+                "feature_branch_v2_0", true, false, false, null);
+        String expected = "CREATE BRANCH feature_branch_v2_0";
+        Assertions.assertEquals(expected, branchInfo.toSql());
+    }
+
+    // ========================== Tag Tests ==========================
+
+    @Test
+    public void testCreateTagToSql() {
+        // Test CREATE TAG with no options
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, false, false, null);
+        String expected = "CREATE TAG test_tag";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testReplaceTagToSql() {
+        // Test REPLACE TAG with no options
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", false, true, false, null);
+        String expected = "REPLACE TAG test_tag";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateOrReplaceTagToSql() {
+        // Test CREATE OR REPLACE TAG with no options
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, true, false, null);
+        String expected = "CREATE OR REPLACE TAG test_tag";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateTagIfNotExistsToSql() {
+        // Test CREATE TAG IF NOT EXISTS
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, false, true, null);
+        String expected = "CREATE TAG IF NOT EXISTS test_tag";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateOrReplaceTagIfNotExistsToSql() {
+        // Test CREATE OR REPLACE TAG IF NOT EXISTS
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, true, true, null);
+        String expected = "CREATE OR REPLACE TAG IF NOT EXISTS test_tag";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateTagWithSnapshotIdToSql() {
+        // Test CREATE TAG with snapshot ID
+        TagOptions options = new TagOptions(Optional.of(123456L), 
Optional.empty());
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, false, false, options);
+        String expected = "CREATE TAG test_tag AS OF VERSION 123456";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateTagWithRetainToSql() {
+        // Test CREATE TAG with retain time (1 hour = 3600000ms)
+        TagOptions options = new TagOptions(Optional.empty(), 
Optional.of(3600000L));
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, false, false, options);
+        String expected = "CREATE TAG test_tag RETAIN 60 MINUTES";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateTagWithAllOptionsToSql() {
+        // Test CREATE TAG with all options (snapshot ID and retain)
+        TagOptions options = new TagOptions(Optional.of(123456L), 
Optional.of(3600000L));
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, false, false, options);
+        String expected = "CREATE TAG test_tag AS OF VERSION 123456 RETAIN 60 
MINUTES";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testEmptyTagOptionsToSql() {
+        // Test with TagOptions.EMPTY
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "test_tag", true, false, false, TagOptions.EMPTY);
+        String expected = "CREATE TAG test_tag";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testTagNameWithSpecialCharacters() {
+        // Test tag name with underscores and numbers
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "release_tag_v1_0", true, false, false, null);
+        String expected = "CREATE TAG release_tag_v1_0";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testReplaceTagWithOptionsToSql() {
+        // Test REPLACE TAG with snapshot ID and retain
+        TagOptions options = new TagOptions(Optional.of(789012L), 
Optional.of(7200000L));
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "production_tag", false, true, false, options);
+        String expected = "REPLACE TAG production_tag AS OF VERSION 789012 
RETAIN 120 MINUTES";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateOrReplaceTagWithSnapshotOnlyToSql() {
+        // Test CREATE OR REPLACE TAG with only snapshot ID
+        TagOptions options = new TagOptions(Optional.of(555666L), 
Optional.empty());
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "backup_tag", true, true, false, options);
+        String expected = "CREATE OR REPLACE TAG backup_tag AS OF VERSION 
555666";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+
+    @Test
+    public void testCreateTagIfNotExistsWithRetainToSql() {
+        // Test CREATE TAG IF NOT EXISTS with retain time
+        TagOptions options = new TagOptions(Optional.empty(), 
Optional.of(86400000L));
+        CreateOrReplaceTagInfo tagInfo = new CreateOrReplaceTagInfo(
+                "daily_tag", true, false, true, options);
+        String expected = "CREATE TAG IF NOT EXISTS daily_tag RETAIN 1440 
MINUTES";
+        Assertions.assertEquals(expected, tagInfo.toSql());
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/iceberg/iceberg_branch_tag_operate.out 
b/regression-test/data/external_table_p0/iceberg/iceberg_branch_tag_operate.out
new file mode 100644
index 00000000000..a7108fdcc02
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/iceberg_branch_tag_operate.out 
differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
 
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
new file mode 100644
index 00000000000..d96b2a08397
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_branch_tag_operate", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "iceberg_branch_tag_operate"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """ use ${catalog_name}.test_db """
+
+    sql """ drop table if exists test_branch_tag_operate """
+    sql """ create table test_branch_tag_operate (id int) """
+
+    // with empty table
+
+    test {
+        sql """ alter table test_branch_tag_operate create tag b1 """
+        exception "main has no snapshot"
+    }
+
+    sql """ alter table test_branch_tag_operate create branch b1 """
+    sql """ alter table test_branch_tag_operate create branch if not exists b1 
"""
+
+    test {
+        sql """ alter table test_branch_tag_operate create or replace branch 
b1 """
+        exception "main has no snapshot"
+    }
+
+    test {
+        sql """ alter table test_branch_tag_operate create branch b1 """
+        exception "Ref b1 already exists"
+    }
+
+    qt_q1 """ select * from test_branch_tag_operate@branch(b1) """ // empty 
table
+
+    // with some data
+    sql """ insert into test_branch_tag_operate values (1) """
+    sql """ insert into test_branch_tag_operate values (2) """
+    sql """ insert into test_branch_tag_operate values (3) """
+    sql """ insert into test_branch_tag_operate values (4) """
+    sql """ insert into test_branch_tag_operate values (5) """
+
+    
+    List<List<Object>> snapshots = sql """ select snapshot_id from 
iceberg_meta("table" = "${catalog_name}.test_db.test_branch_tag_operate", 
"query_type" = "snapshots") order by committed_at; """
+    String s0 = snapshots.get(0)[0]
+    String s1 = snapshots.get(1)[0]
+    String s2 = snapshots.get(2)[0]
+    String s3 = snapshots.get(3)[0]
+    String s4 = snapshots.get(4)[0]
+
+    // branch
+    sql """ alter table test_branch_tag_operate create branch b2 as of version 
${s0} """
+    qt_q2 """ select * from test_branch_tag_operate@branch(b2) order by id """ 
// 0 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b2 AS 
OF VERSION ${s1} RETAIN 2 days """
+    qt_q3 """ select * from test_branch_tag_operate@branch(b2) order by id """ 
// 1 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b2 AS 
OF VERSION ${s2} RETAIN 2 hours WITH SNAPSHOT RETENTION 3 SNAPSHOTS"""
+    qt_q4 """ select * from test_branch_tag_operate@branch(b2) order by id """ 
// 2 records
+
+    sql """ alter table test_branch_tag_operate replace branch b2 AS OF 
VERSION ${s3} RETAIN 2 hours WITH SNAPSHOT RETENTION 4 DAYS """
+    qt_q5 """ select * from test_branch_tag_operate@branch(b2) order by id """ 
// 3 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b2 
RETAIN 2 hours WITH SNAPSHOT RETENTION 3 SNAPSHOTS 4 DAYS """
+    qt_q6 """ select * from test_branch_tag_operate@branch(b2) order by id """ 
// 5 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b3 AS 
OF VERSION ${s1} RETAIN 2 days """
+    qt_q7 """ select * from test_branch_tag_operate@branch(b3) order by id """ 
// 1 records
+
+    sql """ alter table test_branch_tag_operate create branch if not exists b3 
AS OF VERSION ${s2} RETAIN 2 days """
+    qt_q8 """ select * from test_branch_tag_operate@branch(b3) order by id """ 
// still 1 records
+
+    sql """ alter table test_branch_tag_operate create branch if not exists b4 
AS OF VERSION ${s2} RETAIN 2 MINUTES WITH SNAPSHOT RETENTION 3 SNAPSHOTS """
+    qt_q9 """ select * from test_branch_tag_operate@branch(b4) order by id """ 
// 2 records
+
+    sql """ alter table test_branch_tag_operate create branch if not exists b5 
"""
+    qt_q10 """ select * from test_branch_tag_operate@branch(b5) order by id 
""" // 5 records
+
+    sql """ alter table test_branch_tag_operate create branch if not exists b6 
AS OF VERSION ${s2} """
+    qt_q11 """ select * from test_branch_tag_operate@branch(b6) order by id 
""" // 2 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b6 AS 
OF VERSION ${s3} """
+    qt_q12 """ select * from test_branch_tag_operate@branch(b6) order by id 
""" // 3 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b6 """
+    qt_q13 """ select * from test_branch_tag_operate@branch(b6) order by id 
""" // 5 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b6 """
+    qt_q14 """ select * from test_branch_tag_operate@branch(b6) order by id 
""" // still 5 records
+
+    sql """ alter table test_branch_tag_operate create or replace branch b6 
RETAIN 2 DAYS """
+    qt_q15 """ select * from test_branch_tag_operate@branch(b6) order by id 
""" // still 5 records
+
+    sql """ alter table test_branch_tag_operate create branch b7 """
+    qt_q16 """ select * from test_branch_tag_operate@branch(b7) order by id 
""" // 5 records
+
+    test {
+        sql """ alter table test_branch_tag_operate create branch b7 as of 
version ${s3} """
+        exception "Ref b7 already exists"
+    }
+
+    test {
+        sql """ alter table test_branch_tag_operate create branch b8 as of 
version 11223344 """
+        exception "Cannot set b8 to unknown snapshot: 11223344"
+    }
+
+
+    // tag
+    sql """ alter table test_branch_tag_operate create tag t2 as of version 
${s0} """
+    qt_q20 """ select * from test_branch_tag_operate@tag(t2) order by id """ 
// 0 records
+
+    sql """ alter table test_branch_tag_operate create or replace tag t2 as of 
version ${s1} """
+    qt_q21 """ select * from test_branch_tag_operate@tag(t2) order by id """ 
// 1 records
+
+    sql """ alter table test_branch_tag_operate create or replace tag t2 as of 
version ${s2} RETAIN 10 MINUTES """
+    qt_q22 """ select * from test_branch_tag_operate@tag(t2) order by id """ 
// 2 records
+
+    sql """ alter table test_branch_tag_operate create or replace tag t2 
RETAIN 10 MINUTES """
+    qt_q23 """ select * from test_branch_tag_operate@tag(t2) order by id """ 
// 5 records
+
+    sql """ alter table test_branch_tag_operate create tag if not exists t3 as 
of version ${s1} """
+    qt_q24 """ select * from test_branch_tag_operate@tag(t3) order by id """ 
// 1 records
+
+    sql """ alter table test_branch_tag_operate create tag if not exists t3 as 
of version ${s2} """  // still 1 records
+    qt_q25 """ select * from test_branch_tag_operate@tag(t3) order by id """
+
+    sql """ alter table test_branch_tag_operate create tag t4 as of version 
${s2} """
+    qt_q26 """ select * from test_branch_tag_operate@tag(t4) order by id """ 
// 2 records
+
+    sql """ alter table test_branch_tag_operate create or replace tag t5 as of 
version ${s3} """
+    qt_q27 """ select * from test_branch_tag_operate@tag(t5) order by id """ 
// 3 records
+
+    sql """ alter table test_branch_tag_operate create tag t6 """
+    qt_q28 """ select * from test_branch_tag_operate@tag(t6) order by id """ 
// 5 records
+
+    test {
+        sql """ alter table test_branch_tag_operate create tag t6 as of 
version ${s3} """
+        exception "Ref t6 already exists"
+    }
+
+    test {
+        sql """ alter table test_branch_tag_operate create branch t7 as of 
version 11223344 """
+        exception "Cannot set t7 to unknown snapshot: 11223344"
+    }
+
+    // test branch/tag with schema change
+    qt_sc01 """select * from tmp_schema_change_branch order by id;"""
+    /// select by branch will use table schema
+    qt_sc02 """select * from tmp_schema_change_branch@branch(test_branch) 
order by id;;"""
+    qt_sc03 """select * from tmp_schema_change_branch for version as of 
"test_branch" order by id;;"""
+    List<List<Object>> refs = sql """select * from 
tmp_schema_change_branch\$refs order by name"""
+    String s_main = refs.get(0)[2]
+    String s_test_branch = refs.get(1)[2]
+    
+    /// select by version will use branch schema
+    qt_sc04 """SELECT * FROM tmp_schema_change_branch for VERSION AS OF 
${s_test_branch} order by id;"""
+    qt_sc05 """SELECT * FROM tmp_schema_change_branch for VERSION AS OF 
${s_main} order by id;"""
+
+    /// select by tag will use tag schema
+    qt_sc06 """SELECT * FROM tmp_schema_change_branch@tag(test_tag) order by 
id;"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to