This is an automated email from the ASF dual-hosted git repository.

hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f60ad6101e HDDS-11705. Snapshot operations on linked buckets should 
work on actual underlying bucket (#7434)
f60ad6101e is described below

commit f60ad6101e1a3d807ca893d5d4859a08ffb6a3b0
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Nov 15 17:21:51 2024 -0800

    HDDS-11705. Snapshot operations on linked buckets should work on actual 
underlying bucket (#7434)
---
 .../java/org/apache/hadoop/ozone/TestDataUtil.java |  44 +++++-
 .../hadoop/ozone/om/snapshot/TestOmSnapshot.java   | 152 ++++++++++++---------
 .../om/snapshot/TestOmSnapshotFileSystem.java      |  70 ++++++----
 .../om/snapshot/TestOmSnapshotFileSystemFso.java   |   4 +-
 ...tOmSnapshotFileSystemFsoWithLinkedBuckets.java} |   6 +-
 .../snapshot/TestOmSnapshotFileSystemLegacy.java   |   4 +-
 ...SnapshotFileSystemLegacyWithLinkedBuckets.java} |   6 +-
 .../snapshot/TestOmSnapshotFsoWithNativeLib.java   |   2 +-
 ...SnapshotFsoWithNativeLibWithLinkedBuckets.java} |   6 +-
 .../TestOmSnapshotFsoWithoutNativeLib.java         |   2 +-
 ...pshotFsoWithoutNativeLibWithLinkedBuckets.java} |   6 +-
 .../om/snapshot/TestOmSnapshotObjectStore.java     |   2 +-
 ...estOmSnapshotObjectStoreWithLinkedBuckets.java} |   6 +-
 ... => TestOmSnapshotWithBucketLinkingLegacy.java} |   6 +-
 ... TestOmSnapshotWithoutBucketLinkingLegacy.java} |   6 +-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   9 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  46 ++++---
 .../request/snapshot/OMSnapshotCreateRequest.java  |  24 ++--
 .../request/snapshot/OMSnapshotDeleteRequest.java  |   7 +
 .../request/snapshot/OMSnapshotRenameRequest.java  |   8 ++
 .../snapshot/TestOMSnapshotCreateRequest.java      |  26 ++++
 .../snapshot/TestOMSnapshotDeleteRequest.java      |  27 ++++
 .../snapshot/TestOMSnapshotRenameRequest.java      |  28 ++++
 .../snapshot/TestSnapshotRequestAndResponse.java   |   6 +
 24 files changed, 346 insertions(+), 157 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index 4488e467c2..eb54e4a251 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -164,21 +164,57 @@ public final class TestDataUtil {
   public static OzoneBucket createBucket(OzoneClient client,
       String vol, BucketArgs bucketArgs, String bukName)
       throws IOException {
+    return createBucket(client, vol, bucketArgs, bukName, false);
+  }
+
+  public static OzoneBucket createBucket(OzoneClient client,
+                                         String vol, BucketArgs bucketArgs, 
String bukName,
+                                         boolean createLinkedBucket)
+      throws IOException {
     ObjectStore objectStore = client.getObjectStore();
     OzoneVolume volume = objectStore.getVolume(vol);
-    volume.createBucket(bukName, bucketArgs);
-    return volume.getBucket(bukName);
+    String sourceBucket = bukName;
+    if (createLinkedBucket) {
+      sourceBucket = bukName + RandomStringUtils.randomNumeric(5);
+    }
+    volume.createBucket(sourceBucket, bucketArgs);
+    OzoneBucket ozoneBucket = volume.getBucket(sourceBucket);
+    if (createLinkedBucket) {
+      ozoneBucket = createLinkedBucket(client, vol, sourceBucket, bukName);
+    }
+    return ozoneBucket;
+  }
+
+  public static OzoneBucket createLinkedBucket(OzoneClient client, String vol, 
String sourceBucketName,
+                                               String linkedBucketName) throws 
IOException {
+    BucketArgs.Builder bb = new BucketArgs.Builder()
+        .setStorageType(StorageType.DEFAULT)
+        .setVersioning(false)
+        .setSourceVolume(vol)
+        .setSourceBucket(sourceBucketName);
+    return createBucket(client, vol, bb.build(), linkedBucketName);
+  }
+
+  public static OzoneBucket createVolumeAndBucket(OzoneClient client,
+                                                  BucketLayout bucketLayout)
+      throws IOException {
+    return createVolumeAndBucket(client, bucketLayout, false);
   }
 
   public static OzoneBucket createVolumeAndBucket(OzoneClient client,
-      BucketLayout bucketLayout) throws IOException {
+      BucketLayout bucketLayout, boolean createLinkedBucket) throws 
IOException {
     final int attempts = 5;
     for (int i = 0; i < attempts; i++) {
       try {
         String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
         String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-        return createVolumeAndBucket(client, volumeName, bucketName,
+        OzoneBucket ozoneBucket = createVolumeAndBucket(client, volumeName, 
bucketName,
             bucketLayout);
+        if (createLinkedBucket) {
+          String targetBucketName = ozoneBucket.getName() + 
RandomStringUtils.randomNumeric(5);
+          ozoneBucket = createLinkedBucket(client, volumeName, bucketName, 
targetBucketName);
+        }
+        return ozoneBucket;
       } catch (OMException e) {
         if (e.getResult() != OMException.ResultCodes.VOLUME_ALREADY_EXISTS
             && e.getResult() != OMException.ResultCodes.BUCKET_ALREADY_EXISTS) 
{
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
index 9a6bca29b8..4d0e56dec0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.snapshot;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.HashMap;
 import java.util.List;
 
 import com.google.common.collect.Lists;
@@ -100,6 +101,7 @@ import java.util.Arrays;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -127,6 +129,7 @@ import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCK
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.WRITE;
 import static 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_ALREADY_CANCELLED_JOB;
@@ -182,17 +185,21 @@ public abstract class TestOmSnapshot {
   private final boolean forceFullSnapshotDiff;
   private final boolean disableNativeDiff;
   private final AtomicInteger counter;
+  private final boolean createLinkedBucket;
+  private final Map<String, String> linkedBuckets = new HashMap<>();
 
   public TestOmSnapshot(BucketLayout newBucketLayout,
                         boolean newEnableFileSystemPaths,
                         boolean forceFullSnapDiff,
-                        boolean disableNativeDiff)
+                        boolean disableNativeDiff,
+                        boolean createLinkedBucket)
       throws Exception {
     this.enabledFileSystemPaths = newEnableFileSystemPaths;
     this.bucketLayout = newBucketLayout;
     this.forceFullSnapshotDiff = forceFullSnapDiff;
     this.disableNativeDiff = disableNativeDiff;
     this.counter = new AtomicInteger();
+    this.createLinkedBucket = createLinkedBucket;
     init();
   }
 
@@ -218,7 +225,10 @@ public abstract class TestOmSnapshot {
     cluster.waitForClusterToBeReady();
     client = cluster.newClient();
     // create a volume and a bucket to be used by OzoneFileSystem
-    ozoneBucket = TestDataUtil.createVolumeAndBucket(client, bucketLayout);
+    ozoneBucket = TestDataUtil.createVolumeAndBucket(client, bucketLayout, 
createLinkedBucket);
+    if (createLinkedBucket) {
+      this.linkedBuckets.put(ozoneBucket.getName(), 
ozoneBucket.getSourceBucket());
+    }
     volumeName = ozoneBucket.getVolumeName();
     bucketName = ozoneBucket.getName();
     ozoneManager = cluster.getOzoneManager();
@@ -232,6 +242,17 @@ public abstract class TestOmSnapshot {
     finalizeOMUpgrade();
   }
 
+  private void createBucket(OzoneVolume volume, String bucketVal) throws 
IOException {
+    if (createLinkedBucket) {
+      String sourceBucketName = linkedBuckets.computeIfAbsent(bucketVal, (k) 
-> bucketVal + counter.incrementAndGet());
+      volume.createBucket(sourceBucketName);
+      TestDataUtil.createLinkedBucket(client, volume.getName(), 
sourceBucketName, bucketVal);
+      this.linkedBuckets.put(bucketVal, sourceBucketName);
+    } else {
+      volume.createBucket(bucketVal);
+    }
+  }
+
   private void stopKeyManager() throws IOException {
     KeyManagerImpl keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
         .getInternalState(ozoneManager, "keyManager");
@@ -319,10 +340,10 @@ public abstract class TestOmSnapshot {
     store.createVolume(volumeB);
     OzoneVolume volA = store.getVolume(volumeA);
     OzoneVolume volB = store.getVolume(volumeB);
-    volA.createBucket(bucketA);
-    volA.createBucket(bucketB);
-    volB.createBucket(bucketA);
-    volB.createBucket(bucketB);
+    createBucket(volA, bucketA);
+    createBucket(volA, bucketB);
+    createBucket(volB, bucketA);
+    createBucket(volB, bucketB);
     OzoneBucket volAbucketA = volA.getBucket(bucketA);
     OzoneBucket volAbucketB = volA.getBucket(bucketB);
     OzoneBucket volBbucketA = volB.getBucket(bucketA);
@@ -401,7 +422,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buc-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume vol = store.getVolume(volume);
-    vol.createBucket(bucket);
+    createBucket(vol, bucket);
     String snapshotKeyPrefix = createSnapshot(volume, bucket);
     OzoneBucket buc = vol.getBucket(bucket);
     Iterator<? extends OzoneKey> keys = buc.listKeys(snapshotKeyPrefix);
@@ -478,7 +499,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buc-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume vol = store.getVolume(volume);
-    vol.createBucket(bucket);
+    createBucket(vol, bucket);
     OzoneBucket volBucket = vol.getBucket(bucket);
 
     String key = "key-";
@@ -503,7 +524,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buc-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume vol = store.getVolume(volume);
-    vol.createBucket(bucket);
+    createBucket(vol, bucket);
     OzoneBucket bucket1 = vol.getBucket(bucket);
 
     String key1 = "key-1-";
@@ -553,7 +574,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
@@ -597,11 +618,11 @@ public abstract class TestOmSnapshot {
   private void getOmKeyInfo(String volume, String bucket,
                             String key) throws IOException {
     ResolvedBucket resolvedBucket = new ResolvedBucket(volume, bucket,
-        volume, bucket, "", bucketLayout);
+        volume, this.linkedBuckets.getOrDefault(bucket, bucket), "", 
bucketLayout);
     cluster.getOzoneManager().getKeyManager()
         .getKeyInfo(new OmKeyArgs.Builder()
                 .setVolumeName(volume)
-                .setBucketName(bucket)
+                .setBucketName(this.linkedBuckets.getOrDefault(bucket, bucket))
                 .setKeyName(key).build(),
             resolvedBucket, null);
   }
@@ -621,7 +642,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String key1 = "k1";
     key1 = createFileKeyWithPrefix(bucket, key1);
@@ -659,7 +680,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String key1 = "k1";
     key1 = createFileKeyWithPrefix(bucket, key1);
@@ -706,7 +727,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String key1 = "k1";
     key1 = createFileKeyWithPrefix(bucket, key1);
@@ -760,7 +781,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String key1 = "k1";
     key1 = createFileKeyWithPrefix(bucket, key1);
@@ -805,7 +826,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket" + counter.incrementAndGet();
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String key1 = "k1";
     key1 = createFileKeyWithPrefix(bucket, key1);
@@ -864,7 +885,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String key1 = "k1";
     key1 = createFileKeyWithPrefix(bucket, key1);
@@ -908,7 +929,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String snap1 = "snap1";
     createSnapshot(testVolumeName, testBucketName, snap1);
@@ -942,7 +963,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String snap1 = "snap1";
     String key1 = "k1";
@@ -984,7 +1005,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String snap1 = "snap1";
     String key1 = "k1";
@@ -1042,7 +1063,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String snap1 = "snap1";
     String key1 = "k1";
@@ -1078,7 +1099,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String snap1 = "snap1";
     String key1 = "k1";
@@ -1110,7 +1131,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket" + counter.incrementAndGet();
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     String rootPath = String.format("%s://%s.%s/",
         OzoneConsts.OZONE_URI_SCHEME, testBucketName, testVolumeName);
     try (FileSystem fs = FileSystem.get(new URI(rootPath), cluster.getConf())) 
{
@@ -1153,7 +1174,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket" + counter.incrementAndGet();
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     String rootPath = String.format("%s://%s.%s/",
         OzoneConsts.OZONE_URI_SCHEME, testBucketName, testVolumeName);
     try (FileSystem fs = FileSystem.get(new URI(rootPath), cluster.getConf())) 
{
@@ -1196,7 +1217,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket" + counter.incrementAndGet();
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     String rootPath = String.format("%s://%s.%s/",
         OzoneConsts.OZONE_URI_SCHEME, testBucketName, testVolumeName);
     try (FileSystem fs = FileSystem.get(new URI(rootPath), cluster.getConf())) 
{
@@ -1239,8 +1260,8 @@ public abstract class TestOmSnapshot {
     String bucket2 = "buc-" + counter.incrementAndGet();
     store.createVolume(volume1);
     OzoneVolume volume = store.getVolume(volume1);
-    volume.createBucket(bucket1);
-    volume.createBucket(bucket2);
+    createBucket(volume, bucket1);
+    createBucket(volume, bucket2);
     OzoneBucket bucketWithSnapshot = volume.getBucket(bucket1);
     OzoneBucket bucketWithoutSnapshot = volume.getBucket(bucket2);
     String key = "key-";
@@ -1250,7 +1271,7 @@ public abstract class TestOmSnapshot {
     deleteKeys(bucketWithSnapshot);
     deleteKeys(bucketWithoutSnapshot);
     OMException omException = assertThrows(OMException.class,
-        () -> volume.deleteBucket(bucket1));
+        () -> volume.deleteBucket(linkedBuckets.getOrDefault(bucket1, 
bucket1)));
     assertEquals(CONTAINS_SNAPSHOT, omException.getResult());
     // TODO: Delete snapshot then delete bucket1 when deletion is implemented
     // no exception for bucket without snapshot
@@ -1263,7 +1284,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
 
     createFileKey(bucket1, "key-1");
@@ -1278,12 +1299,12 @@ public abstract class TestOmSnapshot {
 
     assertEquals(snap1, snapshot1.getName());
     assertEquals(volume, snapshot1.getVolumeName());
-    assertEquals(bucket, snapshot1.getBucketName());
+    assertEquals(linkedBuckets.getOrDefault(bucket, bucket), 
snapshot1.getBucketName());
 
     OzoneSnapshot snapshot2 = store.getSnapshotInfo(volume, bucket, snap2);
     assertEquals(snap2, snapshot2.getName());
     assertEquals(volume, snapshot2.getVolumeName());
-    assertEquals(bucket, snapshot2.getBucketName());
+    assertEquals(linkedBuckets.getOrDefault(bucket, bucket), 
snapshot2.getBucketName());
 
     testGetSnapshotInfoFailure(null, bucket, "snapshotName",
         "volume can't be null or empty.");
@@ -1292,9 +1313,10 @@ public abstract class TestOmSnapshot {
     testGetSnapshotInfoFailure(volume, bucket, null,
         "snapshot name can't be null or empty.");
     testGetSnapshotInfoFailure(volume, bucket, "snapshotName",
-        "Snapshot '/" + volume + "/" + bucket + "/snapshotName' is not 
found.");
+        "Snapshot '/" + volume + "/" + linkedBuckets.getOrDefault(bucket, 
bucket) +
+            "/snapshotName' is not found.");
     testGetSnapshotInfoFailure(volume, "bucketName", "snapshotName",
-        "Snapshot '/" + volume + "/bucketName/snapshotName' is not found.");
+        "Bucket not found: " + volume + "/bucketName");
   }
 
   public void testGetSnapshotInfoFailure(String volName,
@@ -1313,7 +1335,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     bucket1.createDirectory("dir1");
     String snap1 = "snap1";
@@ -1335,7 +1357,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
@@ -1473,9 +1495,9 @@ public abstract class TestOmSnapshot {
     assertEquals(CANCELLED, response.getJobStatus());
 
     String fromSnapshotTableKey =
-        SnapshotInfo.getTableKey(volumeName, bucketName, fromSnapName);
+        SnapshotInfo.getTableKey(volumeName, 
linkedBuckets.getOrDefault(bucketName, bucketName), fromSnapName);
     String toSnapshotTableKey =
-        SnapshotInfo.getTableKey(volumeName, bucketName, toSnapName);
+        SnapshotInfo.getTableKey(volumeName, 
linkedBuckets.getOrDefault(bucketName, bucketName), toSnapName);
 
     UUID fromSnapshotID = SnapshotUtils.getSnapshotInfo(ozoneManager, 
fromSnapshotTableKey).getSnapshotId();
     UUID toSnapshotID = SnapshotUtils.getSnapshotInfo(ozoneManager, 
toSnapshotTableKey).getSnapshotId();
@@ -1567,7 +1589,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
@@ -1599,7 +1621,7 @@ public abstract class TestOmSnapshot {
     String bucketb = "buck-" + counter.incrementAndGet();
     store.createVolume(volumea);
     OzoneVolume volume1 = store.getVolume(volumea);
-    volume1.createBucket(bucketa);
+    createBucket(volume1, bucketa);
     OzoneBucket bucket1 = volume1.getBucket(bucketa);
     // Create Key1 and take 2 snapshots
     String key1 = "key-1-";
@@ -1612,16 +1634,16 @@ public abstract class TestOmSnapshot {
     OMException omException = assertThrows(OMException.class,
         () -> store.snapshotDiff(volumea, bucketb, snap1, snap2,
             null, 0, forceFullSnapshotDiff, disableNativeDiff));
-    assertEquals(KEY_NOT_FOUND, omException.getResult());
+    assertEquals(BUCKET_NOT_FOUND, omException.getResult());
     // Volume is nonexistent
     omException = assertThrows(OMException.class,
         () -> store.snapshotDiff(volumeb, bucketa, snap2, snap1,
             null, 0, forceFullSnapshotDiff, disableNativeDiff));
-    assertEquals(KEY_NOT_FOUND, omException.getResult());
+    assertEquals(VOLUME_NOT_FOUND, omException.getResult());
     omException = assertThrows(OMException.class,
         () -> store.snapshotDiff(volumeb, bucketb, snap2, snap1,
             null, 0, forceFullSnapshotDiff, disableNativeDiff));
-    assertEquals(KEY_NOT_FOUND, omException.getResult());
+    assertEquals(VOLUME_NOT_FOUND, omException.getResult());
   }
 
   /**
@@ -1638,7 +1660,7 @@ public abstract class TestOmSnapshot {
     String testBucketName = "bucket1";
     store.createVolume(testVolumeName);
     OzoneVolume volume = store.getVolume(testVolumeName);
-    volume.createBucket(testBucketName);
+    createBucket(volume, testBucketName);
     OzoneBucket bucket = volume.getBucket(testBucketName);
     String key1 = "k1";
     key1 = createFileKeyWithPrefix(bucket, key1);
@@ -1663,7 +1685,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
@@ -1700,8 +1722,8 @@ public abstract class TestOmSnapshot {
     String bucketName2 = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucketName1);
-    volume1.createBucket(bucketName2);
+    createBucket(volume1, bucketName1);
+    createBucket(volume1, bucketName2);
     OzoneBucket bucket1 = volume1.getBucket(bucketName1);
     OzoneBucket bucket2 = volume1.getBucket(bucketName2);
     // Create Key1 and take snapshot
@@ -1726,19 +1748,18 @@ public abstract class TestOmSnapshot {
     String volume = "vol-" + RandomStringUtils.randomNumeric(5);
     String bucket = "buck-" + RandomStringUtils.randomNumeric(5);
 
-    String volBucketErrorMessage = "Provided volume name " + volume +
-        " or bucket name " + bucket + " doesn't exist";
+    String volErrorMessage = "Volume not found: " + volume;
 
     Exception volBucketEx = assertThrows(OMException.class,
         () -> store.listSnapshotDiffJobs(volume, bucket,
             "", true));
-    assertEquals(volBucketErrorMessage,
+    assertEquals(volErrorMessage,
         volBucketEx.getMessage());
 
     // Create the volume and the bucket.
     store.createVolume(volume);
     OzoneVolume ozVolume = store.getVolume(volume);
-    ozVolume.createBucket(bucket);
+    createBucket(ozVolume, bucket);
 
     assertDoesNotThrow(() ->
         store.listSnapshotDiffJobs(volume, bucket, "", true));
@@ -1785,8 +1806,8 @@ public abstract class TestOmSnapshot {
     String bucketName2 = "buck2";
     store.createVolume(volumeName1);
     OzoneVolume volume1 = store.getVolume(volumeName1);
-    volume1.createBucket(bucketName1);
-    volume1.createBucket(bucketName2);
+    createBucket(volume1, bucketName1);
+    createBucket(volume1, bucketName2);
     OzoneBucket bucket1 = volume1.getBucket(bucketName1);
     OzoneBucket bucket2 = volume1.getBucket(bucketName2);
     String keyPrefix = "key-";
@@ -1822,7 +1843,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
@@ -1843,7 +1864,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
@@ -1870,7 +1891,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
@@ -1895,9 +1916,10 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
-    bucket1.setQuota(OzoneQuota.parseQuota("102400000", "500"));
+    OzoneBucket originalBucket1 = 
volume1.getBucket(linkedBuckets.getOrDefault(bucket, bucket));
+    originalBucket1.setQuota(OzoneQuota.parseQuota("102400000", "500"));
     volume1.setQuota(OzoneQuota.parseQuota("204800000", "1000"));
 
     long volUsedNamespaceInitial = volume1.getUsedNamespace();
@@ -1973,7 +1995,7 @@ public abstract class TestOmSnapshot {
         OmSnapshotManager.getSnapshotPrefix(snapshotName);
     SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
         .getSnapshotInfoTable()
-        .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName));
+        .get(SnapshotInfo.getTableKey(volName, 
linkedBuckets.getOrDefault(buckName, buckName), snapshotName));
     String snapshotDirName =
         OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(),
             snapshotInfo) + OM_KEY_PREFIX + "CURRENT";
@@ -2182,7 +2204,7 @@ public abstract class TestOmSnapshot {
     String bucketA = "buc-a-" + RandomStringUtils.randomNumeric(5);
     store.createVolume(volumeA);
     OzoneVolume volA = store.getVolume(volumeA);
-    volA.createBucket(bucketA);
+    createBucket(volA, bucketA);
     OzoneBucket volAbucketA = volA.getBucket(bucketA);
 
     int latestDayIndex = 0;
@@ -2309,7 +2331,7 @@ public abstract class TestOmSnapshot {
       // Validate keys metadata in active Ozone namespace
       OzoneKeyDetails ozoneKeyDetails = ozoneBucketClient.getKey(keyName);
       assertEquals(keyName, ozoneKeyDetails.getName());
-      assertEquals(ozoneBucketClient.getName(),
+      assertEquals(linkedBuckets.getOrDefault(ozoneBucketClient.getName(), 
ozoneBucketClient.getName()),
           ozoneKeyDetails.getBucketName());
       assertEquals(ozoneBucketClient.getVolumeName(),
           ozoneKeyDetails.getVolumeName());
@@ -2391,7 +2413,7 @@ public abstract class TestOmSnapshot {
 
     store.createVolume(volume1);
     OzoneVolume ozoneVolume = store.getVolume(volume1);
-    ozoneVolume.createBucket(bucket1);
+    createBucket(ozoneVolume, bucket1);
     OzoneBucket ozoneBucket1 = ozoneVolume.getBucket(bucket1);
 
     DBStore activeDbStore = ozoneManager.getMetadataManager().getStore();
@@ -2404,7 +2426,7 @@ public abstract class TestOmSnapshot {
     createSnapshot(volume1, bucket1, "bucket1-snap1");
     activeDbStore.compactDB();
 
-    ozoneVolume.createBucket(bucket2);
+    createBucket(ozoneVolume, bucket2);
     OzoneBucket ozoneBucket2 = ozoneVolume.getBucket(bucket2);
 
     for (int i = 100; i < 200; i++) {
@@ -2417,7 +2439,7 @@ public abstract class TestOmSnapshot {
     createSnapshot(volume1, bucket2, "bucket2-snap1");
     activeDbStore.compactDB();
 
-    ozoneVolume.createBucket(bucket3);
+    createBucket(ozoneVolume, bucket3);
     OzoneBucket ozoneBucket3 = ozoneVolume.getBucket(bucket3);
 
     for (int i = 200; i < 300; i++) {
@@ -2496,7 +2518,7 @@ public abstract class TestOmSnapshot {
     String bucket = "buck-" + counter.incrementAndGet();
     store.createVolume(volume);
     OzoneVolume volume1 = store.getVolume(volume);
-    volume1.createBucket(bucket);
+    createBucket(volume1, bucket);
     OzoneBucket bucket1 = volume1.getBucket(bucket);
     // Create Key1 and take snapshot
     String key1 = "key-1-";
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystem.java
index 0849b90078..c43ec9c33c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystem.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystem.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneSnapshot;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -54,9 +55,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,9 +71,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -99,6 +102,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  * Abstract class for OmSnapshot file system tests.
  */
 @Timeout(120)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public abstract class TestOmSnapshotFileSystem {
   protected static final String VOLUME_NAME =
       "volume" + RandomStringUtils.randomNumeric(5);
@@ -107,26 +111,29 @@ public abstract class TestOmSnapshotFileSystem {
   protected static final String BUCKET_NAME_LEGACY =
       "bucket-legacy-" + RandomStringUtils.randomNumeric(5);
 
-  private static MiniOzoneCluster cluster = null;
-  private static OzoneClient client;
-  private static ObjectStore objectStore;
-  private static OzoneConfiguration conf;
-  private static OzoneManagerProtocol writeClient;
-  private static OzoneManager ozoneManager;
-  private static String keyPrefix;
+  private MiniOzoneCluster cluster = null;
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private OzoneConfiguration conf;
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager ozoneManager;
+  private String keyPrefix;
   private final String bucketName;
+  private final boolean createLinkedBuckets;
   private FileSystem fs;
   private OzoneFileSystem o3fs;
+  private Map<String, String> linkedBucketMaps = new HashMap<>();
 
   private static final Logger LOG =
       LoggerFactory.getLogger(TestOmSnapshot.class);
 
-  public TestOmSnapshotFileSystem(String bucketName) {
+  public TestOmSnapshotFileSystem(String bucketName, boolean 
createLinkedBuckets) throws Exception {
     this.bucketName = bucketName;
+    this.createLinkedBuckets = createLinkedBuckets;
+    init();
   }
 
-  @BeforeAll
-  public static void init() throws Exception {
+  private void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS, true);
     cluster = MiniOzoneCluster.newBuilder(conf).build();
@@ -138,12 +145,20 @@ public abstract class TestOmSnapshotFileSystem {
     ozoneManager = cluster.getOzoneManager();
 
     TestDataUtil.createVolume(client, VOLUME_NAME);
-    TestDataUtil.createBucket(client, VOLUME_NAME,
+    OzoneBucket bucket = TestDataUtil.createBucket(client, VOLUME_NAME,
         new 
BucketArgs.Builder().setBucketLayout(FILE_SYSTEM_OPTIMIZED).build(),
-        BUCKET_NAME_FSO);
-    TestDataUtil.createBucket(client, VOLUME_NAME,
+        BUCKET_NAME_FSO, createLinkedBuckets);
+    if (createLinkedBuckets) {
+      linkedBucketMaps.put(bucket.getName(), bucket.getSourceBucket());
+    }
+    bucket = TestDataUtil.createBucket(client, VOLUME_NAME,
         new BucketArgs.Builder().setBucketLayout(LEGACY).build(),
-        BUCKET_NAME_LEGACY);
+        BUCKET_NAME_LEGACY, createLinkedBuckets);
+    if (createLinkedBuckets) {
+      linkedBucketMaps.put(bucket.getName(), bucket.getSourceBucket());
+    }
+
+
 
     // stop the deletion services so that keys can still be read
     KeyManagerImpl keyManager = (KeyManagerImpl) ozoneManager.getKeyManager();
@@ -163,7 +178,7 @@ public abstract class TestOmSnapshotFileSystem {
   }
 
   @AfterAll
-  public static void tearDown() throws Exception {
+  void tearDown() {
     IOUtils.closeQuietly(client);
     if (cluster != null) {
       cluster.shutdown();
@@ -273,7 +288,7 @@ public abstract class TestOmSnapshotFileSystem {
     deleteSnapshot(snapshotName);
     String expectedMessage = String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName);
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName);
     OMException exception = assertThrows(OMException.class,
         () -> ozoneBucket.listKeys(keyPrefix + "a/", null));
     assertEquals(expectedMessage, exception.getMessage());
@@ -376,7 +391,7 @@ public abstract class TestOmSnapshotFileSystem {
     assertEquals(inputString, new String(read, StandardCharsets.UTF_8));
   }
 
-  private static void setKeyPrefix(String s) {
+  private void setKeyPrefix(String s) {
     keyPrefix = s;
   }
 
@@ -493,21 +508,21 @@ public abstract class TestOmSnapshotFileSystem {
         () -> fs.listStatus(snapshotRoot1));
     assertEquals(String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName1), exception1.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName1), exception1.getMessage());
 
     deleteSnapshot(snapshotName2);
     FileNotFoundException exception2 = 
assertThrows(FileNotFoundException.class,
         () -> fs.listStatus(snapshotRoot2));
     assertEquals(String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName2), exception2.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName2), exception2.getMessage());
 
     deleteSnapshot(snapshotName3);
     FileNotFoundException exception3 = 
assertThrows(FileNotFoundException.class,
         () -> fs.listStatus(snapshotParent3));
     assertEquals(String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName3), exception3.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName3), exception3.getMessage());
   }
 
   @Test
@@ -542,7 +557,7 @@ public abstract class TestOmSnapshotFileSystem {
         () -> fs.listStatus(snapshotParent));
     assertEquals(String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName), exception.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName), exception.getMessage());
   }
 
   @Test
@@ -578,7 +593,7 @@ public abstract class TestOmSnapshotFileSystem {
         () -> fs.listStatus(snapshotParent));
     assertEquals(String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName), exception.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName), exception.getMessage());
   }
 
   @Test
@@ -619,7 +634,7 @@ public abstract class TestOmSnapshotFileSystem {
         () -> fs.open(fileInSnapshot));
     assertEquals(String.format("FILE_NOT_FOUND: Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName), exception.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName), exception.getMessage());
   }
 
   private void createAndCommitKey(String keyName) throws IOException {
@@ -669,7 +684,7 @@ public abstract class TestOmSnapshotFileSystem {
         () -> fs.listStatus(snapshotRoot));
     assertEquals(String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName), exception.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName), exception.getMessage());
   }
 
   /**
@@ -726,7 +741,7 @@ public abstract class TestOmSnapshotFileSystem {
         () -> fs.listStatus(snapshotRoot));
     assertEquals(String.format("Unable to load snapshot. " +
             "Snapshot with table key '/%s/%s/%s' is no longer active",
-        VOLUME_NAME, bucketName, snapshotName), exception.getMessage());
+        VOLUME_NAME, linkedBucketMaps.getOrDefault(bucketName, bucketName), 
snapshotName), exception.getMessage());
   }
 
   private String createSnapshot(String snapshotName)
@@ -736,9 +751,10 @@ public abstract class TestOmSnapshotFileSystem {
     writeClient.createSnapshot(VOLUME_NAME, bucketName, snapshotName);
 
     // wait till the snapshot directory exists
+    OzoneSnapshot snapshot = objectStore.getSnapshotInfo(VOLUME_NAME, 
bucketName, snapshotName);
     SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
         .getSnapshotInfoTable()
-        .get(SnapshotInfo.getTableKey(VOLUME_NAME, bucketName, snapshotName));
+        .get(SnapshotInfo.getTableKey(snapshot.getVolumeName(), 
snapshot.getBucketName(), snapshotName));
     String snapshotDirName = getSnapshotPath(conf, snapshotInfo) +
         OM_KEY_PREFIX + "CURRENT";
     GenericTestUtils.waitFor(() -> new File(snapshotDirName).exists(),
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFso.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFso.java
index 47bdd8f3bd..17adf6cce7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFso.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFso.java
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.Timeout;
  */
 @Timeout(120)
 public class TestOmSnapshotFileSystemFso extends TestOmSnapshotFileSystem {
-  TestOmSnapshotFileSystemFso() {
-    super(BUCKET_NAME_FSO);
+  TestOmSnapshotFileSystemFso() throws Exception {
+    super(BUCKET_NAME_FSO, false);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFso.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFsoWithLinkedBuckets.java
similarity index 83%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFso.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFsoWithLinkedBuckets.java
index 47bdd8f3bd..e9d1017cdd 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFso.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemFsoWithLinkedBuckets.java
@@ -24,8 +24,8 @@ import org.junit.jupiter.api.Timeout;
  * OmSnapshot file system tests for FSO.
  */
 @Timeout(120)
-public class TestOmSnapshotFileSystemFso extends TestOmSnapshotFileSystem {
-  TestOmSnapshotFileSystemFso() {
-    super(BUCKET_NAME_FSO);
+public class TestOmSnapshotFileSystemFsoWithLinkedBuckets extends 
TestOmSnapshotFileSystem {
+  TestOmSnapshotFileSystemFsoWithLinkedBuckets() throws Exception {
+    super(BUCKET_NAME_FSO, true);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacy.java
index b8d81c31cf..effaaa5d4e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacy.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacy.java
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.Timeout;
  */
 @Timeout(120)
 public class TestOmSnapshotFileSystemLegacy extends TestOmSnapshotFileSystem {
-  TestOmSnapshotFileSystemLegacy() {
-    super(BUCKET_NAME_LEGACY);
+  TestOmSnapshotFileSystemLegacy() throws Exception {
+    super(BUCKET_NAME_LEGACY, false);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacyWithLinkedBuckets.java
similarity index 82%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacy.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacyWithLinkedBuckets.java
index b8d81c31cf..61f92cc7c0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacy.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFileSystemLegacyWithLinkedBuckets.java
@@ -24,8 +24,8 @@ import org.junit.jupiter.api.Timeout;
  * OmSnapshot file system tests for Legacy.
  */
 @Timeout(120)
-public class TestOmSnapshotFileSystemLegacy extends TestOmSnapshotFileSystem {
-  TestOmSnapshotFileSystemLegacy() {
-    super(BUCKET_NAME_LEGACY);
+public class TestOmSnapshotFileSystemLegacyWithLinkedBuckets extends 
TestOmSnapshotFileSystem {
+  TestOmSnapshotFileSystemLegacyWithLinkedBuckets() throws Exception {
+    super(BUCKET_NAME_LEGACY, true);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLib.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLib.java
index 06fbebb2ef..c303b24ad2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLib.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLib.java
@@ -31,6 +31,6 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMI
 @Timeout(300)
 class TestOmSnapshotFsoWithNativeLib extends TestOmSnapshot {
   TestOmSnapshotFsoWithNativeLib() throws Exception {
-    super(FILE_SYSTEM_OPTIMIZED, false, false, false);
+    super(FILE_SYSTEM_OPTIMIZED, false, false, false, false);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLib.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLibWithLinkedBuckets.java
similarity index 85%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLib.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLibWithLinkedBuckets.java
index 06fbebb2ef..c499a70564 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLib.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithNativeLibWithLinkedBuckets.java
@@ -29,8 +29,8 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMI
  */
 @Native(ROCKS_TOOLS_NATIVE_LIBRARY_NAME)
 @Timeout(300)
-class TestOmSnapshotFsoWithNativeLib extends TestOmSnapshot {
-  TestOmSnapshotFsoWithNativeLib() throws Exception {
-    super(FILE_SYSTEM_OPTIMIZED, false, false, false);
+class TestOmSnapshotFsoWithNativeLibWithLinkedBuckets extends TestOmSnapshot {
+  TestOmSnapshotFsoWithNativeLibWithLinkedBuckets() throws Exception {
+    super(FILE_SYSTEM_OPTIMIZED, false, false, false, true);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLib.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLib.java
index c1782b73d1..26262916cb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLib.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLib.java
@@ -29,6 +29,6 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMI
 public class TestOmSnapshotFsoWithoutNativeLib extends TestOmSnapshot {
 
   public TestOmSnapshotFsoWithoutNativeLib() throws Exception {
-    super(FILE_SYSTEM_OPTIMIZED, false, false, true);
+    super(FILE_SYSTEM_OPTIMIZED, false, false, true, false);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLib.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLibWithLinkedBuckets.java
similarity index 82%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLib.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLibWithLinkedBuckets.java
index c1782b73d1..4387f77b3f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLib.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotFsoWithoutNativeLibWithLinkedBuckets.java
@@ -26,9 +26,9 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMI
  * Test OmSnapshot for FSO bucket type when native lib is disabled.
  */
 @Timeout(300)
-public class TestOmSnapshotFsoWithoutNativeLib extends TestOmSnapshot {
+public class TestOmSnapshotFsoWithoutNativeLibWithLinkedBuckets extends 
TestOmSnapshot {
 
-  public TestOmSnapshotFsoWithoutNativeLib() throws Exception {
-    super(FILE_SYSTEM_OPTIMIZED, false, false, true);
+  public TestOmSnapshotFsoWithoutNativeLibWithLinkedBuckets() throws Exception 
{
+    super(FILE_SYSTEM_OPTIMIZED, false, false, true, true);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStore.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStore.java
index 13c8cb5fca..bad51103a5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStore.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStore.java
@@ -29,6 +29,6 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
 public class TestOmSnapshotObjectStore extends TestOmSnapshot {
 
   public TestOmSnapshotObjectStore() throws Exception {
-    super(OBJECT_STORE, false, false, false);
+    super(OBJECT_STORE, false, false, false, true);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStore.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStoreWithLinkedBuckets.java
similarity index 83%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStore.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStoreWithLinkedBuckets.java
index 13c8cb5fca..64765e7171 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStore.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotObjectStoreWithLinkedBuckets.java
@@ -26,9 +26,9 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
  * Test OmSnapshot for Object Store bucket type.
  */
 @Timeout(300)
-public class TestOmSnapshotObjectStore extends TestOmSnapshot {
+public class TestOmSnapshotObjectStoreWithLinkedBuckets extends TestOmSnapshot 
{
 
-  public TestOmSnapshotObjectStore() throws Exception {
-    super(OBJECT_STORE, false, false, false);
+  public TestOmSnapshotObjectStoreWithLinkedBuckets() throws Exception {
+    super(OBJECT_STORE, false, false, false, true);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLegacy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotWithBucketLinkingLegacy.java
similarity index 84%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLegacy.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotWithBucketLinkingLegacy.java
index bf4a2fee0d..f1ced6c4a8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLegacy.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotWithBucketLinkingLegacy.java
@@ -26,9 +26,9 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.LEGACY;
  * Test OmSnapshot for Legacy bucket type.
  */
 @Timeout(300)
-public class TestOmSnapshotLegacy extends TestOmSnapshot {
+public class TestOmSnapshotWithBucketLinkingLegacy extends TestOmSnapshot {
 
-  public TestOmSnapshotLegacy() throws Exception {
-    super(LEGACY, false, false, false);
+  public TestOmSnapshotWithBucketLinkingLegacy() throws Exception {
+    super(LEGACY, false, false, false, true);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLegacy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotWithoutBucketLinkingLegacy.java
similarity index 83%
rename from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLegacy.java
rename to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotWithoutBucketLinkingLegacy.java
index bf4a2fee0d..95549471e6 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLegacy.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotWithoutBucketLinkingLegacy.java
@@ -26,9 +26,9 @@ import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.LEGACY;
  * Test OmSnapshot for Legacy bucket type.
  */
 @Timeout(300)
-public class TestOmSnapshotLegacy extends TestOmSnapshot {
+public class TestOmSnapshotWithoutBucketLinkingLegacy extends TestOmSnapshot {
 
-  public TestOmSnapshotLegacy() throws Exception {
-    super(LEGACY, false, false, false);
+  public TestOmSnapshotWithoutBucketLinkingLegacy() throws Exception {
+    super(LEGACY, false, false, false, false);
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index dde5b22e79..cf52635125 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -38,6 +38,7 @@ import java.util.stream.Collectors;
 import java.util.UUID;
 
 import com.google.common.cache.RemovalListener;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.server.ServerUtils;
@@ -624,7 +625,12 @@ public final class OmSnapshotManager implements 
AutoCloseable {
     String[] keyParts = keyName.split(OM_KEY_PREFIX);
     if (isSnapshotKey(keyParts)) {
       String snapshotName = keyParts[1];
-
+      // Updating the volumeName & bucketName in case the bucket is a linked 
bucket. We need to do this before a
+      // permission check, since linked bucket permissions and source bucket 
permissions could be different.
+      ResolvedBucket resolvedBucket = 
ozoneManager.resolveBucketLink(Pair.of(volumeName,
+          bucketName), false);
+      volumeName = resolvedBucket.realVolume();
+      bucketName = resolvedBucket.realBucket();
       return (ReferenceCounted<IOmMetadataReader>) (ReferenceCounted<?>)
           getActiveSnapshot(volumeName, bucketName, snapshotName);
     } else {
@@ -656,7 +662,6 @@ public final class OmSnapshotManager implements 
AutoCloseable {
       // don't allow snapshot indicator without snapshot name
       throw new OMException(INVALID_KEY_NAME);
     }
-
     String snapshotTableKey = SnapshotInfo.getTableKey(volumeName,
         bucketName, snapshotName);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 2bb8c915b3..2facdaccd2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2971,12 +2971,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     Map<String, String> auditMap = buildAuditMap(volumeName);
     auditMap.put(OzoneConsts.BUCKET, bucketName);
     try {
-      if (isAclEnabled) {
-        omMetadataReader.checkAcls(ResourceType.BUCKET, StoreType.OZONE,
-            ACLType.READ, volumeName, bucketName, null);
-      }
+      // Updating the volumeName & bucketName in case the bucket is a linked 
bucket. We need to do this before a
+      // permission check, since linked bucket permissions and source bucket 
permissions could be different.
+      ResolvedBucket resolvedBucket = resolveBucketLink(Pair.of(volumeName, 
bucketName));
+      auditMap = buildAuditMap(resolvedBucket.realVolume());
+      auditMap.put(OzoneConsts.BUCKET, resolvedBucket.realBucket());
       SnapshotInfo snapshotInfo =
-          metadataManager.getSnapshotInfo(volumeName, bucketName, 
snapshotName);
+          metadataManager.getSnapshotInfo(resolvedBucket.realVolume(), 
resolvedBucket.realBucket(), snapshotName);
 
       AUDIT.logReadSuccess(buildAuditMessageForSuccess(
           OMAction.SNAPSHOT_INFO, auditMap));
@@ -2997,12 +2998,17 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     Map<String, String> auditMap = buildAuditMap(volumeName);
     auditMap.put(OzoneConsts.BUCKET, bucketName);
     try {
+      // Updating the volumeName & bucketName in case the bucket is a linked 
bucket. We need to do this before a
+      // permission check, since linked bucket permissions and source bucket 
permissions could be different.
+      ResolvedBucket resolvedBucket = resolveBucketLink(Pair.of(volumeName, 
bucketName));
+      auditMap = buildAuditMap(resolvedBucket.realVolume());
+      auditMap.put(OzoneConsts.BUCKET, resolvedBucket.realBucket());
       if (isAclEnabled) {
         omMetadataReader.checkAcls(ResourceType.BUCKET, StoreType.OZONE,
-            ACLType.LIST, volumeName, bucketName, null);
+            ACLType.LIST, resolvedBucket.realVolume(), 
resolvedBucket.realBucket(), null);
       }
       ListSnapshotResponse listSnapshotResponse =
-          metadataManager.listSnapshot(volumeName, bucketName,
+          metadataManager.listSnapshot(resolvedBucket.realVolume(), 
resolvedBucket.realBucket(),
               snapshotPrefix, prevSnapshot, maxListResult);
 
       AUDIT.logReadSuccess(buildAuditMessageForSuccess(
@@ -4898,13 +4904,11 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
                                            boolean forceFullDiff,
                                            boolean disableNativeDiff)
       throws IOException {
-
-    if (isAclEnabled) {
-      omMetadataReader.checkAcls(ResourceType.BUCKET, StoreType.OZONE, 
ACLType.READ, volume, bucket, null);
-    }
-
-    return omSnapshotManager.getSnapshotDiffReport(volume, bucket, 
fromSnapshot, toSnapshot,
-        token, pageSize, forceFullDiff, disableNativeDiff);
+    // Updating the volumeName & bucketName in case the bucket is a linked 
bucket. We need to do this before a
+    // permission check, since linked bucket permissions and source bucket 
permissions could be different.
+    ResolvedBucket resolvedBucket = resolveBucketLink(Pair.of(volume, bucket), 
false);
+    return 
omSnapshotManager.getSnapshotDiffReport(resolvedBucket.realVolume(), 
resolvedBucket.realBucket(),
+        fromSnapshot, toSnapshot, token, pageSize, forceFullDiff, 
disableNativeDiff);
   }
 
   public CancelSnapshotDiffResponse cancelSnapshotDiff(String volume,
@@ -4912,12 +4916,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
                                                        String fromSnapshot,
                                                        String toSnapshot)
       throws IOException {
-
-    if (isAclEnabled) {
-      omMetadataReader.checkAcls(ResourceType.BUCKET, StoreType.OZONE, 
ACLType.READ, volume, bucket, null);
-    }
-
-    return omSnapshotManager.cancelSnapshotDiff(volume, bucket, fromSnapshot, 
toSnapshot);
+    ResolvedBucket resolvedBucket = this.resolveBucketLink(Pair.of(volume, 
bucket), false);
+    return omSnapshotManager.cancelSnapshotDiff(resolvedBucket.realVolume(), 
resolvedBucket.realBucket(),
+        fromSnapshot, toSnapshot);
   }
 
   public List<SnapshotDiffJob> listSnapshotDiffJobs(String volume,
@@ -4925,12 +4926,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
                                                     String jobStatus,
                                                     boolean listAll)
       throws IOException {
-
+    ResolvedBucket resolvedBucket = this.resolveBucketLink(Pair.of(volume, 
bucket), false);
     if (isAclEnabled) {
       omMetadataReader.checkAcls(ResourceType.BUCKET, StoreType.OZONE, 
ACLType.LIST, volume, bucket, null);
     }
 
-    return omSnapshotManager.getSnapshotDiffList(volume, bucket, jobStatus, 
listAll);
+    return omSnapshotManager.getSnapshotDiffList(resolvedBucket.realVolume(), 
resolvedBucket.realBucket(),
+        jobStatus, listAll);
   }
 
   public String printCompactionLogDag(String fileNamePrefix,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
index 2ded4f6a83..59cc02b6fd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.ozone.om.request.snapshot;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
@@ -75,8 +77,8 @@ public class OMSnapshotCreateRequest extends OMClientRequest {
       LoggerFactory.getLogger(OMSnapshotCreateRequest.class);
 
   private final String snapshotPath;
-  private final String volumeName;
-  private final String bucketName;
+  private String volumeName;
+  private String bucketName;
   private final String snapshotName;
   private final SnapshotInfo snapshotInfo;
 
@@ -106,7 +108,11 @@ public class OMSnapshotCreateRequest extends 
OMClientRequest {
     final OMRequest omRequest = super.preExecute(ozoneManager);
     // Verify name
     OmUtils.validateSnapshotName(snapshotName);
-
+    // Updating the volumeName & bucketName in case the bucket is a linked 
bucket. We need to do this before a
+    // permission check, since linked bucket permissions and source bucket 
permissions could be different.
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(Pair.of(volumeName, 
bucketName), this);
+    this.volumeName = bucket.realVolume();
+    this.bucketName = bucket.realBucket();
     UserGroupInformation ugi = createUGIForApi();
     String bucketOwner = ozoneManager.getBucketOwner(volumeName, bucketName,
         IAccessAuthorizer.ACLType.READ, OzoneObj.ResourceType.BUCKET);
@@ -116,12 +122,12 @@ public class OMSnapshotCreateRequest extends 
OMClientRequest {
           "Only bucket owners and Ozone admins can create snapshots",
           OMException.ResultCodes.PERMISSION_DENIED);
     }
-
-    return omRequest.toBuilder().setCreateSnapshotRequest(
-        omRequest.getCreateSnapshotRequest().toBuilder()
-            .setSnapshotId(toProtobuf(UUID.randomUUID()))
-            .setCreationTime(Time.now())
-            .build()).build();
+    CreateSnapshotRequest.Builder createSnapshotRequest = 
omRequest.getCreateSnapshotRequest().toBuilder()
+        .setSnapshotId(toProtobuf(UUID.randomUUID()))
+        .setVolumeName(volumeName)
+        .setBucketName(this.bucketName)
+        .setCreationTime(Time.now());
+    return 
omRequest.toBuilder().setCreateSnapshotRequest(createSnapshotRequest.build()).build();
   }
   
   @Override
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotDeleteRequest.java
index a2b00138cf..95f99c627c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotDeleteRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.snapshot;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -82,6 +84,11 @@ public class OMSnapshotDeleteRequest extends OMClientRequest 
{
 
     String volumeName = deleteSnapshotRequest.getVolumeName();
     String bucketName = deleteSnapshotRequest.getBucketName();
+    // Updating the volumeName & bucketName in case the bucket is a linked 
bucket. We need to do this before a
+    // permission check, since linked bucket permissions and source bucket 
permissions could be different.
+    ResolvedBucket resolvedBucket = 
ozoneManager.resolveBucketLink(Pair.of(volumeName, bucketName), this);
+    volumeName = resolvedBucket.realVolume();
+    bucketName = resolvedBucket.realBucket();
 
     // Permission check
     UserGroupInformation ugi = createUGIForApi();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotRenameRequest.java
index 8341f87550..8cf0579647 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotRenameRequest.java
@@ -25,6 +25,8 @@ import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.FILESYSTEM_SNAP
 
 import java.io.IOException;
 import java.nio.file.InvalidPathException;
+
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OmUtils;
@@ -32,6 +34,7 @@ import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -75,6 +78,11 @@ public class OMSnapshotRenameRequest extends OMClientRequest 
{
 
     String volumeName = renameSnapshotRequest.getVolumeName();
     String bucketName = renameSnapshotRequest.getBucketName();
+    // Updating the volumeName & bucketName in case the bucket is a linked 
bucket. We need to do this before a
+    // permission check, since linked bucket permissions and source bucket 
permissions could be different.
+    ResolvedBucket resolvedBucket = 
ozoneManager.resolveBucketLink(Pair.of(volumeName, bucketName), this);
+    volumeName = resolvedBucket.realVolume();
+    bucketName = resolvedBucket.realBucket();
 
     // Permission check
     UserGroupInformation ugi = createUGIForApi();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
index af90438225..b7b7ff0a46 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
@@ -18,15 +18,18 @@
 
 package org.apache.hadoop.ozone.om.request.snapshot;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponse;
@@ -86,6 +89,29 @@ public class TestOMSnapshotCreateRequest extends 
TestSnapshotRequestAndResponse
     doPreExecute(omRequest);
   }
 
+  @ValueSource(strings = {
+      // '-' is allowed.
+      "9cdf0e8a-6946-41ad-a2d1-9eb724fab126",
+      // 3 chars name is allowed.
+      "sn1",
+      // less than or equal to 63 chars are allowed.
+      "snap75795657617173401188448010125899089001363595171500499231286"
+  })
+  @ParameterizedTest
+  public void testPreExecuteWithLinkedBucket(String snapshotName) throws 
Exception {
+    when(getOzoneManager().isOwner(any(), any())).thenReturn(true);
+    String resolvedBucketName = getBucketName() + "1";
+    String resolvedVolumeName = getVolumeName() + "1";
+    when(getOzoneManager().resolveBucketLink(any(Pair.class), 
any(OMClientRequest.class)))
+        .thenAnswer(i -> new ResolvedBucket(i.getArgument(0), 
Pair.of(resolvedVolumeName, resolvedBucketName),
+            "owner", BucketLayout.FILE_SYSTEM_OPTIMIZED));
+    OMRequest omRequest = createSnapshotRequest(getVolumeName(),
+        getBucketName(), snapshotName);
+    OMSnapshotCreateRequest omSnapshotCreateRequest = doPreExecute(omRequest);
+    assertEquals(resolvedVolumeName, 
omSnapshotCreateRequest.getOmRequest().getCreateSnapshotRequest().getVolumeName());
+    assertEquals(resolvedBucketName, 
omSnapshotCreateRequest.getOmRequest().getCreateSnapshotRequest().getBucketName());
+  }
+
   @ValueSource(strings = {
       // ? is not allowed in snapshot name.
       "a?b",
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
index 4c5dc2e77f..9e19e59484 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
@@ -19,10 +19,14 @@
 
 package org.apache.hadoop.ozone.om.request.snapshot;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.snapshot.TestSnapshotRequestAndResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -81,6 +85,29 @@ public class TestOMSnapshotDeleteRequest extends 
TestSnapshotRequestAndResponse
     doPreExecute(omRequest);
   }
 
+  @ValueSource(strings = {
+      // '-' is allowed.
+      "9cdf0e8a-6946-41ad-a2d1-9eb724fab126",
+      // 3 chars name is allowed.
+      "sn1",
+      // less than or equal to 63 chars are allowed.
+      "snap75795657617173401188448010125899089001363595171500499231286"
+  })
+  @ParameterizedTest
+  public void testPreExecuteWithLinkedBuckets(String deleteSnapshotName) 
throws Exception {
+    when(getOzoneManager().isOwner(any(), any())).thenReturn(true);
+    String resolvedBucketName = getBucketName() + "1";
+    String resolvedVolumeName = getVolumeName() + "1";
+    when(getOzoneManager().resolveBucketLink(any(Pair.class), 
any(OMClientRequest.class)))
+        .thenAnswer(i -> new ResolvedBucket(i.getArgument(0), 
Pair.of(resolvedVolumeName, resolvedBucketName),
+            "owner", BucketLayout.FILE_SYSTEM_OPTIMIZED));
+    OMRequest omRequest = deleteSnapshotRequest(getVolumeName(),
+        getBucketName(), deleteSnapshotName);
+    OMSnapshotDeleteRequest omSnapshotDeleteRequest = doPreExecute(omRequest);
+    assertEquals(resolvedVolumeName, 
omSnapshotDeleteRequest.getOmRequest().getDeleteSnapshotRequest().getVolumeName());
+    assertEquals(resolvedBucketName, 
omSnapshotDeleteRequest.getOmRequest().getDeleteSnapshotRequest().getBucketName());
+  }
+
   @ValueSource(strings = {
       // ? is not allowed in snapshot name.
       "a?b",
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotRenameRequest.java
index a746597288..8059c3ce50 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotRenameRequest.java
@@ -16,14 +16,18 @@
  */
 package org.apache.hadoop.ozone.om.request.snapshot;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.snapshot.TestSnapshotRequestAndResponse;
@@ -83,6 +87,30 @@ public class TestOMSnapshotRenameRequest extends 
TestSnapshotRequestAndResponse
     doPreExecute(omRequest);
   }
 
+  @ValueSource(strings = {
+      // '-' is allowed.
+      "9cdf0e8a-6946-41ad-a2d1-9eb724fab126",
+      // 3 chars name is allowed.
+      "sn1",
+      // less than or equal to 63 chars are allowed.
+      "snap75795657617173401188448010125899089001363595171500499231286"
+  })
+  @ParameterizedTest
+  public void testPreExecuteWithLinkedBucket(String toSnapshotName) throws 
Exception {
+    when(getOzoneManager().isOwner(any(), any())).thenReturn(true);
+    String resolvedBucketName = getBucketName() + "1";
+    String resolvedVolumeName = getVolumeName() + "1";
+    when(getOzoneManager().resolveBucketLink(any(Pair.class), 
any(OMClientRequest.class)))
+        .thenAnswer(i -> new ResolvedBucket(i.getArgument(0), 
Pair.of(resolvedVolumeName, resolvedBucketName),
+            "owner", BucketLayout.FILE_SYSTEM_OPTIMIZED));
+    String currentSnapshotName = "current";
+    OzoneManagerProtocolProtos.OMRequest omRequest = 
renameSnapshotRequest(getVolumeName(),
+        getBucketName(), currentSnapshotName, toSnapshotName);
+    OMSnapshotRenameRequest omSnapshotRenameRequest = doPreExecute(omRequest);
+    assertEquals(resolvedVolumeName, 
omSnapshotRenameRequest.getOmRequest().getRenameSnapshotRequest().getVolumeName());
+    assertEquals(resolvedBucketName, 
omSnapshotRenameRequest.getOmRequest().getRenameSnapshotRequest().getBucketName());
+  }
+
   @ValueSource(strings = {
       // ? is not allowed in snapshot name.
       "a?b",
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java
index e60e23de22..b037b68fd7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java
@@ -32,9 +32,12 @@ import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotCreateRequest;
 import org.apache.hadoop.ozone.om.request.snapshot.TestOMSnapshotCreateRequest;
@@ -140,6 +143,9 @@ public class TestSnapshotRequestAndResponse {
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration,
         ozoneManager);
     when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
+    when(ozoneManager.resolveBucketLink(any(Pair.class), 
any(OMClientRequest.class)))
+        .thenAnswer(i -> new ResolvedBucket(i.getArgument(0),
+            i.getArgument(0), "dummyBucketOwner", 
BucketLayout.FILE_SYSTEM_OPTIMIZED));
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     when(ozoneManager.isRatisEnabled()).thenReturn(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to