This is an automated email from the ASF dual-hosted git repository. aswinshakil pushed a commit to branch HDDS-10239-container-reconciliation in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 47c1eaa1eb903175c1729186da48b7bffda85bd8 Merge: 2b4708b70e a355664093 Author: Aswin Shakil Balasubramanian <[email protected]> AuthorDate: Fri May 16 17:41:00 2025 +0530 Merge branch 'HDDS-10239-container-reconciliation' of https://github.com/apache/ozone into HDDS-10239-container-reconciliation Conflicts: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java .../checksum/ContainerChecksumTreeManager.java | 47 +- .../checksum/ContainerMerkleTreeMetrics.java | 30 +- .../checksum/ContainerMerkleTreeWriter.java | 72 ++- .../ozone/container/common/impl/ContainerSet.java | 26 + .../container/common/impl/HddsDispatcher.java | 3 +- .../ozone/container/common/interfaces/Handler.java | 4 +- .../ozone/container/keyvalue/KeyValueHandler.java | 413 ++++++++------ .../container/ozoneimpl/ContainerController.java | 12 +- .../container/ozoneimpl/ContainerScanError.java | 33 +- .../ozoneimpl/OnDemandContainerDataScanner.java | 76 +-- .../ozone/container/ozoneimpl/OzoneContainer.java | 6 +- .../checksum/ContainerMerkleTreeTestUtils.java | 3 +- .../checksum/TestContainerMerkleTreeWriter.java | 63 ++- .../container/common/impl/TestContainerSet.java | 24 + ...stContainerReconciliationWithMockDatanodes.java | 623 +++++++++++++++++++++ .../container/keyvalue/TestKeyValueHandler.java | 361 +----------- .../TestOnDemandContainerDataScanner.java | 73 +-- .../hdds/scm/container/ContainerReplica.java | 2 +- .../TestContainerCommandReconciliation.java | 6 +- 19 files changed, 1190 insertions(+), 687 deletions(-) diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java index aa21389e25,aa21389e25..63154cddd5 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java @@@ -30,21 -30,21 +30,6 @@@ import org.apache.hadoop.metrics2.lib.M public class ContainerMerkleTreeMetrics { private static final String METRICS_SOURCE_NAME = ContainerMerkleTreeMetrics.class.getSimpleName(); -- public static ContainerMerkleTreeMetrics create() { -- MetricsSystem ms = DefaultMetricsSystem.instance(); -- MetricsSource source = ms.getSource(METRICS_SOURCE_NAME); -- if (source != null) { -- ms.unregisterSource(METRICS_SOURCE_NAME); -- } -- return ms.register(METRICS_SOURCE_NAME, "Container Merkle Tree Metrics", -- new ContainerMerkleTreeMetrics()); -- } -- -- public static void unregister() { -- MetricsSystem ms = DefaultMetricsSystem.instance(); -- ms.unregisterSource(METRICS_SOURCE_NAME); -- } -- @Metric(about = "Number of Merkle tree write failure") private MutableCounterLong numMerkleTreeWriteFailure; @@@ -72,6 -72,6 +57,21 @@@ @Metric(about = "Merkle tree diff latency") private MutableRate merkleTreeDiffLatencyNS; ++ public static ContainerMerkleTreeMetrics create() { ++ MetricsSystem ms = DefaultMetricsSystem.instance(); ++ MetricsSource source = ms.getSource(METRICS_SOURCE_NAME); ++ if (source != null) { ++ ms.unregisterSource(METRICS_SOURCE_NAME); ++ } ++ return ms.register(METRICS_SOURCE_NAME, "Container Merkle Tree Metrics", ++ new ContainerMerkleTreeMetrics()); ++ } ++ ++ public static void unregister() { ++ MetricsSystem ms = DefaultMetricsSystem.instance(); ++ ms.unregisterSource(METRICS_SOURCE_NAME); ++ } ++ public void incrementMerkleTreeWriteFailures() { this.numMerkleTreeWriteFailure.incr(); } diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 0f5c19fd33,8204f58953..7e95518e2c --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@@ -37,7 -37,7 +37,8 @@@ import java.util.concurrent.ConcurrentS import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import java.util.function.Consumer; +import java.util.function.ToLongFunction; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; @@@ -64,27 -64,28 +65,29 @@@ public class ContainerSet implements It new ConcurrentSkipListSet<>(); private final ConcurrentSkipListMap<Long, Long> recoveringContainerMap = new ConcurrentSkipListMap<>(); - private Clock clock; + private final Clock clock; private long recoveringTimeout; - private final Table<Long, String> containerIdsTable; + private final Table<ContainerID, String> containerIdsTable; + // Handler that will be invoked when a scan of a container in this set is requested. + private Consumer<Container<?>> containerScanHandler; - @VisibleForTesting - public ContainerSet(long recoveringTimeout) { - this(new InMemoryTestTable<>(), recoveringTimeout); + public static ContainerSet newReadOnlyContainerSet(long recoveringTimeout) { + return new ContainerSet(null, recoveringTimeout); + } + + public static ContainerSet newRwContainerSet(Table<ContainerID, String> containerIdsTable, long recoveringTimeout) { + Objects.requireNonNull(containerIdsTable, "containerIdsTable == null"); + return new ContainerSet(containerIdsTable, recoveringTimeout); } - public ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout) { - this(continerIdsTable, recoveringTimeout, false); + private ContainerSet(Table<ContainerID, String> continerIdsTable, long recoveringTimeout) { + this(continerIdsTable, recoveringTimeout, null); } - public ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout, boolean readOnly) { - this.clock = Clock.system(ZoneOffset.UTC); + ContainerSet(Table<ContainerID, String> continerIdsTable, long recoveringTimeout, Clock clock) { + this.clock = clock != null ? clock : Clock.systemUTC(); this.containerIdsTable = continerIdsTable; this.recoveringTimeout = recoveringTimeout; - if (!readOnly && containerIdsTable == null) { - throw new IllegalArgumentException("Container table cannot be null when container set is not read only"); - } } public long getCurrentTime() { diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 9c8fed3ca5,e240ff6bf4..a46e097c0b --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@@ -70,9 -71,9 +70,8 @@@ import org.apache.hadoop.ozone.containe import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError; import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult; - import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.apache.hadoop.util.Time; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum; diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 8fa7157091,5feec61a66..26ec6226fc --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@@ -21,8 -21,7 +21,8 @@@ import com.google.common.annotations.Vi import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.time.Clock; - import java.util.Set; + import java.util.Collection; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java index db558f4847,db558f4847..1b167dabd8 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java @@@ -23,22 -23,22 +23,6 @@@ import java.io.File * This class is used to identify any error that may be seen while scanning a container. */ public class ContainerScanError { -- /** -- * Represents the reason a container scan failed and a container should -- * be marked unhealthy. -- */ -- public enum FailureType { -- MISSING_CONTAINER_DIR, -- MISSING_METADATA_DIR, -- MISSING_CONTAINER_FILE, -- MISSING_CHUNKS_DIR, -- MISSING_CHUNK_FILE, -- CORRUPT_CONTAINER_FILE, -- CORRUPT_CHUNK, -- INCONSISTENT_CHUNK_LENGTH, -- INACCESSIBLE_DB, -- WRITE_FAILURE, -- } private final File unhealthyFile; private final FailureType failureType; @@@ -66,4 -66,4 +50,21 @@@ public String toString() { return failureType + " for file " + unhealthyFile + " with exception: " + exception; } ++ ++ /** ++ * Represents the reason a container scan failed and a container should ++ * be marked unhealthy. ++ */ ++ public enum FailureType { ++ MISSING_CONTAINER_DIR, ++ MISSING_METADATA_DIR, ++ MISSING_CONTAINER_FILE, ++ MISSING_CHUNKS_DIR, ++ MISSING_CHUNK_FILE, ++ CORRUPT_CONTAINER_FILE, ++ CORRUPT_CHUNK, ++ INCONSISTENT_CHUNK_LENGTH, ++ INACCESSIBLE_DB, ++ WRITE_FAILURE, ++ } } diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index 2b66ead23b,6197657114..ad30aa0c43 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@@ -42,11 -41,9 +41,9 @@@ import org.slf4j.LoggerFactory * Class for performing on demand scans of containers. */ public final class OnDemandContainerDataScanner { - public static final Logger LOG = + private static final Logger LOG = LoggerFactory.getLogger(OnDemandContainerDataScanner.class); - private static volatile OnDemandContainerDataScanner instance; - private final ExecutorService scanExecutor; private final ContainerController containerController; private final DataTransferThrottler throttler; diff --cc hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index 0000000000,d290cea5bb..ced198eecc mode 000000,100644..100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@@ -1,0 -1,621 +1,623 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.ozone.container.keyvalue; + + import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; + import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; + import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; + import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE; + import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded; ++import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; + import static org.assertj.core.api.Assertions.assertThat; + import static org.assertj.core.api.Assertions.fail; + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertFalse; + import static org.junit.jupiter.api.Assertions.assertTrue; + import static org.mockito.ArgumentMatchers.any; + import static org.mockito.ArgumentMatchers.anyLong; + import static org.mockito.ArgumentMatchers.anyMap; + + import java.io.File; + import java.io.IOException; + import java.io.UncheckedIOException; + import java.nio.ByteBuffer; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + import java.nio.file.StandardOpenOption; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; + import java.util.Comparator; + import java.util.List; + import java.util.Map; + import java.util.Optional; + import java.util.Random; + import java.util.UUID; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.Future; + import java.util.concurrent.TimeoutException; + import java.util.function.Function; + import java.util.stream.Collectors; + import java.util.stream.Stream; + import org.apache.commons.io.IOUtils; + import org.apache.commons.text.RandomStringGenerator; + import org.apache.hadoop.hdds.HddsUtils; + import org.apache.hadoop.hdds.client.BlockID; + import org.apache.hadoop.hdds.conf.OzoneConfiguration; + import org.apache.hadoop.hdds.protocol.DatanodeDetails; + import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; + import org.apache.hadoop.hdds.scm.XceiverClientSpi; + import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; + import org.apache.hadoop.hdds.utils.db.BatchOperation; + import org.apache.hadoop.ozone.OzoneConsts; + import org.apache.hadoop.ozone.common.Checksum; + import org.apache.hadoop.ozone.common.ChecksumData; + import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; + import org.apache.hadoop.ozone.container.common.ContainerTestUtils; + import org.apache.hadoop.ozone.container.common.helpers.BlockData; + import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; + import org.apache.hadoop.ozone.container.common.impl.ContainerSet; + import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; + import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; + import org.apache.hadoop.ozone.container.common.volume.StorageVolume; + import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; + import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; + import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; + import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; + import org.apache.ozone.test.GenericTestUtils; + import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.AfterEach; + import org.junit.jupiter.api.Assertions; + import org.junit.jupiter.api.BeforeAll; + import org.junit.jupiter.api.io.TempDir; + import org.junit.jupiter.params.provider.Arguments; + import org.junit.jupiter.params.provider.MethodSource; + import org.mockito.MockedStatic; + import org.mockito.Mockito; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * This unit test simulates three datanodes with replicas of a container that need to be reconciled. + * It creates three KeyValueHandler instances to represent each datanode, and each instance is working on a container + * replica that is stored in a local directory. The reconciliation client is mocked to return the corresponding local + * container for each datanode peer. + */ + public class TestContainerReconciliationWithMockDatanodes { ++ ++ public static final Logger LOG = LoggerFactory.getLogger(TestContainerReconciliationWithMockDatanodes.class); ++ ++ // All container replicas will be placed in this directory, and the same replicas will be re-used for each test run. ++ @TempDir ++ private static Path containerDir; ++ private static DNContainerOperationClient dnClient; ++ private static MockedStatic<ContainerProtocolCalls> containerProtocolMock; ++ private static List<MockDatanode> datanodes; ++ private static long healthyDataChecksum; ++ ++ private static final String CLUSTER_ID = UUID.randomUUID().toString(); ++ private static final long CONTAINER_ID = 100L; ++ private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB; ++ private static final int CHUNKS_PER_BLOCK = 4; ++ private static final int NUM_DATANODES = 3; ++ + /** + * Number of corrupt blocks and chunks. + * + * TODO HDDS-11942 support more combinations of corruptions. + */ + public static Stream<Arguments> corruptionValues() { + return Stream.of( + Arguments.of(5, 0), + Arguments.of(0, 5), + Arguments.of(0, 10), + Arguments.of(10, 0), + Arguments.of(5, 10), + Arguments.of(10, 5), + Arguments.of(2, 3), + Arguments.of(3, 2), + Arguments.of(4, 6), + Arguments.of(6, 4), + Arguments.of(6, 9), + Arguments.of(9, 6) + ); + } + - public static final Logger LOG = LoggerFactory.getLogger(TestContainerReconciliationWithMockDatanodes.class); - - // All container replicas will be placed in this directory, and the same replicas will be re-used for each test run. - @TempDir - private static Path containerDir; - private static DNContainerOperationClient dnClient; - private static MockedStatic<ContainerProtocolCalls> containerProtocolMock; - private static List<MockDatanode> datanodes; - private static long healthyDataChecksum; - - private static final String CLUSTER_ID = UUID.randomUUID().toString(); - private static final long CONTAINER_ID = 100L; - private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB; - private static final int CHUNKS_PER_BLOCK = 4; - private static final int NUM_DATANODES = 3; - + /** + * Use the same container instances throughout the tests. Each reconciliation should make a full repair, resetting + * the state for the next test. + */ + @BeforeAll + public static void setup() throws Exception { + LOG.info("Data written to {}", containerDir); + dnClient = new DNContainerOperationClient(new OzoneConfiguration(), null, null); + datanodes = new ArrayList<>(); + + // Create a container with 15 blocks and 3 replicas. + for (int i = 0; i < NUM_DATANODES; i++) { + DatanodeDetails dnDetails = randomDatanodeDetails(); + // Use this fake host name to track the node through the test since it's easier to visualize than a UUID. + dnDetails.setHostName("dn" + (i + 1)); + MockDatanode dn = new MockDatanode(dnDetails, containerDir); + dn.addContainerWithBlocks(CONTAINER_ID, 15); + datanodes.add(dn); + } + + datanodes.forEach(d -> d.scanContainer(CONTAINER_ID)); + healthyDataChecksum = assertUniqueChecksumCount(CONTAINER_ID, datanodes, 1); + // Do not count the initial synchronous scan to build the merkle tree towards the scan count in the tests. + // This lets each test run start counting the number of scans from zero. + datanodes.forEach(MockDatanode::resetOnDemandScanCount); + + containerProtocolMock = Mockito.mockStatic(ContainerProtocolCalls.class); + mockContainerProtocolCalls(); + } + + @AfterEach + public void reset() { + datanodes.forEach(MockDatanode::resetOnDemandScanCount); + } + + @AfterAll + public static void teardown() { + if (containerProtocolMock != null) { + containerProtocolMock.close(); + } + } + + // TODO HDDS-10374 once on-demand scanner can build merkle trees this test should pass. + // @ParameterizedTest + @MethodSource("corruptionValues") + public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCorrupt) throws Exception { + LOG.info("Healthy data checksum for container {} in this test is {}", CONTAINER_ID, + HddsUtils.checksumToString(healthyDataChecksum)); + // Introduce corruption in each container on different replicas. + List<MockDatanode> dnsToCorrupt = datanodes.stream().limit(2).collect(Collectors.toList()); + + dnsToCorrupt.get(0).introduceCorruption(CONTAINER_ID, numBlocksToDelete, numChunksToCorrupt, false); + dnsToCorrupt.get(1).introduceCorruption(CONTAINER_ID, numBlocksToDelete, numChunksToCorrupt, true); + // Use synchronous on-demand scans to re-build the merkle trees after corruption. + datanodes.forEach(d -> d.scanContainer(CONTAINER_ID)); + // Without reconciliation, checksums should be different because of the corruption. + assertUniqueChecksumCount(CONTAINER_ID, datanodes, 3); + + // Each datanode should have had one on-demand scan during test setup, and a second one after corruption was + // introduced. + waitForExpectedScanCount(1); + + // Reconcile each datanode with its peers. + // In a real cluster, SCM will not send a command to reconcile a datanode with itself. + for (MockDatanode current : datanodes) { + List<DatanodeDetails> peers = datanodes.stream() + .map(MockDatanode::getDnDetails) + .filter(other -> !current.getDnDetails().equals(other)) + .collect(Collectors.toList()); + current.reconcileContainer(dnClient, peers, CONTAINER_ID); + } + // Reconciliation should have triggered a second on-demand scan for each replica. Wait for them to finish before + // checking the results. + waitForExpectedScanCount(2); + // After reconciliation, checksums should be the same for all containers. + long repairedDataChecksum = assertUniqueChecksumCount(CONTAINER_ID, datanodes, 1); + assertEquals(healthyDataChecksum, repairedDataChecksum); + } + + /** + * Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on + * every datanode. + */ + private void waitForExpectedScanCount(int expectedCount) throws Exception { + for (MockDatanode datanode: datanodes) { + try { + GenericTestUtils.waitFor(() -> datanode.getOnDemandScanCount() == expectedCount, 100, 10_000); + } catch (TimeoutException ex) { + LOG.error("Timed out waiting for on-demand scan count {} to reach expected count {} on datanode {}", + datanode.getOnDemandScanCount(), expectedCount, datanode); + throw ex; + } + } + } + + /** + * Checks for the expected number of unique checksums among a container on the provided datanodes. + * @return The data checksum from one of the nodes. Useful if expectedUniqueChecksums = 1. + */ + private static long assertUniqueChecksumCount(long containerID, Collection<MockDatanode> nodes, + long expectedUniqueChecksums) { + long actualUniqueChecksums = nodes.stream() + .mapToLong(d -> d.checkAndGetDataChecksum(containerID)) + .distinct() + .count(); + assertEquals(expectedUniqueChecksums, actualUniqueChecksums); + return nodes.stream().findAny().get().checkAndGetDataChecksum(containerID); + } + + private static void mockContainerProtocolCalls() { + Map<DatanodeDetails, MockDatanode> dnMap = datanodes.stream() + .collect(Collectors.toMap(MockDatanode::getDnDetails, Function.identity())); + + // Mock getContainerChecksumInfo + containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any())) + .thenAnswer(inv -> { + XceiverClientSpi xceiverClientSpi = inv.getArgument(0); + long containerID = inv.getArgument(1); + Pipeline pipeline = xceiverClientSpi.getPipeline(); + assertEquals(1, pipeline.size()); + DatanodeDetails dn = pipeline.getFirstNode(); + return dnMap.get(dn).getChecksumInfo(containerID); + }); + + // Mock getBlock + containerProtocolMock.when(() -> ContainerProtocolCalls.getBlock(any(), any(), any(), any(), anyMap())) + .thenAnswer(inv -> { + XceiverClientSpi xceiverClientSpi = inv.getArgument(0); + BlockID blockID = inv.getArgument(2); + Pipeline pipeline = xceiverClientSpi.getPipeline(); + assertEquals(1, pipeline.size()); + DatanodeDetails dn = pipeline.getFirstNode(); + return dnMap.get(dn).getBlock(blockID); + }); + + // Mock readChunk + containerProtocolMock.when(() -> ContainerProtocolCalls.readChunk(any(), any(), any(), any(), any())) + .thenAnswer(inv -> { + XceiverClientSpi xceiverClientSpi = inv.getArgument(0); + ContainerProtos.ChunkInfo chunkInfo = inv.getArgument(1); + ContainerProtos.DatanodeBlockID blockId = inv.getArgument(2); + List<XceiverClientSpi.Validator> checksumValidators = inv.getArgument(3); + Pipeline pipeline = xceiverClientSpi.getPipeline(); + assertEquals(1, pipeline.size()); + DatanodeDetails dn = pipeline.getFirstNode(); + return dnMap.get(dn).readChunk(blockId, chunkInfo, checksumValidators); + }); + + containerProtocolMock.when(() -> ContainerProtocolCalls.toValidatorList(any())).thenCallRealMethod(); + } + + /** + * This class wraps a KeyValueHandler instance with just enough features to test its reconciliation functionality. + */ + private static class MockDatanode { + private final KeyValueHandler handler; + private final DatanodeDetails dnDetails; + private final OnDemandContainerDataScanner onDemandScanner; + private final ContainerSet containerSet; + private final OzoneConfiguration conf; + + private final Logger log; + + MockDatanode(DatanodeDetails dnDetails, Path tempDir) throws IOException { + this.dnDetails = dnDetails; + log = LoggerFactory.getLogger("mock-datanode-" + dnDetails.getHostName()); + Path dataVolume = Paths.get(tempDir.toString(), dnDetails.getHostName(), "data"); + Path metadataVolume = Paths.get(tempDir.toString(), dnDetails.getHostName(), "metadata"); + + this.conf = new OzoneConfiguration(); + conf.set(HDDS_DATANODE_DIR_KEY, dataVolume.toString()); + conf.set(OZONE_METADATA_DIRS, metadataVolume.toString()); + - containerSet = new ContainerSet(1000); ++ containerSet = newContainerSet(); + MutableVolumeSet volumeSet = createVolumeSet(); + handler = ContainerTestUtils.getKeyValueHandler(conf, dnDetails.getUuidString(), containerSet, volumeSet); + handler.setClusterID(CLUSTER_ID); + + ContainerController controller = new ContainerController(containerSet, + Collections.singletonMap(ContainerProtos.ContainerType.KeyValueContainer, handler)); + onDemandScanner = new OnDemandContainerDataScanner( + conf.getObject(ContainerScannerConfiguration.class), controller); + // Register the on-demand container scanner with the container set used by the KeyValueHandler. + containerSet.registerContainerScanHandler(onDemandScanner::scanContainer); + } + + public DatanodeDetails getDnDetails() { + return dnDetails; + } + + /** + * @throws IOException for general IO errors accessing the checksum file + * @throws java.io.FileNotFoundException When the checksum file does not exist. + */ + public ContainerProtos.GetContainerChecksumInfoResponseProto getChecksumInfo(long containerID) throws IOException { + KeyValueContainer container = getContainer(containerID); + ByteString checksumInfo = handler.getChecksumManager().getContainerChecksumInfo(container.getContainerData()); + return ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder() + .setContainerID(containerID) + .setContainerChecksumInfo(checksumInfo) + .build(); + } + + /** + * Verifies that the data checksum on disk matches the one in memory, and returns the data checksum. + */ + public long checkAndGetDataChecksum(long containerID) { + KeyValueContainer container = getContainer(containerID); + long dataChecksum = 0; + try { + Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo = + handler.getChecksumManager().read(container.getContainerData()); + assertTrue(containerChecksumInfo.isPresent()); + dataChecksum = containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum(); + assertEquals(container.getContainerData().getDataChecksum(), dataChecksum); + } catch (IOException ex) { + fail("Failed to read container checksum from disk", ex); + } + log.info("Retrieved data checksum {} from container {}", HddsUtils.checksumToString(dataChecksum), + containerID); + return dataChecksum; + } + + public ContainerProtos.GetBlockResponseProto getBlock(BlockID blockID) throws IOException { + KeyValueContainer container = getContainer(blockID.getContainerID()); + ContainerProtos.BlockData blockData = handler.getBlockManager().getBlock(container, blockID).getProtoBufMessage(); + return ContainerProtos.GetBlockResponseProto.newBuilder() + .setBlockData(blockData) + .build(); + } + + public ContainerProtos.ReadChunkResponseProto readChunk(ContainerProtos.DatanodeBlockID blockId, + ContainerProtos.ChunkInfo chunkInfo, List<XceiverClientSpi.Validator> validators) throws IOException { + KeyValueContainer container = getContainer(blockId.getContainerID()); + ContainerProtos.ReadChunkResponseProto readChunkResponseProto = + ContainerProtos.ReadChunkResponseProto.newBuilder() + .setBlockID(blockId) + .setChunkData(chunkInfo) + .setData(handler.getChunkManager().readChunk(container, BlockID.getFromProtobuf(blockId), + ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString()) + .build(); + verifyChecksums(readChunkResponseProto, blockId, chunkInfo, validators); + return readChunkResponseProto; + } + + public void verifyChecksums(ContainerProtos.ReadChunkResponseProto readChunkResponseProto, + ContainerProtos.DatanodeBlockID blockId, ContainerProtos.ChunkInfo chunkInfo, + List<XceiverClientSpi.Validator> validators) throws IOException { + assertFalse(validators.isEmpty()); + ContainerProtos.ContainerCommandRequestProto requestProto = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setContainerID(blockId.getContainerID()) + .setDatanodeUuid(dnDetails.getUuidString()) + .setReadChunk( + ContainerProtos.ReadChunkRequestProto.newBuilder() + .setBlockID(blockId) + .setChunkData(chunkInfo) + .build()) + .build(); + ContainerProtos.ContainerCommandResponseProto responseProto = + ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setResult(ContainerProtos.Result.SUCCESS) + .setReadChunk(readChunkResponseProto).build(); + for (XceiverClientSpi.Validator function : validators) { + function.accept(requestProto, responseProto); + } + } + + public KeyValueContainer getContainer(long containerID) { + return (KeyValueContainer) containerSet.getContainer(containerID); + } + + /** + * Triggers a synchronous scan of the container. This method will block until the scan completes. + */ + public void scanContainer(long containerID) { + Optional<Future<?>> scanFuture = onDemandScanner.scanContainer(containerSet.getContainer(containerID)); + assertTrue(scanFuture.isPresent()); + + try { + scanFuture.get().get(); + } catch (InterruptedException | ExecutionException e) { + fail("On demand container scan failed", e); + } + } + + public int getOnDemandScanCount() { + return onDemandScanner.getMetrics().getNumContainersScanned(); + } + + public void resetOnDemandScanCount() { + onDemandScanner.getMetrics().resetNumContainersScanned(); + } + + public void reconcileContainer(DNContainerOperationClient client, Collection<DatanodeDetails> peers, + long containerID) { + log.info("Beginning reconciliation on this mock datanode"); + try { + handler.reconcileContainer(client, containerSet.getContainer(containerID), peers); + } catch (IOException ex) { + fail("Container reconciliation failed", ex); + } + } + + /** + * Create a container with the specified number of blocks. Block data is human-readable so the block files can be + * inspected when debugging the test. + */ + public void addContainerWithBlocks(long containerId, int blocks) throws Exception { + ContainerProtos.CreateContainerRequestProto createRequest = + ContainerProtos.CreateContainerRequestProto.newBuilder() + .setContainerType(ContainerProtos.ContainerType.KeyValueContainer) + .build(); + ContainerProtos.ContainerCommandRequestProto request = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.CreateContainer) + .setCreateContainer(createRequest) + .setContainerID(containerId) + .setDatanodeUuid(dnDetails.getUuidString()) + .build(); + + handler.handleCreateContainer(request, null); + KeyValueContainer container = getContainer(containerId); + + // Verify container is initially empty. + File chunksPath = new File(container.getContainerData().getChunksPath()); + ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 0, 0); + + // Create data to put in the container. + // Seed using the container ID so that all replicas are identical. + RandomStringGenerator generator = new RandomStringGenerator.Builder() + .withinRange('a', 'z') + .usingRandom(new Random(containerId)::nextInt) + .get(); + + // This array will keep getting populated with new bytes for each chunk. + byte[] chunkData = new byte[CHUNK_LEN]; + int bytesPerChecksum = 2 * (int) OzoneConsts.KB; + + // Add data to the container. + List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>(); + for (int i = 0; i < blocks; i++) { + BlockID blockID = new BlockID(containerId, i); + BlockData blockData = new BlockData(blockID); + + chunkList.clear(); + for (long chunkCount = 0; chunkCount < CHUNKS_PER_BLOCK; chunkCount++) { + String chunkName = "chunk" + chunkCount; + long offset = chunkCount * chunkData.length; + ChunkInfo info = new ChunkInfo(chunkName, offset, chunkData.length); + + // Generate data for the chunk and compute its checksum. + // Data is generated as one ascii character per line, so block files are human-readable if further + // debugging is needed. + for (int c = 0; c < chunkData.length; c += 2) { + chunkData[c] = (byte)generator.generate(1).charAt(0); + chunkData[c + 1] = (byte)'\n'; + } + + Checksum checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, bytesPerChecksum); + ChecksumData checksumData = checksum.computeChecksum(chunkData); + info.setChecksumData(checksumData); + // Write chunk and checksum into the container. + chunkList.add(info.getProtoBufMessage()); + handler.getChunkManager().writeChunk(container, blockID, info, + ByteBuffer.wrap(chunkData), WRITE_STAGE); + } + handler.getChunkManager().finishWriteChunks(container, blockData); + blockData.setChunks(chunkList); + blockData.setBlockCommitSequenceId(i); + handler.getBlockManager().putBlock(container, blockData); + } + ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, blocks, (long) blocks * CHUNKS_PER_BLOCK); + container.markContainerForClose(); + handler.closeContainer(container); + } + + @Override + public String toString() { + return dnDetails.toString(); + } + + /** + * Returns a list of all blocks in the container sorted numerically by blockID. + * For example, the unsorted list would have the first blocks as 1, 10, 11... + * The list returned by this method would have the first blocks as 1, 2, 3... + */ + private List<BlockData> getSortedBlocks(KeyValueContainer container) throws IOException { + List<BlockData> blockDataList = handler.getBlockManager().listBlock(container, -1, 100); + blockDataList.sort(Comparator.comparingLong(BlockData::getLocalID)); + return blockDataList; + } + + /** + * Introduce corruption in the container. + * 1. Delete blocks from the container. + * 2. Corrupt chunks at an offset. + * If revers is true, the blocks and chunks are deleted in reverse order. + */ + public void introduceCorruption(long containerID, int numBlocksToDelete, int numChunksToCorrupt, boolean reverse) + throws IOException { + KeyValueContainer container = getContainer(containerID); + KeyValueContainerData containerData = container.getContainerData(); + // Simulate missing blocks + try (DBHandle handle = BlockUtils.getDB(containerData, conf); + BatchOperation batch = handle.getStore().getBatchHandler().initBatchOperation()) { + List<BlockData> blockDataList = getSortedBlocks(container); + int size = blockDataList.size(); + for (int i = 0; i < numBlocksToDelete; i++) { + BlockData blockData = reverse ? blockDataList.get(size - 1 - i) : blockDataList.get(i); + File blockFile = TestContainerCorruptions.getBlock(container, blockData.getBlockID().getLocalID()); + Assertions.assertTrue(blockFile.delete()); + handle.getStore().getBlockDataTable().deleteWithBatch(batch, + containerData.getBlockKey(blockData.getLocalID())); + log.info("Deleting block {} from container {}", blockData.getBlockID().getLocalID(), containerID); + } + handle.getStore().getBatchHandler().commitBatchOperation(batch); + // Check that the correct number of blocks were deleted. + blockDataList = getSortedBlocks(container); + assertEquals(numBlocksToDelete, size - blockDataList.size()); + } + + // Corrupt chunks at an offset. + List<BlockData> blockDataList = getSortedBlocks(container); + int size = blockDataList.size(); + for (int i = 0; i < numChunksToCorrupt; i++) { + int blockIndex = reverse ? size - 1 - (i % size) : i % size; + BlockData blockData = blockDataList.get(blockIndex); + int chunkIndex = i / size; + File blockFile = TestContainerCorruptions.getBlock(container, blockData.getBlockID().getLocalID()); + List<ContainerProtos.ChunkInfo> chunks = new ArrayList<>(blockData.getChunks()); + ContainerProtos.ChunkInfo chunkInfo = chunks.remove(chunkIndex); + corruptFileAtOffset(blockFile, chunkInfo.getOffset(), chunkInfo.getLen()); + log.info("Corrupting block {} at offset {} in container {}", blockData.getBlockID().getLocalID(), + chunkInfo.getOffset(), containerID); + } + } + + private MutableVolumeSet createVolumeSet() throws IOException { + MutableVolumeSet volumeSet = new MutableVolumeSet(dnDetails.getUuidString(), conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + createDbInstancesForTestIfNeeded(volumeSet, CLUSTER_ID, CLUSTER_ID, conf); + return volumeSet; + } + + /** + * Overwrite the file with random bytes at an offset within the given length. + */ + private static void corruptFileAtOffset(File file, long offset, long chunkLength) { + try { + final int fileLength = (int) file.length(); + assertTrue(fileLength >= offset + chunkLength); + final int chunkEnd = (int)(offset + chunkLength); + + Path path = file.toPath(); + final byte[] original = IOUtils.readFully(Files.newInputStream(path), fileLength); + + // Corrupt the last byte and middle bytes of the block. The scanner should log this as two errors. + final byte[] corruptedBytes = Arrays.copyOf(original, fileLength); + corruptedBytes[chunkEnd - 1] = (byte) (original[chunkEnd - 1] << 1); + final long chunkMid = offset + (chunkLength - offset) / 2; + corruptedBytes[(int) (chunkMid / 2)] = (byte) (original[(int) (chunkMid / 2)] << 1); + + + Files.write(path, corruptedBytes, + StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC); + + assertThat(IOUtils.readFully(Files.newInputStream(path), fileLength)) + .isEqualTo(corruptedBytes) + .isNotEqualTo(original); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + } + } + } diff --cc hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index da1bb06471,7530a33327..95ed16b2d9 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@@ -17,7 -17,7 +17,6 @@@ package org.apache.hadoop.ozone.container.keyvalue; - import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; @@@ -26,28 -26,20 +25,22 @@@ import static org.apache.hadoop.hdds.pr import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY; import static org.apache.hadoop.ozone.OzoneConsts.GB; - import static org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile; - import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto; - import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData; - import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded; +import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; - import static org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions.getBlock; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; - import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@@ -64,26 -52,13 +53,15 @@@ import java.time.Clock import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; - import java.util.HashSet; import java.util.List; - import java.util.Map; - import java.util.Optional; - import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; - import java.util.stream.Stream; import org.apache.commons.io.FileUtils; - import org.apache.commons.io.IOUtils; - import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; - import org.apache.hadoop.hdds.client.BlockID; - import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; @@@ -92,17 -67,9 +70,10 @@@ import org.apache.hadoop.hdds.protocol. import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; - import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; - import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; - import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.security.token.TokenVerifier; - import org.apache.hadoop.hdds.utils.db.BatchOperation; - import org.apache.hadoop.ozone.OzoneConsts; - import org.apache.hadoop.ozone.common.Checksum; - import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; @@@ -121,27 -85,19 +89,22 @@@ import org.apache.hadoop.ozone.containe import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; - import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; - import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; + import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; + import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; + import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.apache.hadoop.util.Sets; +import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.GenericTestUtils.LogCapturer; - import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; - import org.junit.jupiter.params.ParameterizedTest; - import org.junit.jupiter.params.provider.Arguments; - import org.junit.jupiter.params.provider.MethodSource; - import org.mockito.MockedStatic; import org.mockito.Mockito; - import org.mockito.invocation.InvocationOnMock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit tests for {@link KeyValueHandler}. @@@ -165,28 -118,8 +126,9 @@@ public class TestKeyValueHandler private HddsDispatcher dispatcher; private KeyValueHandler handler; private OzoneConfiguration conf; + private ContainerSet mockContainerSet; + private long maxContainerSize; - /** - * Number of corrupt blocks and chunks. - */ - public static Stream<Arguments> corruptionValues() { - return Stream.of( - Arguments.of(5, 0), - Arguments.of(0, 5), - Arguments.of(0, 10), - Arguments.of(10, 0), - Arguments.of(5, 10), - Arguments.of(10, 5), - Arguments.of(2, 3), - Arguments.of(3, 2), - Arguments.of(4, 6), - Arguments.of(6, 4), - Arguments.of(6, 9), - Arguments.of(9, 6) - ); - } - @BeforeEach public void setup() throws IOException { // Create mock HddsDispatcher and KeyValueHandler. @@@ -441,68 -389,6 +385,68 @@@ "Close container should return Invalid container error"); } + @Test + public void testCreateContainerWithFailure() throws Exception { + final String testDir = tempDir.toString(); + final long containerID = 1L; + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); - final ConfigurationSource conf = new OzoneConfiguration(); ++ conf = new OzoneConfiguration(); + final ContainerSet containerSet = spy(newContainerSet()); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(conf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + List<HddsVolume> hddsVolumeList = StorageVolumeUtil + .getHddsVolumesList(volumeSet.getVolumesList()); + + assertEquals(1, hddsVolumeList.size()); + + final ContainerMetrics metrics = ContainerMetrics.create(conf); + + final AtomicInteger icrReceived = new AtomicInteger(0); + + final KeyValueHandler kvHandler = new KeyValueHandler(conf, + datanodeId, containerSet, volumeSet, metrics, + c -> icrReceived.incrementAndGet(), new ContainerChecksumTreeManager(conf)); + kvHandler.setClusterID(clusterId); + + final ContainerCommandRequestProto createContainer = + createContainerRequest(datanodeId, containerID); + + Semaphore semaphore = new Semaphore(1); + doAnswer(invocation -> { + semaphore.acquire(); + throw new StorageContainerException(ContainerProtos.Result.IO_EXCEPTION); + }).when(containerSet).addContainer(any()); + + semaphore.acquire(); + CompletableFuture.runAsync(() -> + kvHandler.handleCreateContainer(createContainer, null) + ); + + // commit bytes has been allocated by volumeChoosingPolicy which is called in KeyValueContainer#create + GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == maxContainerSize, + 1000, 50000); + semaphore.release(); + + LOG.info("Committed bytes: {}", hddsVolume.getCommittedBytes()); + + // release committed bytes as exception is thrown + GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 0, + 1000, 50000); + } + @Test public void testDeleteContainer() throws IOException { final String testDir = tempDir.toString(); @@@ -597,60 -484,6 +541,60 @@@ } } + /** + * Tests that deleting a container decrements the cached used space of its volume. + */ + @Test + public void testDeleteDecrementsVolumeUsedSpace() throws IOException { + final long containerID = 1; + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + final ContainerSet containerSet = newContainerSet(); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + final HddsVolume hddsVolume = mock(HddsVolume.class); + when(hddsVolume.getDeletedContainerDir()).thenReturn(new File("")); + - final ConfigurationSource conf = new OzoneConfiguration(); ++ conf = new OzoneConfiguration(); + final ContainerMetrics metrics = ContainerMetrics.create(conf); + final AtomicInteger icrReceived = new AtomicInteger(0); + final long containerBytesUsed = 1024 * 1024; + + // We're testing KeyValueHandler in this test, all the other objects are mocked + final KeyValueHandler kvHandler = new KeyValueHandler(conf, + datanodeId, containerSet, volumeSet, metrics, + c -> icrReceived.incrementAndGet(), new ContainerChecksumTreeManager(conf)); + kvHandler.setClusterID(clusterId); + + // Setup ContainerData and Container mocks + KeyValueContainerData containerData = mock(KeyValueContainerData.class); + when(containerData.getContainerID()).thenReturn(containerID); + when(containerData.getVolume()).thenReturn(hddsVolume); + when(containerData.getBytesUsed()).thenReturn(containerBytesUsed); + when(containerData.getState()).thenReturn(ContainerProtos.ContainerDataProto.State.CLOSED); + when(containerData.isOpen()).thenReturn(false); + when(containerData.getLayoutVersion()).thenReturn(ContainerLayoutVersion.FILE_PER_BLOCK); + when(containerData.getDbFile()).thenReturn(new File(tempDir.toFile(), "dummy.db")); + when(containerData.getContainerPath()).thenReturn(tempDir.toString()); + when(containerData.getMetadataPath()).thenReturn(tempDir.toString()); + + KeyValueContainer container = mock(KeyValueContainer.class); + when(container.getContainerData()).thenReturn(containerData); + when(container.hasBlocks()).thenReturn(true); + + containerSet.addContainer(container); + assertNotNull(containerSet.getContainer(containerID)); + + // This is the method we're testing. It should decrement used space in the volume when deleting this container + kvHandler.deleteContainer(container, true); + assertNull(containerSet.getContainer(containerID)); + + // Verify ICR was sent (once for delete) + assertEquals(1, icrReceived.get(), "ICR should be sent for delete"); + verify(container, times(1)).delete(); + // Verify decrementUsedSpace was called with the correct amount + verify(hddsVolume, times(1)).decrementUsedSpace(eq(containerBytesUsed)); + } + @ContainerLayoutTestInfo.ContainerTest public void testContainerChecksumInvocation(ContainerLayoutVersion layoutVersion) throws Exception { conf = new OzoneConfiguration(); diff --cc hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java index de76af2555,6ea5ff0b12..4045b959d8 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java @@@ -60,8 -60,9 +60,10 @@@ import org.mockito.stubbing.Answer @MockitoSettings(strictness = Strictness.LENIENT) public class TestOnDemandContainerDataScanner extends TestContainerScannersAbstract { + + private OnDemandContainerDataScanner onDemandScanner; + @Override @BeforeEach public void setup() { super.setup(); diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java index 67bf1ad194,0e2baeeeb3..6cba1e8f1c --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java @@@ -185,17 -177,19 +185,17 @@@ public final class ContainerReplica imp @Override public String toString() { - return "ContainerReplica{" + - "containerID=" + containerID + - ", state=" + state + - ", datanodeDetails=" + datanodeDetails + - ", placeOfBirth=" + placeOfBirth + - ", sequenceId=" + sequenceId + - ", keyCount=" + keyCount + - ", bytesUsed=" + bytesUsed + ((replicaIndex > 0) ? - ",replicaIndex=" + replicaIndex : - "") + - ", isEmpty=" + isEmpty + - ", dataChecksum=" + dataChecksum + - '}'; + return "ContainerReplica{" + containerID + + " (" + state + + ") currentDN=" + datanodeDetails + + (originDatanodeId != null ? ", originDN=" + originDatanodeId : " (origin)") + + ", bcsid=" + sequenceId + + (replicaIndex > 0 ? ", replicaIndex=" + replicaIndex : "") + + ", keyCount=" + keyCount + + ", bytesUsed=" + bytesUsed + + ", " + (isEmpty ? "empty" : "non-empty") - + ", dataChecksum=" + dataChecksum + ++ + ", dataChecksum=" + dataChecksum + + '}'; } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
