This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new b34c537940 HDDS-11763. Implement container repair logic within 
datanodes. (#7474)
b34c537940 is described below

commit b34c537940bce95470f2a6af6b0aa4abbbabfb0d
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Fri Apr 11 04:13:46 2025 +0530

    HDDS-11763. Implement container repair logic within datanodes. (#7474)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  11 +-
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |   4 +
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   5 +
 .../hadoop/hdds/client/ReplicationConfig.java      |   1 +
 .../hdds/scm/container/ContainerReplicaInfo.java   |   5 +-
 .../checksum/ContainerChecksumTreeManager.java     |  45 ++-
 .../checksum/DNContainerOperationClient.java       |   4 +
 .../container/common/helpers/ContainerMetrics.java |  10 +
 .../container/common/utils/ContainerLogger.java    |  23 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 350 +++++++++++++++--
 .../container/keyvalue/impl/BlockManagerImpl.java  |  15 +-
 .../keyvalue/interfaces/BlockManager.java          |   9 +
 .../checksum/ContainerMerkleTreeTestUtils.java     |   1 +
 .../checksum/TestContainerChecksumTreeManager.java |  36 +-
 .../ozone/container/common/ContainerTestUtils.java |  31 ++
 .../TestReconcileContainerCommandHandler.java      |  10 +
 .../keyvalue/TestContainerCorruptions.java         |   2 +-
 .../container/keyvalue/TestKeyValueHandler.java    | 412 +++++++++++++++++++--
 .../src/main/smoketest/admincli/container.robot    |   5 +-
 .../apache/hadoop/ozone/container/TestHelper.java  |  14 +
 .../TestContainerCommandReconciliation.java        | 409 ++++++++++++++++++--
 21 files changed, 1277 insertions(+), 125 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 4867a2aa69..c9fe4ec679 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -112,6 +112,8 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
   private final Function<BlockID, BlockLocationInfo> refreshFunction;
 
+  private BlockData blockData;
+
   public BlockInputStream(
       BlockLocationInfo blockInfo,
       Pipeline pipeline,
@@ -153,7 +155,6 @@ public synchronized void initialize() throws IOException {
       return;
     }
 
-    BlockData blockData = null;
     List<ChunkInfo> chunks = null;
     IOException catchEx = null;
     do {
@@ -554,8 +555,7 @@ public long getLength() {
     return length;
   }
 
-  @VisibleForTesting
-  synchronized int getChunkIndex() {
+  public synchronized int getChunkIndex() {
     return chunkIndex;
   }
 
@@ -618,9 +618,12 @@ private void handleReadError(IOException cause) throws 
IOException {
     refreshBlockInfo(cause);
   }
 
-  @VisibleForTesting
   public synchronized List<ChunkInputStream> getChunkStreams() {
     return chunkStreams;
   }
 
+  public BlockData getStreamBlockData() {
+    return blockData;
+  }
+
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 23c96fc7d6..efad9ff76c 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -747,4 +747,8 @@ public synchronized void unbuffer() {
   public ByteBuffer[] getCachedBuffers() {
     return BufferUtils.getReadOnlyByteBuffers(buffers);
   }
+
+  public ChunkInfo getChunkInfo() {
+    return chunkInfo;
+  }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 0b2928410c..cce329f91e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -882,6 +882,11 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
         : null;
   }
 
+  /** @return Hex string representation of {@code value} */
+  public static String checksumToString(long value) {
+    return Long.toHexString(value);
+  }
+
   /**
    * Logs a warning to report that the class is not closed properly.
    */
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
index 4bd4709716..20ddf555bc 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
@@ -112,6 +112,7 @@ static HddsProtos.ReplicationFactor getLegacyFactor(
       return ((ReplicatedReplicationConfig) replicationConfig)
           .getReplicationFactor();
     }
+
     throw new UnsupportedOperationException(
         "Replication configuration of type "
             + replicationConfig.getReplicationType()
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
index b2884e3fb0..24bc4d6d32 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
+import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
+
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonSerializer;
 import com.fasterxml.jackson.databind.SerializerProvider;
@@ -26,7 +28,6 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
-
 /**
  * Class which stores ContainerReplica details on the client.
  */
@@ -102,7 +103,7 @@ public long getDataChecksum() {
   private static class LongToHexJsonSerializer extends JsonSerializer<Long> {
     @Override
     public void serialize(Long value, JsonGenerator gen, SerializerProvider 
provider) throws IOException {
-      gen.writeString(Long.toHexString(value));
+      gen.writeString(checksumToString(value));
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
index c247380fbe..99b5800c45 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -82,7 +82,9 @@ public void stop() {
    * file remains unchanged.
    * Concurrent writes to the same file are coordinated internally.
    */
-  public void writeContainerDataTree(ContainerData data, 
ContainerMerkleTreeWriter tree) throws IOException {
+  public ContainerProtos.ContainerChecksumInfo 
writeContainerDataTree(ContainerData data,
+                                                                      
ContainerMerkleTreeWriter tree)
+      throws IOException {
     long containerID = data.getContainerID();
     Lock writeLock = getLock(containerID);
     writeLock.lock();
@@ -98,11 +100,13 @@ public void writeContainerDataTree(ContainerData data, 
ContainerMerkleTreeWriter
         checksumInfoBuilder = 
ContainerProtos.ContainerChecksumInfo.newBuilder();
       }
 
-      checksumInfoBuilder
+      ContainerProtos.ContainerChecksumInfo checksumInfo = checksumInfoBuilder
           .setContainerID(containerID)
-          
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
 tree::toProto));
-      write(data, checksumInfoBuilder.build());
+          
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
 tree::toProto))
+          .build();
+      write(data, checksumInfo);
       LOG.debug("Data merkle tree for container {} updated", containerID);
+      return checksumInfo;
     } finally {
       writeLock.unlock();
     }
@@ -146,33 +150,32 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiffReport diff(KeyValueContainerData thisContainer,
+  /**
+   * Compares the checksum info of the container with the peer's checksum info 
and returns a report of the differences.
+   * @param thisChecksumInfo The checksum info of the container on this 
datanode.
+   * @param peerChecksumInfo The checksum info of the container on the peer 
datanode.
+   */
+  public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
                                   ContainerProtos.ContainerChecksumInfo 
peerChecksumInfo) throws
       StorageContainerException {
 
     ContainerDiffReport report = new ContainerDiffReport();
     try {
       captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
-        Preconditions.assertNotNull(thisContainer, "Container data is null");
-        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
-        Optional<ContainerProtos.ContainerChecksumInfo> 
thisContainerChecksumInfo = read(thisContainer);
-        if (!thisContainerChecksumInfo.isPresent()) {
-          throw new StorageContainerException("The container #" + 
thisContainer.getContainerID() +
-              " doesn't have container checksum", 
ContainerProtos.Result.IO_EXCEPTION);
-        }
-
-        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
-          throw new StorageContainerException("Container Id does not match for 
container "
-              + thisContainer.getContainerID(), 
ContainerProtos.Result.CONTAINER_ID_MISMATCH);
+        Preconditions.assertNotNull(thisChecksumInfo, "Datanode's checksum 
info is null.");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null.");
+        if (thisChecksumInfo.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new StorageContainerException("Container ID does not match. 
Local container ID "
+              + thisChecksumInfo.getContainerID() + " , Peer container ID " + 
peerChecksumInfo.getContainerID(),
+              ContainerProtos.Result.CONTAINER_ID_MISMATCH);
         }
 
-        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfo.get();
         compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
       });
     } catch (IOException ex) {
       metrics.incrementMerkleTreeDiffFailures();
-      throw new StorageContainerException("Container Diff failed for container 
#" + thisContainer.getContainerID(), ex,
-          ContainerProtos.Result.IO_EXCEPTION);
+      throw new StorageContainerException("Container Diff failed for container 
#" + thisChecksumInfo.getContainerID(),
+          ex, ContainerProtos.Result.IO_EXCEPTION);
     }
 
     // Update Container Diff metrics based on the diff report.
@@ -314,7 +317,7 @@ private Lock getLock(long containerID) {
    * Callers are not required to hold a lock while calling this since writes 
are done to a tmp file and atomically
    * swapped into place.
    */
-  private Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData 
data) throws IOException {
+  public Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData 
data) throws IOException {
     long containerID = data.getContainerID();
     File checksumFile = getContainerChecksumFile(data);
     try {
@@ -361,6 +364,8 @@ private void write(ContainerData data, 
ContainerProtos.ContainerChecksumInfo che
       throw new IOException("Error occurred when writing container merkle tree 
for containerID "
           + data.getContainerID(), ex);
     }
+    // Set in-memory data checksum.
+    
data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
index 31c397ce09..d5ba243dd1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
@@ -84,6 +84,10 @@ public XceiverClientManager getXceiverClientManager() {
     return xceiverClientManager;
   }
 
+  public TokenHelper getTokenHelper() {
+    return tokenHelper;
+  }
+
   /**
    * Reads {@link ContainerProtos.ContainerChecksumInfo} for a specified 
container for the specified datanode.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index af7662a131..652a90e978 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -56,6 +56,8 @@ public class ContainerMetrics implements Closeable {
   @Metric private MutableCounterLong containerForceDelete;
   @Metric private MutableCounterLong numReadStateMachine;
   @Metric private MutableCounterLong bytesReadStateMachine;
+  @Metric private MutableCounterLong numContainerReconciledWithoutChanges;
+  @Metric private MutableCounterLong numContainerReconciledWithChanges;
 
 
   private final EnumMap<ContainerProtos.Type, MutableCounterLong> numOpsArray;
@@ -172,4 +174,12 @@ public void incBytesReadStateMachine(long bytes) {
   public long getBytesReadStateMachine() {
     return bytesReadStateMachine.value();
   }
+
+  public void incContainerReconciledWithoutChanges() {
+    numContainerReconciledWithoutChanges.incr();
+  }
+
+  public void incContainerReconciledWithChanges() {
+    numContainerReconciledWithChanges.incr();
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
index 4aacc7c2de..6f20f22a8b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
@@ -17,7 +17,10 @@
 
 package org.apache.hadoop.ozone.container.common.utils;
 
+import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
+
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 import org.apache.logging.log4j.LogManager;
@@ -145,6 +148,23 @@ public static void logRecovered(ContainerData 
containerData) {
     LOG.info(getMessage(containerData));
   }
 
+  /**
+   * Logged when a container is reconciled.
+   *
+   * @param containerData The container that was reconciled on this datanode.
+   * @param oldDataChecksum The old data checksum.
+   */
+  public static void logReconciled(ContainerData containerData, long 
oldDataChecksum, DatanodeDetails peer) {
+    if (containerData.getDataChecksum() == oldDataChecksum) {
+      LOG.info(getMessage(containerData, "Container reconciled with peer " + 
peer.toString() +
+          ". No change in checksum."));
+    } else {
+      LOG.warn(getMessage(containerData, "Container reconciled with peer " + 
peer.toString() +
+          ". Checksum updated from " + checksumToString(oldDataChecksum) + " 
to "
+          + checksumToString(containerData.getDataChecksum())));
+    }
+  }
+
   private static String getMessage(ContainerData containerData,
                                    String message) {
     return String.join(FIELD_SEPARATOR, getMessage(containerData), message);
@@ -155,6 +175,7 @@ private static String getMessage(ContainerData 
containerData) {
         "ID=" + containerData.getContainerID(),
         "Index=" + containerData.getReplicaIndex(),
         "BCSID=" + containerData.getBlockCommitSequenceId(),
-        "State=" + containerData.getState());
+        "State=" + containerData.getState(),
+        "DataChecksum=" + checksumToString(containerData.getDataChecksum()));
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index d4bfa79142..43926ca5e2 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -17,10 +17,12 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
+import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
@@ -34,6 +36,8 @@
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse;
@@ -52,6 +56,7 @@
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
 import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
 import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
+import static 
org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline;
 import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -67,17 +72,23 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -91,21 +102,29 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
 import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
-import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.ChunkBufferToByteString;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.ContainerDiffReport;
 import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
 import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -136,7 +155,9 @@
 import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import 
org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -158,11 +179,13 @@ public class KeyValueHandler extends Handler {
   private final long maxDeleteLockWaitMs;
   private final Function<ByteBuffer, ByteString> byteBufferToByteString;
   private final boolean validateChunkChecksumData;
+  private final int chunkSize;
   // A striped lock that is held during container creation.
   private final Striped<Lock> containerCreationLocks;
   private final ContainerChecksumTreeManager checksumManager;
   private static FaultInjector injector;
   private final Clock clock;
+  private final BlockInputStreamFactoryImpl blockInputStreamFactory;
 
   public KeyValueHandler(ConfigurationSource config,
                          String datanodeId,
@@ -224,6 +247,9 @@ public KeyValueHandler(ConfigurationSource config,
         ByteStringConversion
             .createByteBufferConversion(isUnsafeByteBufferConversionEnabled);
 
+    blockInputStreamFactory = new BlockInputStreamFactoryImpl();
+    chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 
OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
+
     if (ContainerLayoutVersion.getConfiguredVersion(conf) ==
         ContainerLayoutVersion.FILE_PER_CHUNK) {
       LOG.warn("FILE_PER_CHUNK layout is not supported. Falling back to 
default : {}.",
@@ -600,24 +626,20 @@ ContainerCommandResponseProto handleCloseContainer(
     return getSuccessResponse(request);
   }
 
-  private void createContainerMerkleTree(Container container) {
+
+  /**
+   * Create a Merkle tree for the container if it does not exist.
+   * TODO: This method should be changed to private after HDDS-10374 is merged.
+   */
+  @VisibleForTesting
+  public void createContainerMerkleTree(Container container) {
     if (ContainerChecksumTreeManager.checksumFileExist(container)) {
       return;
     }
 
     try {
       KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
-      ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
-      try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
-           BlockIterator<BlockData> blockIterator = dbHandle.getStore().
-               getBlockIterator(containerData.getContainerID())) {
-        while (blockIterator.hasNext()) {
-          BlockData blockData = blockIterator.nextBlock();
-          List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
-          merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
-        }
-      }
-      checksumManager.writeContainerDataTree(containerData, merkleTree);
+      updateAndGetContainerChecksum(containerData);
     } catch (IOException ex) {
       LOG.error("Cannot create container checksum for container {} , 
Exception: ",
           container.getContainerData().getContainerID(), ex);
@@ -1483,21 +1505,297 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      checksumInfo = updateAndGetContainerChecksum(containerData);
+    }
+    long oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    for (DatanodeDetails peer : peers) {
+      long start = Instant.now().toEpochMilli();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      Pipeline pipeline = createSingleNodePipeline(peer);
+      ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, pipeline, dnClient, missingBlock, 
chunkByteBuffer);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, pipeline, dnClient, 
entry.getKey(), entry.getValue(), chunkByteBuffer);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, pipeline, dnClient, 
entry.getKey(), entry.getValue(), chunkByteBuffer);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+              containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata. The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateAndGetContainerChecksum(containerData);
+      long dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Instant.now().toEpochMilli() - start;
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateAndGetContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        // TODO: Add empty blocks to the merkle tree. Done in HDDS-10374, 
needs to be backported.
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id of the container 
and the block are not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, Pipeline 
pipeline, DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree 
missingBlock, ByteBuffer chunkByteBuffer)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
dnClient.getTokenHelper().getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. The block should not 
exist and our container merkle tree" +
+              " is stale. Skipping reconciliation for this block.", blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Gets the blockData from the peer, 
sets the block length and
+      // initializes ChunkInputStream for each chunk.
+      blockInputStream.initialize();
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // The maxBcsId is the peer's bcsId as there is no block for this 
blockID in the local container.
+      long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+      List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+
+      // Don't update bcsId if chunk read fails
+      for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkInfoProto.getOffset());
+
+          // Read the chunk data from the BlockInputStream and write it to the 
container.
+          int chunkLength = (int) chunkInfoProto.getLen();
+          if (chunkByteBuffer.capacity() < chunkLength) {
+            chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+          }
+
+          chunkByteBuffer.clear();
+          chunkByteBuffer.limit(chunkLength);
+          int bytesRead = blockInputStream.read(chunkByteBuffer);
+          if (bytesRead != chunkLength) {
+            throw new IOException("Error while reading chunk data from block 
input stream. Expected length: " +
+                chunkLength + ", Actual length: " + bytesRead);
+          }
+
+          chunkByteBuffer.flip();
+          ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+          writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+          // If the chunk read/write fails, we are expected to have holes in 
the blockData's chunk list.
+          // But that is okay, if the read fails it means there might be a 
hole in the peer datanode as well.
+          // If the chunk write fails then we don't want to add the metadata 
without the actual data as there is
+          // no data to verify the chunk checksum.
+          successfulChunksList.add(chunkInfoProto);
+        } catch (IOException ex) {
+          overwriteBcsId = false;
+          LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+              blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+        }
+      }
+
+      BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+      putBlockData.setChunks(successfulChunksList);
+      putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+      chunkManager.finishWriteChunks(container, putBlockData);
+    }
+  }
+
+  /**
+   * This method reconciles chunks per block. It reads the missing/corrupt 
chunk data from the peer
+   * datanode and writes it to the local container. If the chunk write fails, 
the block commit sequence
+   * id is not updated.
+   */
+  private void reconcileChunksPerBlock(KeyValueContainer container, Pipeline 
pipeline,
+                                       DNContainerOperationClient dnClient, 
long blockId,
+                                       List<ContainerProtos.ChunkMerkleTree> 
chunkList, ByteBuffer chunkByteBuffer)
+      throws IOException {
+
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
dnClient.getTokenHelper().getBlockToken(blockID, 0L);
+    BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+
+    SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap = 
localBlockData.getChunks().stream()
+        .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+            Function.identity(), (chunk1, chunk2) -> chunk1, TreeMap::new));
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Gets the blockData from the peer, 
sets the block length and
+      // initializes ChunkInputStream for each chunk.
+      blockInputStream.initialize();
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+      long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+          localBlockData.getBlockCommitSequenceId());
+
+      for (ContainerProtos.ChunkMerkleTree chunkMerkleTree : chunkList) {
+        long chunkOffset = chunkMerkleTree.getOffset();
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkOffset);
+          ChunkInputStream currentChunkStream = 
blockInputStream.getChunkStreams().get(
+              blockInputStream.getChunkIndex());
+          ContainerProtos.ChunkInfo chunkInfoProto = 
currentChunkStream.getChunkInfo();
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+          verifyChunksLength(chunkInfoProto, localChunksMap.get(chunkOffset));
+
+          // Read the chunk data from the BlockInputStream and write it to the 
container.
+          int chunkLength = (int) chunkInfoProto.getLen();
+          if (chunkByteBuffer.capacity() < chunkLength) {
+            chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+          }
+
+          chunkByteBuffer.clear();
+          chunkByteBuffer.limit(chunkLength);
+          int bytesRead = blockInputStream.read(chunkByteBuffer);
+          if (bytesRead != chunkLength) {
+            throw new IOException("Error while reading chunk data from block 
input stream. Expected length: " +
+                chunkLength + ", Actual length: " + bytesRead);
+          }
+
+          chunkByteBuffer.flip();
+          ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+          writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+          // In reconciling missing chunks which happens at the end of the 
block, we are expected to have holes in
+          // the blockData's chunk list because we continue to reconcile even 
if there are failures while reconciling
+          // chunks which is fine as we don't update the bcsId.
+          localChunksMap.put(chunkInfo.getOffset(), chunkInfoProto);
+        } catch (IOException ex) {
+          overwriteBcsId = false;
+          LOG.error("Error while reconciling chunk {} for block {} in 
container {}",
+              chunkOffset, blockID, containerData.getContainerID(), ex);
+        }
+      }
+
+      List<ContainerProtos.ChunkInfo> localChunkList = new 
ArrayList<>(localChunksMap.values());
+      localBlockData.setChunks(localChunkList);
+      putBlockForClosedContainer(container, localBlockData, maxBcsId, 
overwriteBcsId);
+      chunkManager.finishWriteChunks(container, localBlockData);
+    }
+  }
+
+  private void verifyChunksLength(ContainerProtos.ChunkInfo peerChunkInfo, 
ContainerProtos.ChunkInfo localChunkInfo)
+      throws StorageContainerException {
+    if (localChunkInfo == null || peerChunkInfo == null) {
+      return;
+    }
+
+    if (peerChunkInfo.getOffset() != localChunkInfo.getOffset()) {
+      throw new StorageContainerException("Offset mismatch for chunk. 
Expected: " + localChunkInfo.getOffset() +
+          ", Actual: " + peerChunkInfo.getOffset(), CHUNK_FILE_INCONSISTENCY);
+    }
+
+    if (peerChunkInfo.getLen() != localChunkInfo.getLen()) {
+      throw new StorageContainerException("Length mismatch for chunk at offset 
" + localChunkInfo.getOffset() +
+          ". Expected: " + localChunkInfo.getLen() + ", Actual: " + 
peerChunkInfo.getLen(), CHUNK_FILE_INCONSISTENCY);
+    }
+  }
+
   /**
    * Called by BlockDeletingService to delete all the chunks in a block
    * before proceeding to delete the block info from DB.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index ba3d404897..20ab37cf1c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -112,8 +112,6 @@ public long putBlockForClosedContainer(Container container, 
BlockData data, bool
     // We are not locking the key manager since RocksDB serializes all actions
     // against a single DB. We rely on DB level locking to avoid conflicts.
     try (DBHandle db = BlockUtils.getDB(containerData, config)) {
-      // This is a post condition that acts as a hint to the user.
-      // Should never fail.
       Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
 
       long blockBcsID = data.getBlockCommitSequenceId();
@@ -441,6 +439,19 @@ public List<BlockData> listBlock(Container container, long 
startLocalID, int
     }
   }
 
+  @Override
+  public boolean blockExists(Container container, BlockID blockID) throws 
IOException {
+    KeyValueContainerData containerData = (KeyValueContainerData) container
+        .getContainerData();
+    try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
+      String blockKey = containerData.getBlockKey(blockID.getLocalID());
+      return db.getStore().getBlockDataTable().isExist(blockKey);
+    }
+  }
+
   /**
    * Shutdown KeyValueContainerManager.
    */
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index 10562f450b..cf65bb819d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -94,6 +94,15 @@ long putBlockForClosedContainer(Container container, 
BlockData data, boolean ove
   List<BlockData> listBlock(Container container, long startLocalID, int count)
       throws IOException;
 
+  /**
+   * Check if a block exists in the container.
+   *
+   * @param container - Container from which blocks need to be listed.
+   * @param blockID - BlockID of the Block.
+   * @return True if block exists, false otherwise.
+   */
+  boolean blockExists(Container container, BlockID blockID) throws IOException;
+
   /**
    * Returns last committed length of the block.
    *
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
index 22559aa378..811e4b483a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
@@ -353,5 +353,6 @@ public static void 
writeContainerDataTreeProto(ContainerData data, ContainerProt
       throw new IOException("Error occurred when writing container merkle tree 
for containerID "
           + data.getContainerID(), ex);
     }
+    
data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
index e0e8930c94..987ff7cf81 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
@@ -39,6 +39,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -339,7 +340,8 @@ public void testContainerWithNoDiff() throws Exception {
     ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
ContainerProtos.ContainerChecksumInfo.newBuilder()
             .setContainerID(container.getContainerID())
             .setContainerMerkleTree(peerMerkleTree.toProto()).build();
-    ContainerDiffReport diff = checksumManager.diff(container, 
peerChecksumInfo);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(), 
peerChecksumInfo);
     
assertTrue(checksumManager.getMetrics().getMerkleTreeDiffLatencyNS().lastStat().total()
 > 0);
     assertFalse(diff.needsRepair());
     assertEquals(checksumManager.getMetrics().getNoRepairContainerDiffs(), 1);
@@ -362,7 +364,8 @@ public void testContainerDiffWithMismatches(int 
numMissingBlock, int numMissingC
     ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
ContainerProtos.ContainerChecksumInfo.newBuilder()
         .setContainerID(container.getContainerID())
         .setContainerMerkleTree(peerMerkleTree.toProto()).build();
-    ContainerDiffReport diff = checksumManager.diff(container, 
peerChecksumInfo);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(), 
peerChecksumInfo);
     assertTrue(metrics.getMerkleTreeDiffLatencyNS().lastStat().total() > 0);
     assertContainerDiffMatch(expectedDiff, diff);
     assertEquals(checksumManager.getMetrics().getRepairContainerDiffs(), 1);
@@ -385,15 +388,19 @@ public void testPeerWithMismatchesHasNoDiff(int 
numMissingBlock, int numMissingC
     ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
ContainerProtos.ContainerChecksumInfo.newBuilder()
         .setContainerID(container.getContainerID())
         .setContainerMerkleTree(peerMerkleTree).build();
-    ContainerDiffReport diff = checksumManager.diff(container, 
peerChecksumInfo);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(), 
peerChecksumInfo);
     assertFalse(diff.needsRepair());
     assertEquals(checksumManager.getMetrics().getNoRepairContainerDiffs(), 1);
   }
 
   @Test
-  public void testFailureContainerMerkleTreeMetric() {
+  public void testFailureContainerMerkleTreeMetric() throws IOException {
     ContainerProtos.ContainerChecksumInfo peerChecksum = 
ContainerProtos.ContainerChecksumInfo.newBuilder().build();
-    assertThrows(StorageContainerException.class, () -> 
checksumManager.diff(container, peerChecksum));
+    ContainerMerkleTreeWriter ourMerkleTree = buildTestTree(config);
+    checksumManager.writeContainerDataTree(container, ourMerkleTree);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    assertThrows(StorageContainerException.class, () -> 
checksumManager.diff(checksumInfo.get(), peerChecksum));
     assertEquals(checksumManager.getMetrics().getMerkleTreeDiffFailure(), 1);
   }
 
@@ -413,7 +420,8 @@ void testDeletedBlocksInPeerAndBoth() throws Exception {
         .addAllDeletedBlocks(deletedBlockList).build();
 
     writeContainerDataTreeProto(container, ourMerkleTree);
-    ContainerDiffReport containerDiff = checksumManager.diff(container, 
peerChecksumInfo);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    ContainerDiffReport containerDiff = 
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
 
     // The diff should not have any missing block/missing chunk/corrupt chunks 
as the blocks are deleted
     // in peer merkle tree.
@@ -423,7 +431,8 @@ void testDeletedBlocksInPeerAndBoth() throws Exception {
 
     // Delete blocks in our merkle tree as well.
     checksumManager.markBlocksAsDeleted(container, deletedBlockList);
-    containerDiff = checksumManager.diff(container, peerChecksumInfo);
+    checksumInfo = checksumManager.read(container);
+    containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
 
     // The diff should not have any missing block/missing chunk/corrupt chunks 
as the blocks are deleted
     // in both merkle tree.
@@ -449,7 +458,8 @@ void testDeletedBlocksInOurContainerOnly() throws Exception 
{
     writeContainerDataTreeProto(container, ourMerkleTree);
     checksumManager.markBlocksAsDeleted(container, deletedBlockList);
 
-    ContainerDiffReport containerDiff = checksumManager.diff(container, 
peerChecksumInfo);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    ContainerDiffReport containerDiff = 
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
 
     // The diff should not have any missing block/missing chunk/corrupt chunks 
as the blocks are deleted
     // in our merkle tree.
@@ -475,7 +485,8 @@ void testCorruptionInOurMerkleTreeAndDeletedBlocksInPeer() 
throws Exception {
 
     writeContainerDataTreeProto(container, ourMerkleTree);
 
-    ContainerDiffReport containerDiff = checksumManager.diff(container, 
peerChecksumInfo);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    ContainerDiffReport containerDiff = 
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
 
     // The diff should not have any missing block/missing chunk/corrupt chunks 
as the blocks are deleted
     // in peer merkle tree.
@@ -499,8 +510,8 @@ void testContainerDiffWithBlockDeletionInPeer() throws 
Exception {
     writeContainerDataTreeProto(container, ourMerkleTree);
 
     ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
peerChecksumInfoBuilder.build();
-
-    ContainerDiffReport containerDiff = checksumManager.diff(container, 
peerChecksumInfo);
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(container);
+    ContainerDiffReport containerDiff = 
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
     // The diff should not have any missing block/missing chunk/corrupt chunks 
as the blocks are deleted
     // in peer merkle tree.
     assertFalse(containerDiff.getMissingBlocks().isEmpty());
@@ -512,7 +523,8 @@ void testContainerDiffWithBlockDeletionInPeer() throws 
Exception {
 
     // Clear deleted blocks to add them in missing blocks.
     peerChecksumInfo = peerChecksumInfoBuilder.clearDeletedBlocks().build();
-    containerDiff = checksumManager.diff(container, peerChecksumInfo);
+    checksumInfo = checksumManager.read(container);
+    containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
 
     assertFalse(containerDiff.getMissingBlocks().isEmpty());
     // Missing block does not contain the deleted blocks 6L to 10L
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index aa771cb6c3..ab8929c18c 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -22,21 +22,25 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
@@ -53,6 +57,8 @@
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -60,6 +66,7 @@
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
@@ -76,6 +83,7 @@
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
@@ -419,4 +427,27 @@ public static XceiverServerRatis newXceiverServerRatis(
         getNoopContainerDispatcher(), getEmptyContainerController(),
         null, null);
   }
+
+  /**
+   * Creates block metadata for the given container with the specified number 
of blocks and chunks per block.
+   */
+  public static void createBlockMetaData(KeyValueContainerData data, int 
numOfBlocksPerContainer,
+                                         int numOfChunksPerBlock) throws 
IOException {
+    try (DBHandle metadata = BlockUtils.getDB(data, new OzoneConfiguration())) 
{
+      for (int j = 0; j < numOfBlocksPerContainer; j++) {
+        BlockID blockID = new BlockID(data.getContainerID(), j);
+        String blockKey = data.getBlockKey(blockID.getLocalID());
+        BlockData kd = new BlockData(blockID);
+        List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+        for (int k = 0; k < numOfChunksPerBlock; k++) {
+          long dataLen = 10L;
+          ChunkInfo chunkInfo = 
ContainerTestHelper.getChunk(blockID.getLocalID(), k, k * dataLen, dataLen);
+          ContainerTestHelper.setDataChecksum(chunkInfo, 
ContainerTestHelper.getData((int) dataLen));
+          chunks.add(chunkInfo.getProtoBufMessage());
+        }
+        kd.setChunks(chunks);
+        metadata.getStore().getBlockDataTable().put(blockKey, kd);
+      }
+    }
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
index 6dab87cbac..d7d01f9349 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
@@ -20,6 +20,7 @@
 import static java.util.Collections.singletonMap;
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -28,6 +29,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -57,6 +59,7 @@
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +76,10 @@ public class TestReconcileContainerCommandHandler {
   private StateContext context;
   private ReconcileContainerCommandHandler subject;
   private ReplicationSupervisor mockSupervisor;
+  @TempDir
+  private Path tempDir;
+  @TempDir
+  private Path dbFile;
 
   public void init(ContainerLayoutVersion layout, 
IncrementalReportSender<Container> icrSender)
       throws Exception {
@@ -93,6 +100,9 @@ public void init(ContainerLayoutVersion layout, 
IncrementalReportSender<Containe
     for (int id = 1; id <= NUM_CONTAINERS; id++) {
       KeyValueContainerData data = new KeyValueContainerData(id, layout, GB,
           PipelineID.randomId().toString(), 
randomDatanodeDetails().getUuidString());
+      data.setMetadataPath(tempDir.toString());
+      data.setDbFile(dbFile.toFile());
+      createBlockMetaData(data, 5, 3);
       containerSet.addContainer(new KeyValueContainer(data, conf));
     }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
index 543bfc17b5..20ad00676b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
@@ -195,7 +195,7 @@ private static void corruptFile(File file) {
     }
   }
 
-  private static File getBlock(Container<?> container, long blockID) {
+  public static File getBlock(Container<?> container, long blockID) {
     File blockFile;
     File chunksDir = new File(container.getContainerData().getContainerPath(),
         "chunks");
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 5e39d33192..33f4faefb6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -26,53 +27,88 @@
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static 
org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions.getBlock;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atMostOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
 import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
@@ -80,15 +116,24 @@
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.util.Sets;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 
 /**
  * Unit tests for {@link KeyValueHandler}.
@@ -101,17 +146,43 @@ public class TestKeyValueHandler {
   @TempDir
   private Path dbFile;
 
-  private static final String DATANODE_UUID = UUID.randomUUID().toString();
-
   private static final long DUMMY_CONTAINER_ID = 9999;
   private static final String DUMMY_PATH = "dummy/dir/doesnt/exist";
+  private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB;
+  private static final int CHUNKS_PER_BLOCK = 4;
+  private static final String DATANODE_UUID = UUID.randomUUID().toString();
+  private static final String CLUSTER_ID = UUID.randomUUID().toString();
 
   private HddsDispatcher dispatcher;
   private KeyValueHandler handler;
+  private OzoneConfiguration conf;
+
+  /**
+   * Number of corrupt blocks and chunks.
+   */
+  public static Stream<Arguments> corruptionValues() {
+    return Stream.of(
+        Arguments.of(5, 0),
+        Arguments.of(0, 5),
+        Arguments.of(0, 10),
+        Arguments.of(10, 0),
+        Arguments.of(5, 10),
+        Arguments.of(10, 5),
+        Arguments.of(2, 3),
+        Arguments.of(3, 2),
+        Arguments.of(4, 6),
+        Arguments.of(6, 4),
+        Arguments.of(6, 9),
+        Arguments.of(9, 6)
+    );
+  }
 
   @BeforeEach
-  public void setup() throws StorageContainerException {
+  public void setup() throws IOException {
     // Create mock HddsDispatcher and KeyValueHandler.
+    conf = new OzoneConfiguration();
+    conf.set(HDDS_DATANODE_DIR_KEY, tempDir.toString());
+    conf.set(OZONE_METADATA_DIRS, tempDir.toString());
     handler = mock(KeyValueHandler.class);
 
     HashMap<ContainerType, Handler> handlers = new HashMap<>();
@@ -283,7 +354,7 @@ public void testVolumeSetInKeyValueHandler() throws 
Exception {
     File metadataDir =
         Files.createDirectory(tempDir.resolve("metadataDir")).toFile();
 
-    OzoneConfiguration conf = new OzoneConfiguration();
+    conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, datanodeDir.getAbsolutePath());
     conf.set(OZONE_METADATA_DIRS, metadataDir.getAbsolutePath());
     MutableVolumeSet
@@ -337,8 +408,8 @@ private ContainerCommandRequestProto 
getDummyCommandRequestProto(
   @ContainerLayoutTestInfo.ContainerTest
   public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
       throws IOException {
-    KeyValueHandler keyValueHandler = createKeyValueHandler();
-    OzoneConfiguration conf = new OzoneConfiguration();
+    KeyValueHandler keyValueHandler = createKeyValueHandler(tempDir);
+    conf = new OzoneConfiguration();
     KeyValueContainerData kvData = new 
KeyValueContainerData(DUMMY_CONTAINER_ID,
         layoutVersion,
         (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
@@ -382,7 +453,7 @@ public void testDeleteContainer() throws IOException {
       final long containerID = 1L;
       final String clusterId = UUID.randomUUID().toString();
       final String datanodeId = UUID.randomUUID().toString();
-      final ConfigurationSource conf = new OzoneConfiguration();
+      conf = new OzoneConfiguration();
       final ContainerSet containerSet = new ContainerSet(1000);
       final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
 
@@ -470,13 +541,16 @@ public void testDeleteContainer() throws IOException {
   }
 
   @ContainerLayoutTestInfo.ContainerTest
-  public void testReconcileContainer(ContainerLayoutVersion layoutVersion) 
throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
+  public void testContainerChecksumInvocation(ContainerLayoutVersion 
layoutVersion) throws Exception {
+    conf = new OzoneConfiguration();
 
     KeyValueContainerData data = new KeyValueContainerData(123L, 
layoutVersion, GB,
         PipelineID.randomId().toString(), 
randomDatanodeDetails().getUuidString());
+    data.setMetadataPath(tempDir.toString());
+    data.setDbFile(dbFile.toFile());
 
     Container container = new KeyValueContainer(data, conf);
+    createBlockMetaData(data, 5, 3);
     ContainerSet containerSet = new ContainerSet(1000);
     containerSet.addContainer(container);
 
@@ -489,7 +563,7 @@ public void testReconcileContainer(ContainerLayoutVersion 
layoutVersion) throws
       Assertions.assertEquals(container.getContainerData().getContainerID(), 
reportedID);
 
       long reportDataChecksum = report.getDataChecksum();
-      Assertions.assertNotEquals(0, reportDataChecksum,
+      assertNotEquals(0, reportDataChecksum,
           "Container report should have populated the checksum field with a 
non-zero value.");
       icrCount.incrementAndGet();
     };
@@ -499,10 +573,140 @@ public void 
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
 
     Assertions.assertEquals(0, icrCount.get());
     // This should trigger container report validation in the ICR handler 
above.
-    keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), 
container, Collections.emptySet());
+    DNContainerOperationClient mockDnClient = 
mock(DNContainerOperationClient.class);
+    DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+    when(mockDnClient.getContainerChecksumInfo(anyLong(), 
any())).thenReturn(null);
+    keyValueHandler.reconcileContainer(mockDnClient, container, 
Sets.newHashSet(peer1, peer2, peer3));
+    // Make sure all the replicas are used for reconciliation.
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
     Assertions.assertEquals(1, icrCount.get());
   }
 
+  @ParameterizedTest
+  @MethodSource("corruptionValues")
+  public void testFullContainerReconciliation(int numBlocks, int numChunks) 
throws Exception {
+    KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+    ContainerChecksumTreeManager checksumManager = 
kvHandler.getChecksumManager();
+    DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
null, null);
+    final long containerID = 100L;
+    // Create 3 containers with 15 blocks each and 3 replicas.
+    List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler, 
containerID, 15, 3);
+    assertEquals(3, containers.size());
+
+    // Introduce corruption in each container on different replicas.
+    introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, 
false);
+    introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, 
true);
+
+    // Without reconciliation, checksums should be different because of the 
corruption.
+    Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+    for (KeyValueContainer kvContainer : containers) {
+      Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+          checksumManager.read(kvContainer.getContainerData());
+      assertTrue(containerChecksumInfo.isPresent());
+      long dataChecksum = 
containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum();
+      assertEquals(kvContainer.getContainerData().getDataChecksum(), 
dataChecksum);
+      checksumsBeforeReconciliation.add(dataChecksum);
+    }
+    // There should be more than 1 checksum because of the corruption.
+    assertTrue(checksumsBeforeReconciliation.size() > 1);
+
+    List<DatanodeDetails> datanodes = 
ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(),
+        randomDatanodeDetails());
+    Map<String, KeyValueContainer> dnToContainerMap = new HashMap<>();
+    dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0));
+    dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1));
+    dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2));
+
+    // Setup mock for each datanode network calls needed for reconciliation.
+    try (MockedStatic<ContainerProtocolCalls> containerProtocolMock =
+             Mockito.mockStatic(ContainerProtocolCalls.class)) {
+      mockContainerProtocolCalls(containerProtocolMock, dnToContainerMap, 
checksumManager, kvHandler, containerID);
+
+      kvHandler.reconcileContainer(dnClient, containers.get(0), 
Sets.newHashSet(datanodes));
+      kvHandler.reconcileContainer(dnClient, containers.get(1), 
Sets.newHashSet(datanodes));
+      kvHandler.reconcileContainer(dnClient, containers.get(2), 
Sets.newHashSet(datanodes));
+
+      // After reconciliation, checksums should be the same for all containers.
+      ContainerProtos.ContainerChecksumInfo prevContainerChecksumInfo = null;
+      for (KeyValueContainer kvContainer : containers) {
+        kvHandler.createContainerMerkleTree(kvContainer);
+        Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+            checksumManager.read(kvContainer.getContainerData());
+        assertTrue(containerChecksumInfo.isPresent());
+        long dataChecksum = 
containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum();
+        assertEquals(kvContainer.getContainerData().getDataChecksum(), 
dataChecksum);
+        if (prevContainerChecksumInfo != null) {
+          
assertEquals(prevContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(),
 dataChecksum);
+        }
+        prevContainerChecksumInfo = containerChecksumInfo.get();
+      }
+    }
+  }
+  private void mockContainerProtocolCalls(MockedStatic<ContainerProtocolCalls> 
containerProtocolMock,
+                                          Map<String, KeyValueContainer> 
dnToContainerMap,
+                                          ContainerChecksumTreeManager 
checksumManager,
+                                          KeyValueHandler kvHandler,
+                                          long containerID) {
+    // Mock getContainerChecksumInfo
+    containerProtocolMock.when(() -> 
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+        .thenAnswer(inv -> {
+          XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+          Pipeline pipeline = xceiverClientSpi.getPipeline();
+          assertEquals(1, pipeline.size());
+          DatanodeDetails dn = pipeline.getFirstNode();
+          KeyValueContainer container = 
dnToContainerMap.get(dn.getUuidString());
+          ByteString checksumInfo = 
checksumManager.getContainerChecksumInfo(container.getContainerData());
+          return 
ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder()
+              .setContainerID(containerID)
+              .setContainerChecksumInfo(checksumInfo)
+              .build();
+        });
+
+    // Mock getBlock
+    containerProtocolMock.when(() -> ContainerProtocolCalls.getBlock(any(), 
any(), any(), any(), anyMap()))
+        .thenAnswer(inv -> {
+          XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+          Pipeline pipeline = xceiverClientSpi.getPipeline();
+          assertEquals(1, pipeline.size());
+          DatanodeDetails dn = pipeline.getFirstNode();
+          KeyValueContainer container = 
dnToContainerMap.get(dn.getUuidString());
+          ContainerProtos.BlockData blockData = 
kvHandler.getBlockManager().getBlock(container, inv.getArgument(2))
+                .getProtoBufMessage();
+          return ContainerProtos.GetBlockResponseProto.newBuilder()
+              .setBlockData(blockData)
+              .build();
+        });
+
+    // Mock readChunk
+    containerProtocolMock.when(() -> ContainerProtocolCalls.readChunk(any(), 
any(), any(), any(), any()))
+        .thenAnswer(inv -> {
+          XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+          Pipeline pipeline = xceiverClientSpi.getPipeline();
+          assertEquals(1, pipeline.size());
+          DatanodeDetails dn = pipeline.getFirstNode();
+          KeyValueContainer container = 
dnToContainerMap.get(dn.getUuidString());
+          return createReadChunkResponse(inv, container, kvHandler);
+        });
+  }
+
+  // Helper method to create readChunk responses
+  private ContainerProtos.ReadChunkResponseProto 
createReadChunkResponse(InvocationOnMock inv,
+                                                                         
KeyValueContainer container,
+                                                                         
KeyValueHandler kvHandler) throws IOException {
+    ContainerProtos.DatanodeBlockID blockId = inv.getArgument(2);
+    ContainerProtos.ChunkInfo chunkInfo = inv.getArgument(1);
+    return ContainerProtos.ReadChunkResponseProto.newBuilder()
+        .setBlockID(blockId)
+        .setChunkData(chunkInfo)
+        .setData(kvHandler.getChunkManager().readChunk(container, 
BlockID.getFromProtobuf(blockId),
+                ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString())
+        .build();
+  }
+
   @Test
   public void testGetContainerChecksumInfoOnInvalidContainerStates() {
     when(handler.handleGetContainerChecksumInfo(any(), 
any())).thenCallRealMethod();
@@ -537,7 +741,7 @@ public void testDeleteContainerTimeout() throws IOException 
{
     final long containerID = 1L;
     final String clusterId = UUID.randomUUID().toString();
     final String datanodeId = UUID.randomUUID().toString();
-    final ConfigurationSource conf = new OzoneConfiguration();
+    conf = new OzoneConfiguration();
     final ContainerSet containerSet = new ContainerSet(1000);
     final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
     final Clock clock = mock(Clock.class);
@@ -605,24 +809,184 @@ private static ContainerCommandRequestProto 
createContainerRequest(
         .build();
   }
 
-  private KeyValueHandler createKeyValueHandler() throws IOException {
-    final String clusterId = UUID.randomUUID().toString();
-    final String datanodeId = UUID.randomUUID().toString();
-    final ConfigurationSource conf = new OzoneConfiguration();
+  private KeyValueHandler createKeyValueHandler(Path path) throws IOException {
     final ContainerSet containerSet = new ContainerSet(1000);
     final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
 
-    HddsVolume hddsVolume = new 
HddsVolume.Builder(tempDir.toString()).conf(conf)
-        .clusterID(clusterId).datanodeUuid(datanodeId)
+    HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf)
+        .clusterID(CLUSTER_ID).datanodeUuid(DATANODE_UUID)
         .volumeSet(volumeSet)
         .build();
-    hddsVolume.format(clusterId);
-    hddsVolume.createWorkingDir(clusterId, null);
-    hddsVolume.createTmpDirs(clusterId);
+    hddsVolume.format(CLUSTER_ID);
+    hddsVolume.createWorkingDir(CLUSTER_ID, null);
+    hddsVolume.createTmpDirs(CLUSTER_ID);
     
when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(hddsVolume));
     final KeyValueHandler kvHandler = 
ContainerTestUtils.getKeyValueHandler(conf,
-        datanodeId, containerSet, volumeSet);
-    kvHandler.setClusterID(clusterId);
+        DATANODE_UUID, containerSet, volumeSet);
+    kvHandler.setClusterID(CLUSTER_ID);
+    // Clean up metrics for next tests.
+    hddsVolume.getVolumeInfoStats().unregister();
+    hddsVolume.getVolumeIOStats().unregister();
+    ContainerMetrics.remove();
     return kvHandler;
   }
+
+  /**
+   * Creates a container with normal and deleted blocks.
+   * First it will insert normal blocks, and then it will insert
+   * deleted blocks.
+   */
+  protected List<KeyValueContainer> createContainerWithBlocks(KeyValueHandler 
kvHandler, long containerId,
+                                                              int blocks, int 
numContainerCopy)
+      throws Exception {
+    String strBlock = "block";
+    String strChunk = "chunkFile";
+    List<KeyValueContainer> containers = new ArrayList<>();
+    MutableVolumeSet volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, 
null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, CLUSTER_ID, CLUSTER_ID, conf);
+    int bytesPerChecksum = 2 * (int) OzoneConsts.KB;
+    Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256,
+        bytesPerChecksum);
+    byte[] chunkData = 
RandomStringUtils.randomAscii(CHUNK_LEN).getBytes(UTF_8);
+    ChecksumData checksumData = checksum.computeChecksum(chunkData);
+
+    for (int j =  0; j < numContainerCopy; j++) {
+      KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
+          ContainerLayoutVersion.FILE_PER_BLOCK, (long) CHUNKS_PER_BLOCK * 
CHUNK_LEN * blocks,
+          UUID.randomUUID().toString(), UUID.randomUUID().toString());
+      Path kvContainerPath = Files.createDirectory(tempDir.resolve(containerId 
+ "-" + j));
+      containerData.setMetadataPath(kvContainerPath.toString());
+      containerData.setDbFile(kvContainerPath.toFile());
+
+      KeyValueContainer container = new KeyValueContainer(containerData, conf);
+      StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
+          .forEach(hddsVolume -> 
hddsVolume.setDbParentDir(kvContainerPath.toFile()));
+      container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), 
UUID.randomUUID().toString());
+      assertNotNull(containerData.getChunksPath());
+      File chunksPath = new File(containerData.getChunksPath());
+      ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 0, 
0);
+
+      List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
+      for (int i = 0; i < blocks; i++) {
+        BlockID blockID = new BlockID(containerId, i);
+        BlockData blockData = new BlockData(blockID);
+
+        chunkList.clear();
+        for (long chunkCount = 0; chunkCount < CHUNKS_PER_BLOCK; chunkCount++) 
{
+          String chunkName = strBlock + i + strChunk + chunkCount;
+          long offset = chunkCount * CHUNK_LEN;
+          ChunkInfo info = new ChunkInfo(chunkName, offset, CHUNK_LEN);
+          info.setChecksumData(checksumData);
+          chunkList.add(info.getProtoBufMessage());
+          kvHandler.getChunkManager().writeChunk(container, blockID, info,
+              ByteBuffer.wrap(chunkData), WRITE_STAGE);
+        }
+        kvHandler.getChunkManager().finishWriteChunks(container, blockData);
+        blockData.setChunks(chunkList);
+        blockData.setBlockCommitSequenceId(i);
+        kvHandler.getBlockManager().putBlock(container, blockData);
+      }
+
+      ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 
blocks, (long) blocks * CHUNKS_PER_BLOCK);
+      container.markContainerForClose();
+      kvHandler.closeContainer(container);
+      containers.add(container);
+    }
+
+    return containers;
+  }
+
+  /**
+   * Introduce corruption in the container.
+   * 1. Delete blocks from the container.
+   * 2. Corrupt chunks at an offset.
+   * If revers is true, the blocks and chunks are deleted in reverse order.
+   */
+  private void introduceCorruption(KeyValueHandler kvHandler, 
KeyValueContainer keyValueContainer, int numBlocks,
+                                   int numChunks, boolean reverse) throws 
IOException {
+    Random random = new Random();
+    KeyValueContainerData containerData = keyValueContainer.getContainerData();
+    // Simulate missing blocks
+    try (DBHandle handle = BlockUtils.getDB(containerData, conf);
+         BatchOperation batch = 
handle.getStore().getBatchHandler().initBatchOperation()) {
+      List<BlockData> blockDataList = 
kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100);
+      int size = blockDataList.size();
+      for (int i = 0; i < numBlocks; i++) {
+        BlockData blockData = reverse ? blockDataList.get(size - 1 - i) : 
blockDataList.get(i);
+        File blockFile = getBlock(keyValueContainer, 
blockData.getBlockID().getLocalID());
+        Assertions.assertTrue(blockFile.delete());
+        handle.getStore().getBlockDataTable().deleteWithBatch(batch, 
containerData.getBlockKey(blockData.getLocalID()));
+      }
+      handle.getStore().getBatchHandler().commitBatchOperation(batch);
+    }
+    
Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(keyValueContainer);
+
+    // Corrupt chunks at an offset.
+    List<BlockData> blockDataList = 
kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100);
+    int size = blockDataList.size();
+    for (int i = 0; i < numChunks; i++) {
+      int blockIndex = reverse ? size - 1 - (i % size) : i % size;
+      BlockData blockData = blockDataList.get(blockIndex);
+      int chunkIndex = i / size;
+      File blockFile = getBlock(keyValueContainer, 
blockData.getBlockID().getLocalID());
+      List<ContainerProtos.ChunkInfo> chunks = new 
ArrayList<>(blockData.getChunks());
+      ContainerProtos.ChunkInfo chunkInfo = chunks.remove(chunkIndex);
+      corruptFileAtOffset(blockFile, (int) chunkInfo.getOffset(), (int) 
chunkInfo.getLen());
+
+      // TODO: On-demand scanner (HDDS-10374) should detect this corruption 
and generate container merkle tree.
+      ContainerProtos.ContainerChecksumInfo.Builder builder = 
kvHandler.getChecksumManager()
+          .read(containerData).get().toBuilder();
+      List<ContainerProtos.BlockMerkleTree> blockMerkleTreeList = 
builder.getContainerMerkleTree()
+          .getBlockMerkleTreeList();
+      assertEquals(size, blockMerkleTreeList.size());
+
+      builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree();
+      for (int j = 0; j < blockMerkleTreeList.size(); j++) {
+        ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder = 
blockMerkleTreeList.get(j).toBuilder();
+        if (j == blockIndex) {
+          List<ContainerProtos.ChunkMerkleTree.Builder> 
chunkMerkleTreeBuilderList =
+              blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList();
+          
chunkMerkleTreeBuilderList.get(chunkIndex).setIsHealthy(false).setDataChecksum(random.nextLong());
+          blockMerkleTreeBuilder.setDataChecksum(random.nextLong());
+        }
+        
builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build());
+      }
+      
builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong());
+      
Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath());
+      writeContainerDataTreeProto(keyValueContainer.getContainerData(), 
builder.getContainerMerkleTree());
+    }
+  }
+
+  /**
+   * Overwrite the file with random bytes at an offset within the given length.
+   */
+  public static void corruptFileAtOffset(File file, int offset, int 
chunkLength) {
+    try {
+      final int fileLength = (int) file.length();
+      assertTrue(fileLength >= offset + chunkLength);
+      final int chunkEnd = offset + chunkLength;
+
+      Path path = file.toPath();
+      final byte[] original = IOUtils.readFully(Files.newInputStream(path), 
fileLength);
+
+      // Corrupt the last byte and middle bytes of the block. The scanner 
should log this as two errors.
+      final byte[] corruptedBytes = Arrays.copyOf(original, fileLength);
+      corruptedBytes[chunkEnd - 1] = (byte) (original[chunkEnd - 1] << 1);
+      final long chunkMid = offset + ((long) chunkLength - offset) / 2;
+      corruptedBytes[(int) (chunkMid / 2)] = (byte) (original[(int) (chunkMid 
/ 2)] << 1);
+
+
+      Files.write(path, corruptedBytes,
+          StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+
+      assertThat(IOUtils.readFully(Files.newInputStream(path), fileLength))
+          .isEqualTo(corruptedBytes)
+          .isNotEqualTo(original);
+    } catch (IOException ex) {
+      // Fail the test.
+      throw new UncheckedIOException(ex);
+    }
+  }
 }
diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot 
b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
index b17973e1f3..55132123cd 100644
--- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
@@ -150,11 +150,10 @@ Close container
 
 Reconcile closed container
     # Check that info does not show replica checksums, since manual 
reconciliation has not yet been triggered.
-    # TODO When the scanner is computing checksums automatically, this test 
may need to be updated.
     ${container} =      Execute          ozone admin container list --state 
CLOSED | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | 
.containerID' | head -1
     ${data_checksum} =  Execute          ozone admin container info 
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
-    # 0 is the hex value of an empty checksum.
-    Should Be Equal As Strings    0    ${data_checksum}
+    # 0 is the hex value of an empty checksum. After container close the data 
checksum should not be 0.
+    Should Not Be Equal As Strings    0    ${data_checksum}
     # When reconciliation finishes, replica checksums should be shown.
     Execute    ozone admin container reconcile ${container}
     Wait until keyword succeeds    1min    5sec    Reconciliation complete    
${container}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index d5e13921d7..f835340ac3 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -377,6 +377,20 @@ public static void waitForContainerClose(MiniOzoneCluster 
cluster,
     }
   }
 
+  public static void waitForScmContainerState(MiniOzoneCluster cluster, long 
containerID,
+                                              HddsProtos.LifeCycleState 
lifeCycleState)
+      throws InterruptedException, TimeoutException {
+    GenericTestUtils.waitFor(() ->  {
+      try {
+        HddsProtos.LifeCycleState state = 
cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueOf(containerID)).getState();
+        return state == lifeCycleState;
+      } catch (ContainerNotFoundException e) {
+        return false;
+      }
+    }, 500, 100 * 1000);
+  }
+
   public static StateMachine getStateMachine(MiniOzoneCluster cluster)
       throws Exception {
     return getStateMachine(cluster.getHddsDatanodes().get(0), null);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
index 2211aaf2b0..f51dbfed43 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
@@ -18,29 +18,70 @@
 package org.apache.hadoop.ozone.dn.checksum;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static 
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile;
 import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch;
 import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -50,15 +91,26 @@
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
 import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
+import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -73,24 +125,34 @@ public class TestContainerCommandReconciliation {
   private static ObjectStore store;
   private static OzoneConfiguration conf;
   private static DNContainerOperationClient dnClient;
+  private static final String KEY_NAME = "testkey";
 
   @TempDir
   private static File testDir;
+  @TempDir
+  private static File workDir;
+  private static MiniKdc miniKdc;
+  private static File ozoneKeytab;
+  private static File spnegoKeytab;
+  private static File testUserKeytab;
+  private static String testUserPrincipal;
+  private static String host;
 
   @BeforeAll
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
+    conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
     conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 128 * 1024, 
StorageUnit.BYTES);
+    conf.setStorageSize(OZONE_SCM_BLOCK_SIZE,  512 * 1024, StorageUnit.BYTES);
     // Disable the container scanner so it does not create merkle tree files 
that interfere with this test.
     conf.getObject(ContainerScannerConfiguration.class).setEnabled(false);
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3)
-        .build();
-    cluster.waitForClusterToBeReady();
-    rpcClient = OzoneClientFactory.getRpcClient(conf);
-    store = rpcClient.getObjectStore();
-    dnClient = new DNContainerOperationClient(conf, null, null);
+
+    startMiniKdc();
+    setSecureConfig();
+    createCredentialsInKDC();
+    setSecretKeysConfig();
+    startCluster();
   }
 
   @AfterAll
@@ -99,8 +161,16 @@ public static void stop() throws IOException {
       rpcClient.close();
     }
 
+    if (dnClient != null) {
+      dnClient.close();
+    }
+
+    if (miniKdc != null) {
+      miniKdc.stop();
+    }
+
     if (cluster != null) {
-      cluster.shutdown();
+      cluster.stop();
     }
   }
 
@@ -110,7 +180,9 @@ public static void stop() throws IOException {
    */
   @Test
   public void testGetChecksumInfoOpenReplica() throws Exception {
-    long containerID = writeDataAndGetContainer(false);
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    long containerID = writeDataAndGetContainer(false, volume, bucket);
     HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
     StorageContainerException ex = 
assertThrows(StorageContainerException.class,
         () -> dnClient.getContainerChecksumInfo(containerID, 
targetDN.getDatanodeDetails()));
@@ -145,12 +217,14 @@ public void testGetChecksumInfoNonexistentReplica() {
    */
   @Test
   public void testGetChecksumInfoNonexistentFile() throws Exception {
-    long containerID = writeDataAndGetContainer(true);
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    long containerID = writeDataAndGetContainer(true, volume, bucket);
     // Pick a datanode and remove its checksum file.
     HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
     Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
         .getContainerSet().getContainer(containerID);
-    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    File treeFile = getContainerChecksumFile(container.getContainerData());
     // Closing the container should have generated the tree file.
     assertTrue(treeFile.exists());
     assertTrue(treeFile.delete());
@@ -168,12 +242,14 @@ public void testGetChecksumInfoNonexistentFile() throws 
Exception {
    */
   @Test
   public void testGetChecksumInfoServerIOError() throws Exception {
-    long containerID = writeDataAndGetContainer(true);
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    long containerID = writeDataAndGetContainer(true, volume, bucket);
     // Pick a datanode and remove its checksum file.
     HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
     Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
         .getContainerSet().getContainer(containerID);
-    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    File treeFile = getContainerChecksumFile(container.getContainerData());
     assertTrue(treeFile.exists());
     // Make the server unable to read the file.
     assertTrue(treeFile.setReadable(false));
@@ -190,13 +266,15 @@ public void testGetChecksumInfoServerIOError() throws 
Exception {
    */
   @Test
   public void testGetCorruptChecksumInfo() throws Exception {
-    long containerID = writeDataAndGetContainer(true);
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    long containerID = writeDataAndGetContainer(true, volume, bucket);
 
     // Pick a datanode and corrupt its checksum file.
     HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
     Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
         .getContainerSet().getContainer(containerID);
-    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    File treeFile = getContainerChecksumFile(container.getContainerData());
     Files.write(treeFile.toPath(), new byte[]{1, 2, 3},
         StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
 
@@ -207,13 +285,15 @@ public void testGetCorruptChecksumInfo() throws Exception 
{
 
   @Test
   public void testGetEmptyChecksumInfo() throws Exception {
-    long containerID = writeDataAndGetContainer(true);
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    long containerID = writeDataAndGetContainer(true, volume, bucket);
 
     // Pick a datanode and truncate its checksum file to zero length.
     HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
     Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
         .getContainerSet().getContainer(containerID);
-    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    File treeFile = getContainerChecksumFile(container.getContainerData());
     // TODO After HDDS-10379 the file will already exist and need to be 
overwritten.
     assertTrue(treeFile.exists());
     Files.write(treeFile.toPath(), new byte[]{},
@@ -229,7 +309,9 @@ public void testGetEmptyChecksumInfo() throws Exception {
 
   @Test
   public void testGetChecksumInfoSuccess() throws Exception {
-    long containerID = writeDataAndGetContainer(true);
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    long containerID = writeDataAndGetContainer(true, volume, bucket);
     // Overwrite the existing tree with a custom one for testing. We will 
check that it is returned properly from the
     // API.
     ContainerMerkleTreeWriter tree = buildTestTree(conf);
@@ -247,26 +329,217 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    // Read the key back and check its hash.
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024 
* 1024, volume, bucket);
+    long containerID = containerAndData.getLeft();
+    byte[] data = containerAndData.getRight();
+    // Get the datanodes where the container replicas are stored.
+    List<DatanodeDetails> dataNodeDetails = 
cluster.getStorageContainerManager().getContainerManager()
+        .getContainerReplicas(ContainerID.valueOf(containerID))
+        .stream().map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+    Assertions.assertEquals(3, dataNodeDetails.size());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanode(dataNodeDetails.get(0));
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+    String chunksPath = container.getContainerData().getChunksPath();
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Delete some blocks to simulate missing blocks.
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (int i = 0; i < blockDataList.size(); i += 2) {
+        BlockData blockData = blockDataList.get(i);
+        // Delete the block metadata from the container db
+        db.getStore().getBlockDataTable().deleteWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()));
+        // Delete the block file.
+        Files.deleteIfExists(Paths.get(chunksPath + "/" + 
blockData.getBlockID().getLocalID() + ".block"));
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    // TODO: Use On-demand container scanner to build the new container merkle 
tree. (HDDS-10374)
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);
+    ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+        readChecksumFile(container.getContainerData());
+    long dataChecksumAfterBlockDelete = 
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+    // Checksum should have changed after block delete.
+    Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+    // Since the container is already closed, we have manually updated the 
container checksum file.
+    // This doesn't update the checksum reported to SCM, and we need to 
trigger an ICR.
+    // Marking a container unhealthy will send an ICR.
+    kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted());
+    waitForDataChecksumsAtSCM(containerID, 2);
+
+    // 3. Reconcile the container.
+    
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+    // Compare and check if dataChecksum is same on all replicas.
+    waitForDataChecksumsAtSCM(containerID, 1);
+    ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+        newContainerChecksumInfo.getContainerMerkleTree());
+    TestHelper.validateData(KEY_NAME, data, store, volume, bucket);
+  }
+
+  @Test
+  public void testContainerChecksumChunkCorruption() throws Exception {
+    // 1. Write data to a container.
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024 
* 1024, volume, bucket);
+    long containerID = containerAndData.getLeft();
+    byte[] data = containerAndData.getRight();
+    // Get the datanodes where the container replicas are stored.
+    List<DatanodeDetails> dataNodeDetails = 
cluster.getStorageContainerManager().getContainerManager()
+        .getContainerReplicas(ContainerID.valueOf(containerID))
+        .stream().map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+    Assertions.assertEquals(3, dataNodeDetails.size());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanode(dataNodeDetails.get(0));
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDatas = blockManager.listBlock(container, -1, 100);
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Corrupt first chunk for all the blocks
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (BlockData blockData : blockDatas) {
+        // Modify the block metadata to simulate chunk corruption.
+        ContainerProtos.BlockData.Builder blockDataBuilder = 
blockData.getProtoBufMessage().toBuilder();
+        blockDataBuilder.clearChunks();
+
+        ContainerProtos.ChunkInfo chunkInfo = blockData.getChunks().get(0);
+        ContainerProtos.ChecksumData.Builder checksumDataBuilder = 
ContainerProtos.ChecksumData.newBuilder()
+            
.setBytesPerChecksum(chunkInfo.getChecksumData().getBytesPerChecksum())
+            .setType(chunkInfo.getChecksumData().getType());
+
+        for (ByteString checksum : 
chunkInfo.getChecksumData().getChecksumsList()) {
+          byte[] checksumBytes = checksum.toByteArray();
+          // Modify the checksum bytes to simulate corruption.
+          checksumBytes[0] = (byte) (checksumBytes[0] - 1);
+          
checksumDataBuilder.addChecksums(ByteString.copyFrom(checksumBytes)).build();
+        }
+        chunkInfo = 
chunkInfo.toBuilder().setChecksumData(checksumDataBuilder.build()).build();
+        blockDataBuilder.addChunks(chunkInfo);
+        for (int i = 1; i < blockData.getChunks().size(); i++) {
+          blockDataBuilder.addChunks(blockData.getChunks().get(i));
+        }
+
+        // Modify the block metadata from the container db to simulate chunk 
corruption.
+        db.getStore().getBlockDataTable().putWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()),
+            BlockData.getFromProtoBuf(blockDataBuilder.build()));
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);
+    // To set unhealthy for chunks that are corrupted.
+    ContainerProtos.ContainerChecksumInfo 
containerChecksumAfterChunkCorruption =
+        readChecksumFile(container.getContainerData());
+    long dataChecksumAfterAfterChunkCorruption = 
containerChecksumAfterChunkCorruption
+        .getContainerMerkleTree().getDataChecksum();
+    // Checksum should have changed after chunk corruption.
+    Assertions.assertNotEquals(oldDataChecksum, 
dataChecksumAfterAfterChunkCorruption);
+
+    // 3. Set Unhealthy for first chunk of all blocks. This should be done by 
the scanner, Until then this is a
+    // manual step.
+    // // TODO: Use On-demand container scanner to build the new container 
merkle tree (HDDS-10374)
+    Random random = new Random();
+    ContainerProtos.ContainerChecksumInfo.Builder builder = 
containerChecksumAfterChunkCorruption.toBuilder();
+    List<ContainerProtos.BlockMerkleTree> blockMerkleTreeList = 
builder.getContainerMerkleTree()
+        .getBlockMerkleTreeList();
+    builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree();
+    for (ContainerProtos.BlockMerkleTree blockMerkleTree : 
blockMerkleTreeList) {
+      ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder = 
blockMerkleTree.toBuilder();
+      List<ContainerProtos.ChunkMerkleTree.Builder> chunkMerkleTreeBuilderList 
=
+          blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList();
+      
chunkMerkleTreeBuilderList.get(0).setIsHealthy(false).setDataChecksum(random.nextLong());
+      blockMerkleTreeBuilder.setDataChecksum(random.nextLong());
+      
builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build());
+    }
+    builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong());
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    writeContainerDataTreeProto(container.getContainerData(), 
builder.getContainerMerkleTree());
+
+    // Since the container is already closed, we have manually updated the 
container checksum file.
+    // This doesn't update the checksum reported to SCM, and we need to 
trigger an ICR.
+    // Marking a container unhealthy will send an ICR.
+    kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted());
+    waitForDataChecksumsAtSCM(containerID, 2);
+
+    // 4. Reconcile the container.
+    
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+    // Compare and check if dataChecksum is same on all replicas.
+    waitForDataChecksumsAtSCM(containerID, 1);
+    ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+        newContainerChecksumInfo.getContainerMerkleTree());
+    Assertions.assertEquals(oldDataChecksum, 
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum());
+    TestHelper.validateData(KEY_NAME, data, store, volume, bucket);
+  }
+
+  private void waitForDataChecksumsAtSCM(long containerID, int expectedSize) 
throws Exception {
+    GenericTestUtils.waitFor(() -> {
+      try {
+        Set<Long> dataChecksums = 
cluster.getStorageContainerLocationClient().getContainerReplicas(containerID,
+                ClientVersion.CURRENT_VERSION).stream()
+            .map(HddsProtos.SCMContainerReplicaProto::getDataChecksum)
+            .collect(Collectors.toSet());
+        return dataChecksums.size() == expectedSize;
+      } catch (Exception ex) {
+        return false;
+      }
+    }, 500, 20000);
+  }
+
+  private Pair<Long, byte[]> getDataAndContainer(boolean close, int dataLen, 
String volumeName, String bucketName)
+          throws Exception {
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    byte[] data = "Test content".getBytes(UTF_8);
+    byte[] data = randomAlphabetic(dataLen).getBytes(UTF_8);
     // Write Key
-    try (OzoneOutputStream os = TestHelper.createKey("testkey", RATIS, THREE, 
0, store, volumeName, bucketName)) {
+    try (OzoneOutputStream os = TestHelper.createKey(KEY_NAME, RATIS, THREE, 
dataLen, store, volumeName, bucketName)) {
       IOUtils.write(data, os);
     }
 
-    long containerID = bucket.getKey("testkey").getOzoneKeyLocations().stream()
+    long containerID = bucket.getKey(KEY_NAME).getOzoneKeyLocations().stream()
         .findFirst().get().getContainerID();
     if (close) {
       TestHelper.waitForContainerClose(cluster, containerID);
+      TestHelper.waitForScmContainerState(cluster, containerID, 
HddsProtos.LifeCycleState.CLOSED);
     }
-    return containerID;
+    return Pair.of(containerID, data);
+  }
+
+  private long writeDataAndGetContainer(boolean close, String volume, String 
bucket) throws Exception {
+    return getDataAndContainer(close, 5, volume, bucket).getLeft();
   }
 
   public static void writeChecksumFileToDatanodes(long containerID, 
ContainerMerkleTreeWriter tree) throws Exception {
@@ -278,8 +551,84 @@ public static void writeChecksumFileToDatanodes(long 
containerID, ContainerMerkl
       KeyValueContainer keyValueContainer =
           (KeyValueContainer) 
dn.getDatanodeStateMachine().getContainer().getController()
               .getContainer(containerID);
-      keyValueHandler.getChecksumManager().writeContainerDataTree(
-          keyValueContainer.getContainerData(), tree);
+      if (keyValueContainer != null) {
+        keyValueHandler.getChecksumManager().writeContainerDataTree(
+            keyValueContainer.getContainerData(), tree);
+      }
     }
   }
+
+  private static void setSecretKeysConfig() {
+    // Secret key lifecycle configs.
+    conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, "500s");
+    conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "500s");
+    conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "500s");
+
+    // enable tokens
+    conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
+    conf.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true);
+  }
+
+  private static void createCredentialsInKDC() throws Exception {
+    ScmConfig scmConfig = conf.getObject(ScmConfig.class);
+    SCMHTTPServerConfig httpServerConfig =
+        conf.getObject(SCMHTTPServerConfig.class);
+    createPrincipal(ozoneKeytab, scmConfig.getKerberosPrincipal());
+    createPrincipal(spnegoKeytab, httpServerConfig.getKerberosPrincipal());
+    createPrincipal(testUserKeytab, testUserPrincipal);
+  }
+
+  private static void createPrincipal(File keytab, String... principal)
+      throws Exception {
+    miniKdc.createPrincipal(keytab, principal);
+  }
+
+  private static void startMiniKdc() throws Exception {
+    Properties securityProperties = MiniKdc.createConf();
+    miniKdc = new MiniKdc(securityProperties, workDir);
+    miniKdc.start();
+  }
+
+  private static void setSecureConfig() throws IOException {
+    conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    host = InetAddress.getLocalHost().getCanonicalHostName()
+        .toLowerCase();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.name());
+    String curUser = UserGroupInformation.getCurrentUser().getUserName();
+    conf.set(OZONE_ADMINISTRATORS, curUser);
+    String realm = miniKdc.getRealm();
+    String hostAndRealm = host + "@" + realm;
+    conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+    conf.set(HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_SCM/" + hostAndRealm);
+    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+    conf.set(OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_OM/" + hostAndRealm);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+
+    ozoneKeytab = new File(workDir, "scm.keytab");
+    spnegoKeytab = new File(workDir, "http.keytab");
+    testUserKeytab = new File(workDir, "testuser.keytab");
+    testUserPrincipal = "test@" + realm;
+
+    conf.set(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY, ozoneKeytab.getAbsolutePath());
+    conf.set(HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY, 
spnegoKeytab.getAbsolutePath());
+    conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, ozoneKeytab.getAbsolutePath());
+    conf.set(OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE, 
spnegoKeytab.getAbsolutePath());
+    conf.set(DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY, 
ozoneKeytab.getAbsolutePath());
+  }
+
+  private static void startCluster() throws Exception {
+    OzoneManager.setTestSecureOmFlag(true);
+    cluster = MiniOzoneCluster.newHABuilder(conf)
+        .setSCMServiceId("SecureSCM")
+        .setNumOfStorageContainerManagers(3)
+        .setNumOfOzoneManagers(1)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+    rpcClient = OzoneClientFactory.getRpcClient(conf);
+    store = rpcClient.getObjectStore();
+    SecretKeyClient secretKeyClient =  
cluster.getStorageContainerManager().getSecretKeyManager();
+    CertificateClient certClient = 
cluster.getStorageContainerManager().getScmCertificateClient();
+    dnClient = new DNContainerOperationClient(conf, certClient, 
secretKeyClient);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to