This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new 04c196c9f4 HDDS-10377. Allow datanodes to do chunk level modifications 
to closed containers. (#7111)
04c196c9f4 is described below

commit 04c196c9f401de0f67356071188b01a49eaccf76
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Wed Oct 9 15:35:43 2024 -0700

    HDDS-10377. Allow datanodes to do chunk level modifications to closed 
containers. (#7111)
---
 .../container/common/helpers/ContainerMetrics.java |  10 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  65 +++++++++
 .../keyvalue/impl/TestFilePerBlockStrategy.java    | 156 +++++++++++++++++++++
 3 files changed, 230 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index 883f6cd851..91bdb17cda 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -60,6 +60,7 @@ public class ContainerMetrics {
 
   private final EnumMap<ContainerProtos.Type, MutableCounterLong> numOpsArray;
   private final EnumMap<ContainerProtos.Type, MutableCounterLong> 
opsBytesArray;
+  private final EnumMap<ContainerProtos.Type, MutableCounterLong> 
opsForClosedContainer;
   private final EnumMap<ContainerProtos.Type, MutableRate> opsLatency;
   private final EnumMap<ContainerProtos.Type, MutableQuantiles[]> 
opsLatQuantiles;
   private MetricsRegistry registry = null;
@@ -69,6 +70,7 @@ public class ContainerMetrics {
     MutableQuantiles[] latQuantiles = new MutableQuantiles[len];
     this.numOpsArray = new EnumMap<>(ContainerProtos.Type.class);
     this.opsBytesArray = new EnumMap<>(ContainerProtos.Type.class);
+    this.opsForClosedContainer = new EnumMap<>(ContainerProtos.Type.class);
     this.opsLatency = new EnumMap<>(ContainerProtos.Type.class);
     this.opsLatQuantiles = new EnumMap<>(ContainerProtos.Type.class);
     this.registry = new MetricsRegistry("StorageContainerMetrics");
@@ -77,7 +79,9 @@ public class ContainerMetrics {
       numOpsArray.put(type, registry.newCounter(
           "num" + type, "number of " + type + " ops", (long) 0));
       opsBytesArray.put(type, registry.newCounter(
-          "bytes" + type, "bytes used by " + type + "op", (long) 0));
+          "bytes" + type, "bytes used by " + type + " op", (long) 0));
+      opsForClosedContainer.put(type, 
registry.newCounter("bytesForClosedContainer" + type,
+          "bytes used by " + type + " for closed container op", (long) 0));
       opsLatency.put(type, registry.newRate("latencyNs" + type, type + " op"));
 
       for (int j = 0; j < len; j++) {
@@ -121,6 +125,10 @@ public class ContainerMetrics {
     opsBytesArray.get(type).incr(bytes);
   }
 
+  public void incClosedContainerBytesStats(ContainerProtos.Type type, long 
bytes) {
+    opsForClosedContainer.get(type).incr(bytes);
+  }
+
   public void incContainerDeleteFailedBlockCountNotZero() {
     containerDeleteFailedBlockCountNotZero.incr();
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 66dc54e302..fa10ffe20a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1022,6 +1022,58 @@ public class KeyValueHandler extends Handler {
     return getWriteChunkResponseSuccess(request, blockDataProto);
   }
 
+  /**
+   * Handle Write Chunk operation for closed container. Calls ChunkManager to 
process the request.
+   *
+   */
+  public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID 
blockID,
+                                           ChunkBuffer data, KeyValueContainer 
kvContainer)
+      throws IOException {
+    Preconditions.checkNotNull(kvContainer);
+    Preconditions.checkNotNull(chunkInfo);
+    Preconditions.checkNotNull(data);
+    long writeChunkStartTime = Time.monotonicNowNanos();
+    if (!checkContainerClose(kvContainer)) {
+      throw new IOException("Container #" + 
kvContainer.getContainerData().getContainerID() +
+          " is not in closed state, Container state is " + 
kvContainer.getContainerState());
+    }
+
+    DispatcherContext dispatcherContext = 
DispatcherContext.getHandleWriteChunk();
+    chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
+        dispatcherContext);
+
+    // Increment write stats for WriteChunk after write.
+    metrics.incClosedContainerBytesStats(Type.WriteChunk, chunkInfo.getLen());
+    metrics.incContainerOpsLatencies(Type.WriteChunk, Time.monotonicNowNanos() 
- writeChunkStartTime);
+  }
+
+  /**
+   * Handle Put Block operation for closed container. Calls BlockManager to 
process the request.
+   *
+   */
+  public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> 
chunkInfos, KeyValueContainer kvContainer,
+                                          BlockData blockData, long 
blockCommitSequenceId)
+      throws IOException {
+    Preconditions.checkNotNull(kvContainer);
+    Preconditions.checkNotNull(blockData);
+    long startTime = Time.monotonicNowNanos();
+
+    if (!checkContainerClose(kvContainer)) {
+      throw new IOException("Container #" + 
kvContainer.getContainerData().getContainerID() +
+          " is not in closed state, Container state is " + 
kvContainer.getContainerState());
+    }
+    blockData.setChunks(chunkInfos);
+    // To be set from the Replica's BCSId
+    blockData.setBlockCommitSequenceId(blockCommitSequenceId);
+
+    blockManager.putBlock(kvContainer, blockData, false);
+    ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
+    final long numBytes = blockDataProto.getSerializedSize();
+    // Increment write stats for PutBlock after write.
+    metrics.incClosedContainerBytesStats(Type.PutBlock, numBytes);
+    metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - 
startTime);
+  }
+
   /**
    * Handle Put Small File operation. Writes the chunk and associated key
    * using a single RPC. Calls BlockManager and ChunkManager to process the
@@ -1198,6 +1250,19 @@ public class KeyValueHandler extends Handler {
     throw new StorageContainerException(msg, result);
   }
 
+  /**
+   * Check if container is Closed.
+   * @param kvContainer
+   */
+  private boolean checkContainerClose(KeyValueContainer kvContainer) {
+
+    final State containerState = kvContainer.getContainerState();
+    if (containerState == State.QUASI_CLOSED || containerState == State.CLOSED 
|| containerState == State.UNHEALTHY) {
+      return true;
+    }
+    return false;
+  }
+
   @Override
   public Container importContainer(ContainerData originalContainerData,
       final InputStream rawContainerStream,
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
index 36d7165519..8807f22026 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
@@ -19,20 +19,41 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
 
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Stream;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
 import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
 import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
@@ -133,6 +154,141 @@ public class TestFilePerBlockStrategy extends 
CommonChunkManagerTestCases {
         readData2.rewind().toByteString());
   }
 
+  @ParameterizedTest
+  @MethodSource("getNonClosedStates")
+  public void testWriteChunkAndPutBlockFailureForNonClosedContainer(
+      ContainerProtos.ContainerDataProto.State state) throws IOException {
+    KeyValueContainer keyValueContainer = getKeyValueContainer();
+    keyValueContainer.getContainerData().setState(state);
+    ContainerSet containerSet = new ContainerSet(100);
+    containerSet.addContainer(keyValueContainer);
+    KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
+    ChunkBuffer.wrap(getData());
+    Assertions.assertThrows(IOException.class, () -> 
keyValueHandler.writeChunkForClosedContainer(
+        getChunkInfo(), getBlockID(), ChunkBuffer.wrap(getData()), 
keyValueContainer));
+    Assertions.assertThrows(IOException.class, () -> 
keyValueHandler.putBlockForClosedContainer(
+        null, keyValueContainer, new BlockData(getBlockID()), 0L));
+  }
+
+  @Test
+  public void testWriteChunkForClosedContainer()
+      throws IOException {
+    ChunkBuffer writeChunkData = ChunkBuffer.wrap(getData());
+    KeyValueContainer kvContainer = getKeyValueContainer();
+    KeyValueContainerData containerData = kvContainer.getContainerData();
+    closedKeyValueContainer();
+    ContainerSet containerSet = new ContainerSet(100);
+    containerSet.addContainer(kvContainer);
+    KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
+    keyValueHandler.writeChunkForClosedContainer(getChunkInfo(), getBlockID(), 
writeChunkData, kvContainer);
+    ChunkBuffer readChunkData = 
keyValueHandler.getChunkManager().readChunk(kvContainer,
+        getBlockID(), getChunkInfo(), WRITE_STAGE);
+    rewindBufferToDataStart();
+    Assertions.assertEquals(writeChunkData, readChunkData);
+    Assertions.assertEquals(containerData.getWriteBytes(), 
writeChunkData.remaining());
+    Assertions.assertEquals(containerData.getBytesUsed(), 
writeChunkData.remaining());
+
+    // Test Overwrite
+    keyValueHandler.writeChunkForClosedContainer(getChunkInfo(), getBlockID(),
+        writeChunkData, kvContainer);
+    readChunkData = keyValueHandler.getChunkManager().readChunk(kvContainer,
+        getBlockID(), getChunkInfo(), WRITE_STAGE);
+    rewindBufferToDataStart();
+    Assertions.assertEquals(writeChunkData, readChunkData);
+    Assertions.assertEquals(containerData.getWriteBytes(), 2L * 
writeChunkData.remaining());
+    // Overwrites won't increase the bytesUsed of a Container
+    Assertions.assertEquals(containerData.getBytesUsed(), 
writeChunkData.remaining());
+
+    // Test new write chunk after overwrite
+    byte[] bytes = "testing write chunks with after overwrite".getBytes(UTF_8);
+    ChunkBuffer newWriteChunkData = 
ChunkBuffer.allocate(bytes.length).put(bytes);
+    newWriteChunkData.rewind();
+
+    // Write chunk after the previous overwrite chunk.
+    ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", 
getBlockID()
+        .getLocalID(), writeChunkData.remaining()), 
writeChunkData.remaining(), bytes.length);
+    keyValueHandler.writeChunkForClosedContainer(newChunkInfo, getBlockID(),
+        newWriteChunkData, kvContainer);
+    readChunkData = keyValueHandler.getChunkManager().readChunk(kvContainer,
+        getBlockID(), newChunkInfo, WRITE_STAGE);
+    newWriteChunkData.rewind();
+    Assertions.assertEquals(newWriteChunkData, readChunkData);
+    Assertions.assertEquals(containerData.getWriteBytes(), 2L * 
writeChunkData.remaining()
+        + newWriteChunkData.remaining());
+    Assertions.assertEquals(containerData.getBytesUsed(), 
writeChunkData.remaining() + newWriteChunkData.remaining());
+  }
+
+  @Test
+  public void testPutBlockForClosedContainer() throws IOException {
+    KeyValueContainer kvContainer = getKeyValueContainer();
+    KeyValueContainerData containerData = kvContainer.getContainerData();
+    closedKeyValueContainer();
+    ContainerSet containerSet = new ContainerSet(100);
+    containerSet.addContainer(kvContainer);
+    KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
+    List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
+    chunkInfoList.add(getChunkInfo().getProtoBufMessage());
+    BlockData putBlockData = new BlockData(getBlockID());
+    keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, 
putBlockData, 1L);
+    Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 1L);
+    Assertions.assertEquals(containerData.getBlockCount(), 1L);
+
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, new 
OzoneConfiguration())) {
+      long localID = putBlockData.getLocalID();
+      BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
+          .get(containerData.getBlockKey(localID));
+      Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
+    }
+
+    // Add another chunk and check the put block data
+    ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", 
getBlockID()
+        .getLocalID(), 1L), 0, 20L);
+    chunkInfoList.add(newChunkInfo.getProtoBufMessage());
+    keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, 
putBlockData, 2L);
+    Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
+    Assertions.assertEquals(containerData.getBlockCount(), 1L);
+
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, new 
OzoneConfiguration())) {
+      long localID = putBlockData.getLocalID();
+      BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
+          .get(containerData.getBlockKey(localID));
+      Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
+    }
+
+    // Put block on bcsId <= containerBcsId should be a no-op
+    keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, 
putBlockData, 2L);
+    Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
+  }
+
+  private boolean blockDataEquals(BlockData putBlockData, BlockData 
getBlockData) {
+    return getBlockData.getSize() == putBlockData.getSize() &&
+        Objects.equals(getBlockData.getBlockID(), putBlockData.getBlockID()) &&
+        Objects.equals(getBlockData.getMetadata(), putBlockData.getMetadata()) 
&&
+        Objects.equals(getBlockData.getChunks(), putBlockData.getChunks());
+  }
+
+
+  private static Stream<Arguments> getNonClosedStates() {
+    return Stream.of(
+        Arguments.of(ContainerProtos.ContainerDataProto.State.OPEN),
+        Arguments.of(ContainerProtos.ContainerDataProto.State.RECOVERING),
+        Arguments.of(ContainerProtos.ContainerDataProto.State.CLOSING),
+        Arguments.of(ContainerProtos.ContainerDataProto.State.INVALID));
+  }
+
+  public KeyValueHandler createKeyValueHandler(ContainerSet containerSet)
+      throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String dnUuid = UUID.randomUUID().toString();
+    MutableVolumeSet volumeSet = new MutableVolumeSet(dnUuid, conf,
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
+    return ContainerTestUtils.getKeyValueHandler(conf, dnUuid, containerSet, 
volumeSet);
+  }
+
+  public void closedKeyValueContainer() {
+    
getKeyValueContainer().getContainerData().setState(ContainerProtos.ContainerDataProto.State.CLOSED);
+  }
+
   @Override
   protected ContainerLayoutTestInfo getStrategy() {
     return ContainerLayoutTestInfo.FILE_PER_BLOCK;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to