This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to
refs/heads/HDDS-10239-container-reconciliation by this push:
new 04c196c9f4 HDDS-10377. Allow datanodes to do chunk level modifications
to closed containers. (#7111)
04c196c9f4 is described below
commit 04c196c9f401de0f67356071188b01a49eaccf76
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Wed Oct 9 15:35:43 2024 -0700
HDDS-10377. Allow datanodes to do chunk level modifications to closed
containers. (#7111)
---
.../container/common/helpers/ContainerMetrics.java | 10 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 65 +++++++++
.../keyvalue/impl/TestFilePerBlockStrategy.java | 156 +++++++++++++++++++++
3 files changed, 230 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index 883f6cd851..91bdb17cda 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -60,6 +60,7 @@ public class ContainerMetrics {
private final EnumMap<ContainerProtos.Type, MutableCounterLong> numOpsArray;
private final EnumMap<ContainerProtos.Type, MutableCounterLong>
opsBytesArray;
+ private final EnumMap<ContainerProtos.Type, MutableCounterLong>
opsForClosedContainer;
private final EnumMap<ContainerProtos.Type, MutableRate> opsLatency;
private final EnumMap<ContainerProtos.Type, MutableQuantiles[]>
opsLatQuantiles;
private MetricsRegistry registry = null;
@@ -69,6 +70,7 @@ public class ContainerMetrics {
MutableQuantiles[] latQuantiles = new MutableQuantiles[len];
this.numOpsArray = new EnumMap<>(ContainerProtos.Type.class);
this.opsBytesArray = new EnumMap<>(ContainerProtos.Type.class);
+ this.opsForClosedContainer = new EnumMap<>(ContainerProtos.Type.class);
this.opsLatency = new EnumMap<>(ContainerProtos.Type.class);
this.opsLatQuantiles = new EnumMap<>(ContainerProtos.Type.class);
this.registry = new MetricsRegistry("StorageContainerMetrics");
@@ -77,7 +79,9 @@ public class ContainerMetrics {
numOpsArray.put(type, registry.newCounter(
"num" + type, "number of " + type + " ops", (long) 0));
opsBytesArray.put(type, registry.newCounter(
- "bytes" + type, "bytes used by " + type + "op", (long) 0));
+ "bytes" + type, "bytes used by " + type + " op", (long) 0));
+ opsForClosedContainer.put(type,
registry.newCounter("bytesForClosedContainer" + type,
+ "bytes used by " + type + " for closed container op", (long) 0));
opsLatency.put(type, registry.newRate("latencyNs" + type, type + " op"));
for (int j = 0; j < len; j++) {
@@ -121,6 +125,10 @@ public class ContainerMetrics {
opsBytesArray.get(type).incr(bytes);
}
+ public void incClosedContainerBytesStats(ContainerProtos.Type type, long
bytes) {
+ opsForClosedContainer.get(type).incr(bytes);
+ }
+
public void incContainerDeleteFailedBlockCountNotZero() {
containerDeleteFailedBlockCountNotZero.incr();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 66dc54e302..fa10ffe20a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1022,6 +1022,58 @@ public class KeyValueHandler extends Handler {
return getWriteChunkResponseSuccess(request, blockDataProto);
}
+ /**
+ * Handle Write Chunk operation for closed container. Calls ChunkManager to
process the request.
+ *
+ */
+ public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID
blockID,
+ ChunkBuffer data, KeyValueContainer
kvContainer)
+ throws IOException {
+ Preconditions.checkNotNull(kvContainer);
+ Preconditions.checkNotNull(chunkInfo);
+ Preconditions.checkNotNull(data);
+ long writeChunkStartTime = Time.monotonicNowNanos();
+ if (!checkContainerClose(kvContainer)) {
+ throw new IOException("Container #" +
kvContainer.getContainerData().getContainerID() +
+ " is not in closed state, Container state is " +
kvContainer.getContainerState());
+ }
+
+ DispatcherContext dispatcherContext =
DispatcherContext.getHandleWriteChunk();
+ chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
+ dispatcherContext);
+
+ // Increment write stats for WriteChunk after write.
+ metrics.incClosedContainerBytesStats(Type.WriteChunk, chunkInfo.getLen());
+ metrics.incContainerOpsLatencies(Type.WriteChunk, Time.monotonicNowNanos()
- writeChunkStartTime);
+ }
+
+ /**
+ * Handle Put Block operation for closed container. Calls BlockManager to
process the request.
+ *
+ */
+ public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo>
chunkInfos, KeyValueContainer kvContainer,
+ BlockData blockData, long
blockCommitSequenceId)
+ throws IOException {
+ Preconditions.checkNotNull(kvContainer);
+ Preconditions.checkNotNull(blockData);
+ long startTime = Time.monotonicNowNanos();
+
+ if (!checkContainerClose(kvContainer)) {
+ throw new IOException("Container #" +
kvContainer.getContainerData().getContainerID() +
+ " is not in closed state, Container state is " +
kvContainer.getContainerState());
+ }
+ blockData.setChunks(chunkInfos);
+ // To be set from the Replica's BCSId
+ blockData.setBlockCommitSequenceId(blockCommitSequenceId);
+
+ blockManager.putBlock(kvContainer, blockData, false);
+ ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
+ final long numBytes = blockDataProto.getSerializedSize();
+ // Increment write stats for PutBlock after write.
+ metrics.incClosedContainerBytesStats(Type.PutBlock, numBytes);
+ metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() -
startTime);
+ }
+
/**
* Handle Put Small File operation. Writes the chunk and associated key
* using a single RPC. Calls BlockManager and ChunkManager to process the
@@ -1198,6 +1250,19 @@ public class KeyValueHandler extends Handler {
throw new StorageContainerException(msg, result);
}
+ /**
+ * Check if container is Closed.
+ * @param kvContainer
+ */
+ private boolean checkContainerClose(KeyValueContainer kvContainer) {
+
+ final State containerState = kvContainer.getContainerState();
+ if (containerState == State.QUASI_CLOSED || containerState == State.CLOSED
|| containerState == State.UNHEALTHY) {
+ return true;
+ }
+ return false;
+ }
+
@Override
public Container importContainer(ContainerData originalContainerData,
final InputStream rawContainerStream,
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
index 36d7165519..8807f22026 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
@@ -19,20 +19,41 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Stream;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static
org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
@@ -133,6 +154,141 @@ public class TestFilePerBlockStrategy extends
CommonChunkManagerTestCases {
readData2.rewind().toByteString());
}
+ @ParameterizedTest
+ @MethodSource("getNonClosedStates")
+ public void testWriteChunkAndPutBlockFailureForNonClosedContainer(
+ ContainerProtos.ContainerDataProto.State state) throws IOException {
+ KeyValueContainer keyValueContainer = getKeyValueContainer();
+ keyValueContainer.getContainerData().setState(state);
+ ContainerSet containerSet = new ContainerSet(100);
+ containerSet.addContainer(keyValueContainer);
+ KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
+ ChunkBuffer.wrap(getData());
+ Assertions.assertThrows(IOException.class, () ->
keyValueHandler.writeChunkForClosedContainer(
+ getChunkInfo(), getBlockID(), ChunkBuffer.wrap(getData()),
keyValueContainer));
+ Assertions.assertThrows(IOException.class, () ->
keyValueHandler.putBlockForClosedContainer(
+ null, keyValueContainer, new BlockData(getBlockID()), 0L));
+ }
+
+ @Test
+ public void testWriteChunkForClosedContainer()
+ throws IOException {
+ ChunkBuffer writeChunkData = ChunkBuffer.wrap(getData());
+ KeyValueContainer kvContainer = getKeyValueContainer();
+ KeyValueContainerData containerData = kvContainer.getContainerData();
+ closedKeyValueContainer();
+ ContainerSet containerSet = new ContainerSet(100);
+ containerSet.addContainer(kvContainer);
+ KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
+ keyValueHandler.writeChunkForClosedContainer(getChunkInfo(), getBlockID(),
writeChunkData, kvContainer);
+ ChunkBuffer readChunkData =
keyValueHandler.getChunkManager().readChunk(kvContainer,
+ getBlockID(), getChunkInfo(), WRITE_STAGE);
+ rewindBufferToDataStart();
+ Assertions.assertEquals(writeChunkData, readChunkData);
+ Assertions.assertEquals(containerData.getWriteBytes(),
writeChunkData.remaining());
+ Assertions.assertEquals(containerData.getBytesUsed(),
writeChunkData.remaining());
+
+ // Test Overwrite
+ keyValueHandler.writeChunkForClosedContainer(getChunkInfo(), getBlockID(),
+ writeChunkData, kvContainer);
+ readChunkData = keyValueHandler.getChunkManager().readChunk(kvContainer,
+ getBlockID(), getChunkInfo(), WRITE_STAGE);
+ rewindBufferToDataStart();
+ Assertions.assertEquals(writeChunkData, readChunkData);
+ Assertions.assertEquals(containerData.getWriteBytes(), 2L *
writeChunkData.remaining());
+ // Overwrites won't increase the bytesUsed of a Container
+ Assertions.assertEquals(containerData.getBytesUsed(),
writeChunkData.remaining());
+
+ // Test new write chunk after overwrite
+ byte[] bytes = "testing write chunks with after overwrite".getBytes(UTF_8);
+ ChunkBuffer newWriteChunkData =
ChunkBuffer.allocate(bytes.length).put(bytes);
+ newWriteChunkData.rewind();
+
+ // Write chunk after the previous overwrite chunk.
+ ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d",
getBlockID()
+ .getLocalID(), writeChunkData.remaining()),
writeChunkData.remaining(), bytes.length);
+ keyValueHandler.writeChunkForClosedContainer(newChunkInfo, getBlockID(),
+ newWriteChunkData, kvContainer);
+ readChunkData = keyValueHandler.getChunkManager().readChunk(kvContainer,
+ getBlockID(), newChunkInfo, WRITE_STAGE);
+ newWriteChunkData.rewind();
+ Assertions.assertEquals(newWriteChunkData, readChunkData);
+ Assertions.assertEquals(containerData.getWriteBytes(), 2L *
writeChunkData.remaining()
+ + newWriteChunkData.remaining());
+ Assertions.assertEquals(containerData.getBytesUsed(),
writeChunkData.remaining() + newWriteChunkData.remaining());
+ }
+
+ @Test
+ public void testPutBlockForClosedContainer() throws IOException {
+ KeyValueContainer kvContainer = getKeyValueContainer();
+ KeyValueContainerData containerData = kvContainer.getContainerData();
+ closedKeyValueContainer();
+ ContainerSet containerSet = new ContainerSet(100);
+ containerSet.addContainer(kvContainer);
+ KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
+ List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
+ chunkInfoList.add(getChunkInfo().getProtoBufMessage());
+ BlockData putBlockData = new BlockData(getBlockID());
+ keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer,
putBlockData, 1L);
+ Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 1L);
+ Assertions.assertEquals(containerData.getBlockCount(), 1L);
+
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, new
OzoneConfiguration())) {
+ long localID = putBlockData.getLocalID();
+ BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
+ .get(containerData.getBlockKey(localID));
+ Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
+ }
+
+ // Add another chunk and check the put block data
+ ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d",
getBlockID()
+ .getLocalID(), 1L), 0, 20L);
+ chunkInfoList.add(newChunkInfo.getProtoBufMessage());
+ keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer,
putBlockData, 2L);
+ Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
+ Assertions.assertEquals(containerData.getBlockCount(), 1L);
+
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, new
OzoneConfiguration())) {
+ long localID = putBlockData.getLocalID();
+ BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
+ .get(containerData.getBlockKey(localID));
+ Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
+ }
+
+ // Put block on bcsId <= containerBcsId should be a no-op
+ keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer,
putBlockData, 2L);
+ Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
+ }
+
+ private boolean blockDataEquals(BlockData putBlockData, BlockData
getBlockData) {
+ return getBlockData.getSize() == putBlockData.getSize() &&
+ Objects.equals(getBlockData.getBlockID(), putBlockData.getBlockID()) &&
+ Objects.equals(getBlockData.getMetadata(), putBlockData.getMetadata())
&&
+ Objects.equals(getBlockData.getChunks(), putBlockData.getChunks());
+ }
+
+
+ private static Stream<Arguments> getNonClosedStates() {
+ return Stream.of(
+ Arguments.of(ContainerProtos.ContainerDataProto.State.OPEN),
+ Arguments.of(ContainerProtos.ContainerDataProto.State.RECOVERING),
+ Arguments.of(ContainerProtos.ContainerDataProto.State.CLOSING),
+ Arguments.of(ContainerProtos.ContainerDataProto.State.INVALID));
+ }
+
+ public KeyValueHandler createKeyValueHandler(ContainerSet containerSet)
+ throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ String dnUuid = UUID.randomUUID().toString();
+ MutableVolumeSet volumeSet = new MutableVolumeSet(dnUuid, conf,
+ null, StorageVolume.VolumeType.DATA_VOLUME, null);
+ return ContainerTestUtils.getKeyValueHandler(conf, dnUuid, containerSet,
volumeSet);
+ }
+
+ public void closedKeyValueContainer() {
+
getKeyValueContainer().getContainerData().setState(ContainerProtos.ContainerDataProto.State.CLOSED);
+ }
+
@Override
protected ContainerLayoutTestInfo getStrategy() {
return ContainerLayoutTestInfo.FILE_PER_BLOCK;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]