This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7a5129d6953 HDDS-12810. Check and reserve space atomically in
VolumeChoosingPolicy (#8360)
7a5129d6953 is described below
commit 7a5129d6953302430c4b7813b33902b29379de99
Author: Peter Lee <[email protected]>
AuthorDate: Wed May 14 16:43:05 2025 +0800
HDDS-12810. Check and reserve space atomically in VolumeChoosingPolicy
(#8360)
---
.../ozone/container/common/impl/ContainerData.java | 20 +---
.../ozone/container/common/impl/ContainerSet.java | 2 -
.../volume/CapacityVolumeChoosingPolicy.java | 20 ++--
.../volume/RoundRobinVolumeChoosingPolicy.java | 11 +-
.../container/keyvalue/KeyValueContainer.java | 13 ++-
.../ozone/container/keyvalue/KeyValueHandler.java | 1 +
.../ozone/container/ozoneimpl/ContainerReader.java | 100 ++++++++++-------
.../container/replication/ContainerImporter.java | 13 +--
.../replication/DownloadAndImportReplicator.java | 18 +--
.../replication/SendContainerRequestHandler.java | 16 +--
.../common/impl/TestContainerPersistence.java | 5 +-
.../volume/TestCapacityVolumeChoosingPolicy.java | 11 ++
.../volume/TestRoundRobinVolumeChoosingPolicy.java | 13 +++
.../container/keyvalue/TestKeyValueContainer.java | 17 +++
.../container/keyvalue/TestKeyValueHandler.java | 80 ++++++++++++-
.../container/ozoneimpl/TestContainerReader.java | 31 +++++-
.../TestDownloadAndImportReplicator.java | 123 ++++++++++++++++++++
.../replication/TestReplicationSupervisor.java | 31 +++---
.../TestSendContainerRequestHandler.java | 124 +++++++++++++++++++--
19 files changed, 494 insertions(+), 155 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index d431b494d78..7bb59247ca5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -29,7 +29,6 @@
import static org.apache.hadoop.ozone.OzoneConsts.STATE;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import jakarta.annotation.Nullable;
@@ -214,21 +213,14 @@ public synchronized void
setState(ContainerDataProto.State state) {
(state != oldState)) {
releaseCommitSpace();
}
+ }
- /**
- * commit space when container transitions (back) to Open.
- * when? perhaps closing a container threw an exception
- */
- if ((state == ContainerDataProto.State.OPEN) &&
- (state != oldState)) {
- Preconditions.checkState(getMaxSize() > 0);
- commitSpace();
- }
+ public boolean isCommittedSpace() {
+ return committedSpace;
}
- @VisibleForTesting
- void setCommittedSpace(boolean committedSpace) {
- this.committedSpace = committedSpace;
+ public void setCommittedSpace(boolean committed) {
+ committedSpace = committed;
}
/**
@@ -356,7 +348,7 @@ public synchronized void closeContainer() {
setState(ContainerDataProto.State.CLOSED);
}
- private void releaseCommitSpace() {
+ public void releaseCommitSpace() {
long unused = getMaxSize() - getBytesUsed();
// only if container size < max size
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 9b5c89e1f73..0f5c19fd336 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -154,8 +154,6 @@ private boolean addContainer(Container<?> container,
boolean overwrite) throws
throw new StorageContainerException(e,
ContainerProtos.Result.IO_EXCEPTION);
}
missingContainerSet.remove(containerId);
- // wish we could have done this from ContainerData.setState
- container.getContainerData().commitSpace();
if (container.getContainerData().getState() == RECOVERING) {
recoveringContainerMap.put(
clock.millis() + recoveringTimeout, containerId);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
index 89c686645ea..e323eeb4b17 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
@@ -22,7 +22,7 @@
import java.io.IOException;
import java.util.List;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -44,11 +44,8 @@ public class CapacityVolumeChoosingPolicy implements
VolumeChoosingPolicy {
private static final Logger LOG = LoggerFactory.getLogger(
CapacityVolumeChoosingPolicy.class);
- // Stores the index of the next volume to be returned.
- private final Random random = new Random();
-
@Override
- public HddsVolume chooseVolume(List<HddsVolume> volumes,
+ public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {
// No volumes available to choose from
@@ -69,9 +66,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
}
int count = volumesWithEnoughSpace.size();
- if (count == 1) {
- return volumesWithEnoughSpace.get(0);
- } else {
+ HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
+ if (count > 1) {
// Even if we don't have too many volumes in volumesWithEnoughSpace, this
// algorithm will still help us choose the volume with larger
// available space than other volumes.
@@ -83,8 +79,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
// 4. vol2 + vol2: 25%, result is vol2
// So we have a total of 75% chances to choose vol1, which meets our
// expectation.
- int firstIndex = random.nextInt(count);
- int secondIndex = random.nextInt(count);
+ int firstIndex = ThreadLocalRandom.current().nextInt(count);
+ int secondIndex = ThreadLocalRandom.current().nextInt(count);
HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex);
HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex);
@@ -93,7 +89,9 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
- firstVolume.getCommittedBytes();
long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
- secondVolume.getCommittedBytes();
- return firstAvailable < secondAvailable ? secondVolume : firstVolume;
+ selectedVolume = firstAvailable < secondAvailable ? secondVolume :
firstVolume;
}
+ selectedVolume.incCommittedBytes(maxContainerSize);
+ return selectedVolume;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
index 9945a3256b3..52c8c599703 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.slf4j.Logger;
@@ -38,10 +37,10 @@ public class RoundRobinVolumeChoosingPolicy implements
VolumeChoosingPolicy {
RoundRobinVolumeChoosingPolicy.class);
// Stores the index of the next volume to be returned.
- private AtomicInteger nextVolumeIndex = new AtomicInteger(0);
+ private int nextVolumeIndex = 0;
@Override
- public HddsVolume chooseVolume(List<HddsVolume> volumes,
+ public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {
// No volumes available to choose from
@@ -53,8 +52,7 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
// since volumes could've been removed because of the failure
// make sure we are not out of bounds
- int nextIndex = nextVolumeIndex.get();
- int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0;
+ int currentVolumeIndex = nextVolumeIndex < volumes.size() ?
nextVolumeIndex : 0;
int startVolumeIndex = currentVolumeIndex;
@@ -67,7 +65,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
if (hasEnoughSpace) {
logIfSomeVolumesOutOfSpace(filter, LOG);
- nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex);
+ nextVolumeIndex = currentVolumeIndex;
+ volume.incCommittedBytes(maxContainerSize);
return volume;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 09cadd5d13f..030392045d5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -157,8 +157,15 @@ public void create(VolumeSet volumeSet,
VolumeChoosingPolicy
= StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
while (true) {
HddsVolume containerVolume;
+ String hddsVolumeDir;
try {
containerVolume = volumeChoosingPolicy.chooseVolume(volumes,
maxSize);
+ hddsVolumeDir = containerVolume.getHddsRootDir().toString();
+ // Set volume before getContainerDBFile(), because we may need the
+ // volume to deduce the db file.
+ containerData.setVolume(containerVolume);
+ // commit bytes have been reserved in
volumeChoosingPolicy#chooseVolume
+ containerData.setCommittedSpace(true);
} catch (DiskOutOfSpaceException ex) {
throw new StorageContainerException("Container creation failed, " +
"due to disk out of space", ex, DISK_OUT_OF_SPACE);
@@ -169,11 +176,6 @@ public void create(VolumeSet volumeSet,
VolumeChoosingPolicy
}
try {
- String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
- // Set volume before getContainerDBFile(), because we may need the
- // volume to deduce the db file.
- containerData.setVolume(containerVolume);
-
long containerID = containerData.getContainerID();
String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
containerVolume, clusterId);
@@ -206,7 +208,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
// Create .container file
File containerFile = getContainerFile();
createContainerFile(containerFile);
-
return;
} catch (StorageContainerException ex) {
if (containerMetaDataPath != null
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index c933dc76cef..7f192afc29e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -434,6 +434,7 @@ ContainerCommandResponseProto handleCreateContainer(
LOG.debug("Container already exists. container Id {}", containerID);
}
} catch (StorageContainerException ex) {
+ newContainerData.releaseCommitSpace();
return ContainerUtils.logAndReturnError(LOG, ex, request);
} finally {
containerIdLock.unlock();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 90bbb3186ad..f3b39333e08 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -202,49 +202,56 @@ public void verifyAndFixupContainerData(ContainerData
containerData)
throws IOException {
switch (containerData.getContainerType()) {
case KeyValueContainer:
- if (containerData instanceof KeyValueContainerData) {
- KeyValueContainerData kvContainerData = (KeyValueContainerData)
- containerData;
- containerData.setVolume(hddsVolume);
- KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
- KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
- config);
- if (kvContainer.getContainerState() == RECOVERING) {
- if (shouldDelete) {
- // delete Ratis replicated RECOVERING containers
- if (kvContainer.getContainerData().getReplicaIndex() == 0) {
- cleanupContainer(hddsVolume, kvContainer);
- } else {
- kvContainer.markContainerUnhealthy();
- LOG.info("Stale recovering container {} marked UNHEALTHY",
- kvContainerData.getContainerID());
- containerSet.addContainer(kvContainer);
- }
- }
- return;
- }
- if (kvContainer.getContainerState() == DELETED) {
- if (shouldDelete) {
+ if (!(containerData instanceof KeyValueContainerData)) {
+ throw new StorageContainerException("Container File is corrupted. " +
+ "ContainerType is KeyValueContainer but cast to " +
+ "KeyValueContainerData failed. ",
+ ContainerProtos.Result.CONTAINER_METADATA_ERROR);
+ }
+
+ KeyValueContainerData kvContainerData = (KeyValueContainerData)
+ containerData;
+ containerData.setVolume(hddsVolume);
+ KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
+ KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
+ config);
+ if (kvContainer.getContainerState() == RECOVERING) {
+ if (shouldDelete) {
+ // delete Ratis replicated RECOVERING containers
+ if (kvContainer.getContainerData().getReplicaIndex() == 0) {
cleanupContainer(hddsVolume, kvContainer);
+ } else {
+ kvContainer.markContainerUnhealthy();
+ LOG.info("Stale recovering container {} marked UNHEALTHY",
+ kvContainerData.getContainerID());
+ containerSet.addContainer(kvContainer);
}
- return;
}
- try {
- containerSet.addContainer(kvContainer);
- } catch (StorageContainerException e) {
- if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
- throw e;
- }
- if (shouldDelete) {
- resolveDuplicate((KeyValueContainer) containerSet.getContainer(
- kvContainer.getContainerData().getContainerID()), kvContainer);
+ return;
+ } else if (kvContainer.getContainerState() == DELETED) {
+ if (shouldDelete) {
+ cleanupContainer(hddsVolume, kvContainer);
+ }
+ return;
+ }
+
+ try {
+ containerSet.addContainer(kvContainer);
+ // this should be the last step of this block
+ containerData.commitSpace();
+ } catch (StorageContainerException e) {
+ if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
+ throw e;
+ }
+ if (shouldDelete) {
+ KeyValueContainer existing = (KeyValueContainer)
containerSet.getContainer(
+ kvContainer.getContainerData().getContainerID());
+ boolean swapped = resolveDuplicate(existing, kvContainer);
+ if (swapped) {
+ existing.getContainerData().releaseCommitSpace();
+ kvContainer.getContainerData().commitSpace();
}
}
- } else {
- throw new StorageContainerException("Container File is corrupted. " +
- "ContainerType is KeyValueContainer but cast to " +
- "KeyValueContainerData failed. ",
- ContainerProtos.Result.CONTAINER_METADATA_ERROR);
}
break;
default:
@@ -254,7 +261,14 @@ public void verifyAndFixupContainerData(ContainerData
containerData)
}
}
- private void resolveDuplicate(KeyValueContainer existing,
+ /**
+ * Resolve duplicate containers.
+ * @param existing
+ * @param toAdd
+ * @return true if the container was swapped, false otherwise
+ * @throws IOException
+ */
+ private boolean resolveDuplicate(KeyValueContainer existing,
KeyValueContainer toAdd) throws IOException {
if (existing.getContainerData().getReplicaIndex() != 0 ||
toAdd.getContainerData().getReplicaIndex() != 0) {
@@ -268,7 +282,7 @@ private void resolveDuplicate(KeyValueContainer existing,
existing.getContainerData().getContainerID(),
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath());
- return;
+ return false;
}
long existingBCSID = existing.getBlockCommitSequenceId();
@@ -288,7 +302,7 @@ private void resolveDuplicate(KeyValueContainer existing,
toAdd.getContainerData().getContainerPath(), toAddState);
KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
hddsVolume.getConf());
- return;
+ return false;
} else if (toAddState == CLOSED) {
LOG.warn("Container {} is present at {} with state CLOSED and at " +
"{} with state {}. Removing the latter container.",
@@ -296,7 +310,7 @@ private void resolveDuplicate(KeyValueContainer existing,
toAdd.getContainerData().getContainerPath(),
existing.getContainerData().getContainerPath(), existingState);
swapAndRemoveContainer(existing, toAdd);
- return;
+ return true;
}
}
@@ -309,6 +323,7 @@ private void resolveDuplicate(KeyValueContainer existing,
toAdd.getContainerData().getContainerPath());
KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
hddsVolume.getConf());
+ return false;
} else {
LOG.warn("Container {} is present at {} with a lesser BCSID " +
"than at {}. Removing the former container.",
@@ -316,6 +331,7 @@ private void resolveDuplicate(KeyValueContainer existing,
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath());
swapAndRemoveContainer(existing, toAdd);
+ return true;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index f69516f94e1..ff7b1d3b732 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -88,7 +88,7 @@ public boolean isAllowedContainerImport(long containerID) {
}
public void importContainer(long containerID, Path tarFilePath,
- HddsVolume hddsVolume, CopyContainerCompression compression)
+ HddsVolume targetVolume, CopyContainerCompression compression)
throws IOException {
if (!importContainerProgress.add(containerID)) {
deleteFileQuietely(tarFilePath);
@@ -106,11 +106,6 @@ public void importContainer(long containerID, Path
tarFilePath,
ContainerProtos.Result.CONTAINER_EXISTS);
}
- HddsVolume targetVolume = hddsVolume;
- if (targetVolume == null) {
- targetVolume = chooseNextVolume();
- }
-
KeyValueContainerData containerData;
TarContainerPacker packer = getPacker(compression);
@@ -148,7 +143,7 @@ HddsVolume chooseNextVolume() throws IOException {
// Choose volume that can hold both container in tmp and dest directory
return volumeChoosingPolicy.chooseVolume(
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
- HddsServerUtil.requiredReplicationSpace(containerSize));
+ getDefaultReplicationSpace());
}
public static Path getUntarDirectory(HddsVolume hddsVolume)
@@ -171,7 +166,7 @@ protected TarContainerPacker
getPacker(CopyContainerCompression compression) {
return new TarContainerPacker(compression);
}
- public long getDefaultContainerSize() {
- return containerSize;
+ public long getDefaultReplicationSpace() {
+ return HddsServerUtil.requiredReplicationSpace(containerSize);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 9a943c63338..240ba9473d3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -22,11 +22,8 @@
import java.nio.file.Path;
import java.util.List;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
import org.slf4j.Logger;
@@ -47,7 +44,6 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
private final ContainerDownloader downloader;
private final ContainerImporter containerImporter;
private final ContainerSet containerSet;
- private final long containerSize;
public DownloadAndImportReplicator(
ConfigurationSource conf, ContainerSet containerSet,
@@ -57,9 +53,6 @@ public DownloadAndImportReplicator(
this.containerSet = containerSet;
this.downloader = downloader;
this.containerImporter = containerImporter;
- containerSize = (long) conf.getStorageSize(
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
}
@Override
@@ -81,15 +74,6 @@ public void replicate(ReplicationTask task) {
try {
targetVolume = containerImporter.chooseNextVolume();
- // Increment committed bytes and verify if it doesn't cross the space
left.
- targetVolume.incCommittedBytes(containerSize * 2);
- StorageLocationReport volumeReport = targetVolume.getReport();
- // Already committed bytes increased above, so required space is not
required here in AvailableSpaceFilter
- if (volumeReport.getUsableSpace() <= 0) {
- LOG.warn("Container {} replication was unsuccessful, no space left on
volume {}", containerID, volumeReport);
- task.setStatus(Status.FAILED);
- return;
- }
// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
@@ -114,7 +98,7 @@ public void replicate(ReplicationTask task) {
task.setStatus(Status.FAILED);
} finally {
if (targetVolume != null) {
- targetVolume.incCommittedBytes(-containerSize * 2);
+
targetVolume.incCommittedBytes(-containerImporter.getDefaultReplicationSpace());
}
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index 5224498e727..9cb07a21c5d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -29,9 +29,7 @@
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.util.DiskChecker;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
@@ -87,16 +85,6 @@ public void onNext(SendContainerRequest req) {
if (containerId == -1) {
containerId = req.getContainerID();
volume = importer.chooseNextVolume();
- // Increment committed bytes and verify if it doesn't cross the space
left.
- volume.incCommittedBytes(importer.getDefaultContainerSize() * 2);
- StorageLocationReport volumeReport = volume.getReport();
- // Already committed bytes increased above, so required space is not
required here in AvailableSpaceFilter
- if (volumeReport.getUsableSpace() <= 0) {
- volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
- LOG.warn("Container {} import was unsuccessful, no space left on
volume {}", containerId, volumeReport);
- volume = null;
- throw new DiskChecker.DiskOutOfSpaceException("No more available
volumes");
- }
Path dir = ContainerImporter.getUntarDirectory(volume);
Files.createDirectories(dir);
@@ -130,7 +118,7 @@ public void onError(Throwable t) {
responseObserver.onError(t);
} finally {
if (volume != null) {
- volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+ volume.incCommittedBytes(-importer.getDefaultReplicationSpace());
}
}
}
@@ -159,7 +147,7 @@ public void onCompleted() {
}
} finally {
if (volume != null) {
- volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+ volume.incCommittedBytes(-importer.getDefaultReplicationSpace());
}
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index cf75342efdf..02f999013e6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -202,10 +202,11 @@ private KeyValueContainer addContainer(ContainerSet cSet,
long cID)
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
KeyValueContainer container = new KeyValueContainer(data, conf);
+ commitBytesBefore =
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()).get(0).getCommittedBytes();
+
container.create(volumeSet, volumeChoosingPolicy, SCM_ID);
- commitBytesBefore = container.getContainerData()
- .getVolume().getCommittedBytes();
cSet.addContainer(container);
+
commitBytesAfter = container.getContainerData()
.getVolume().getCommittedBytes();
commitIncrement = commitBytesAfter - commitBytesBefore;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
index d6c97c5f1a3..07ae372a4cd 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
@@ -149,4 +149,15 @@ public void testVolumeChoosingPolicyFactory()
VolumeChoosingPolicyFactory.getPolicy(CONF).getClass());
}
+ @Test
+ public void testVolumeCommittedSpace() throws Exception {
+ Map<HddsVolume, Long> initialCommittedSpace = new HashMap<>();
+ volumes.forEach(vol ->
+ initialCommittedSpace.put(vol, vol.getCommittedBytes()));
+
+ HddsVolume selectedVolume = policy.chooseVolume(volumes, 50);
+
+ assertEquals(initialCommittedSpace.get(selectedVolume) + 50,
+ selectedVolume.getCommittedBytes());
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
index 1c07fe7ab7b..2406011a3d1 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
@@ -25,7 +25,9 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
import org.apache.hadoop.hdds.fs.MockSpaceUsageSource;
@@ -115,4 +117,15 @@ public void
throwsDiskOutOfSpaceIfRequestMoreThanAvailable() {
"Most available space: 150 bytes");
}
+ @Test
+ public void testVolumeCommittedSpace() throws Exception {
+ Map<HddsVolume, Long> initialCommittedSpace = new HashMap<>();
+ volumes.forEach(vol ->
+ initialCommittedSpace.put(vol, vol.getCommittedBytes()));
+
+ HddsVolume selectedVolume = policy.chooseVolume(volumes, 50);
+
+ assertEquals(initialCommittedSpace.get(selectedVolume) + 50,
+ selectedVolume.getCommittedBytes());
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index c700b235faf..51a949e496f 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -38,6 +38,8 @@
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -1097,4 +1099,19 @@ private void testMixedSchemaImport(String dir,
assertEquals(pendingDeleteBlockCount,
importedContainer.getContainerData().getNumPendingDeletionBlocks());
}
+
+ @ContainerTestVersionInfo.ContainerTest
+ public void testContainerCreationCommitSpaceReserve(
+ ContainerTestVersionInfo versionInfo) throws Exception {
+ init(versionInfo);
+ keyValueContainerData = spy(keyValueContainerData);
+ keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
+ keyValueContainer = spy(keyValueContainer);
+
+ keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+
+ // verify that
+ verify(volumeChoosingPolicy).chooseVolume(anyList(), anyLong()); // this
would reserve commit space
+ assertTrue(keyValueContainerData.isCommittedSpace());
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 256ca20e938..7927864861b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -25,11 +25,13 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -43,6 +45,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
@@ -53,6 +57,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
@@ -70,17 +75,22 @@
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Unit tests for {@link KeyValueHandler}.
*/
public class TestKeyValueHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestKeyValueHandler.class);
+
@TempDir
private Path tempDir;
@@ -91,9 +101,11 @@ public class TestKeyValueHandler {
private HddsDispatcher dispatcher;
private KeyValueHandler handler;
+ private long maxContainerSize;
@BeforeEach
public void setup() throws StorageContainerException {
+ OzoneConfiguration conf = new OzoneConfiguration();
// Create mock HddsDispatcher and KeyValueHandler.
handler = mock(KeyValueHandler.class);
@@ -109,6 +121,10 @@ public void setup() throws StorageContainerException {
mock(ContainerMetrics.class),
mock(TokenVerifier.class)
);
+
+ maxContainerSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
}
/**
@@ -337,6 +353,68 @@ public void
testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
"Close container should return Invalid container error");
}
+ @Test
+ public void testCreateContainerWithFailure() throws Exception {
+ final String testDir = tempDir.toString();
+ final long containerID = 1L;
+ final String clusterId = UUID.randomUUID().toString();
+ final String datanodeId = UUID.randomUUID().toString();
+ final ConfigurationSource conf = new OzoneConfiguration();
+ final ContainerSet containerSet = spy(newContainerSet());
+ final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
+
+ HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(conf)
+ .clusterID(clusterId).datanodeUuid(datanodeId)
+ .volumeSet(volumeSet)
+ .build();
+
+ hddsVolume.format(clusterId);
+ hddsVolume.createWorkingDir(clusterId, null);
+ hddsVolume.createTmpDirs(clusterId);
+
+ when(volumeSet.getVolumesList())
+ .thenReturn(Collections.singletonList(hddsVolume));
+
+ List<HddsVolume> hddsVolumeList = StorageVolumeUtil
+ .getHddsVolumesList(volumeSet.getVolumesList());
+
+ assertEquals(1, hddsVolumeList.size());
+
+ final ContainerMetrics metrics = ContainerMetrics.create(conf);
+
+ final AtomicInteger icrReceived = new AtomicInteger(0);
+
+ final KeyValueHandler kvHandler = new KeyValueHandler(conf,
+ datanodeId, containerSet, volumeSet, metrics,
+ c -> icrReceived.incrementAndGet());
+ kvHandler.setClusterID(clusterId);
+
+ final ContainerCommandRequestProto createContainer =
+ createContainerRequest(datanodeId, containerID);
+
+ Semaphore semaphore = new Semaphore(1);
+ doAnswer(invocation -> {
+ semaphore.acquire();
+ throw new StorageContainerException(ContainerProtos.Result.IO_EXCEPTION);
+ }).when(containerSet).addContainer(any());
+
+ semaphore.acquire();
+ CompletableFuture.runAsync(() ->
+ kvHandler.handleCreateContainer(createContainer, null)
+ );
+
+ // commit bytes has been allocated by volumeChoosingPolicy which is called
in KeyValueContainer#create
+ GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() ==
maxContainerSize,
+ 1000, 50000);
+ semaphore.release();
+
+ LOG.info("Committed bytes: {}", hddsVolume.getCommittedBytes());
+
+ // release committed bytes as exception is thrown
+ GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 0,
+ 1000, 50000);
+ }
+
@Test
public void testDeleteContainer() throws IOException {
final String testDir = tempDir.toString();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index 6a48765c1a9..ec5c6743e72 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -217,8 +217,18 @@ public void testContainerReader(ContainerTestVersionInfo
versionInfo)
throws Exception {
setLayoutAndSchemaVersion(versionInfo);
setup(versionInfo);
+
+ ContainerReader containerReader = new ContainerReader(volumeSet,
+ hddsVolume, containerSet, conf, true);
+ Thread thread = new Thread(containerReader);
+ thread.start();
+ thread.join();
+ long originalCommittedBytes = hddsVolume.getCommittedBytes();
+ ContainerCache.getInstance(conf).shutdownCache();
+
+ long recoveringContainerId = 10;
KeyValueContainerData recoveringContainerData = new KeyValueContainerData(
- 10, layout, (long) StorageUnit.GB.toBytes(5),
+ recoveringContainerId, layout, (long) StorageUnit.GB.toBytes(5),
UUID.randomUUID().toString(), datanodeId.toString());
//create a container with recovering state
recoveringContainerData.setState(RECOVERING);
@@ -229,13 +239,13 @@ public void testContainerReader(ContainerTestVersionInfo
versionInfo)
recoveringKeyValueContainer.create(
volumeSet, volumeChoosingPolicy, clusterId);
- ContainerReader containerReader = new ContainerReader(volumeSet,
- hddsVolume, containerSet, conf, true);
-
- Thread thread = new Thread(containerReader);
+ thread = new Thread(containerReader);
thread.start();
thread.join();
+ // no change, only open containers have committed space
+ assertEquals(originalCommittedBytes, hddsVolume.getCommittedBytes());
+
// Ratis replicated recovering containers are deleted upon datanode startup
if (recoveringKeyValueContainer.getContainerData().getReplicaIndex() == 0)
{
assertNull(containerSet.getContainer(recoveringContainerData.getContainerID()));
@@ -262,6 +272,8 @@ public void testContainerReader(ContainerTestVersionInfo
versionInfo)
assertEquals(i,
keyValueContainerData.getNumPendingDeletionBlocks());
+
+ assertTrue(keyValueContainerData.isCommittedSpace());
}
}
@@ -313,6 +325,14 @@ public void testContainerReaderWithLoadException(
hddsVolume1, containerSet1, conf, true);
containerReader.readVolume(hddsVolume1.getHddsRootDir());
assertEquals(containerCount - 1, containerSet1.containerCount());
+ for (Container c : containerSet1.getContainerMap().values()) {
+ if (c.getContainerData().getContainerID() == 0) {
+ assertFalse(c.getContainerData().isCommittedSpace());
+ } else {
+ assertTrue(c.getContainerData().isCommittedSpace());
+ }
+ }
+ assertEquals(hddsVolume1.getCommittedBytes(), (containerCount - 1) *
StorageUnit.GB.toBytes(5));
}
@ContainerTestVersionInfo.ContainerTest
@@ -361,6 +381,7 @@ public void testContainerReaderWithInvalidDbPath(
hddsVolume1, containerSet1, conf, true);
containerReader.readVolume(hddsVolume1.getHddsRootDir());
assertEquals(0, containerSet1.containerCount());
+ assertEquals(0, hddsVolume1.getCommittedBytes());
assertThat(dnLogs.getOutput()).contains("Container DB file is missing");
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java
new file mode 100644
index 00000000000..5993e43e661
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import static
org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Test for DownloadAndImportReplicator.
+ */
+@Timeout(300)
+public class TestDownloadAndImportReplicator {
+
+ @TempDir
+ private File tempDir;
+
+ private OzoneConfiguration conf;
+ private VolumeChoosingPolicy volumeChoosingPolicy;
+ private ContainerSet containerSet;
+ private MutableVolumeSet volumeSet;
+ private ContainerImporter importer;
+ private SimpleContainerDownloader downloader;
+ private DownloadAndImportReplicator replicator;
+ private long containerMaxSize;
+
+ @BeforeEach
+ void setup() throws IOException {
+ conf = new OzoneConfiguration();
+ conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
+ volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
+ containerSet = newContainerSet(0);
+ volumeSet = new MutableVolumeSet("test", conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ importer = new ContainerImporter(conf, containerSet,
+ mock(ContainerController.class), volumeSet, volumeChoosingPolicy);
+ downloader = mock(SimpleContainerDownloader.class);
+ replicator = new DownloadAndImportReplicator(conf, containerSet, importer,
+ downloader);
+ containerMaxSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+ }
+
+ @Test
+ public void testCommitSpaceReleasedOnReplicationFailure() throws Exception {
+ long containerId = 1;
+ HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+ long initialCommittedBytes = volume.getCommittedBytes();
+
+ // Mock downloader to throw exception
+ Semaphore semaphore = new Semaphore(1);
+ when(downloader.getContainerDataFromReplicas(anyLong(), any(), any(),
any()))
+ .thenAnswer(invocation -> {
+ semaphore.acquire();
+ throw new IOException("Download failed");
+ });
+
+ ReplicationTask task = new ReplicationTask(containerId,
+ Collections.singletonList(mock(DatanodeDetails.class)), replicator);
+
+ // Acquire semaphore so that container import will pause before
downloading.
+ semaphore.acquire();
+ CompletableFuture.runAsync(() -> {
+ assertThrows(IOException.class, () -> replicator.replicate(task));
+ });
+
+ // Wait such that first container import reserve space
+ GenericTestUtils.waitFor(() ->
+ volume.getCommittedBytes() > initialCommittedBytes,
+ 1000, 50000);
+ assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 *
containerMaxSize);
+ semaphore.release();
+
+ GenericTestUtils.waitFor(() ->
+ volume.getCommittedBytes() == initialCommittedBytes,
+ 1000, 50000);
+
+ // Verify commit space is released
+ assertEquals(initialCommittedBytes, volume.getCommittedBytes());
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 9acb73486a0..1e69eac2ea9 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -355,11 +355,12 @@ public void
testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout
@ContainerLayoutTestInfo.ContainerTest
public void testReplicationImportReserveSpace(ContainerLayoutVersion layout)
throws IOException, InterruptedException, TimeoutException {
+ final long containerUsedSize = 100;
this.layoutVersion = layout;
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
- long containerSize = (long) conf.getStorageSize(
+ long containerMaxSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
@@ -369,13 +370,16 @@ public void
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
.clock(clock)
.build();
+ MutableVolumeSet volumeSet = new
MutableVolumeSet(datanode.getUuidString(), conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+
long containerId = 1;
// create container
KeyValueContainerData containerData = new
KeyValueContainerData(containerId,
- ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
- HddsVolume vol = mock(HddsVolume.class);
- containerData.setVolume(vol);
- containerData.incrBytesUsed(100);
+ ContainerLayoutVersion.FILE_PER_BLOCK, containerMaxSize, "test",
"test");
+ HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0);
+ containerData.setVolume(vol1);
+ containerData.incrBytesUsed(containerUsedSize);
KeyValueContainer container = new KeyValueContainer(containerData, conf);
ContainerController controllerMock = mock(ContainerController.class);
Semaphore semaphore = new Semaphore(1);
@@ -384,8 +388,7 @@ public void
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
semaphore.acquire();
return container;
});
- MutableVolumeSet volumeSet = new
MutableVolumeSet(datanode.getUuidString(), conf, null,
- StorageVolume.VolumeType.DATA_VOLUME, null);
+
File tarFile = containerTarFile(containerId, containerData);
SimpleContainerDownloader moc =
@@ -398,14 +401,13 @@ public void
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
ContainerImporter importer =
new ContainerImporter(conf, set, controllerMock, volumeSet,
volumeChoosingPolicy);
- HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0);
// Initially volume has 0 commit space
assertEquals(0, vol1.getCommittedBytes());
long usedSpace = vol1.getCurrentUsage().getUsedSpace();
// Initially volume has 0 used space
assertEquals(0, usedSpace);
// Increase committed bytes so that volume has only remaining 3 times
container size space
- long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() -
containerSize * 3;
+ long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() -
containerMaxSize * 3;
vol1.incCommittedBytes(initialCommittedBytes);
ContainerReplicator replicator =
new DownloadAndImportReplicator(conf, set, importer, moc);
@@ -424,11 +426,11 @@ public void
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
// Wait such that first container import reserve space
GenericTestUtils.waitFor(() ->
- vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() -
containerSize * 3,
+ vol1.getCommittedBytes() > initialCommittedBytes,
1000, 50000);
// Volume has reserved space of 2 * containerSize
- assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 *
containerSize);
+ assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 *
containerMaxSize);
// Container 2 import will fail as container 1 has reserved space and no
space left to import new container
// New container import requires at least (2 * container size)
long containerId2 = 2;
@@ -443,10 +445,11 @@ public void
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
usedSpace = vol1.getCurrentUsage().getUsedSpace();
// After replication, volume used space should be increased by container
used bytes
- assertEquals(100, usedSpace);
+ assertEquals(containerUsedSize, usedSpace);
- // Volume committed bytes should become initial committed bytes which was
before replication
- assertEquals(initialCommittedBytes, vol1.getCommittedBytes());
+ // Volume committed bytes used for replication has been released, no need
to reserve space for imported container
+ // only closed container gets replicated, so no new data will be written it
+ assertEquals(vol1.getCommittedBytes(), initialCommittedBytes);
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
index 0d15e265ad9..441bc7890b6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
@@ -21,18 +21,25 @@
import static
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.mockito.Mockito.any;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.File;
+import java.io.IOException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
@@ -48,7 +55,7 @@
/**
* Test for {@link SendContainerRequestHandler}.
*/
-class TestSendContainerRequestHandler {
+public class TestSendContainerRequestHandler {
@TempDir
private File tempDir;
@@ -57,38 +64,48 @@ class TestSendContainerRequestHandler {
private VolumeChoosingPolicy volumeChoosingPolicy;
+ private ContainerSet containerSet;
+ private MutableVolumeSet volumeSet;
+ private ContainerImporter importer;
+ private StreamObserver<ContainerProtos.SendContainerResponse>
responseObserver;
+ private SendContainerRequestHandler sendContainerRequestHandler;
+ private long containerMaxSize;
+
@BeforeEach
- void setup() {
+ void setup() throws IOException {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
+ containerSet = newContainerSet(0);
+ volumeSet = new MutableVolumeSet("test", conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ importer = new ContainerImporter(conf, containerSet,
+ mock(ContainerController.class), volumeSet, volumeChoosingPolicy);
+ importer = spy(importer);
+ responseObserver = mock(StreamObserver.class);
+ sendContainerRequestHandler = new SendContainerRequestHandler(importer,
responseObserver, null);
+ containerMaxSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
}
@Test
void testReceiveDataForExistingContainer() throws Exception {
long containerId = 1;
// create containerImporter
- ContainerSet containerSet = newContainerSet(0);
- MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null,
- StorageVolume.VolumeType.DATA_VOLUME, null);
- ContainerImporter containerImporter = new ContainerImporter(conf,
- newContainerSet(0), mock(ContainerController.class), volumeSet,
volumeChoosingPolicy);
KeyValueContainerData containerData = new
KeyValueContainerData(containerId,
ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
// add container to container set
KeyValueContainer container = new KeyValueContainer(containerData, conf);
containerSet.addContainer(container);
- StreamObserver observer = mock(StreamObserver.class);
doAnswer(invocation -> {
Object arg = invocation.getArgument(0);
assertInstanceOf(StorageContainerException.class, arg);
assertEquals(ContainerProtos.Result.CONTAINER_EXISTS,
((StorageContainerException) arg).getResult());
return null;
- }).when(observer).onError(any());
- SendContainerRequestHandler sendContainerRequestHandler
- = new SendContainerRequestHandler(containerImporter, observer, null);
+ }).when(responseObserver).onError(any());
ByteString data = ByteString.copyFromUtf8("test");
ContainerProtos.SendContainerRequest request
= ContainerProtos.SendContainerRequest.newBuilder()
@@ -99,4 +116,87 @@ void testReceiveDataForExistingContainer() throws Exception
{
.build();
sendContainerRequestHandler.onNext(request);
}
+
+ @Test
+ public void testSpaceReservedAndReleasedWhenRequestCompleted() throws
Exception {
+ long containerId = 1;
+ HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+ long initialCommittedBytes = volume.getCommittedBytes();
+
+ // Create request
+ ContainerProtos.SendContainerRequest request =
ContainerProtos.SendContainerRequest.newBuilder()
+ .setContainerID(containerId)
+ .setData(ByteString.EMPTY)
+ .setOffset(0)
+ .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto())
+ .build();
+
+ // Execute request
+ sendContainerRequestHandler.onNext(request);
+
+ // Verify commit space is reserved
+ assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 *
containerMaxSize);
+
+ // complete the request
+ sendContainerRequestHandler.onCompleted();
+
+ // Verify commit space is released
+ assertEquals(volume.getCommittedBytes(), initialCommittedBytes);
+ }
+
+ @Test
+ public void testSpaceReservedAndReleasedWhenOnNextFails() throws Exception {
+ long containerId = 1;
+ HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+ long initialCommittedBytes = volume.getCommittedBytes();
+
+ // Create request
+ ContainerProtos.SendContainerRequest request = createRequest(containerId,
ByteString.copyFromUtf8("test"), 0);
+
+ // Execute request
+ sendContainerRequestHandler.onNext(request);
+
+ // Verify commit space is reserved
+ assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 *
containerMaxSize);
+
+ // mock the importer is not allowed to import this container
+ when(importer.isAllowedContainerImport(containerId)).thenReturn(false);
+
+ sendContainerRequestHandler.onNext(request);
+
+ // Verify commit space is released
+ assertEquals(volume.getCommittedBytes(), initialCommittedBytes);
+ }
+
+ @Test
+ public void testSpaceReservedAndReleasedWhenOnCompletedFails() throws
Exception {
+ long containerId = 1;
+ HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+ long initialCommittedBytes = volume.getCommittedBytes();
+
+ // Create request
+ ContainerProtos.SendContainerRequest request = createRequest(containerId,
ByteString.copyFromUtf8("test"), 0);
+
+ // Execute request
+ sendContainerRequestHandler.onNext(request);
+
+ // Verify commit space is reserved
+ assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 *
containerMaxSize);
+
+ doThrow(new
IOException("Failed")).when(importer).importContainer(anyLong(), any(), any(),
any());
+
+ sendContainerRequestHandler.onCompleted();
+
+ // Verify commit space is released
+ assertEquals(volume.getCommittedBytes(), initialCommittedBytes);
+ }
+
+ private ContainerProtos.SendContainerRequest createRequest(long containerId,
ByteString data, int offset) {
+ return ContainerProtos.SendContainerRequest.newBuilder()
+ .setContainerID(containerId)
+ .setData(data)
+ .setOffset(offset)
+ .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto())
+ .build();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]