amogh-jahagirdar commented on code in PR #14215:
URL: https://github.com/apache/iceberg/pull/14215#discussion_r2478700370


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    String branchName = "historical-analysis";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Verify branch points to the specific snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(firstSnapshot.snapshotId());
+    
assertThat(refs.get(branchName).snapshotId()).isNotEqualTo(secondSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 5;
+    long maxSnapshotAgeMs = Duration.ofDays(7).toMillis();
+    long maxRefAgeMs = Duration.ofDays(30).toMillis();
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, currentSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .setMaxRefAgeMs(branchName, maxRefAgeMs)
+        .commit();
+
+    // Verify branch with retention policies
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+    assertThat(branchRef.isBranch()).isTrue();
+    assertThat(branchRef.minSnapshotsToKeep()).isEqualTo(minSnapshotsToKeep);
+    assertThat(branchRef.maxSnapshotAgeMs()).isEqualTo(maxSnapshotAgeMs);
+    assertThat(branchRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testCreateTagFromSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "end-of-month-2024-01";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+    assertThat(refs.get(tagName).isTag()).isTrue();
+    
assertThat(refs.get(tagName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateTagWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "yearly-archive-2024";
+    long maxRefAgeMs = Duration.ofDays(365).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag with retention policy
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef).isNotNull();
+    assertThat(tagRef.isTag()).isTrue();
+    assertThat(tagRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testRemoveBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "staging-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+
+    // Remove branch
+    table.manageSnapshots().removeBranch(branchName).commit();
+
+    // Verify branch is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(branchName);
+  }
+
+  @Test
+  public void testRemoveTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "pre-migration-backup";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Remove tag
+    table.manageSnapshots().removeTag(tagName).commit();
+
+    // Verify tag is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testReplaceBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String branchName = "data-validation";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace branch to point to new snapshot
+    table.manageSnapshots().replaceBranch(branchName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify branch points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    
assertThat(branchRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testReplaceTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String tagName = "quarterly-audit-2024-q1";
+    table.manageSnapshots().createTag(tagName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace tag to point to new snapshot
+    table.manageSnapshots().replaceTag(tagName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify tag points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    assertThat(tagRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testBranchIndependentLineage() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot mainSnapshot = table.currentSnapshot();
+
+    // Create branch from main
+    String branchName = "experimental-features";
+    table.manageSnapshots().createBranch(branchName, 
mainSnapshot.snapshotId()).commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch (this should create a new snapshot on the 
branch)
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify both branches have independent snapshots
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Verify the branch snapshot contains FILE_C
+    Snapshot branchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(branchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testBranchRetentionPolicyEnforcement() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    // Create initial snapshot
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    // Create branch with retention policy
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 2;
+    long maxSnapshotAgeMs = Duration.ofSeconds(1).toMillis(); // Very short 
for testing
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, firstSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .commit();
+
+    // Add more snapshots to the branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_B).commit();
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Wait for snapshots to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxSnapshotAgeMs);
+
+    // Expire snapshots
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify retention policy was enforced
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+
+    // The branch should still exist and point to a recent snapshot
+    Snapshot currentBranchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(currentBranchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testTagMaxRefAgeExpiration() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    // Create tag with very short max ref age
+    String tagName = "temp-analysis-snapshot";
+    long maxRefAgeMs = Duration.ofSeconds(1).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Wait for tag to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxRefAgeMs);
+
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify tag was expired
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testMultipleBranchesAndTags() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    String branch1 = "feature-1";
+    String branch2 = "feature-2";
+    String branch3 = "hotfix";
+
+    table
+        .manageSnapshots()
+        .createBranch(branch1, baseSnapshot.snapshotId())
+        .createBranch(branch2, baseSnapshot.snapshotId())
+        .createBranch(branch3, baseSnapshot.snapshotId())
+        .commit();
+
+    String tag1 = "end-of-month-2024-01";
+    String tag2 = "end-of-month-2024-02";
+    String tag3 = "compliance-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createTag(tag1, baseSnapshot.snapshotId())
+        .createTag(tag2, baseSnapshot.snapshotId())
+        .createTag(tag3, baseSnapshot.snapshotId())
+        .commit();
+
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs)
+        .containsKey(branch1)
+        .containsKey(branch2)
+        .containsKey(branch3)
+        .containsKey(tag1)
+        .containsKey(tag2)
+        .containsKey(tag3);
+
+    assertThat(refs.get(branch1).isBranch()).isTrue();
+    assertThat(refs.get(branch2).isBranch()).isTrue();
+    assertThat(refs.get(branch3).isBranch()).isTrue();
+    assertThat(refs.get(tag1).isTag()).isTrue();
+    assertThat(refs.get(tag2).isTag()).isTrue();
+    assertThat(refs.get(tag3).isTag()).isTrue();
+  }
+
+  @Test
+  public void testBranchAndTagSnapshotIsolation() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    // Create branch and tag from same snapshot
+    String branchName = "etl-pipeline";
+    String tagName = "data-migration-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, baseSnapshot.snapshotId())
+        .createTag(tagName, baseSnapshot.snapshotId())
+        .commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify isolation
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+    SnapshotRef tagRef = refs.get(tagName);
+
+    // Main branch should point to latest main snapshot
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+
+    // Feature branch should point to its own snapshot (different from main)
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Tag should still point to original snapshot
+    assertThat(tagRef.snapshotId()).isEqualTo(baseSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFailsWhenBranchExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "production-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Attempt to create the same branch again should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().createBranch(branchName).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", branchName);
+  }
+
+  @Test
+  public void testCreateTagFailsWhenTagExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "production-checkpoint";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Attempt to create the same tag again should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", tagName);
+  }
+
+  @Test
+  public void testRemoveBranchFailsWhenBranchDoesNotExist() {
+    C catalog = catalog();

Review Comment:
   These tests aren't hitting the catalog, they're just asserting failures that 
happen on the client.



##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    String branchName = "historical-analysis";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Verify branch points to the specific snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(firstSnapshot.snapshotId());
+    
assertThat(refs.get(branchName).snapshotId()).isNotEqualTo(secondSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 5;
+    long maxSnapshotAgeMs = Duration.ofDays(7).toMillis();
+    long maxRefAgeMs = Duration.ofDays(30).toMillis();
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, currentSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .setMaxRefAgeMs(branchName, maxRefAgeMs)
+        .commit();
+
+    // Verify branch with retention policies
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+    assertThat(branchRef.isBranch()).isTrue();
+    assertThat(branchRef.minSnapshotsToKeep()).isEqualTo(minSnapshotsToKeep);
+    assertThat(branchRef.maxSnapshotAgeMs()).isEqualTo(maxSnapshotAgeMs);
+    assertThat(branchRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testCreateTagFromSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "end-of-month-2024-01";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+    assertThat(refs.get(tagName).isTag()).isTrue();
+    
assertThat(refs.get(tagName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateTagWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "yearly-archive-2024";
+    long maxRefAgeMs = Duration.ofDays(365).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag with retention policy
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef).isNotNull();
+    assertThat(tagRef.isTag()).isTrue();
+    assertThat(tagRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testRemoveBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "staging-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+
+    // Remove branch
+    table.manageSnapshots().removeBranch(branchName).commit();
+
+    // Verify branch is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(branchName);
+  }
+
+  @Test
+  public void testRemoveTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "pre-migration-backup";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Remove tag
+    table.manageSnapshots().removeTag(tagName).commit();
+
+    // Verify tag is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testReplaceBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String branchName = "data-validation";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace branch to point to new snapshot
+    table.manageSnapshots().replaceBranch(branchName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify branch points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    
assertThat(branchRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testReplaceTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String tagName = "quarterly-audit-2024-q1";
+    table.manageSnapshots().createTag(tagName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace tag to point to new snapshot
+    table.manageSnapshots().replaceTag(tagName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify tag points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    assertThat(tagRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testBranchIndependentLineage() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot mainSnapshot = table.currentSnapshot();
+
+    // Create branch from main
+    String branchName = "experimental-features";
+    table.manageSnapshots().createBranch(branchName, 
mainSnapshot.snapshotId()).commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch (this should create a new snapshot on the 
branch)
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify both branches have independent snapshots
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Verify the branch snapshot contains FILE_C
+    Snapshot branchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(branchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testBranchRetentionPolicyEnforcement() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    // Create initial snapshot
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    // Create branch with retention policy
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 2;
+    long maxSnapshotAgeMs = Duration.ofSeconds(1).toMillis(); // Very short 
for testing
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, firstSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .commit();
+
+    // Add more snapshots to the branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_B).commit();
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Wait for snapshots to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxSnapshotAgeMs);
+
+    // Expire snapshots
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify retention policy was enforced
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+
+    // The branch should still exist and point to a recent snapshot
+    Snapshot currentBranchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(currentBranchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testTagMaxRefAgeExpiration() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    // Create tag with very short max ref age
+    String tagName = "temp-analysis-snapshot";
+    long maxRefAgeMs = Duration.ofSeconds(1).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Wait for tag to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxRefAgeMs);
+
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify tag was expired
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testMultipleBranchesAndTags() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    String branch1 = "feature-1";
+    String branch2 = "feature-2";
+    String branch3 = "hotfix";
+
+    table
+        .manageSnapshots()
+        .createBranch(branch1, baseSnapshot.snapshotId())
+        .createBranch(branch2, baseSnapshot.snapshotId())
+        .createBranch(branch3, baseSnapshot.snapshotId())
+        .commit();
+
+    String tag1 = "end-of-month-2024-01";
+    String tag2 = "end-of-month-2024-02";
+    String tag3 = "compliance-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createTag(tag1, baseSnapshot.snapshotId())
+        .createTag(tag2, baseSnapshot.snapshotId())
+        .createTag(tag3, baseSnapshot.snapshotId())
+        .commit();
+
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs)
+        .containsKey(branch1)
+        .containsKey(branch2)
+        .containsKey(branch3)
+        .containsKey(tag1)
+        .containsKey(tag2)
+        .containsKey(tag3);
+
+    assertThat(refs.get(branch1).isBranch()).isTrue();
+    assertThat(refs.get(branch2).isBranch()).isTrue();
+    assertThat(refs.get(branch3).isBranch()).isTrue();
+    assertThat(refs.get(tag1).isTag()).isTrue();
+    assertThat(refs.get(tag2).isTag()).isTrue();
+    assertThat(refs.get(tag3).isTag()).isTrue();
+  }
+
+  @Test
+  public void testBranchAndTagSnapshotIsolation() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    // Create branch and tag from same snapshot
+    String branchName = "etl-pipeline";
+    String tagName = "data-migration-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, baseSnapshot.snapshotId())
+        .createTag(tagName, baseSnapshot.snapshotId())
+        .commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify isolation
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+    SnapshotRef tagRef = refs.get(tagName);
+
+    // Main branch should point to latest main snapshot
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+
+    // Feature branch should point to its own snapshot (different from main)
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Tag should still point to original snapshot
+    assertThat(tagRef.snapshotId()).isEqualTo(baseSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFailsWhenBranchExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "production-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Attempt to create the same branch again should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().createBranch(branchName).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", branchName);
+  }
+
+  @Test
+  public void testCreateTagFailsWhenTagExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "production-checkpoint";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Attempt to create the same tag again should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", tagName);
+  }
+
+  @Test
+  public void testRemoveBranchFailsWhenBranchDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentBranch = "missing-data-pipeline";
+
+    // Attempt to remove non-existent branch should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeBranch(nonExistentBranch).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Branch does not exist: %s", nonExistentBranch);
+  }
+
+  @Test
+  public void testRemoveTagFailsWhenTagDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentTag = "missing-checkpoint";
+
+    // Attempt to remove non-existent tag should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeTag(nonExistentTag).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Tag does not exist: %s", nonExistentTag);
+  }
+
+  @Test
+  public void testRemoveMainBranchFails() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    // Attempt to remove main branch should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeBranch("main").commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot remove main branch");
+  }
+
+  @Test
+  public void testCreateBranchWithInvalidSnapshotId() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String branchName = "corrupted-data-branch";
+    long invalidSnapshotId = 999999L; // Non-existent snapshot ID
+
+    // Attempt to create branch with invalid snapshot ID should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createBranch(branchName, 
invalidSnapshotId).commit())
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("unknown snapshot");
+  }
+
+  @Test
+  public void testCreateTagWithInvalidSnapshotId() {

Review Comment:
   Same as above, not testing against the catalog, this just fails on the 
client. We should  already have coverage of these in TestSnapshotManager



##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    String branchName = "historical-analysis";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Verify branch points to the specific snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(firstSnapshot.snapshotId());
+    
assertThat(refs.get(branchName).snapshotId()).isNotEqualTo(secondSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 5;
+    long maxSnapshotAgeMs = Duration.ofDays(7).toMillis();
+    long maxRefAgeMs = Duration.ofDays(30).toMillis();
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, currentSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .setMaxRefAgeMs(branchName, maxRefAgeMs)
+        .commit();
+
+    // Verify branch with retention policies
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+    assertThat(branchRef.isBranch()).isTrue();
+    assertThat(branchRef.minSnapshotsToKeep()).isEqualTo(minSnapshotsToKeep);
+    assertThat(branchRef.maxSnapshotAgeMs()).isEqualTo(maxSnapshotAgeMs);
+    assertThat(branchRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testCreateTagFromSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "end-of-month-2024-01";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+    assertThat(refs.get(tagName).isTag()).isTrue();
+    
assertThat(refs.get(tagName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateTagWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "yearly-archive-2024";
+    long maxRefAgeMs = Duration.ofDays(365).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag with retention policy
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef).isNotNull();
+    assertThat(tagRef.isTag()).isTrue();
+    assertThat(tagRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testRemoveBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "staging-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+
+    // Remove branch
+    table.manageSnapshots().removeBranch(branchName).commit();
+
+    // Verify branch is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(branchName);
+  }
+
+  @Test
+  public void testRemoveTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "pre-migration-backup";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Remove tag
+    table.manageSnapshots().removeTag(tagName).commit();
+
+    // Verify tag is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testReplaceBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String branchName = "data-validation";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace branch to point to new snapshot
+    table.manageSnapshots().replaceBranch(branchName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify branch points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    
assertThat(branchRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testReplaceTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String tagName = "quarterly-audit-2024-q1";
+    table.manageSnapshots().createTag(tagName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace tag to point to new snapshot
+    table.manageSnapshots().replaceTag(tagName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify tag points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    assertThat(tagRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testBranchIndependentLineage() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot mainSnapshot = table.currentSnapshot();
+
+    // Create branch from main
+    String branchName = "experimental-features";
+    table.manageSnapshots().createBranch(branchName, 
mainSnapshot.snapshotId()).commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch (this should create a new snapshot on the 
branch)
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify both branches have independent snapshots
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Verify the branch snapshot contains FILE_C
+    Snapshot branchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(branchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testBranchRetentionPolicyEnforcement() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    // Create initial snapshot
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    // Create branch with retention policy
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 2;
+    long maxSnapshotAgeMs = Duration.ofSeconds(1).toMillis(); // Very short 
for testing
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, firstSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .commit();
+
+    // Add more snapshots to the branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_B).commit();
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Wait for snapshots to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxSnapshotAgeMs);
+
+    // Expire snapshots
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify retention policy was enforced
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+
+    // The branch should still exist and point to a recent snapshot
+    Snapshot currentBranchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(currentBranchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testTagMaxRefAgeExpiration() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    // Create tag with very short max ref age
+    String tagName = "temp-analysis-snapshot";
+    long maxRefAgeMs = Duration.ofSeconds(1).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Wait for tag to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxRefAgeMs);
+
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify tag was expired
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testMultipleBranchesAndTags() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    String branch1 = "feature-1";
+    String branch2 = "feature-2";
+    String branch3 = "hotfix";
+
+    table
+        .manageSnapshots()
+        .createBranch(branch1, baseSnapshot.snapshotId())
+        .createBranch(branch2, baseSnapshot.snapshotId())
+        .createBranch(branch3, baseSnapshot.snapshotId())
+        .commit();
+
+    String tag1 = "end-of-month-2024-01";
+    String tag2 = "end-of-month-2024-02";
+    String tag3 = "compliance-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createTag(tag1, baseSnapshot.snapshotId())
+        .createTag(tag2, baseSnapshot.snapshotId())
+        .createTag(tag3, baseSnapshot.snapshotId())
+        .commit();
+
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs)
+        .containsKey(branch1)
+        .containsKey(branch2)
+        .containsKey(branch3)
+        .containsKey(tag1)
+        .containsKey(tag2)
+        .containsKey(tag3);
+
+    assertThat(refs.get(branch1).isBranch()).isTrue();
+    assertThat(refs.get(branch2).isBranch()).isTrue();
+    assertThat(refs.get(branch3).isBranch()).isTrue();
+    assertThat(refs.get(tag1).isTag()).isTrue();
+    assertThat(refs.get(tag2).isTag()).isTrue();
+    assertThat(refs.get(tag3).isTag()).isTrue();
+  }
+
+  @Test
+  public void testBranchAndTagSnapshotIsolation() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    // Create branch and tag from same snapshot
+    String branchName = "etl-pipeline";
+    String tagName = "data-migration-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, baseSnapshot.snapshotId())
+        .createTag(tagName, baseSnapshot.snapshotId())
+        .commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify isolation
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+    SnapshotRef tagRef = refs.get(tagName);
+
+    // Main branch should point to latest main snapshot
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+
+    // Feature branch should point to its own snapshot (different from main)
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Tag should still point to original snapshot
+    assertThat(tagRef.snapshotId()).isEqualTo(baseSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFailsWhenBranchExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "production-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Attempt to create the same branch again should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().createBranch(branchName).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", branchName);
+  }
+
+  @Test
+  public void testCreateTagFailsWhenTagExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "production-checkpoint";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Attempt to create the same tag again should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", tagName);
+  }
+
+  @Test
+  public void testRemoveBranchFailsWhenBranchDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentBranch = "missing-data-pipeline";
+
+    // Attempt to remove non-existent branch should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeBranch(nonExistentBranch).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Branch does not exist: %s", nonExistentBranch);
+  }
+
+  @Test
+  public void testRemoveTagFailsWhenTagDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentTag = "missing-checkpoint";
+
+    // Attempt to remove non-existent tag should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeTag(nonExistentTag).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Tag does not exist: %s", nonExistentTag);
+  }
+
+  @Test
+  public void testRemoveMainBranchFails() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    // Attempt to remove main branch should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeBranch("main").commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot remove main branch");
+  }
+
+  @Test
+  public void testCreateBranchWithInvalidSnapshotId() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String branchName = "corrupted-data-branch";
+    long invalidSnapshotId = 999999L; // Non-existent snapshot ID
+
+    // Attempt to create branch with invalid snapshot ID should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createBranch(branchName, 
invalidSnapshotId).commit())
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("unknown snapshot");
+  }
+
+  @Test
+  public void testCreateTagWithInvalidSnapshotId() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String tagName = "corrupted-checkpoint";
+    long invalidSnapshotId = 999999L; // Non-existent snapshot ID
+
+    // Attempt to create tag with invalid snapshot ID should fail
+    assertThatThrownBy(() -> table.manageSnapshots().createTag(tagName, 
invalidSnapshotId).commit())
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("unknown snapshot");
+  }
+
+  @Test
+  public void testSetRetentionPolicyOnNonExistentBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentBranch = "missing-data-pipeline";
+
+    // Attempt to set retention policy on non-existent branch should fail
+    assertThatThrownBy(
+            () -> 
table.manageSnapshots().setMinSnapshotsToKeep(nonExistentBranch, 5).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Branch does not exist: %s", nonExistentBranch);
+  }
+
+  @Test
+  public void testSetRetentionPolicyOnTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "analytics-checkpoint";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Attempt to set branch-specific retention policy on tag should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().setMinSnapshotsToKeep(tagName, 5).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Tags do not support setting 
minSnapshotsToKeep");
+  }
+
+  @Test
+  public void testReplaceBranchWithInvalidSnapshotId() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "analytics-branch";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    long invalidSnapshotId = 999999L; // Non-existent snapshot ID
+
+    // Attempt to replace branch with invalid snapshot ID should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().replaceBranch(branchName, 
invalidSnapshotId).commit())
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("unknown snapshot");
+  }
+
+  @Test
+  public void testReplaceTagWithInvalidSnapshotId() {
+    C catalog = catalog();

Review Comment:
   Same as above, just validating failure on client, never hits the catalog. 
Should have coverage in TestSnapshotManager



##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    String branchName = "historical-analysis";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Verify branch points to the specific snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(firstSnapshot.snapshotId());
+    
assertThat(refs.get(branchName).snapshotId()).isNotEqualTo(secondSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 5;
+    long maxSnapshotAgeMs = Duration.ofDays(7).toMillis();
+    long maxRefAgeMs = Duration.ofDays(30).toMillis();
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, currentSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .setMaxRefAgeMs(branchName, maxRefAgeMs)
+        .commit();
+
+    // Verify branch with retention policies
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+    assertThat(branchRef.isBranch()).isTrue();
+    assertThat(branchRef.minSnapshotsToKeep()).isEqualTo(minSnapshotsToKeep);
+    assertThat(branchRef.maxSnapshotAgeMs()).isEqualTo(maxSnapshotAgeMs);
+    assertThat(branchRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testCreateTagFromSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "end-of-month-2024-01";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+    assertThat(refs.get(tagName).isTag()).isTrue();
+    
assertThat(refs.get(tagName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateTagWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "yearly-archive-2024";
+    long maxRefAgeMs = Duration.ofDays(365).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag with retention policy
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef).isNotNull();
+    assertThat(tagRef.isTag()).isTrue();
+    assertThat(tagRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testRemoveBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "staging-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+
+    // Remove branch
+    table.manageSnapshots().removeBranch(branchName).commit();
+
+    // Verify branch is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(branchName);
+  }
+
+  @Test
+  public void testRemoveTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "pre-migration-backup";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Remove tag
+    table.manageSnapshots().removeTag(tagName).commit();
+
+    // Verify tag is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testReplaceBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String branchName = "data-validation";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace branch to point to new snapshot
+    table.manageSnapshots().replaceBranch(branchName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify branch points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    
assertThat(branchRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testReplaceTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String tagName = "quarterly-audit-2024-q1";
+    table.manageSnapshots().createTag(tagName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace tag to point to new snapshot
+    table.manageSnapshots().replaceTag(tagName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify tag points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    assertThat(tagRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testBranchIndependentLineage() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot mainSnapshot = table.currentSnapshot();
+
+    // Create branch from main
+    String branchName = "experimental-features";
+    table.manageSnapshots().createBranch(branchName, 
mainSnapshot.snapshotId()).commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch (this should create a new snapshot on the 
branch)
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify both branches have independent snapshots
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Verify the branch snapshot contains FILE_C
+    Snapshot branchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(branchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testBranchRetentionPolicyEnforcement() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    // Create initial snapshot
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    // Create branch with retention policy
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 2;
+    long maxSnapshotAgeMs = Duration.ofSeconds(1).toMillis(); // Very short 
for testing
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, firstSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .commit();
+
+    // Add more snapshots to the branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_B).commit();
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Wait for snapshots to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxSnapshotAgeMs);
+
+    // Expire snapshots
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify retention policy was enforced
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+
+    // The branch should still exist and point to a recent snapshot
+    Snapshot currentBranchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(currentBranchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testTagMaxRefAgeExpiration() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    // Create tag with very short max ref age
+    String tagName = "temp-analysis-snapshot";
+    long maxRefAgeMs = Duration.ofSeconds(1).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Wait for tag to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxRefAgeMs);
+
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify tag was expired
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testMultipleBranchesAndTags() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    String branch1 = "feature-1";
+    String branch2 = "feature-2";
+    String branch3 = "hotfix";
+
+    table
+        .manageSnapshots()
+        .createBranch(branch1, baseSnapshot.snapshotId())
+        .createBranch(branch2, baseSnapshot.snapshotId())
+        .createBranch(branch3, baseSnapshot.snapshotId())
+        .commit();
+
+    String tag1 = "end-of-month-2024-01";
+    String tag2 = "end-of-month-2024-02";
+    String tag3 = "compliance-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createTag(tag1, baseSnapshot.snapshotId())
+        .createTag(tag2, baseSnapshot.snapshotId())
+        .createTag(tag3, baseSnapshot.snapshotId())
+        .commit();
+
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs)
+        .containsKey(branch1)
+        .containsKey(branch2)
+        .containsKey(branch3)
+        .containsKey(tag1)
+        .containsKey(tag2)
+        .containsKey(tag3);
+
+    assertThat(refs.get(branch1).isBranch()).isTrue();
+    assertThat(refs.get(branch2).isBranch()).isTrue();
+    assertThat(refs.get(branch3).isBranch()).isTrue();
+    assertThat(refs.get(tag1).isTag()).isTrue();
+    assertThat(refs.get(tag2).isTag()).isTrue();
+    assertThat(refs.get(tag3).isTag()).isTrue();
+  }
+
+  @Test
+  public void testBranchAndTagSnapshotIsolation() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    // Create branch and tag from same snapshot
+    String branchName = "etl-pipeline";
+    String tagName = "data-migration-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, baseSnapshot.snapshotId())
+        .createTag(tagName, baseSnapshot.snapshotId())
+        .commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify isolation
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+    SnapshotRef tagRef = refs.get(tagName);
+
+    // Main branch should point to latest main snapshot
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+
+    // Feature branch should point to its own snapshot (different from main)
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Tag should still point to original snapshot
+    assertThat(tagRef.snapshotId()).isEqualTo(baseSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFailsWhenBranchExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "production-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Attempt to create the same branch again should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().createBranch(branchName).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", branchName);
+  }
+
+  @Test
+  public void testCreateTagFailsWhenTagExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "production-checkpoint";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Attempt to create the same tag again should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", tagName);
+  }
+
+  @Test
+  public void testRemoveBranchFailsWhenBranchDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentBranch = "missing-data-pipeline";
+
+    // Attempt to remove non-existent branch should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeBranch(nonExistentBranch).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Branch does not exist: %s", nonExistentBranch);
+  }
+
+  @Test
+  public void testRemoveTagFailsWhenTagDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentTag = "missing-checkpoint";
+
+    // Attempt to remove non-existent tag should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeTag(nonExistentTag).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Tag does not exist: %s", nonExistentTag);
+  }
+
+  @Test
+  public void testRemoveMainBranchFails() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    // Attempt to remove main branch should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeBranch("main").commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot remove main branch");
+  }
+
+  @Test
+  public void testCreateBranchWithInvalidSnapshotId() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String branchName = "corrupted-data-branch";
+    long invalidSnapshotId = 999999L; // Non-existent snapshot ID
+
+    // Attempt to create branch with invalid snapshot ID should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createBranch(branchName, 
invalidSnapshotId).commit())
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("unknown snapshot");
+  }
+
+  @Test
+  public void testCreateTagWithInvalidSnapshotId() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String tagName = "corrupted-checkpoint";
+    long invalidSnapshotId = 999999L; // Non-existent snapshot ID
+
+    // Attempt to create tag with invalid snapshot ID should fail
+    assertThatThrownBy(() -> table.manageSnapshots().createTag(tagName, 
invalidSnapshotId).commit())
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("unknown snapshot");
+  }
+
+  @Test
+  public void testSetRetentionPolicyOnNonExistentBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentBranch = "missing-data-pipeline";
+
+    // Attempt to set retention policy on non-existent branch should fail
+    assertThatThrownBy(
+            () -> 
table.manageSnapshots().setMinSnapshotsToKeep(nonExistentBranch, 5).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Branch does not exist: %s", nonExistentBranch);
+  }
+
+  @Test
+  public void testSetRetentionPolicyOnTag() {

Review Comment:
   This is just testing client side failure, not anything with the catalog, 
which we already test in TestSnapshotManager



##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    String branchName = "historical-analysis";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Verify branch points to the specific snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(firstSnapshot.snapshotId());
+    
assertThat(refs.get(branchName).snapshotId()).isNotEqualTo(secondSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 5;
+    long maxSnapshotAgeMs = Duration.ofDays(7).toMillis();
+    long maxRefAgeMs = Duration.ofDays(30).toMillis();
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, currentSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .setMaxRefAgeMs(branchName, maxRefAgeMs)
+        .commit();
+
+    // Verify branch with retention policies
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+    assertThat(branchRef.isBranch()).isTrue();
+    assertThat(branchRef.minSnapshotsToKeep()).isEqualTo(minSnapshotsToKeep);
+    assertThat(branchRef.maxSnapshotAgeMs()).isEqualTo(maxSnapshotAgeMs);
+    assertThat(branchRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testCreateTagFromSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "end-of-month-2024-01";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+    assertThat(refs.get(tagName).isTag()).isTrue();
+    
assertThat(refs.get(tagName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateTagWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "yearly-archive-2024";
+    long maxRefAgeMs = Duration.ofDays(365).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag with retention policy
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef).isNotNull();
+    assertThat(tagRef.isTag()).isTrue();
+    assertThat(tagRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testRemoveBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "staging-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+
+    // Remove branch
+    table.manageSnapshots().removeBranch(branchName).commit();
+
+    // Verify branch is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(branchName);
+  }
+
+  @Test
+  public void testRemoveTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "pre-migration-backup";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Remove tag
+    table.manageSnapshots().removeTag(tagName).commit();
+
+    // Verify tag is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testReplaceBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String branchName = "data-validation";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace branch to point to new snapshot
+    table.manageSnapshots().replaceBranch(branchName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify branch points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    
assertThat(branchRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testReplaceTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String tagName = "quarterly-audit-2024-q1";
+    table.manageSnapshots().createTag(tagName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace tag to point to new snapshot
+    table.manageSnapshots().replaceTag(tagName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify tag points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    assertThat(tagRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testBranchIndependentLineage() {

Review Comment:
   Most of this test is just testing what we already test in 
`TestSnapshotManager`, nothing specifically about verifying a particular 
catalog behavior. I'd probably remove this



##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {

Review Comment:
   I would probably just have one of these tests. In the context of 
CatalogTests, we're really just trying to test if a catalog allows for new 
branches.



##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    String branchName = "historical-analysis";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Verify branch points to the specific snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(firstSnapshot.snapshotId());
+    
assertThat(refs.get(branchName).snapshotId()).isNotEqualTo(secondSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 5;
+    long maxSnapshotAgeMs = Duration.ofDays(7).toMillis();
+    long maxRefAgeMs = Duration.ofDays(30).toMillis();
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, currentSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .setMaxRefAgeMs(branchName, maxRefAgeMs)
+        .commit();
+
+    // Verify branch with retention policies
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+    assertThat(branchRef.isBranch()).isTrue();
+    assertThat(branchRef.minSnapshotsToKeep()).isEqualTo(minSnapshotsToKeep);
+    assertThat(branchRef.maxSnapshotAgeMs()).isEqualTo(maxSnapshotAgeMs);
+    assertThat(branchRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testCreateTagFromSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "end-of-month-2024-01";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+    assertThat(refs.get(tagName).isTag()).isTrue();
+    
assertThat(refs.get(tagName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateTagWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "yearly-archive-2024";
+    long maxRefAgeMs = Duration.ofDays(365).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag with retention policy
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef).isNotNull();
+    assertThat(tagRef.isTag()).isTrue();
+    assertThat(tagRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testRemoveBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "staging-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+
+    // Remove branch
+    table.manageSnapshots().removeBranch(branchName).commit();
+
+    // Verify branch is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(branchName);
+  }
+
+  @Test
+  public void testRemoveTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "pre-migration-backup";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Remove tag
+    table.manageSnapshots().removeTag(tagName).commit();
+
+    // Verify tag is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testReplaceBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String branchName = "data-validation";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace branch to point to new snapshot
+    table.manageSnapshots().replaceBranch(branchName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify branch points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    
assertThat(branchRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testReplaceTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String tagName = "quarterly-audit-2024-q1";
+    table.manageSnapshots().createTag(tagName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace tag to point to new snapshot
+    table.manageSnapshots().replaceTag(tagName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify tag points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    assertThat(tagRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testBranchIndependentLineage() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot mainSnapshot = table.currentSnapshot();
+
+    // Create branch from main
+    String branchName = "experimental-features";
+    table.manageSnapshots().createBranch(branchName, 
mainSnapshot.snapshotId()).commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch (this should create a new snapshot on the 
branch)
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify both branches have independent snapshots
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Verify the branch snapshot contains FILE_C
+    Snapshot branchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(branchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testBranchRetentionPolicyEnforcement() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    // Create initial snapshot
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    // Create branch with retention policy
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 2;
+    long maxSnapshotAgeMs = Duration.ofSeconds(1).toMillis(); // Very short 
for testing
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, firstSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .commit();
+
+    // Add more snapshots to the branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_B).commit();
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Wait for snapshots to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxSnapshotAgeMs);
+
+    // Expire snapshots
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify retention policy was enforced
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+
+    // The branch should still exist and point to a recent snapshot
+    Snapshot currentBranchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(currentBranchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testTagMaxRefAgeExpiration() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    // Create tag with very short max ref age
+    String tagName = "temp-analysis-snapshot";
+    long maxRefAgeMs = Duration.ofSeconds(1).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Wait for tag to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxRefAgeMs);
+
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify tag was expired
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testMultipleBranchesAndTags() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    String branch1 = "feature-1";
+    String branch2 = "feature-2";
+    String branch3 = "hotfix";
+
+    table
+        .manageSnapshots()
+        .createBranch(branch1, baseSnapshot.snapshotId())
+        .createBranch(branch2, baseSnapshot.snapshotId())
+        .createBranch(branch3, baseSnapshot.snapshotId())
+        .commit();
+
+    String tag1 = "end-of-month-2024-01";
+    String tag2 = "end-of-month-2024-02";
+    String tag3 = "compliance-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createTag(tag1, baseSnapshot.snapshotId())
+        .createTag(tag2, baseSnapshot.snapshotId())
+        .createTag(tag3, baseSnapshot.snapshotId())
+        .commit();
+
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs)
+        .containsKey(branch1)
+        .containsKey(branch2)
+        .containsKey(branch3)
+        .containsKey(tag1)
+        .containsKey(tag2)
+        .containsKey(tag3);
+
+    assertThat(refs.get(branch1).isBranch()).isTrue();
+    assertThat(refs.get(branch2).isBranch()).isTrue();
+    assertThat(refs.get(branch3).isBranch()).isTrue();
+    assertThat(refs.get(tag1).isTag()).isTrue();
+    assertThat(refs.get(tag2).isTag()).isTrue();
+    assertThat(refs.get(tag3).isTag()).isTrue();
+  }
+
+  @Test
+  public void testBranchAndTagSnapshotIsolation() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    // Create branch and tag from same snapshot
+    String branchName = "etl-pipeline";
+    String tagName = "data-migration-checkpoint";
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, baseSnapshot.snapshotId())
+        .createTag(tagName, baseSnapshot.snapshotId())
+        .commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify isolation
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+    SnapshotRef tagRef = refs.get(tagName);
+
+    // Main branch should point to latest main snapshot
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+
+    // Feature branch should point to its own snapshot (different from main)
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Tag should still point to original snapshot
+    assertThat(tagRef.snapshotId()).isEqualTo(baseSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFailsWhenBranchExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "production-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Attempt to create the same branch again should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().createBranch(branchName).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", branchName);
+  }
+
+  @Test
+  public void testCreateTagFailsWhenTagExists() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "production-checkpoint";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Attempt to create the same tag again should fail
+    assertThatThrownBy(
+            () -> table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref %s already exists", tagName);
+  }
+
+  @Test
+  public void testRemoveBranchFailsWhenBranchDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentBranch = "missing-data-pipeline";
+
+    // Attempt to remove non-existent branch should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeBranch(nonExistentBranch).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Branch does not exist: %s", nonExistentBranch);
+  }
+
+  @Test
+  public void testRemoveTagFailsWhenTagDoesNotExist() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+    String nonExistentTag = "missing-checkpoint";
+
+    // Attempt to remove non-existent tag should fail
+    assertThatThrownBy(() -> 
table.manageSnapshots().removeTag(nonExistentTag).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Tag does not exist: %s", nonExistentTag);
+  }
+
+  @Test
+  public void testRemoveMainBranchFails() {
+    C catalog = catalog();

Review Comment:
   Same as above. These aren't testing anything against the catalog, the 
validation happens on the client



##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java:
##########
@@ -3167,6 +3170,687 @@ public void testRegisterExistingTable() {
     assertThat(catalog.dropTable(identifier)).isTrue();
   }
 
+  @Test
+  public void testCreateBranchFromCurrentSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "data-quality-check";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    assertThat(refs.get(branchName).isBranch()).isTrue();
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchFromSpecificSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    String branchName = "historical-analysis";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Verify branch points to the specific snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+    
assertThat(refs.get(branchName).snapshotId()).isEqualTo(firstSnapshot.snapshotId());
+    
assertThat(refs.get(branchName).snapshotId()).isNotEqualTo(secondSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateBranchWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 5;
+    long maxSnapshotAgeMs = Duration.ofDays(7).toMillis();
+    long maxRefAgeMs = Duration.ofDays(30).toMillis();
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, currentSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .setMaxRefAgeMs(branchName, maxRefAgeMs)
+        .commit();
+
+    // Verify branch with retention policies
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+    assertThat(branchRef.isBranch()).isTrue();
+    assertThat(branchRef.minSnapshotsToKeep()).isEqualTo(minSnapshotsToKeep);
+    assertThat(branchRef.maxSnapshotAgeMs()).isEqualTo(maxSnapshotAgeMs);
+    assertThat(branchRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testCreateTagFromSnapshot() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "end-of-month-2024-01";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag was created
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+    assertThat(refs.get(tagName).isTag()).isTrue();
+    
assertThat(refs.get(tagName).snapshotId()).isEqualTo(currentSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testCreateTagWithRetentionPolicies() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "yearly-archive-2024";
+    long maxRefAgeMs = Duration.ofDays(365).toMillis();
+
+    table
+        .manageSnapshots()
+        .createTag(tagName, currentSnapshot.snapshotId())
+        .setMaxRefAgeMs(tagName, maxRefAgeMs)
+        .commit();
+
+    // Verify tag with retention policy
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef).isNotNull();
+    assertThat(tagRef.isTag()).isTrue();
+    assertThat(tagRef.maxRefAgeMs()).isEqualTo(maxRefAgeMs);
+  }
+
+  @Test
+  public void testRemoveBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    String branchName = "staging-pipeline";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Verify branch exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(branchName);
+
+    // Remove branch
+    table.manageSnapshots().removeBranch(branchName).commit();
+
+    // Verify branch is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(branchName);
+  }
+
+  @Test
+  public void testRemoveTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    String tagName = "pre-migration-backup";
+    table.manageSnapshots().createTag(tagName, 
currentSnapshot.snapshotId()).commit();
+
+    // Verify tag exists
+    Map<String, SnapshotRef> refs = table.refs();
+    assertThat(refs).containsKey(tagName);
+
+    // Remove tag
+    table.manageSnapshots().removeTag(tagName).commit();
+
+    // Verify tag is removed
+    refs = table.refs();
+    assertThat(refs).doesNotContainKey(tagName);
+  }
+
+  @Test
+  public void testReplaceBranch() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String branchName = "data-validation";
+    table.manageSnapshots().createBranch(branchName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace branch to point to new snapshot
+    table.manageSnapshots().replaceBranch(branchName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify branch points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    
assertThat(branchRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testReplaceTag() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    String tagName = "quarterly-audit-2024-q1";
+    table.manageSnapshots().createTag(tagName, 
firstSnapshot.snapshotId()).commit();
+
+    // Add more data and create new snapshot
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    // Replace tag to point to new snapshot
+    table.manageSnapshots().replaceTag(tagName, 
secondSnapshot.snapshotId()).commit();
+
+    // Verify tag points to new snapshot
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef tagRef = refs.get(tagName);
+    assertThat(tagRef.snapshotId()).isEqualTo(secondSnapshot.snapshotId());
+    assertThat(tagRef.snapshotId()).isNotEqualTo(firstSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testBranchIndependentLineage() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot mainSnapshot = table.currentSnapshot();
+
+    // Create branch from main
+    String branchName = "experimental-features";
+    table.manageSnapshots().createBranch(branchName, 
mainSnapshot.snapshotId()).commit();
+
+    // Add data to main branch
+    table.newFastAppend().appendFile(FILE_B).commit();
+    Snapshot mainSnapshot2 = table.currentSnapshot();
+
+    // Add data to feature branch (this should create a new snapshot on the 
branch)
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Verify both branches have independent snapshots
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef mainRef = refs.get(SnapshotRef.MAIN_BRANCH);
+    SnapshotRef branchRef = refs.get(branchName);
+
+    assertThat(mainRef.snapshotId()).isEqualTo(mainSnapshot2.snapshotId());
+    assertThat(branchRef.snapshotId()).isNotEqualTo(mainRef.snapshotId());
+
+    // Verify the branch snapshot contains FILE_C
+    Snapshot branchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(branchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testBranchRetentionPolicyEnforcement() {
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog.buildTable(TABLE, 
SCHEMA).withProperty(TableProperties.GC_ENABLED, "true").create();
+
+    // Create initial snapshot
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    // Create branch with retention policy
+    String branchName = "audit-branch";
+    int minSnapshotsToKeep = 2;
+    long maxSnapshotAgeMs = Duration.ofSeconds(1).toMillis(); // Very short 
for testing
+
+    table
+        .manageSnapshots()
+        .createBranch(branchName, firstSnapshot.snapshotId())
+        .setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
+        .setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
+        .commit();
+
+    // Add more snapshots to the branch
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_B).commit();
+    table.newFastAppend().toBranch(branchName).appendFile(FILE_C).commit();
+
+    // Wait for snapshots to age
+    TestHelpers.waitUntilAfter(System.currentTimeMillis() + maxSnapshotAgeMs);
+
+    // Expire snapshots
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    // Verify retention policy was enforced
+    Map<String, SnapshotRef> refs = table.refs();
+    SnapshotRef branchRef = refs.get(branchName);
+    assertThat(branchRef).isNotNull();
+
+    // The branch should still exist and point to a recent snapshot
+    Snapshot currentBranchSnapshot = table.snapshot(branchRef.snapshotId());
+    assertThat(currentBranchSnapshot).isNotNull();
+  }
+
+  @Test
+  public void testTagMaxRefAgeExpiration() {

Review Comment:
   I'd try and combine this with the test above, basically just have a general 
"does the catalog allow me to set custom retention on tags/branches"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to