This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to
refs/heads/HDDS-10239-container-reconciliation by this push:
new b34c537940 HDDS-11763. Implement container repair logic within
datanodes. (#7474)
b34c537940 is described below
commit b34c537940bce95470f2a6af6b0aa4abbbabfb0d
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Fri Apr 11 04:13:46 2025 +0530
HDDS-11763. Implement container repair logic within datanodes. (#7474)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 11 +-
.../hadoop/hdds/scm/storage/ChunkInputStream.java | 4 +
.../java/org/apache/hadoop/hdds/HddsUtils.java | 5 +
.../hadoop/hdds/client/ReplicationConfig.java | 1 +
.../hdds/scm/container/ContainerReplicaInfo.java | 5 +-
.../checksum/ContainerChecksumTreeManager.java | 45 ++-
.../checksum/DNContainerOperationClient.java | 4 +
.../container/common/helpers/ContainerMetrics.java | 10 +
.../container/common/utils/ContainerLogger.java | 23 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 350 +++++++++++++++--
.../container/keyvalue/impl/BlockManagerImpl.java | 15 +-
.../keyvalue/interfaces/BlockManager.java | 9 +
.../checksum/ContainerMerkleTreeTestUtils.java | 1 +
.../checksum/TestContainerChecksumTreeManager.java | 36 +-
.../ozone/container/common/ContainerTestUtils.java | 31 ++
.../TestReconcileContainerCommandHandler.java | 10 +
.../keyvalue/TestContainerCorruptions.java | 2 +-
.../container/keyvalue/TestKeyValueHandler.java | 412 +++++++++++++++++++--
.../src/main/smoketest/admincli/container.robot | 5 +-
.../apache/hadoop/ozone/container/TestHelper.java | 14 +
.../TestContainerCommandReconciliation.java | 409 ++++++++++++++++++--
21 files changed, 1277 insertions(+), 125 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 4867a2aa69..c9fe4ec679 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -112,6 +112,8 @@ public class BlockInputStream extends
BlockExtendedInputStream {
private final Function<BlockID, BlockLocationInfo> refreshFunction;
+ private BlockData blockData;
+
public BlockInputStream(
BlockLocationInfo blockInfo,
Pipeline pipeline,
@@ -153,7 +155,6 @@ public synchronized void initialize() throws IOException {
return;
}
- BlockData blockData = null;
List<ChunkInfo> chunks = null;
IOException catchEx = null;
do {
@@ -554,8 +555,7 @@ public long getLength() {
return length;
}
- @VisibleForTesting
- synchronized int getChunkIndex() {
+ public synchronized int getChunkIndex() {
return chunkIndex;
}
@@ -618,9 +618,12 @@ private void handleReadError(IOException cause) throws
IOException {
refreshBlockInfo(cause);
}
- @VisibleForTesting
public synchronized List<ChunkInputStream> getChunkStreams() {
return chunkStreams;
}
+ public BlockData getStreamBlockData() {
+ return blockData;
+ }
+
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 23c96fc7d6..efad9ff76c 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -747,4 +747,8 @@ public synchronized void unbuffer() {
public ByteBuffer[] getCachedBuffers() {
return BufferUtils.getReadOnlyByteBuffers(buffers);
}
+
+ public ChunkInfo getChunkInfo() {
+ return chunkInfo;
+ }
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 0b2928410c..cce329f91e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -882,6 +882,11 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
: null;
}
+ /** @return Hex string representation of {@code value} */
+ public static String checksumToString(long value) {
+ return Long.toHexString(value);
+ }
+
/**
* Logs a warning to report that the class is not closed properly.
*/
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
index 4bd4709716..20ddf555bc 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
@@ -112,6 +112,7 @@ static HddsProtos.ReplicationFactor getLegacyFactor(
return ((ReplicatedReplicationConfig) replicationConfig)
.getReplicationFactor();
}
+
throw new UnsupportedOperationException(
"Replication configuration of type "
+ replicationConfig.getReplicationType()
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
index b2884e3fb0..24bc4d6d32 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.hdds.scm.container;
+import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
+
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
@@ -26,7 +28,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-
/**
* Class which stores ContainerReplica details on the client.
*/
@@ -102,7 +103,7 @@ public long getDataChecksum() {
private static class LongToHexJsonSerializer extends JsonSerializer<Long> {
@Override
public void serialize(Long value, JsonGenerator gen, SerializerProvider
provider) throws IOException {
- gen.writeString(Long.toHexString(value));
+ gen.writeString(checksumToString(value));
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
index c247380fbe..99b5800c45 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -82,7 +82,9 @@ public void stop() {
* file remains unchanged.
* Concurrent writes to the same file are coordinated internally.
*/
- public void writeContainerDataTree(ContainerData data,
ContainerMerkleTreeWriter tree) throws IOException {
+ public ContainerProtos.ContainerChecksumInfo
writeContainerDataTree(ContainerData data,
+
ContainerMerkleTreeWriter tree)
+ throws IOException {
long containerID = data.getContainerID();
Lock writeLock = getLock(containerID);
writeLock.lock();
@@ -98,11 +100,13 @@ public void writeContainerDataTree(ContainerData data,
ContainerMerkleTreeWriter
checksumInfoBuilder =
ContainerProtos.ContainerChecksumInfo.newBuilder();
}
- checksumInfoBuilder
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumInfoBuilder
.setContainerID(containerID)
-
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
tree::toProto));
- write(data, checksumInfoBuilder.build());
+
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
tree::toProto))
+ .build();
+ write(data, checksumInfo);
LOG.debug("Data merkle tree for container {} updated", containerID);
+ return checksumInfo;
} finally {
writeLock.unlock();
}
@@ -146,33 +150,32 @@ public void markBlocksAsDeleted(KeyValueContainerData
data, Collection<Long> del
}
}
- public ContainerDiffReport diff(KeyValueContainerData thisContainer,
+ /**
+ * Compares the checksum info of the container with the peer's checksum info
and returns a report of the differences.
+ * @param thisChecksumInfo The checksum info of the container on this
datanode.
+ * @param peerChecksumInfo The checksum info of the container on the peer
datanode.
+ */
+ public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo
thisChecksumInfo,
ContainerProtos.ContainerChecksumInfo
peerChecksumInfo) throws
StorageContainerException {
ContainerDiffReport report = new ContainerDiffReport();
try {
captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
- Preconditions.assertNotNull(thisContainer, "Container data is null");
- Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is
null");
- Optional<ContainerProtos.ContainerChecksumInfo>
thisContainerChecksumInfo = read(thisContainer);
- if (!thisContainerChecksumInfo.isPresent()) {
- throw new StorageContainerException("The container #" +
thisContainer.getContainerID() +
- " doesn't have container checksum",
ContainerProtos.Result.IO_EXCEPTION);
- }
-
- if (thisContainer.getContainerID() !=
peerChecksumInfo.getContainerID()) {
- throw new StorageContainerException("Container Id does not match for
container "
- + thisContainer.getContainerID(),
ContainerProtos.Result.CONTAINER_ID_MISMATCH);
+ Preconditions.assertNotNull(thisChecksumInfo, "Datanode's checksum
info is null.");
+ Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is
null.");
+ if (thisChecksumInfo.getContainerID() !=
peerChecksumInfo.getContainerID()) {
+ throw new StorageContainerException("Container ID does not match.
Local container ID "
+ + thisChecksumInfo.getContainerID() + " , Peer container ID " +
peerChecksumInfo.getContainerID(),
+ ContainerProtos.Result.CONTAINER_ID_MISMATCH);
}
- ContainerProtos.ContainerChecksumInfo thisChecksumInfo =
thisContainerChecksumInfo.get();
compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
});
} catch (IOException ex) {
metrics.incrementMerkleTreeDiffFailures();
- throw new StorageContainerException("Container Diff failed for container
#" + thisContainer.getContainerID(), ex,
- ContainerProtos.Result.IO_EXCEPTION);
+ throw new StorageContainerException("Container Diff failed for container
#" + thisChecksumInfo.getContainerID(),
+ ex, ContainerProtos.Result.IO_EXCEPTION);
}
// Update Container Diff metrics based on the diff report.
@@ -314,7 +317,7 @@ private Lock getLock(long containerID) {
* Callers are not required to hold a lock while calling this since writes
are done to a tmp file and atomically
* swapped into place.
*/
- private Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData
data) throws IOException {
+ public Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData
data) throws IOException {
long containerID = data.getContainerID();
File checksumFile = getContainerChecksumFile(data);
try {
@@ -361,6 +364,8 @@ private void write(ContainerData data,
ContainerProtos.ContainerChecksumInfo che
throw new IOException("Error occurred when writing container merkle tree
for containerID "
+ data.getContainerID(), ex);
}
+ // Set in-memory data checksum.
+
data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
index 31c397ce09..d5ba243dd1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
@@ -84,6 +84,10 @@ public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}
+ public TokenHelper getTokenHelper() {
+ return tokenHelper;
+ }
+
/**
* Reads {@link ContainerProtos.ContainerChecksumInfo} for a specified
container for the specified datanode.
*
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index af7662a131..652a90e978 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -56,6 +56,8 @@ public class ContainerMetrics implements Closeable {
@Metric private MutableCounterLong containerForceDelete;
@Metric private MutableCounterLong numReadStateMachine;
@Metric private MutableCounterLong bytesReadStateMachine;
+ @Metric private MutableCounterLong numContainerReconciledWithoutChanges;
+ @Metric private MutableCounterLong numContainerReconciledWithChanges;
private final EnumMap<ContainerProtos.Type, MutableCounterLong> numOpsArray;
@@ -172,4 +174,12 @@ public void incBytesReadStateMachine(long bytes) {
public long getBytesReadStateMachine() {
return bytesReadStateMachine.value();
}
+
+ public void incContainerReconciledWithoutChanges() {
+ numContainerReconciledWithoutChanges.incr();
+ }
+
+ public void incContainerReconciledWithChanges() {
+ numContainerReconciledWithChanges.incr();
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
index 4aacc7c2de..6f20f22a8b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
@@ -17,7 +17,10 @@
package org.apache.hadoop.ozone.container.common.utils;
+import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
+
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
import org.apache.logging.log4j.LogManager;
@@ -145,6 +148,23 @@ public static void logRecovered(ContainerData
containerData) {
LOG.info(getMessage(containerData));
}
+ /**
+ * Logged when a container is reconciled.
+ *
+ * @param containerData The container that was reconciled on this datanode.
+ * @param oldDataChecksum The old data checksum.
+ */
+ public static void logReconciled(ContainerData containerData, long
oldDataChecksum, DatanodeDetails peer) {
+ if (containerData.getDataChecksum() == oldDataChecksum) {
+ LOG.info(getMessage(containerData, "Container reconciled with peer " +
peer.toString() +
+ ". No change in checksum."));
+ } else {
+ LOG.warn(getMessage(containerData, "Container reconciled with peer " +
peer.toString() +
+ ". Checksum updated from " + checksumToString(oldDataChecksum) + "
to "
+ + checksumToString(containerData.getDataChecksum())));
+ }
+ }
+
private static String getMessage(ContainerData containerData,
String message) {
return String.join(FIELD_SEPARATOR, getMessage(containerData), message);
@@ -155,6 +175,7 @@ private static String getMessage(ContainerData
containerData) {
"ID=" + containerData.getContainerID(),
"Index=" + containerData.getReplicaIndex(),
"BCSID=" + containerData.getBlockCommitSequenceId(),
- "State=" + containerData.getState());
+ "State=" + containerData.getState(),
+ "DataChecksum=" + checksumToString(containerData.getDataChecksum()));
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index d4bfa79142..43926ca5e2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -17,10 +17,12 @@
package org.apache.hadoop.ozone.container.keyvalue;
+import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
@@ -34,6 +36,8 @@
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse;
@@ -52,6 +56,7 @@
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import static
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
+import static
org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline;
import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT;
import com.google.common.annotations.VisibleForTesting;
@@ -67,17 +72,23 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -91,21 +102,29 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
-import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.ChunkBufferToByteString;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.ContainerDiffReport;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -136,7 +155,9 @@
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import
org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -158,11 +179,13 @@ public class KeyValueHandler extends Handler {
private final long maxDeleteLockWaitMs;
private final Function<ByteBuffer, ByteString> byteBufferToByteString;
private final boolean validateChunkChecksumData;
+ private final int chunkSize;
// A striped lock that is held during container creation.
private final Striped<Lock> containerCreationLocks;
private final ContainerChecksumTreeManager checksumManager;
private static FaultInjector injector;
private final Clock clock;
+ private final BlockInputStreamFactoryImpl blockInputStreamFactory;
public KeyValueHandler(ConfigurationSource config,
String datanodeId,
@@ -224,6 +247,9 @@ public KeyValueHandler(ConfigurationSource config,
ByteStringConversion
.createByteBufferConversion(isUnsafeByteBufferConversionEnabled);
+ blockInputStreamFactory = new BlockInputStreamFactoryImpl();
+ chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
+
if (ContainerLayoutVersion.getConfiguredVersion(conf) ==
ContainerLayoutVersion.FILE_PER_CHUNK) {
LOG.warn("FILE_PER_CHUNK layout is not supported. Falling back to
default : {}.",
@@ -600,24 +626,20 @@ ContainerCommandResponseProto handleCloseContainer(
return getSuccessResponse(request);
}
- private void createContainerMerkleTree(Container container) {
+
+ /**
+ * Create a Merkle tree for the container if it does not exist.
+ * TODO: This method should be changed to private after HDDS-10374 is merged.
+ */
+ @VisibleForTesting
+ public void createContainerMerkleTree(Container container) {
if (ContainerChecksumTreeManager.checksumFileExist(container)) {
return;
}
try {
KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
- ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
- try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
- BlockIterator<BlockData> blockIterator = dbHandle.getStore().
- getBlockIterator(containerData.getContainerID())) {
- while (blockIterator.hasNext()) {
- BlockData blockData = blockIterator.nextBlock();
- List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
- merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
- }
- }
- checksumManager.writeContainerDataTree(containerData, merkleTree);
+ updateAndGetContainerChecksum(containerData);
} catch (IOException ex) {
LOG.error("Cannot create container checksum for container {} ,
Exception: ",
container.getContainerData().getContainerID(), ex);
@@ -1483,21 +1505,297 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = updateAndGetContainerChecksum(containerData);
+ }
+ long oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ for (DatanodeDetails peer : peers) {
+ long start = Instant.now().toEpochMilli();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ Pipeline pipeline = createSingleNodePipeline(peer);
+ ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, pipeline, dnClient, missingBlock,
chunkByteBuffer);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, pipeline, dnClient,
entry.getKey(), entry.getValue(), chunkByteBuffer);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, pipeline, dnClient,
entry.getKey(), entry.getValue(), chunkByteBuffer);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata. The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateAndGetContainerChecksum(containerData);
+ long dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Instant.now().toEpochMilli() - start;
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateAndGetContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ // TODO: Add empty blocks to the merkle tree. Done in HDDS-10374,
needs to be backported.
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id of the container
and the block are not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, Pipeline
pipeline, DNContainerOperationClient dnClient,
+ ContainerProtos.BlockMerkleTree
missingBlock, ByteBuffer chunkByteBuffer)
+ throws IOException {
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
dnClient.getTokenHelper().getBlockToken(blockID, 0L);
+ if (getBlockManager().blockExists(container, blockID)) {
+ LOG.warn("Block {} already exists in container {}. The block should not
exist and our container merkle tree" +
+ " is stale. Skipping reconciliation for this block.", blockID,
containerData.getContainerID());
+ return;
+ }
+
+ List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Gets the blockData from the peer,
sets the block length and
+ // initializes ChunkInputStream for each chunk.
+ blockInputStream.initialize();
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // The maxBcsId is the peer's bcsId as there is no block for this
blockID in the local container.
+ long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ // Seek to the offset of the chunk. Seek updates the chunkIndex in
the BlockInputStream.
+ blockInputStream.seek(chunkInfoProto.getOffset());
+
+ // Read the chunk data from the BlockInputStream and write it to the
container.
+ int chunkLength = (int) chunkInfoProto.getLen();
+ if (chunkByteBuffer.capacity() < chunkLength) {
+ chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+ }
+
+ chunkByteBuffer.clear();
+ chunkByteBuffer.limit(chunkLength);
+ int bytesRead = blockInputStream.read(chunkByteBuffer);
+ if (bytesRead != chunkLength) {
+ throw new IOException("Error while reading chunk data from block
input stream. Expected length: " +
+ chunkLength + ", Actual length: " + bytesRead);
+ }
+
+ chunkByteBuffer.flip();
+ ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ // If the chunk read/write fails, we are expected to have holes in
the blockData's chunk list.
+ // But that is okay, if the read fails it means there might be a
hole in the peer datanode as well.
+ // If the chunk write fails then we don't want to add the metadata
without the actual data as there is
+ // no data to verify the chunk checksum.
+ successfulChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfulChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ chunkManager.finishWriteChunks(container, putBlockData);
+ }
+ }
+
+ /**
+ * This method reconciles chunks per block. It reads the missing/corrupt
chunk data from the peer
+ * datanode and writes it to the local container. If the chunk write fails,
the block commit sequence
+ * id is not updated.
+ */
+ private void reconcileChunksPerBlock(KeyValueContainer container, Pipeline
pipeline,
+ DNContainerOperationClient dnClient,
long blockId,
+ List<ContainerProtos.ChunkMerkleTree>
chunkList, ByteBuffer chunkByteBuffer)
+ throws IOException {
+
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
dnClient.getTokenHelper().getBlockToken(blockID, 0L);
+ BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+
+ SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap =
localBlockData.getChunks().stream()
+ .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+ Function.identity(), (chunk1, chunk2) -> chunk1, TreeMap::new));
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Gets the blockData from the peer,
sets the block length and
+ // initializes ChunkInputStream for each chunk.
+ blockInputStream.initialize();
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+ localBlockData.getBlockCommitSequenceId());
+
+ for (ContainerProtos.ChunkMerkleTree chunkMerkleTree : chunkList) {
+ long chunkOffset = chunkMerkleTree.getOffset();
+ try {
+ // Seek to the offset of the chunk. Seek updates the chunkIndex in
the BlockInputStream.
+ blockInputStream.seek(chunkOffset);
+ ChunkInputStream currentChunkStream =
blockInputStream.getChunkStreams().get(
+ blockInputStream.getChunkIndex());
+ ContainerProtos.ChunkInfo chunkInfoProto =
currentChunkStream.getChunkInfo();
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ verifyChunksLength(chunkInfoProto, localChunksMap.get(chunkOffset));
+
+ // Read the chunk data from the BlockInputStream and write it to the
container.
+ int chunkLength = (int) chunkInfoProto.getLen();
+ if (chunkByteBuffer.capacity() < chunkLength) {
+ chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+ }
+
+ chunkByteBuffer.clear();
+ chunkByteBuffer.limit(chunkLength);
+ int bytesRead = blockInputStream.read(chunkByteBuffer);
+ if (bytesRead != chunkLength) {
+ throw new IOException("Error while reading chunk data from block
input stream. Expected length: " +
+ chunkLength + ", Actual length: " + bytesRead);
+ }
+
+ chunkByteBuffer.flip();
+ ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ // In reconciling missing chunks which happens at the end of the
block, we are expected to have holes in
+ // the blockData's chunk list because we continue to reconcile even
if there are failures while reconciling
+ // chunks which is fine as we don't update the bcsId.
+ localChunksMap.put(chunkInfo.getOffset(), chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling chunk {} for block {} in
container {}",
+ chunkOffset, blockID, containerData.getContainerID(), ex);
+ }
+ }
+
+ List<ContainerProtos.ChunkInfo> localChunkList = new
ArrayList<>(localChunksMap.values());
+ localBlockData.setChunks(localChunkList);
+ putBlockForClosedContainer(container, localBlockData, maxBcsId,
overwriteBcsId);
+ chunkManager.finishWriteChunks(container, localBlockData);
+ }
+ }
+
+ private void verifyChunksLength(ContainerProtos.ChunkInfo peerChunkInfo,
ContainerProtos.ChunkInfo localChunkInfo)
+ throws StorageContainerException {
+ if (localChunkInfo == null || peerChunkInfo == null) {
+ return;
+ }
+
+ if (peerChunkInfo.getOffset() != localChunkInfo.getOffset()) {
+ throw new StorageContainerException("Offset mismatch for chunk.
Expected: " + localChunkInfo.getOffset() +
+ ", Actual: " + peerChunkInfo.getOffset(), CHUNK_FILE_INCONSISTENCY);
+ }
+
+ if (peerChunkInfo.getLen() != localChunkInfo.getLen()) {
+ throw new StorageContainerException("Length mismatch for chunk at offset
" + localChunkInfo.getOffset() +
+ ". Expected: " + localChunkInfo.getLen() + ", Actual: " +
peerChunkInfo.getLen(), CHUNK_FILE_INCONSISTENCY);
+ }
+ }
+
/**
* Called by BlockDeletingService to delete all the chunks in a block
* before proceeding to delete the block info from DB.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index ba3d404897..20ab37cf1c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -112,8 +112,6 @@ public long putBlockForClosedContainer(Container container,
BlockData data, bool
// We are not locking the key manager since RocksDB serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
try (DBHandle db = BlockUtils.getDB(containerData, config)) {
- // This is a post condition that acts as a hint to the user.
- // Should never fail.
Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
long blockBcsID = data.getBlockCommitSequenceId();
@@ -441,6 +439,19 @@ public List<BlockData> listBlock(Container container, long
startLocalID, int
}
}
+ @Override
+ public boolean blockExists(Container container, BlockID blockID) throws
IOException {
+ KeyValueContainerData containerData = (KeyValueContainerData) container
+ .getContainerData();
+ try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+ // This is a post condition that acts as a hint to the user.
+ // Should never fail.
+ Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
+ String blockKey = containerData.getBlockKey(blockID.getLocalID());
+ return db.getStore().getBlockDataTable().isExist(blockKey);
+ }
+ }
+
/**
* Shutdown KeyValueContainerManager.
*/
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index 10562f450b..cf65bb819d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -94,6 +94,15 @@ long putBlockForClosedContainer(Container container,
BlockData data, boolean ove
List<BlockData> listBlock(Container container, long startLocalID, int count)
throws IOException;
+ /**
+ * Check if a block exists in the container.
+ *
+ * @param container - Container from which blocks need to be listed.
+ * @param blockID - BlockID of the Block.
+ * @return True if block exists, false otherwise.
+ */
+ boolean blockExists(Container container, BlockID blockID) throws IOException;
+
/**
* Returns last committed length of the block.
*
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
index 22559aa378..811e4b483a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
@@ -353,5 +353,6 @@ public static void
writeContainerDataTreeProto(ContainerData data, ContainerProt
throw new IOException("Error occurred when writing container merkle tree
for containerID "
+ data.getContainerID(), ex);
}
+
data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
index e0e8930c94..987ff7cf81 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
@@ -39,6 +39,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -339,7 +340,8 @@ public void testContainerWithNoDiff() throws Exception {
ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
ContainerProtos.ContainerChecksumInfo.newBuilder()
.setContainerID(container.getContainerID())
.setContainerMerkleTree(peerMerkleTree.toProto()).build();
- ContainerDiffReport diff = checksumManager.diff(container,
peerChecksumInfo);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(),
peerChecksumInfo);
assertTrue(checksumManager.getMetrics().getMerkleTreeDiffLatencyNS().lastStat().total()
> 0);
assertFalse(diff.needsRepair());
assertEquals(checksumManager.getMetrics().getNoRepairContainerDiffs(), 1);
@@ -362,7 +364,8 @@ public void testContainerDiffWithMismatches(int
numMissingBlock, int numMissingC
ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
ContainerProtos.ContainerChecksumInfo.newBuilder()
.setContainerID(container.getContainerID())
.setContainerMerkleTree(peerMerkleTree.toProto()).build();
- ContainerDiffReport diff = checksumManager.diff(container,
peerChecksumInfo);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(),
peerChecksumInfo);
assertTrue(metrics.getMerkleTreeDiffLatencyNS().lastStat().total() > 0);
assertContainerDiffMatch(expectedDiff, diff);
assertEquals(checksumManager.getMetrics().getRepairContainerDiffs(), 1);
@@ -385,15 +388,19 @@ public void testPeerWithMismatchesHasNoDiff(int
numMissingBlock, int numMissingC
ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
ContainerProtos.ContainerChecksumInfo.newBuilder()
.setContainerID(container.getContainerID())
.setContainerMerkleTree(peerMerkleTree).build();
- ContainerDiffReport diff = checksumManager.diff(container,
peerChecksumInfo);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(),
peerChecksumInfo);
assertFalse(diff.needsRepair());
assertEquals(checksumManager.getMetrics().getNoRepairContainerDiffs(), 1);
}
@Test
- public void testFailureContainerMerkleTreeMetric() {
+ public void testFailureContainerMerkleTreeMetric() throws IOException {
ContainerProtos.ContainerChecksumInfo peerChecksum =
ContainerProtos.ContainerChecksumInfo.newBuilder().build();
- assertThrows(StorageContainerException.class, () ->
checksumManager.diff(container, peerChecksum));
+ ContainerMerkleTreeWriter ourMerkleTree = buildTestTree(config);
+ checksumManager.writeContainerDataTree(container, ourMerkleTree);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ assertThrows(StorageContainerException.class, () ->
checksumManager.diff(checksumInfo.get(), peerChecksum));
assertEquals(checksumManager.getMetrics().getMerkleTreeDiffFailure(), 1);
}
@@ -413,7 +420,8 @@ void testDeletedBlocksInPeerAndBoth() throws Exception {
.addAllDeletedBlocks(deletedBlockList).build();
writeContainerDataTreeProto(container, ourMerkleTree);
- ContainerDiffReport containerDiff = checksumManager.diff(container,
peerChecksumInfo);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ ContainerDiffReport containerDiff =
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
// The diff should not have any missing block/missing chunk/corrupt chunks
as the blocks are deleted
// in peer merkle tree.
@@ -423,7 +431,8 @@ void testDeletedBlocksInPeerAndBoth() throws Exception {
// Delete blocks in our merkle tree as well.
checksumManager.markBlocksAsDeleted(container, deletedBlockList);
- containerDiff = checksumManager.diff(container, peerChecksumInfo);
+ checksumInfo = checksumManager.read(container);
+ containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
// The diff should not have any missing block/missing chunk/corrupt chunks
as the blocks are deleted
// in both merkle tree.
@@ -449,7 +458,8 @@ void testDeletedBlocksInOurContainerOnly() throws Exception
{
writeContainerDataTreeProto(container, ourMerkleTree);
checksumManager.markBlocksAsDeleted(container, deletedBlockList);
- ContainerDiffReport containerDiff = checksumManager.diff(container,
peerChecksumInfo);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ ContainerDiffReport containerDiff =
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
// The diff should not have any missing block/missing chunk/corrupt chunks
as the blocks are deleted
// in our merkle tree.
@@ -475,7 +485,8 @@ void testCorruptionInOurMerkleTreeAndDeletedBlocksInPeer()
throws Exception {
writeContainerDataTreeProto(container, ourMerkleTree);
- ContainerDiffReport containerDiff = checksumManager.diff(container,
peerChecksumInfo);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ ContainerDiffReport containerDiff =
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
// The diff should not have any missing block/missing chunk/corrupt chunks
as the blocks are deleted
// in peer merkle tree.
@@ -499,8 +510,8 @@ void testContainerDiffWithBlockDeletionInPeer() throws
Exception {
writeContainerDataTreeProto(container, ourMerkleTree);
ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
peerChecksumInfoBuilder.build();
-
- ContainerDiffReport containerDiff = checksumManager.diff(container,
peerChecksumInfo);
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(container);
+ ContainerDiffReport containerDiff =
checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
// The diff should not have any missing block/missing chunk/corrupt chunks
as the blocks are deleted
// in peer merkle tree.
assertFalse(containerDiff.getMissingBlocks().isEmpty());
@@ -512,7 +523,8 @@ void testContainerDiffWithBlockDeletionInPeer() throws
Exception {
// Clear deleted blocks to add them in missing blocks.
peerChecksumInfo = peerChecksumInfoBuilder.clearDeletedBlocks().build();
- containerDiff = checksumManager.diff(container, peerChecksumInfo);
+ checksumInfo = checksumManager.read(container);
+ containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo);
assertFalse(containerDiff.getMissingBlocks().isEmpty());
// Missing block does not contain the deleted blocks 6L to 10L
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index aa771cb6c3..ab8929c18c 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -22,21 +22,25 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
@@ -53,6 +57,8 @@
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -60,6 +66,7 @@
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
@@ -76,6 +83,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
@@ -419,4 +427,27 @@ public static XceiverServerRatis newXceiverServerRatis(
getNoopContainerDispatcher(), getEmptyContainerController(),
null, null);
}
+
+ /**
+ * Creates block metadata for the given container with the specified number
of blocks and chunks per block.
+ */
+ public static void createBlockMetaData(KeyValueContainerData data, int
numOfBlocksPerContainer,
+ int numOfChunksPerBlock) throws
IOException {
+ try (DBHandle metadata = BlockUtils.getDB(data, new OzoneConfiguration()))
{
+ for (int j = 0; j < numOfBlocksPerContainer; j++) {
+ BlockID blockID = new BlockID(data.getContainerID(), j);
+ String blockKey = data.getBlockKey(blockID.getLocalID());
+ BlockData kd = new BlockData(blockID);
+ List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+ for (int k = 0; k < numOfChunksPerBlock; k++) {
+ long dataLen = 10L;
+ ChunkInfo chunkInfo =
ContainerTestHelper.getChunk(blockID.getLocalID(), k, k * dataLen, dataLen);
+ ContainerTestHelper.setDataChecksum(chunkInfo,
ContainerTestHelper.getData((int) dataLen));
+ chunks.add(chunkInfo.getProtoBufMessage());
+ }
+ kd.setChunks(chunks);
+ metadata.getStore().getBlockDataTable().put(blockKey, kd);
+ }
+ }
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
index 6dab87cbac..d7d01f9349 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
@@ -20,6 +20,7 @@
import static java.util.Collections.singletonMap;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -28,6 +29,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -57,6 +59,7 @@
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +76,10 @@ public class TestReconcileContainerCommandHandler {
private StateContext context;
private ReconcileContainerCommandHandler subject;
private ReplicationSupervisor mockSupervisor;
+ @TempDir
+ private Path tempDir;
+ @TempDir
+ private Path dbFile;
public void init(ContainerLayoutVersion layout,
IncrementalReportSender<Container> icrSender)
throws Exception {
@@ -93,6 +100,9 @@ public void init(ContainerLayoutVersion layout,
IncrementalReportSender<Containe
for (int id = 1; id <= NUM_CONTAINERS; id++) {
KeyValueContainerData data = new KeyValueContainerData(id, layout, GB,
PipelineID.randomId().toString(),
randomDatanodeDetails().getUuidString());
+ data.setMetadataPath(tempDir.toString());
+ data.setDbFile(dbFile.toFile());
+ createBlockMetaData(data, 5, 3);
containerSet.addContainer(new KeyValueContainer(data, conf));
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
index 543bfc17b5..20ad00676b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java
@@ -195,7 +195,7 @@ private static void corruptFile(File file) {
}
}
- private static File getBlock(Container<?> container, long blockID) {
+ public static File getBlock(Container<?> container, long blockID) {
File blockFile;
File chunksDir = new File(container.getContainerData().getContainerPath(),
"chunks");
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 5e39d33192..33f4faefb6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.container.keyvalue;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -26,53 +27,88 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static
org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile;
+import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto;
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData;
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static
org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions.getBlock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
@@ -80,15 +116,24 @@
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.util.Sets;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
/**
* Unit tests for {@link KeyValueHandler}.
@@ -101,17 +146,43 @@ public class TestKeyValueHandler {
@TempDir
private Path dbFile;
- private static final String DATANODE_UUID = UUID.randomUUID().toString();
-
private static final long DUMMY_CONTAINER_ID = 9999;
private static final String DUMMY_PATH = "dummy/dir/doesnt/exist";
+ private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB;
+ private static final int CHUNKS_PER_BLOCK = 4;
+ private static final String DATANODE_UUID = UUID.randomUUID().toString();
+ private static final String CLUSTER_ID = UUID.randomUUID().toString();
private HddsDispatcher dispatcher;
private KeyValueHandler handler;
+ private OzoneConfiguration conf;
+
+ /**
+ * Number of corrupt blocks and chunks.
+ */
+ public static Stream<Arguments> corruptionValues() {
+ return Stream.of(
+ Arguments.of(5, 0),
+ Arguments.of(0, 5),
+ Arguments.of(0, 10),
+ Arguments.of(10, 0),
+ Arguments.of(5, 10),
+ Arguments.of(10, 5),
+ Arguments.of(2, 3),
+ Arguments.of(3, 2),
+ Arguments.of(4, 6),
+ Arguments.of(6, 4),
+ Arguments.of(6, 9),
+ Arguments.of(9, 6)
+ );
+ }
@BeforeEach
- public void setup() throws StorageContainerException {
+ public void setup() throws IOException {
// Create mock HddsDispatcher and KeyValueHandler.
+ conf = new OzoneConfiguration();
+ conf.set(HDDS_DATANODE_DIR_KEY, tempDir.toString());
+ conf.set(OZONE_METADATA_DIRS, tempDir.toString());
handler = mock(KeyValueHandler.class);
HashMap<ContainerType, Handler> handlers = new HashMap<>();
@@ -283,7 +354,7 @@ public void testVolumeSetInKeyValueHandler() throws
Exception {
File metadataDir =
Files.createDirectory(tempDir.resolve("metadataDir")).toFile();
- OzoneConfiguration conf = new OzoneConfiguration();
+ conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, datanodeDir.getAbsolutePath());
conf.set(OZONE_METADATA_DIRS, metadataDir.getAbsolutePath());
MutableVolumeSet
@@ -337,8 +408,8 @@ private ContainerCommandRequestProto
getDummyCommandRequestProto(
@ContainerLayoutTestInfo.ContainerTest
public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
throws IOException {
- KeyValueHandler keyValueHandler = createKeyValueHandler();
- OzoneConfiguration conf = new OzoneConfiguration();
+ KeyValueHandler keyValueHandler = createKeyValueHandler(tempDir);
+ conf = new OzoneConfiguration();
KeyValueContainerData kvData = new
KeyValueContainerData(DUMMY_CONTAINER_ID,
layoutVersion,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
@@ -382,7 +453,7 @@ public void testDeleteContainer() throws IOException {
final long containerID = 1L;
final String clusterId = UUID.randomUUID().toString();
final String datanodeId = UUID.randomUUID().toString();
- final ConfigurationSource conf = new OzoneConfiguration();
+ conf = new OzoneConfiguration();
final ContainerSet containerSet = new ContainerSet(1000);
final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
@@ -470,13 +541,16 @@ public void testDeleteContainer() throws IOException {
}
@ContainerLayoutTestInfo.ContainerTest
- public void testReconcileContainer(ContainerLayoutVersion layoutVersion)
throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
+ public void testContainerChecksumInvocation(ContainerLayoutVersion
layoutVersion) throws Exception {
+ conf = new OzoneConfiguration();
KeyValueContainerData data = new KeyValueContainerData(123L,
layoutVersion, GB,
PipelineID.randomId().toString(),
randomDatanodeDetails().getUuidString());
+ data.setMetadataPath(tempDir.toString());
+ data.setDbFile(dbFile.toFile());
Container container = new KeyValueContainer(data, conf);
+ createBlockMetaData(data, 5, 3);
ContainerSet containerSet = new ContainerSet(1000);
containerSet.addContainer(container);
@@ -489,7 +563,7 @@ public void testReconcileContainer(ContainerLayoutVersion
layoutVersion) throws
Assertions.assertEquals(container.getContainerData().getContainerID(),
reportedID);
long reportDataChecksum = report.getDataChecksum();
- Assertions.assertNotEquals(0, reportDataChecksum,
+ assertNotEquals(0, reportDataChecksum,
"Container report should have populated the checksum field with a
non-zero value.");
icrCount.incrementAndGet();
};
@@ -499,10 +573,140 @@ public void
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
Assertions.assertEquals(0, icrCount.get());
// This should trigger container report validation in the ICR handler
above.
- keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class),
container, Collections.emptySet());
+ DNContainerOperationClient mockDnClient =
mock(DNContainerOperationClient.class);
+ DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+ when(mockDnClient.getContainerChecksumInfo(anyLong(),
any())).thenReturn(null);
+ keyValueHandler.reconcileContainer(mockDnClient, container,
Sets.newHashSet(peer1, peer2, peer3));
+ // Make sure all the replicas are used for reconciliation.
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
Assertions.assertEquals(1, icrCount.get());
}
+ @ParameterizedTest
+ @MethodSource("corruptionValues")
+ public void testFullContainerReconciliation(int numBlocks, int numChunks)
throws Exception {
+ KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+ ContainerChecksumTreeManager checksumManager =
kvHandler.getChecksumManager();
+ DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
null, null);
+ final long containerID = 100L;
+ // Create 3 containers with 15 blocks each and 3 replicas.
+ List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler,
containerID, 15, 3);
+ assertEquals(3, containers.size());
+
+ // Introduce corruption in each container on different replicas.
+ introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks,
false);
+ introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks,
true);
+
+ // Without reconciliation, checksums should be different because of the
corruption.
+ Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+ for (KeyValueContainer kvContainer : containers) {
+ Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+ checksumManager.read(kvContainer.getContainerData());
+ assertTrue(containerChecksumInfo.isPresent());
+ long dataChecksum =
containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ assertEquals(kvContainer.getContainerData().getDataChecksum(),
dataChecksum);
+ checksumsBeforeReconciliation.add(dataChecksum);
+ }
+ // There should be more than 1 checksum because of the corruption.
+ assertTrue(checksumsBeforeReconciliation.size() > 1);
+
+ List<DatanodeDetails> datanodes =
ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(),
+ randomDatanodeDetails());
+ Map<String, KeyValueContainer> dnToContainerMap = new HashMap<>();
+ dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0));
+ dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1));
+ dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2));
+
+ // Setup mock for each datanode network calls needed for reconciliation.
+ try (MockedStatic<ContainerProtocolCalls> containerProtocolMock =
+ Mockito.mockStatic(ContainerProtocolCalls.class)) {
+ mockContainerProtocolCalls(containerProtocolMock, dnToContainerMap,
checksumManager, kvHandler, containerID);
+
+ kvHandler.reconcileContainer(dnClient, containers.get(0),
Sets.newHashSet(datanodes));
+ kvHandler.reconcileContainer(dnClient, containers.get(1),
Sets.newHashSet(datanodes));
+ kvHandler.reconcileContainer(dnClient, containers.get(2),
Sets.newHashSet(datanodes));
+
+ // After reconciliation, checksums should be the same for all containers.
+ ContainerProtos.ContainerChecksumInfo prevContainerChecksumInfo = null;
+ for (KeyValueContainer kvContainer : containers) {
+ kvHandler.createContainerMerkleTree(kvContainer);
+ Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+ checksumManager.read(kvContainer.getContainerData());
+ assertTrue(containerChecksumInfo.isPresent());
+ long dataChecksum =
containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ assertEquals(kvContainer.getContainerData().getDataChecksum(),
dataChecksum);
+ if (prevContainerChecksumInfo != null) {
+
assertEquals(prevContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(),
dataChecksum);
+ }
+ prevContainerChecksumInfo = containerChecksumInfo.get();
+ }
+ }
+ }
+ private void mockContainerProtocolCalls(MockedStatic<ContainerProtocolCalls>
containerProtocolMock,
+ Map<String, KeyValueContainer>
dnToContainerMap,
+ ContainerChecksumTreeManager
checksumManager,
+ KeyValueHandler kvHandler,
+ long containerID) {
+ // Mock getContainerChecksumInfo
+ containerProtocolMock.when(() ->
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+ .thenAnswer(inv -> {
+ XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+ Pipeline pipeline = xceiverClientSpi.getPipeline();
+ assertEquals(1, pipeline.size());
+ DatanodeDetails dn = pipeline.getFirstNode();
+ KeyValueContainer container =
dnToContainerMap.get(dn.getUuidString());
+ ByteString checksumInfo =
checksumManager.getContainerChecksumInfo(container.getContainerData());
+ return
ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder()
+ .setContainerID(containerID)
+ .setContainerChecksumInfo(checksumInfo)
+ .build();
+ });
+
+ // Mock getBlock
+ containerProtocolMock.when(() -> ContainerProtocolCalls.getBlock(any(),
any(), any(), any(), anyMap()))
+ .thenAnswer(inv -> {
+ XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+ Pipeline pipeline = xceiverClientSpi.getPipeline();
+ assertEquals(1, pipeline.size());
+ DatanodeDetails dn = pipeline.getFirstNode();
+ KeyValueContainer container =
dnToContainerMap.get(dn.getUuidString());
+ ContainerProtos.BlockData blockData =
kvHandler.getBlockManager().getBlock(container, inv.getArgument(2))
+ .getProtoBufMessage();
+ return ContainerProtos.GetBlockResponseProto.newBuilder()
+ .setBlockData(blockData)
+ .build();
+ });
+
+ // Mock readChunk
+ containerProtocolMock.when(() -> ContainerProtocolCalls.readChunk(any(),
any(), any(), any(), any()))
+ .thenAnswer(inv -> {
+ XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+ Pipeline pipeline = xceiverClientSpi.getPipeline();
+ assertEquals(1, pipeline.size());
+ DatanodeDetails dn = pipeline.getFirstNode();
+ KeyValueContainer container =
dnToContainerMap.get(dn.getUuidString());
+ return createReadChunkResponse(inv, container, kvHandler);
+ });
+ }
+
+ // Helper method to create readChunk responses
+ private ContainerProtos.ReadChunkResponseProto
createReadChunkResponse(InvocationOnMock inv,
+
KeyValueContainer container,
+
KeyValueHandler kvHandler) throws IOException {
+ ContainerProtos.DatanodeBlockID blockId = inv.getArgument(2);
+ ContainerProtos.ChunkInfo chunkInfo = inv.getArgument(1);
+ return ContainerProtos.ReadChunkResponseProto.newBuilder()
+ .setBlockID(blockId)
+ .setChunkData(chunkInfo)
+ .setData(kvHandler.getChunkManager().readChunk(container,
BlockID.getFromProtobuf(blockId),
+ ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString())
+ .build();
+ }
+
@Test
public void testGetContainerChecksumInfoOnInvalidContainerStates() {
when(handler.handleGetContainerChecksumInfo(any(),
any())).thenCallRealMethod();
@@ -537,7 +741,7 @@ public void testDeleteContainerTimeout() throws IOException
{
final long containerID = 1L;
final String clusterId = UUID.randomUUID().toString();
final String datanodeId = UUID.randomUUID().toString();
- final ConfigurationSource conf = new OzoneConfiguration();
+ conf = new OzoneConfiguration();
final ContainerSet containerSet = new ContainerSet(1000);
final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
final Clock clock = mock(Clock.class);
@@ -605,24 +809,184 @@ private static ContainerCommandRequestProto
createContainerRequest(
.build();
}
- private KeyValueHandler createKeyValueHandler() throws IOException {
- final String clusterId = UUID.randomUUID().toString();
- final String datanodeId = UUID.randomUUID().toString();
- final ConfigurationSource conf = new OzoneConfiguration();
+ private KeyValueHandler createKeyValueHandler(Path path) throws IOException {
final ContainerSet containerSet = new ContainerSet(1000);
final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
- HddsVolume hddsVolume = new
HddsVolume.Builder(tempDir.toString()).conf(conf)
- .clusterID(clusterId).datanodeUuid(datanodeId)
+ HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf)
+ .clusterID(CLUSTER_ID).datanodeUuid(DATANODE_UUID)
.volumeSet(volumeSet)
.build();
- hddsVolume.format(clusterId);
- hddsVolume.createWorkingDir(clusterId, null);
- hddsVolume.createTmpDirs(clusterId);
+ hddsVolume.format(CLUSTER_ID);
+ hddsVolume.createWorkingDir(CLUSTER_ID, null);
+ hddsVolume.createTmpDirs(CLUSTER_ID);
when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(hddsVolume));
final KeyValueHandler kvHandler =
ContainerTestUtils.getKeyValueHandler(conf,
- datanodeId, containerSet, volumeSet);
- kvHandler.setClusterID(clusterId);
+ DATANODE_UUID, containerSet, volumeSet);
+ kvHandler.setClusterID(CLUSTER_ID);
+ // Clean up metrics for next tests.
+ hddsVolume.getVolumeInfoStats().unregister();
+ hddsVolume.getVolumeIOStats().unregister();
+ ContainerMetrics.remove();
return kvHandler;
}
+
+ /**
+ * Creates a container with normal and deleted blocks.
+ * First it will insert normal blocks, and then it will insert
+ * deleted blocks.
+ */
+ protected List<KeyValueContainer> createContainerWithBlocks(KeyValueHandler
kvHandler, long containerId,
+ int blocks, int
numContainerCopy)
+ throws Exception {
+ String strBlock = "block";
+ String strChunk = "chunkFile";
+ List<KeyValueContainer> containers = new ArrayList<>();
+ MutableVolumeSet volumeSet = new MutableVolumeSet(DATANODE_UUID, conf,
null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ createDbInstancesForTestIfNeeded(volumeSet, CLUSTER_ID, CLUSTER_ID, conf);
+ int bytesPerChecksum = 2 * (int) OzoneConsts.KB;
+ Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256,
+ bytesPerChecksum);
+ byte[] chunkData =
RandomStringUtils.randomAscii(CHUNK_LEN).getBytes(UTF_8);
+ ChecksumData checksumData = checksum.computeChecksum(chunkData);
+
+ for (int j = 0; j < numContainerCopy; j++) {
+ KeyValueContainerData containerData = new
KeyValueContainerData(containerId,
+ ContainerLayoutVersion.FILE_PER_BLOCK, (long) CHUNKS_PER_BLOCK *
CHUNK_LEN * blocks,
+ UUID.randomUUID().toString(), UUID.randomUUID().toString());
+ Path kvContainerPath = Files.createDirectory(tempDir.resolve(containerId
+ "-" + j));
+ containerData.setMetadataPath(kvContainerPath.toString());
+ containerData.setDbFile(kvContainerPath.toFile());
+
+ KeyValueContainer container = new KeyValueContainer(containerData, conf);
+ StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
+ .forEach(hddsVolume ->
hddsVolume.setDbParentDir(kvContainerPath.toFile()));
+ container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
UUID.randomUUID().toString());
+ assertNotNull(containerData.getChunksPath());
+ File chunksPath = new File(containerData.getChunksPath());
+ ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 0,
0);
+
+ List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
+ for (int i = 0; i < blocks; i++) {
+ BlockID blockID = new BlockID(containerId, i);
+ BlockData blockData = new BlockData(blockID);
+
+ chunkList.clear();
+ for (long chunkCount = 0; chunkCount < CHUNKS_PER_BLOCK; chunkCount++)
{
+ String chunkName = strBlock + i + strChunk + chunkCount;
+ long offset = chunkCount * CHUNK_LEN;
+ ChunkInfo info = new ChunkInfo(chunkName, offset, CHUNK_LEN);
+ info.setChecksumData(checksumData);
+ chunkList.add(info.getProtoBufMessage());
+ kvHandler.getChunkManager().writeChunk(container, blockID, info,
+ ByteBuffer.wrap(chunkData), WRITE_STAGE);
+ }
+ kvHandler.getChunkManager().finishWriteChunks(container, blockData);
+ blockData.setChunks(chunkList);
+ blockData.setBlockCommitSequenceId(i);
+ kvHandler.getBlockManager().putBlock(container, blockData);
+ }
+
+ ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath,
blocks, (long) blocks * CHUNKS_PER_BLOCK);
+ container.markContainerForClose();
+ kvHandler.closeContainer(container);
+ containers.add(container);
+ }
+
+ return containers;
+ }
+
+ /**
+ * Introduce corruption in the container.
+ * 1. Delete blocks from the container.
+ * 2. Corrupt chunks at an offset.
+ * If revers is true, the blocks and chunks are deleted in reverse order.
+ */
+ private void introduceCorruption(KeyValueHandler kvHandler,
KeyValueContainer keyValueContainer, int numBlocks,
+ int numChunks, boolean reverse) throws
IOException {
+ Random random = new Random();
+ KeyValueContainerData containerData = keyValueContainer.getContainerData();
+ // Simulate missing blocks
+ try (DBHandle handle = BlockUtils.getDB(containerData, conf);
+ BatchOperation batch =
handle.getStore().getBatchHandler().initBatchOperation()) {
+ List<BlockData> blockDataList =
kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100);
+ int size = blockDataList.size();
+ for (int i = 0; i < numBlocks; i++) {
+ BlockData blockData = reverse ? blockDataList.get(size - 1 - i) :
blockDataList.get(i);
+ File blockFile = getBlock(keyValueContainer,
blockData.getBlockID().getLocalID());
+ Assertions.assertTrue(blockFile.delete());
+ handle.getStore().getBlockDataTable().deleteWithBatch(batch,
containerData.getBlockKey(blockData.getLocalID()));
+ }
+ handle.getStore().getBatchHandler().commitBatchOperation(batch);
+ }
+
Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath());
+ kvHandler.createContainerMerkleTree(keyValueContainer);
+
+ // Corrupt chunks at an offset.
+ List<BlockData> blockDataList =
kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100);
+ int size = blockDataList.size();
+ for (int i = 0; i < numChunks; i++) {
+ int blockIndex = reverse ? size - 1 - (i % size) : i % size;
+ BlockData blockData = blockDataList.get(blockIndex);
+ int chunkIndex = i / size;
+ File blockFile = getBlock(keyValueContainer,
blockData.getBlockID().getLocalID());
+ List<ContainerProtos.ChunkInfo> chunks = new
ArrayList<>(blockData.getChunks());
+ ContainerProtos.ChunkInfo chunkInfo = chunks.remove(chunkIndex);
+ corruptFileAtOffset(blockFile, (int) chunkInfo.getOffset(), (int)
chunkInfo.getLen());
+
+ // TODO: On-demand scanner (HDDS-10374) should detect this corruption
and generate container merkle tree.
+ ContainerProtos.ContainerChecksumInfo.Builder builder =
kvHandler.getChecksumManager()
+ .read(containerData).get().toBuilder();
+ List<ContainerProtos.BlockMerkleTree> blockMerkleTreeList =
builder.getContainerMerkleTree()
+ .getBlockMerkleTreeList();
+ assertEquals(size, blockMerkleTreeList.size());
+
+ builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree();
+ for (int j = 0; j < blockMerkleTreeList.size(); j++) {
+ ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder =
blockMerkleTreeList.get(j).toBuilder();
+ if (j == blockIndex) {
+ List<ContainerProtos.ChunkMerkleTree.Builder>
chunkMerkleTreeBuilderList =
+ blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList();
+
chunkMerkleTreeBuilderList.get(chunkIndex).setIsHealthy(false).setDataChecksum(random.nextLong());
+ blockMerkleTreeBuilder.setDataChecksum(random.nextLong());
+ }
+
builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build());
+ }
+
builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong());
+
Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath());
+ writeContainerDataTreeProto(keyValueContainer.getContainerData(),
builder.getContainerMerkleTree());
+ }
+ }
+
+ /**
+ * Overwrite the file with random bytes at an offset within the given length.
+ */
+ public static void corruptFileAtOffset(File file, int offset, int
chunkLength) {
+ try {
+ final int fileLength = (int) file.length();
+ assertTrue(fileLength >= offset + chunkLength);
+ final int chunkEnd = offset + chunkLength;
+
+ Path path = file.toPath();
+ final byte[] original = IOUtils.readFully(Files.newInputStream(path),
fileLength);
+
+ // Corrupt the last byte and middle bytes of the block. The scanner
should log this as two errors.
+ final byte[] corruptedBytes = Arrays.copyOf(original, fileLength);
+ corruptedBytes[chunkEnd - 1] = (byte) (original[chunkEnd - 1] << 1);
+ final long chunkMid = offset + ((long) chunkLength - offset) / 2;
+ corruptedBytes[(int) (chunkMid / 2)] = (byte) (original[(int) (chunkMid
/ 2)] << 1);
+
+
+ Files.write(path, corruptedBytes,
+ StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+
+ assertThat(IOUtils.readFully(Files.newInputStream(path), fileLength))
+ .isEqualTo(corruptedBytes)
+ .isNotEqualTo(original);
+ } catch (IOException ex) {
+ // Fail the test.
+ throw new UncheckedIOException(ex);
+ }
+ }
}
diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
index b17973e1f3..55132123cd 100644
--- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
@@ -150,11 +150,10 @@ Close container
Reconcile closed container
# Check that info does not show replica checksums, since manual
reconciliation has not yet been triggered.
- # TODO When the scanner is computing checksums automatically, this test
may need to be updated.
${container} = Execute ozone admin container list --state
CLOSED | jq -r 'select(.replicationConfig.replicationFactor == "THREE") |
.containerID' | head -1
${data_checksum} = Execute ozone admin container info
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
- # 0 is the hex value of an empty checksum.
- Should Be Equal As Strings 0 ${data_checksum}
+ # 0 is the hex value of an empty checksum. After container close the data
checksum should not be 0.
+ Should Not Be Equal As Strings 0 ${data_checksum}
# When reconciliation finishes, replica checksums should be shown.
Execute ozone admin container reconcile ${container}
Wait until keyword succeeds 1min 5sec Reconciliation complete
${container}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index d5e13921d7..f835340ac3 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -377,6 +377,20 @@ public static void waitForContainerClose(MiniOzoneCluster
cluster,
}
}
+ public static void waitForScmContainerState(MiniOzoneCluster cluster, long
containerID,
+ HddsProtos.LifeCycleState
lifeCycleState)
+ throws InterruptedException, TimeoutException {
+ GenericTestUtils.waitFor(() -> {
+ try {
+ HddsProtos.LifeCycleState state =
cluster.getStorageContainerManager().getContainerManager()
+ .getContainer(ContainerID.valueOf(containerID)).getState();
+ return state == lifeCycleState;
+ } catch (ContainerNotFoundException e) {
+ return false;
+ }
+ }, 500, 100 * 1000);
+ }
+
public static StateMachine getStateMachine(MiniOzoneCluster cluster)
throws Exception {
return getStateMachine(cluster.getHddsDatanodes().get(0), null);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
index 2211aaf2b0..f51dbfed43 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
@@ -18,29 +18,70 @@
package org.apache.hadoop.ozone.dn.checksum;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
+import static
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static
org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile;
import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch;
import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree;
+import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile;
+import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
+import static
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
+import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -50,15 +91,26 @@
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
+import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -73,24 +125,34 @@ public class TestContainerCommandReconciliation {
private static ObjectStore store;
private static OzoneConfiguration conf;
private static DNContainerOperationClient dnClient;
+ private static final String KEY_NAME = "testkey";
@TempDir
private static File testDir;
+ @TempDir
+ private static File workDir;
+ private static MiniKdc miniKdc;
+ private static File ozoneKeytab;
+ private static File spnegoKeytab;
+ private static File testUserKeytab;
+ private static String testUserPrincipal;
+ private static String host;
@BeforeAll
public static void init() throws Exception {
conf = new OzoneConfiguration();
- conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
+ conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 128 * 1024,
StorageUnit.BYTES);
+ conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 512 * 1024, StorageUnit.BYTES);
// Disable the container scanner so it does not create merkle tree files
that interfere with this test.
conf.getObject(ContainerScannerConfiguration.class).setEnabled(false);
- cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(3)
- .build();
- cluster.waitForClusterToBeReady();
- rpcClient = OzoneClientFactory.getRpcClient(conf);
- store = rpcClient.getObjectStore();
- dnClient = new DNContainerOperationClient(conf, null, null);
+
+ startMiniKdc();
+ setSecureConfig();
+ createCredentialsInKDC();
+ setSecretKeysConfig();
+ startCluster();
}
@AfterAll
@@ -99,8 +161,16 @@ public static void stop() throws IOException {
rpcClient.close();
}
+ if (dnClient != null) {
+ dnClient.close();
+ }
+
+ if (miniKdc != null) {
+ miniKdc.stop();
+ }
+
if (cluster != null) {
- cluster.shutdown();
+ cluster.stop();
}
}
@@ -110,7 +180,9 @@ public static void stop() throws IOException {
*/
@Test
public void testGetChecksumInfoOpenReplica() throws Exception {
- long containerID = writeDataAndGetContainer(false);
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ long containerID = writeDataAndGetContainer(false, volume, bucket);
HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
StorageContainerException ex =
assertThrows(StorageContainerException.class,
() -> dnClient.getContainerChecksumInfo(containerID,
targetDN.getDatanodeDetails()));
@@ -145,12 +217,14 @@ public void testGetChecksumInfoNonexistentReplica() {
*/
@Test
public void testGetChecksumInfoNonexistentFile() throws Exception {
- long containerID = writeDataAndGetContainer(true);
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ long containerID = writeDataAndGetContainer(true, volume, bucket);
// Pick a datanode and remove its checksum file.
HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID);
- File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ File treeFile = getContainerChecksumFile(container.getContainerData());
// Closing the container should have generated the tree file.
assertTrue(treeFile.exists());
assertTrue(treeFile.delete());
@@ -168,12 +242,14 @@ public void testGetChecksumInfoNonexistentFile() throws
Exception {
*/
@Test
public void testGetChecksumInfoServerIOError() throws Exception {
- long containerID = writeDataAndGetContainer(true);
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ long containerID = writeDataAndGetContainer(true, volume, bucket);
// Pick a datanode and remove its checksum file.
HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID);
- File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ File treeFile = getContainerChecksumFile(container.getContainerData());
assertTrue(treeFile.exists());
// Make the server unable to read the file.
assertTrue(treeFile.setReadable(false));
@@ -190,13 +266,15 @@ public void testGetChecksumInfoServerIOError() throws
Exception {
*/
@Test
public void testGetCorruptChecksumInfo() throws Exception {
- long containerID = writeDataAndGetContainer(true);
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ long containerID = writeDataAndGetContainer(true, volume, bucket);
// Pick a datanode and corrupt its checksum file.
HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID);
- File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ File treeFile = getContainerChecksumFile(container.getContainerData());
Files.write(treeFile.toPath(), new byte[]{1, 2, 3},
StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
@@ -207,13 +285,15 @@ public void testGetCorruptChecksumInfo() throws Exception
{
@Test
public void testGetEmptyChecksumInfo() throws Exception {
- long containerID = writeDataAndGetContainer(true);
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ long containerID = writeDataAndGetContainer(true, volume, bucket);
// Pick a datanode and truncate its checksum file to zero length.
HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID);
- File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ File treeFile = getContainerChecksumFile(container.getContainerData());
// TODO After HDDS-10379 the file will already exist and need to be
overwritten.
assertTrue(treeFile.exists());
Files.write(treeFile.toPath(), new byte[]{},
@@ -229,7 +309,9 @@ public void testGetEmptyChecksumInfo() throws Exception {
@Test
public void testGetChecksumInfoSuccess() throws Exception {
- long containerID = writeDataAndGetContainer(true);
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ long containerID = writeDataAndGetContainer(true, volume, bucket);
// Overwrite the existing tree with a custom one for testing. We will
check that it is returned properly from the
// API.
ContainerMerkleTreeWriter tree = buildTestTree(conf);
@@ -247,26 +329,217 @@ public void testGetChecksumInfoSuccess() throws
Exception {
}
}
- private long writeDataAndGetContainer(boolean close) throws Exception {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
+ @Test
+ public void testContainerChecksumWithBlockMissing() throws Exception {
+ // 1. Write data to a container.
+ // Read the key back and check its hash.
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024
* 1024, volume, bucket);
+ long containerID = containerAndData.getLeft();
+ byte[] data = containerAndData.getRight();
+ // Get the datanodes where the container replicas are stored.
+ List<DatanodeDetails> dataNodeDetails =
cluster.getStorageContainerManager().getContainerManager()
+ .getContainerReplicas(ContainerID.valueOf(containerID))
+ .stream().map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(3, dataNodeDetails.size());
+ HddsDatanodeService hddsDatanodeService =
cluster.getHddsDatanode(dataNodeDetails.get(0));
+ DatanodeStateMachine datanodeStateMachine =
hddsDatanodeService.getDatanodeStateMachine();
+ Container<?> container =
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ KeyValueHandler kvHandler = (KeyValueHandler)
datanodeStateMachine.getContainer().getDispatcher()
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+ BlockManager blockManager = kvHandler.getBlockManager();
+ List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+ String chunksPath = container.getContainerData().getChunksPath();
+ long oldDataChecksum =
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ // 2. Delete some blocks to simulate missing blocks.
+ try (DBHandle db = BlockUtils.getDB(containerData, conf);
+ BatchOperation op =
db.getStore().getBatchHandler().initBatchOperation()) {
+ for (int i = 0; i < blockDataList.size(); i += 2) {
+ BlockData blockData = blockDataList.get(i);
+ // Delete the block metadata from the container db
+ db.getStore().getBlockDataTable().deleteWithBatch(op,
containerData.getBlockKey(blockData.getLocalID()));
+ // Delete the block file.
+ Files.deleteIfExists(Paths.get(chunksPath + "/" +
blockData.getBlockID().getLocalID() + ".block"));
+ }
+ db.getStore().getBatchHandler().commitBatchOperation(op);
+ db.getStore().flushDB();
+ }
+
+ // TODO: Use On-demand container scanner to build the new container merkle
tree. (HDDS-10374)
+
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+ kvHandler.createContainerMerkleTree(container);
+ ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+ readChecksumFile(container.getContainerData());
+ long dataChecksumAfterBlockDelete =
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+ // Checksum should have changed after block delete.
+ Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+ // Since the container is already closed, we have manually updated the
container checksum file.
+ // This doesn't update the checksum reported to SCM, and we need to
trigger an ICR.
+ // Marking a container unhealthy will send an ICR.
+ kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted());
+ waitForDataChecksumsAtSCM(containerID, 2);
+
+ // 3. Reconcile the container.
+
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+ // Compare and check if dataChecksum is same on all replicas.
+ waitForDataChecksumsAtSCM(containerID, 1);
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+ newContainerChecksumInfo.getContainerMerkleTree());
+ TestHelper.validateData(KEY_NAME, data, store, volume, bucket);
+ }
+
+ @Test
+ public void testContainerChecksumChunkCorruption() throws Exception {
+ // 1. Write data to a container.
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024
* 1024, volume, bucket);
+ long containerID = containerAndData.getLeft();
+ byte[] data = containerAndData.getRight();
+ // Get the datanodes where the container replicas are stored.
+ List<DatanodeDetails> dataNodeDetails =
cluster.getStorageContainerManager().getContainerManager()
+ .getContainerReplicas(ContainerID.valueOf(containerID))
+ .stream().map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(3, dataNodeDetails.size());
+ HddsDatanodeService hddsDatanodeService =
cluster.getHddsDatanode(dataNodeDetails.get(0));
+ DatanodeStateMachine datanodeStateMachine =
hddsDatanodeService.getDatanodeStateMachine();
+ Container<?> container =
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ KeyValueHandler kvHandler = (KeyValueHandler)
datanodeStateMachine.getContainer().getDispatcher()
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+ BlockManager blockManager = kvHandler.getBlockManager();
+ List<BlockData> blockDatas = blockManager.listBlock(container, -1, 100);
+ long oldDataChecksum =
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ // 2. Corrupt first chunk for all the blocks
+ try (DBHandle db = BlockUtils.getDB(containerData, conf);
+ BatchOperation op =
db.getStore().getBatchHandler().initBatchOperation()) {
+ for (BlockData blockData : blockDatas) {
+ // Modify the block metadata to simulate chunk corruption.
+ ContainerProtos.BlockData.Builder blockDataBuilder =
blockData.getProtoBufMessage().toBuilder();
+ blockDataBuilder.clearChunks();
+
+ ContainerProtos.ChunkInfo chunkInfo = blockData.getChunks().get(0);
+ ContainerProtos.ChecksumData.Builder checksumDataBuilder =
ContainerProtos.ChecksumData.newBuilder()
+
.setBytesPerChecksum(chunkInfo.getChecksumData().getBytesPerChecksum())
+ .setType(chunkInfo.getChecksumData().getType());
+
+ for (ByteString checksum :
chunkInfo.getChecksumData().getChecksumsList()) {
+ byte[] checksumBytes = checksum.toByteArray();
+ // Modify the checksum bytes to simulate corruption.
+ checksumBytes[0] = (byte) (checksumBytes[0] - 1);
+
checksumDataBuilder.addChecksums(ByteString.copyFrom(checksumBytes)).build();
+ }
+ chunkInfo =
chunkInfo.toBuilder().setChecksumData(checksumDataBuilder.build()).build();
+ blockDataBuilder.addChunks(chunkInfo);
+ for (int i = 1; i < blockData.getChunks().size(); i++) {
+ blockDataBuilder.addChunks(blockData.getChunks().get(i));
+ }
+
+ // Modify the block metadata from the container db to simulate chunk
corruption.
+ db.getStore().getBlockDataTable().putWithBatch(op,
containerData.getBlockKey(blockData.getLocalID()),
+ BlockData.getFromProtoBuf(blockDataBuilder.build()));
+ }
+ db.getStore().getBatchHandler().commitBatchOperation(op);
+ db.getStore().flushDB();
+ }
+
+
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+ kvHandler.createContainerMerkleTree(container);
+ // To set unhealthy for chunks that are corrupted.
+ ContainerProtos.ContainerChecksumInfo
containerChecksumAfterChunkCorruption =
+ readChecksumFile(container.getContainerData());
+ long dataChecksumAfterAfterChunkCorruption =
containerChecksumAfterChunkCorruption
+ .getContainerMerkleTree().getDataChecksum();
+ // Checksum should have changed after chunk corruption.
+ Assertions.assertNotEquals(oldDataChecksum,
dataChecksumAfterAfterChunkCorruption);
+
+ // 3. Set Unhealthy for first chunk of all blocks. This should be done by
the scanner, Until then this is a
+ // manual step.
+ // // TODO: Use On-demand container scanner to build the new container
merkle tree (HDDS-10374)
+ Random random = new Random();
+ ContainerProtos.ContainerChecksumInfo.Builder builder =
containerChecksumAfterChunkCorruption.toBuilder();
+ List<ContainerProtos.BlockMerkleTree> blockMerkleTreeList =
builder.getContainerMerkleTree()
+ .getBlockMerkleTreeList();
+ builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree();
+ for (ContainerProtos.BlockMerkleTree blockMerkleTree :
blockMerkleTreeList) {
+ ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder =
blockMerkleTree.toBuilder();
+ List<ContainerProtos.ChunkMerkleTree.Builder> chunkMerkleTreeBuilderList
=
+ blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList();
+
chunkMerkleTreeBuilderList.get(0).setIsHealthy(false).setDataChecksum(random.nextLong());
+ blockMerkleTreeBuilder.setDataChecksum(random.nextLong());
+
builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build());
+ }
+ builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong());
+
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+ writeContainerDataTreeProto(container.getContainerData(),
builder.getContainerMerkleTree());
+
+ // Since the container is already closed, we have manually updated the
container checksum file.
+ // This doesn't update the checksum reported to SCM, and we need to
trigger an ICR.
+ // Marking a container unhealthy will send an ICR.
+ kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted());
+ waitForDataChecksumsAtSCM(containerID, 2);
+
+ // 4. Reconcile the container.
+
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+ // Compare and check if dataChecksum is same on all replicas.
+ waitForDataChecksumsAtSCM(containerID, 1);
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+ newContainerChecksumInfo.getContainerMerkleTree());
+ Assertions.assertEquals(oldDataChecksum,
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum());
+ TestHelper.validateData(KEY_NAME, data, store, volume, bucket);
+ }
+
+ private void waitForDataChecksumsAtSCM(long containerID, int expectedSize)
throws Exception {
+ GenericTestUtils.waitFor(() -> {
+ try {
+ Set<Long> dataChecksums =
cluster.getStorageContainerLocationClient().getContainerReplicas(containerID,
+ ClientVersion.CURRENT_VERSION).stream()
+ .map(HddsProtos.SCMContainerReplicaProto::getDataChecksum)
+ .collect(Collectors.toSet());
+ return dataChecksums.size() == expectedSize;
+ } catch (Exception ex) {
+ return false;
+ }
+ }, 500, 20000);
+ }
+
+ private Pair<Long, byte[]> getDataAndContainer(boolean close, int dataLen,
String volumeName, String bucketName)
+ throws Exception {
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- byte[] data = "Test content".getBytes(UTF_8);
+ byte[] data = randomAlphabetic(dataLen).getBytes(UTF_8);
// Write Key
- try (OzoneOutputStream os = TestHelper.createKey("testkey", RATIS, THREE,
0, store, volumeName, bucketName)) {
+ try (OzoneOutputStream os = TestHelper.createKey(KEY_NAME, RATIS, THREE,
dataLen, store, volumeName, bucketName)) {
IOUtils.write(data, os);
}
- long containerID = bucket.getKey("testkey").getOzoneKeyLocations().stream()
+ long containerID = bucket.getKey(KEY_NAME).getOzoneKeyLocations().stream()
.findFirst().get().getContainerID();
if (close) {
TestHelper.waitForContainerClose(cluster, containerID);
+ TestHelper.waitForScmContainerState(cluster, containerID,
HddsProtos.LifeCycleState.CLOSED);
}
- return containerID;
+ return Pair.of(containerID, data);
+ }
+
+ private long writeDataAndGetContainer(boolean close, String volume, String
bucket) throws Exception {
+ return getDataAndContainer(close, 5, volume, bucket).getLeft();
}
public static void writeChecksumFileToDatanodes(long containerID,
ContainerMerkleTreeWriter tree) throws Exception {
@@ -278,8 +551,84 @@ public static void writeChecksumFileToDatanodes(long
containerID, ContainerMerkl
KeyValueContainer keyValueContainer =
(KeyValueContainer)
dn.getDatanodeStateMachine().getContainer().getController()
.getContainer(containerID);
- keyValueHandler.getChecksumManager().writeContainerDataTree(
- keyValueContainer.getContainerData(), tree);
+ if (keyValueContainer != null) {
+ keyValueHandler.getChecksumManager().writeContainerDataTree(
+ keyValueContainer.getContainerData(), tree);
+ }
}
}
+
+ private static void setSecretKeysConfig() {
+ // Secret key lifecycle configs.
+ conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, "500s");
+ conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "500s");
+ conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "500s");
+
+ // enable tokens
+ conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
+ conf.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true);
+ }
+
+ private static void createCredentialsInKDC() throws Exception {
+ ScmConfig scmConfig = conf.getObject(ScmConfig.class);
+ SCMHTTPServerConfig httpServerConfig =
+ conf.getObject(SCMHTTPServerConfig.class);
+ createPrincipal(ozoneKeytab, scmConfig.getKerberosPrincipal());
+ createPrincipal(spnegoKeytab, httpServerConfig.getKerberosPrincipal());
+ createPrincipal(testUserKeytab, testUserPrincipal);
+ }
+
+ private static void createPrincipal(File keytab, String... principal)
+ throws Exception {
+ miniKdc.createPrincipal(keytab, principal);
+ }
+
+ private static void startMiniKdc() throws Exception {
+ Properties securityProperties = MiniKdc.createConf();
+ miniKdc = new MiniKdc(securityProperties, workDir);
+ miniKdc.start();
+ }
+
+ private static void setSecureConfig() throws IOException {
+ conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+ host = InetAddress.getLocalHost().getCanonicalHostName()
+ .toLowerCase();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.name());
+ String curUser = UserGroupInformation.getCurrentUser().getUserName();
+ conf.set(OZONE_ADMINISTRATORS, curUser);
+ String realm = miniKdc.getRealm();
+ String hostAndRealm = host + "@" + realm;
+ conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+ conf.set(HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_SCM/" + hostAndRealm);
+ conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+ conf.set(OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_OM/" + hostAndRealm);
+ conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+
+ ozoneKeytab = new File(workDir, "scm.keytab");
+ spnegoKeytab = new File(workDir, "http.keytab");
+ testUserKeytab = new File(workDir, "testuser.keytab");
+ testUserPrincipal = "test@" + realm;
+
+ conf.set(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY, ozoneKeytab.getAbsolutePath());
+ conf.set(HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY,
spnegoKeytab.getAbsolutePath());
+ conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, ozoneKeytab.getAbsolutePath());
+ conf.set(OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE,
spnegoKeytab.getAbsolutePath());
+ conf.set(DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY,
ozoneKeytab.getAbsolutePath());
+ }
+
+ private static void startCluster() throws Exception {
+ OzoneManager.setTestSecureOmFlag(true);
+ cluster = MiniOzoneCluster.newHABuilder(conf)
+ .setSCMServiceId("SecureSCM")
+ .setNumOfStorageContainerManagers(3)
+ .setNumOfOzoneManagers(1)
+ .setNumDatanodes(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ rpcClient = OzoneClientFactory.getRpcClient(conf);
+ store = rpcClient.getObjectStore();
+ SecretKeyClient secretKeyClient =
cluster.getStorageContainerManager().getSecretKeyManager();
+ CertificateClient certClient =
cluster.getStorageContainerManager().getScmCertificateClient();
+ dnClient = new DNContainerOperationClient(conf, certClient,
secretKeyClient);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]