This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 47f3f315357 HDDS-13792. Move container related metadata from Derby to
OM DB. (#9154)
47f3f315357 is described below
commit 47f3f315357a9a41724c7eb2611b0698d7e22b0c
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Wed Oct 15 19:51:20 2025 +0530
HDDS-13792. Move container related metadata from Derby to OM DB. (#9154)
---
.../src/main/proto/OmStorageProtocol.proto | 4 +
.../hadoop/ozone/recon/ReconControllerModule.java | 4 +
.../ozone/recon/api/UtilizationEndpoint.java | 76 +++++++-----
.../scm/ReconStorageContainerManagerFacade.java | 15 +--
.../spi/ReconContainerSizeMetadataManager.java | 100 +++++++++++++++
.../ReconContainerSizeMetadataManagerImpl.java | 117 ++++++++++++++++++
.../ozone/recon/spi/impl/ReconDBDefinition.java | 11 +-
.../ozone/recon/tasks/ContainerSizeCountKey.java | 83 +++++++++++++
.../ozone/recon/tasks/ContainerSizeCountTask.java | 132 +++++++-------------
.../ozone/recon/tasks/ReconTaskControllerImpl.java | 7 +-
.../hadoop/ozone/recon/ReconTestInjector.java | 5 +
.../hadoop/ozone/recon/api/TestEndpoints.java | 10 +-
.../recon/tasks/TestContainerSizeCountTask.java | 137 +++++++++++++--------
.../ozone/recon/tasks/TestEventBufferOverflow.java | 25 ++--
.../TestNSSummaryTaskControllerIntegration.java | 4 +-
.../recon/tasks/TestReconTaskControllerImpl.java | 7 +-
16 files changed, 528 insertions(+), 209 deletions(-)
diff --git
a/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto
b/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto
index ee57233f6de..a2e20c7a22a 100644
--- a/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto
+++ b/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto
@@ -73,4 +73,8 @@ message FileSizeCountKeyProto {
required string volume = 1;
required string bucket = 2;
required int64 fileSizeUpperBound = 3;
+}
+
+message ContainerSizeCountKeyProto {
+ required int64 containerSizeUpperBound = 1;
}
\ No newline at end of file
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 3f7e99056e4..e6361339314 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -51,12 +51,14 @@
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import
org.apache.hadoop.ozone.recon.spi.impl.ReconContainerMetadataManagerImpl;
+import
org.apache.hadoop.ozone.recon.spi.impl.ReconContainerSizeMetadataManagerImpl;
import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
import org.apache.hadoop.ozone.recon.spi.impl.ReconFileMetadataManagerImpl;
import org.apache.hadoop.ozone.recon.spi.impl.ReconGlobalStatsManagerImpl;
@@ -109,6 +111,8 @@ protected void configure() {
.to(ReconFileMetadataManagerImpl.class).in(Singleton.class);
bind(ReconGlobalStatsManager.class)
.to(ReconGlobalStatsManagerImpl.class).in(Singleton.class);
+ bind(ReconContainerSizeMetadataManager.class)
+ .to(ReconContainerSizeMetadataManagerImpl.class).in(Singleton.class);
bind(ReconNamespaceSummaryManager.class)
.to(ReconNamespaceSummaryManagerImpl.class).in(Singleton.class);
bind(OzoneManagerServiceProvider.class)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
index 5a19890d3dc..7912f9ed6b4 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
@@ -21,12 +21,9 @@
import static
org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_CONTAINER_SIZE;
import static
org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_FILE_SIZE;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_VOLUME;
-import static
org.apache.ozone.recon.schema.generated.tables.ContainerCountBySizeTable.CONTAINER_COUNT_BY_SIZE;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -37,14 +34,12 @@
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator;
import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountKey;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountKey;
-import org.apache.ozone.recon.schema.UtilizationSchemaDefinition;
-import
org.apache.ozone.recon.schema.generated.tables.daos.ContainerCountBySizeDao;
import
org.apache.ozone.recon.schema.generated.tables.pojos.ContainerCountBySize;
import org.apache.ozone.recon.schema.generated.tables.pojos.FileCountBySize;
-import org.jooq.DSLContext;
-import org.jooq.Record1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,19 +50,17 @@
@Produces(MediaType.APPLICATION_JSON)
public class UtilizationEndpoint {
- private UtilizationSchemaDefinition utilizationSchemaDefinition;
- private ContainerCountBySizeDao containerCountBySizeDao;
private ReconFileMetadataManager reconFileMetadataManager;
+ private ReconContainerSizeMetadataManager reconContainerSizeMetadataManager;
private static final Logger LOG = LoggerFactory
.getLogger(UtilizationEndpoint.class);
@Inject
- public UtilizationEndpoint(ContainerCountBySizeDao containerCountBySizeDao,
- UtilizationSchemaDefinition
utilizationSchemaDefinition,
- ReconFileMetadataManager
reconFileMetadataManager) {
- this.utilizationSchemaDefinition = utilizationSchemaDefinition;
- this.containerCountBySizeDao = containerCountBySizeDao;
+ public UtilizationEndpoint(
+ ReconFileMetadataManager reconFileMetadataManager,
+ ReconContainerSizeMetadataManager reconContainerSizeMetadataManager) {
this.reconFileMetadataManager = reconFileMetadataManager;
+ this.reconContainerSizeMetadataManager = reconContainerSizeMetadataManager;
}
/**
@@ -136,7 +129,7 @@ public Response getFileCounts(
}
/**
- * Return the container size counts from Recon DB.
+ * Return the container size counts from RocksDB.
*
* @return {@link Response}
*/
@@ -145,31 +138,46 @@ public Response getFileCounts(
public Response getContainerCounts(
@QueryParam(RECON_QUERY_CONTAINER_SIZE)
long upperBound) {
- DSLContext dslContext = utilizationSchemaDefinition.getDSLContext();
- Long containerSizeUpperBound =
- ReconUtils.getContainerSizeUpperBound(upperBound);
- List<ContainerCountBySize> resultSet;
+ List<ContainerCountBySize> resultSet = new ArrayList<>();
try {
+ Table<ContainerSizeCountKey, Long> containerCountTable =
+ reconContainerSizeMetadataManager.getContainerCountTable();
+
+ Long containerSizeUpperBound =
+ ReconUtils.getContainerSizeUpperBound(upperBound);
+
if (upperBound > 0) {
- // Get the current count from database and update
- Record1<Long> recordToFind =
- dslContext.newRecord(
- CONTAINER_COUNT_BY_SIZE.CONTAINER_SIZE)
- .value1(containerSizeUpperBound);
- ContainerCountBySize record =
- containerCountBySizeDao.findById(recordToFind.value1());
- resultSet = record != null ?
- Collections.singletonList(record) : Collections.emptyList();
+ // Query for specific container size
+ ContainerSizeCountKey key =
+ new ContainerSizeCountKey(containerSizeUpperBound);
+ Long count = containerCountTable.get(key);
+ if (count != null && count > 0) {
+ ContainerCountBySize record = new ContainerCountBySize();
+ record.setContainerSize(containerSizeUpperBound);
+ record.setCount(count);
+ resultSet.add(record);
+ }
} else {
- // fetch all records having values greater than zero
- resultSet = containerCountBySizeDao.findAll().stream()
- .filter(record -> record.getCount() > 0)
- .collect(Collectors.toList());
+ // Iterate through all records
+ try (KeyValueIterator<ContainerSizeCountKey, Long> iterator =
+ containerCountTable.iterator()) {
+ while (iterator.hasNext()) {
+ Table.KeyValue<ContainerSizeCountKey, Long> entry =
iterator.next();
+ ContainerSizeCountKey key = entry.getKey();
+ Long count = entry.getValue();
+
+ if (count != null && count > 0) {
+ ContainerCountBySize record = new ContainerCountBySize();
+ record.setContainerSize(key.getContainerSizeUpperBound());
+ record.setCount(count);
+ resultSet.add(record);
+ }
+ }
+ }
}
return Response.ok(resultSet).build();
} catch (Exception e) {
- // Log the exception and return a server error response
- LOG.error("Error retrieving container counts", e);
+ LOG.error("Error retrieving container counts from RocksDB", e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index b6b7e3cf5b4..d01964311d4 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -117,12 +117,11 @@
import org.apache.hadoop.ozone.recon.fsck.ReconSafeModeMgrTask;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import
org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
-import org.apache.ozone.recon.schema.UtilizationSchemaDefinition;
-import
org.apache.ozone.recon.schema.generated.tables.daos.ContainerCountBySizeDao;
import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -165,7 +164,6 @@ public class ReconStorageContainerManagerFacade
private ReconSafeModeManager safeModeManager;
private ReconSafeModeMgrTask reconSafeModeMgrTask;
private ContainerSizeCountTask containerSizeCountTask;
- private ContainerCountBySizeDao containerCountBySizeDao;
private AtomicBoolean isSyncDataFromSCMRunning;
private final String threadNamePrefix;
@@ -175,8 +173,7 @@ public class ReconStorageContainerManagerFacade
@SuppressWarnings({"checkstyle:ParameterNumber", "checkstyle:MethodLength"})
public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
StorageContainerServiceProvider
scmServiceProvider,
- ContainerCountBySizeDao
containerCountBySizeDao,
- UtilizationSchemaDefinition
utilizationSchemaDefinition,
+ ReconContainerSizeMetadataManager
reconContainerSizeMetadataManager,
ContainerHealthSchemaManager
containerHealthSchemaManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
ReconUtils reconUtils,
@@ -249,7 +246,6 @@ public
ReconStorageContainerManagerFacade(OzoneConfiguration conf,
scmhaManager, sequenceIdGen, pendingOps);
this.scmServiceProvider = scmServiceProvider;
this.isSyncDataFromSCMRunning = new AtomicBoolean();
- this.containerCountBySizeDao = containerCountBySizeDao;
NodeReportHandler nodeReportHandler =
new NodeReportHandler(nodeManager);
@@ -270,7 +266,7 @@ public
ReconStorageContainerManagerFacade(OzoneConfiguration conf,
reconTaskConfig, reconContainerMetadataManager, conf,
taskStatusUpdaterManager);
this.containerSizeCountTask = new ContainerSizeCountTask(containerManager,
- reconTaskConfig, containerCountBySizeDao, utilizationSchemaDefinition,
taskStatusUpdaterManager);
+ reconTaskConfig, reconContainerSizeMetadataManager,
taskStatusUpdaterManager);
this.dataSource = dataSource;
@@ -715,11 +711,6 @@ public ContainerHealthTask getContainerHealthTask() {
return containerHealthTask;
}
- @VisibleForTesting
- public ContainerCountBySizeDao getContainerCountBySizeDao() {
- return containerCountBySizeDao;
- }
-
public ReconContext getReconContext() {
return reconContext;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerSizeMetadataManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerSizeMetadataManager.java
new file mode 100644
index 00000000000..c4c58008334
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerSizeMetadataManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi;
+
+import java.io.IOException;
+import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
+import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountKey;
+
+/**
+ * The Recon Container Size Metadata DB Service interface for container size
counts.
+ */
[email protected]
+public interface ReconContainerSizeMetadataManager {
+
+ /**
+ * Returns staged DB container size metadata manager.
+ *
+ * @param stagedReconDbStore staged Recon DB store
+ * @return ReconContainerSizeMetadataManager
+ */
+ ReconContainerSizeMetadataManager getStagedReconContainerSizeMetadataManager(
+ DBStore stagedReconDbStore);
+
+ /**
+ * reinitialize the ReconContainerSizeMetadataManager.
+ *
+ * @param reconDBProvider recon DB provider to reinitialize with.
+ */
+ void reinitialize(ReconDBProvider reconDBProvider);
+
+ /**
+ * Store the container size count mapping into a batch.
+ *
+ * @param batch the batch operation we store into
+ * @param containerSizeCountKey the container size count key.
+ * @param count Count of containers with that size range.
+ */
+ void batchStoreContainerSizeCount(BatchOperation batch,
+ ContainerSizeCountKey
containerSizeCountKey,
+ Long count) throws IOException;
+
+ /**
+ * Delete container size count mapping from a batch.
+ *
+ * @param batch the batch operation we add the deletion to
+ * @param containerSizeCountKey the container size count key to be deleted.
+ */
+ void batchDeleteContainerSizeCount(BatchOperation batch,
+ ContainerSizeCountKey
containerSizeCountKey)
+ throws IOException;
+
+ /**
+ * Get the stored container size count for the given key.
+ *
+ * @param containerSizeCountKey the container size count key.
+ * @return count of containers with that size range.
+ */
+ Long getContainerSizeCount(ContainerSizeCountKey containerSizeCountKey)
+ throws IOException;
+
+ /**
+ * Get the entire containerCountTable.
+ * @return containerCountTable
+ */
+ Table<ContainerSizeCountKey, Long> getContainerCountTable();
+
+ /**
+ * Commit a batch operation into the containerSizeMetadataDbStore.
+ *
+ * @param rdbBatchOperation batch operation we want to commit
+ */
+ void commitBatchOperation(RDBBatchOperation rdbBatchOperation)
+ throws IOException;
+
+ /**
+ * Clear all container size count data from the table.
+ * This method is used during reprocess operations.
+ */
+ void clearContainerCountTable() throws IOException;
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerSizeMetadataManagerImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerSizeMetadataManagerImpl.java
new file mode 100644
index 00000000000..6d580cc92bd
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerSizeMetadataManagerImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import static
org.apache.hadoop.ozone.recon.spi.impl.ReconDBDefinition.CONTAINER_COUNT_BY_SIZE;
+import static
org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider.truncateTable;
+
+import java.io.IOException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the Recon Container Size Metadata DB Service.
+ */
+@Singleton
+public class ReconContainerSizeMetadataManagerImpl
+ implements ReconContainerSizeMetadataManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReconContainerSizeMetadataManagerImpl.class);
+
+ private Table<ContainerSizeCountKey, Long> containerCountTable;
+ private DBStore containerSizeMetadataDbStore;
+
+ @Inject
+ public ReconContainerSizeMetadataManagerImpl(ReconDBProvider
reconDBProvider) {
+ this(reconDBProvider.getDbStore());
+ }
+
+ private ReconContainerSizeMetadataManagerImpl(DBStore reconDBStore) {
+ containerSizeMetadataDbStore = reconDBStore;
+ initializeTables();
+ }
+
+ @Override
+ public ReconContainerSizeMetadataManager
getStagedReconContainerSizeMetadataManager(
+ DBStore stagedReconDbStore) {
+ return new ReconContainerSizeMetadataManagerImpl(stagedReconDbStore);
+ }
+
+ @Override
+ public void reinitialize(ReconDBProvider reconDBProvider) {
+ containerSizeMetadataDbStore = reconDBProvider.getDbStore();
+ initializeTables();
+ }
+
+ /**
+ * Initialize the container size metadata DB tables.
+ */
+ private void initializeTables() {
+ try {
+ this.containerCountTable =
CONTAINER_COUNT_BY_SIZE.getTable(containerSizeMetadataDbStore);
+ } catch (IOException e) {
+ LOG.error("Unable to create Container Size Count table.", e);
+ }
+ }
+
+ @Override
+ public void batchStoreContainerSizeCount(BatchOperation batch,
+ ContainerSizeCountKey
containerSizeCountKey,
+ Long count) throws IOException {
+ containerCountTable.putWithBatch(batch, containerSizeCountKey, count);
+ }
+
+ @Override
+ public void batchDeleteContainerSizeCount(BatchOperation batch,
+ ContainerSizeCountKey
containerSizeCountKey)
+ throws IOException {
+ containerCountTable.deleteWithBatch(batch, containerSizeCountKey);
+ }
+
+ @Override
+ public Long getContainerSizeCount(ContainerSizeCountKey
containerSizeCountKey)
+ throws IOException {
+ return containerCountTable.get(containerSizeCountKey);
+ }
+
+ @Override
+ public Table<ContainerSizeCountKey, Long> getContainerCountTable() {
+ return containerCountTable;
+ }
+
+ @Override
+ public void commitBatchOperation(RDBBatchOperation rdbBatchOperation)
+ throws IOException {
+ containerSizeMetadataDbStore.commitBatchOperation(rdbBatchOperation);
+ }
+
+ @Override
+ public void clearContainerCountTable() throws IOException {
+ truncateTable(containerCountTable);
+ LOG.info("Successfully cleared container count table");
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBDefinition.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBDefinition.java
index 3fe489da79d..f06a5b7d90d 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBDefinition.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBDefinition.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.ozone.recon.api.types.NSSummary;
import org.apache.hadoop.ozone.recon.codec.NSSummaryCodec;
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistoryList;
+import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountKey;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountKey;
import org.apache.hadoop.ozone.recon.tasks.GlobalStatsValue;
@@ -95,6 +96,13 @@ public class ReconDBDefinition extends DBDefinition.WithMap {
StringCodec.get(),
GlobalStatsValue.getCodec());
+ public static final DBColumnFamilyDefinition<ContainerSizeCountKey, Long>
+ CONTAINER_COUNT_BY_SIZE =
+ new DBColumnFamilyDefinition<>(
+ "containerCountBySizeTable",
+ ContainerSizeCountKey.getCodec(),
+ LongCodec.get());
+
private static final Map<String, DBColumnFamilyDefinition<?, ?>>
COLUMN_FAMILIES = DBColumnFamilyDefinition.newUnmodifiableMap(
CONTAINER_KEY,
@@ -104,7 +112,8 @@ public class ReconDBDefinition extends DBDefinition.WithMap
{
REPLICA_HISTORY,
REPLICA_HISTORY_V2,
FILE_COUNT_BY_SIZE,
- GLOBAL_STATS);
+ GLOBAL_STATS,
+ CONTAINER_COUNT_BY_SIZE);
public ReconDBDefinition(String dbName) {
super(COLUMN_FAMILIES);
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountKey.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountKey.java
new file mode 100644
index 00000000000..8f840dba2e9
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountKey.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
+import org.apache.hadoop.hdds.utils.db.Proto2Codec;
+import
org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.ContainerSizeCountKeyProto;
+
+/**
+ * Key class used for grouping container size counts in RocksDB storage.
+ * Represents a key of containerSizeUpperBound for CONTAINER_COUNT_BY_SIZE
column family.
+ */
+public class ContainerSizeCountKey {
+ private static final Codec<ContainerSizeCountKey> CODEC = new
DelegatedCodec<>(
+ Proto2Codec.get(ContainerSizeCountKeyProto.getDefaultInstance()),
+ ContainerSizeCountKey::fromProto,
+ ContainerSizeCountKey::toProto,
+ ContainerSizeCountKey.class);
+
+ private final Long containerSizeUpperBound;
+
+ public ContainerSizeCountKey(Long containerSizeUpperBound) {
+ this.containerSizeUpperBound = containerSizeUpperBound;
+ }
+
+ public static Codec<ContainerSizeCountKey> getCodec() {
+ return CODEC;
+ }
+
+ public Long getContainerSizeUpperBound() {
+ return containerSizeUpperBound;
+ }
+
+ public ContainerSizeCountKeyProto toProto() {
+ return ContainerSizeCountKeyProto.newBuilder()
+ .setContainerSizeUpperBound(containerSizeUpperBound)
+ .build();
+ }
+
+ public static ContainerSizeCountKey fromProto(ContainerSizeCountKeyProto
proto) {
+ return new ContainerSizeCountKey(proto.getContainerSizeUpperBound());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ContainerSizeCountKey that = (ContainerSizeCountKey) o;
+ return containerSizeUpperBound.equals(that.containerSizeUpperBound);
+ }
+
+ @Override
+ public int hashCode() {
+ return containerSizeUpperBound.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerSizeCountKey{" +
+ "containerSizeUpperBound=" + containerSizeUpperBound +
+ '}';
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java
index 2a343af5d6e..826df60208a 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java
@@ -18,10 +18,9 @@
package org.apache.hadoop.ozone.recon.tasks;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
-import static
org.apache.ozone.recon.schema.generated.tables.ContainerCountBySizeTable.CONTAINER_COUNT_BY_SIZE;
import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,15 +29,12 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.scm.ReconScmTask;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import
org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
-import org.apache.ozone.recon.schema.UtilizationSchemaDefinition;
-import
org.apache.ozone.recon.schema.generated.tables.daos.ContainerCountBySizeDao;
-import
org.apache.ozone.recon.schema.generated.tables.pojos.ContainerCountBySize;
-import org.jooq.DSLContext;
-import org.jooq.Record1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +50,7 @@ public class ContainerSizeCountTask extends ReconScmTask {
private ContainerManager containerManager;
private final long interval;
- private ContainerCountBySizeDao containerCountBySizeDao;
- private DSLContext dslContext;
+ private ReconContainerSizeMetadataManager reconContainerSizeMetadataManager;
private HashMap<ContainerID, Long> processedContainers = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock(true);
private final ReconTaskStatusUpdater taskStatusUpdater;
@@ -63,13 +58,11 @@ public class ContainerSizeCountTask extends ReconScmTask {
public ContainerSizeCountTask(
ContainerManager containerManager,
ReconTaskConfig reconTaskConfig,
- ContainerCountBySizeDao containerCountBySizeDao,
- UtilizationSchemaDefinition utilizationSchemaDefinition,
+ ReconContainerSizeMetadataManager reconContainerSizeMetadataManager,
ReconTaskStatusUpdaterManager taskStatusUpdaterManager) {
super(taskStatusUpdaterManager);
this.containerManager = containerManager;
- this.containerCountBySizeDao = containerCountBySizeDao;
- this.dslContext = utilizationSchemaDefinition.getDSLContext();
+ this.reconContainerSizeMetadataManager = reconContainerSizeMetadataManager;
interval = reconTaskConfig.getContainerSizeCountTaskInterval().toMillis();
this.taskStatusUpdater = getTaskStatusUpdater();
}
@@ -129,8 +122,9 @@ private void process(ContainerInfo container,
protected void runTask() throws Exception {
final List<ContainerInfo> containers = containerManager.getContainers();
if (processedContainers.isEmpty()) {
- int execute = dslContext.truncate(CONTAINER_COUNT_BY_SIZE).execute();
- LOG.debug("Deleted {} records from {}", execute,
CONTAINER_COUNT_BY_SIZE);
+ // Clear RocksDB table instead of truncating Derby
+ reconContainerSizeMetadataManager.clearContainerCountTable();
+ LOG.debug("Cleared container count table in RocksDB");
}
processContainers(containers);
}
@@ -201,20 +195,14 @@ public void processContainers(List<ContainerInfo>
containers) {
}
/**
- * Populate DB with the counts of container sizes calculated
- * using the dao.
+ * Populate RocksDB with the counts of container sizes using batch
operations.
* <p>
- * The writeCountsToDB function updates the database with the count of
- * container sizes. It does this by creating two lists of records to be
- * inserted or updated in the database. It iterates over the keys of the
- * containerSizeCountMap and creates a new record for each key. It then
- * checks whether the database has been truncated or not. If it has not been
- * truncated, it attempts to find the current count for the container size
- * in the database and either inserts a new record or updates the current
- * record with the updated count. If the database has been truncated,
- * it only inserts a new record if the count is non-zero. Finally, it
- * uses the containerCountBySizeDao to insert the new records and update
- * the existing records in the database.
+ * The writeCountsToDB function updates RocksDB with the count of
+ * container sizes. It uses batch operations for atomic writes. If the
database
+ * has not been truncated, it reads the current count from RocksDB, adds the
+ * delta, and either updates the entry (if new count > 0) or deletes it
+ * (if new count = 0). If the database has been truncated, it only inserts
+ * entries with non-zero counts.
*
* @param isDbTruncated that checks if the database has been truncated or
not.
* @param containerSizeCountMap stores counts of container sizes
@@ -222,36 +210,37 @@ public void processContainers(List<ContainerInfo>
containers) {
private void writeCountsToDB(boolean isDbTruncated,
Map<ContainerSizeCountKey, Long>
containerSizeCountMap) {
- List<ContainerCountBySize> insertToDb = new ArrayList<>();
- List<ContainerCountBySize> updateInDb = new ArrayList<>();
-
- containerSizeCountMap.keySet().forEach((ContainerSizeCountKey key) -> {
- ContainerCountBySize newRecord = new ContainerCountBySize();
- newRecord.setContainerSize(key.containerSizeUpperBound);
- newRecord.setCount(containerSizeCountMap.get(key));
- if (!isDbTruncated) {
- // Get the current count from database and update
- Record1<Long> recordToFind =
- dslContext.newRecord(
- CONTAINER_COUNT_BY_SIZE.CONTAINER_SIZE)
- .value1(key.containerSizeUpperBound);
- ContainerCountBySize containerCountRecord =
- containerCountBySizeDao.findById(recordToFind.value1());
- if (containerCountRecord == null && newRecord.getCount() > 0L) {
- // insert new row only for non-zero counts.
- insertToDb.add(newRecord);
- } else if (containerCountRecord != null) {
- newRecord.setCount(containerCountRecord.getCount() +
- containerSizeCountMap.get(key));
- updateInDb.add(newRecord);
+ try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+ for (Map.Entry<ContainerSizeCountKey, Long> entry :
+ containerSizeCountMap.entrySet()) {
+ ContainerSizeCountKey key = entry.getKey();
+ Long delta = entry.getValue();
+
+ if (!isDbTruncated) {
+ // Get current count from RocksDB
+ Long currentCount = reconContainerSizeMetadataManager
+ .getContainerSizeCount(key);
+ long newCount = (currentCount != null ? currentCount : 0L) + delta;
+
+ if (newCount > 0) {
+ reconContainerSizeMetadataManager.batchStoreContainerSizeCount(
+ rdbBatchOperation, key, newCount);
+ } else if (newCount == 0 && currentCount != null) {
+ // Delete the entry if count reaches zero
+ reconContainerSizeMetadataManager.batchDeleteContainerSizeCount(
+ rdbBatchOperation, key);
+ }
+ } else if (delta > 0) {
+ // After truncate, just insert non-zero counts
+ reconContainerSizeMetadataManager.batchStoreContainerSizeCount(
+ rdbBatchOperation, key, delta);
}
- } else if (newRecord.getCount() > 0) {
- // insert new row only for non-zero counts.
- insertToDb.add(newRecord);
}
- });
- containerCountBySizeDao.insert(insertToDb);
- containerCountBySizeDao.update(updateInDb);
+
reconContainerSizeMetadataManager.commitBatchOperation(rdbBatchOperation);
+ } catch (IOException e) {
+ LOG.error("Failed to write container size counts to RocksDB", e);
+ throw new RuntimeException(e);
+ }
}
/**
@@ -353,35 +342,4 @@ private static ContainerSizeCountKey
getContainerSizeCountKey(
ReconUtils.getContainerSizeUpperBound(containerSize));
}
- /**
- * The ContainerSizeCountKey class is a simple key class that has a single
- * field, containerSizeUpperBound, which is a Long representing the upper
- * bound of the container size range.
- */
- private static class ContainerSizeCountKey {
-
- private Long containerSizeUpperBound;
-
- ContainerSizeCountKey(
- Long containerSizeUpperBound) {
- this.containerSizeUpperBound = containerSizeUpperBound;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof ContainerSizeCountKey) {
- ContainerSizeCountTask.ContainerSizeCountKey
- s = (ContainerSizeCountTask.ContainerSizeCountKey) obj;
- return
- containerSizeUpperBound.equals(s.containerSizeUpperBound);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return (containerSizeUpperBound).hashCode();
- }
- }
-
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index f9b3d40d118..628404f7795 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.ozone.recon.metrics.ReconTaskMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
@@ -79,6 +80,7 @@ public class ReconTaskControllerImpl implements
ReconTaskController {
private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
private final ReconGlobalStatsManager reconGlobalStatsManager;
private final ReconFileMetadataManager reconFileMetadataManager;
+ private final ReconContainerSizeMetadataManager
reconContainerSizeMetadataManager;
private Map<String, ReconOmTask> reconOmTasks;
private ExecutorService executorService;
@@ -109,13 +111,15 @@ public ReconTaskControllerImpl(OzoneConfiguration
configuration,
ReconContainerMetadataManager
reconContainerMetadataManager,
ReconNamespaceSummaryManager
reconNamespaceSummaryManager,
ReconGlobalStatsManager
reconGlobalStatsManager,
- ReconFileMetadataManager
reconFileMetadataManager) {
+ ReconFileMetadataManager
reconFileMetadataManager,
+ ReconContainerSizeMetadataManager
reconContainerSizeMetadataManager) {
this.configuration = configuration;
this.reconDBProvider = reconDBProvider;
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
this.reconGlobalStatsManager = reconGlobalStatsManager;
this.reconFileMetadataManager = reconFileMetadataManager;
+ this.reconContainerSizeMetadataManager = reconContainerSizeMetadataManager;
reconOmTasks = new HashMap<>();
threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
@@ -278,6 +282,7 @@ public synchronized boolean
reInitializeTasks(ReconOMMetadataManager omMetadataM
reconContainerMetadataManager.reinitialize(reconDBProvider);
reconGlobalStatsManager.reinitialize(reconDBProvider);
reconFileMetadataManager.reinitialize(reconDBProvider);
+ reconContainerSizeMetadataManager.reinitialize(reconDBProvider);
recordAllTaskStatus(localReconOmTaskMap, 0,
omMetadataManager.getLastSequenceNumberFromDB());
// Track reprocess success
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/ReconTestInjector.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/ReconTestInjector.java
index e93e4d71f52..d14bc87baa5 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/ReconTestInjector.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/ReconTestInjector.java
@@ -43,10 +43,12 @@
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
import
org.apache.hadoop.ozone.recon.spi.impl.ReconContainerMetadataManagerImpl;
+import
org.apache.hadoop.ozone.recon.spi.impl.ReconContainerSizeMetadataManagerImpl;
import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
import org.apache.hadoop.ozone.recon.spi.impl.ReconFileMetadataManagerImpl;
import org.apache.hadoop.ozone.recon.spi.impl.ReconGlobalStatsManagerImpl;
@@ -183,6 +185,9 @@ protected void configure() {
bind(ReconGlobalStatsManager.class)
.to(ReconGlobalStatsManagerImpl.class)
.in(Singleton.class);
+ bind(ReconContainerSizeMetadataManager.class)
+ .to(ReconContainerSizeMetadataManagerImpl.class)
+ .in(Singleton.class);
bind(ReconNamespaceSummaryManager.class)
.to(ReconNamespaceSummaryManagerImpl.class)
.in(Singleton.class);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index d0a8fed15ed..f233c351887 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -139,8 +139,6 @@
import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.ozone.recon.schema.UtilizationSchemaDefinition;
-import
org.apache.ozone.recon.schema.generated.tables.daos.ContainerCountBySizeDao;
import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
import
org.apache.ozone.recon.schema.generated.tables.pojos.ContainerCountBySize;
import org.apache.ozone.recon.schema.generated.tables.pojos.FileCountBySize;
@@ -294,16 +292,10 @@ private void initializeInjector() throws Exception {
pipelineEndpoint = reconTestInjector.getInstance(PipelineEndpoint.class);
volumeEndpoint = reconTestInjector.getInstance(VolumeEndpoint.class);
bucketEndpoint = reconTestInjector.getInstance(BucketEndpoint.class);
- ContainerCountBySizeDao containerCountBySizeDao =
reconScm.getContainerCountBySizeDao();
GlobalStatsDao globalStatsDao = getDao(GlobalStatsDao.class);
- UtilizationSchemaDefinition utilizationSchemaDefinition =
- getSchemaDefinition(UtilizationSchemaDefinition.class);
reconFileMetadataManager =
reconTestInjector.getInstance(ReconFileMetadataManager.class);
ReconGlobalStatsManager reconGlobalStatsManager =
reconTestInjector.getInstance(ReconGlobalStatsManager.class);
- utilizationEndpoint = new UtilizationEndpoint(
- containerCountBySizeDao,
- utilizationSchemaDefinition,
- reconFileMetadataManager);
+ utilizationEndpoint =
reconTestInjector.getInstance(UtilizationEndpoint.class);
OzoneConfiguration configuration =
reconTestInjector.getInstance(OzoneConfiguration.class);
fileSizeCountTaskFSO =
new FileSizeCountTaskFSO(reconFileMetadataManager, configuration);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java
index de00a73af7c..f39db6270f2 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java
@@ -22,66 +22,80 @@
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED;
-import static
org.apache.ozone.recon.schema.generated.tables.ContainerCountBySizeTable.CONTAINER_COUNT_BY_SIZE;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.recon.ReconTestInjector;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import
org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
-import org.apache.ozone.recon.schema.UtilizationSchemaDefinition;
-import
org.apache.ozone.recon.schema.generated.tables.daos.ContainerCountBySizeDao;
-import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao;
-import org.jooq.DSLContext;
-import org.jooq.Record1;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
/**
* Class to test the process method of ContainerSizeCountTask.
*/
-public class TestContainerSizeCountTask extends AbstractReconSqlDBTest {
+public class TestContainerSizeCountTask {
- private ContainerCountBySizeDao containerCountBySizeDao;
- private ContainerSizeCountTask task;
- private DSLContext dslContext;
+ @TempDir
+ private static java.nio.file.Path temporaryFolder;
- public TestContainerSizeCountTask() {
- super();
+ private static ReconContainerSizeMetadataManager
reconContainerSizeMetadataManager;
+ private ContainerSizeCountTask task;
+ private static Table<ContainerSizeCountKey, Long> containerCountTable;
+
+ @BeforeAll
+ public static void setupOnce() throws Exception {
+ ReconOMMetadataManager reconOMMetadataManager =
getTestReconOmMetadataManager(
+ initializeNewOmMetadataManager(Files.createDirectory(
+ temporaryFolder.resolve("JunitOmDBDir")).toFile()),
+ Files.createDirectory(temporaryFolder.resolve("NewDir")).toFile());
+ ReconTestInjector reconTestInjector = new
ReconTestInjector.Builder(temporaryFolder.toFile())
+ .withReconSqlDb()
+ .withReconOm(reconOMMetadataManager)
+ .withContainerDB()
+ .build();
+ reconContainerSizeMetadataManager = reconTestInjector.getInstance(
+ ReconContainerSizeMetadataManager.class);
+ containerCountTable =
reconContainerSizeMetadataManager.getContainerCountTable();
}
@BeforeEach
- public void setUp() {
- UtilizationSchemaDefinition utilizationSchemaDefinition =
getSchemaDefinition(UtilizationSchemaDefinition.class);
- dslContext = utilizationSchemaDefinition.getDSLContext();
- containerCountBySizeDao = getDao(ContainerCountBySizeDao.class);
+ public void setUp() throws Exception {
ReconTaskConfig reconTaskConfig = new ReconTaskConfig();
reconTaskConfig.setContainerSizeCountTaskInterval(Duration.ofSeconds(1));
ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManager =
mock(ReconTaskStatusUpdaterManager.class);
-
when(reconTaskStatusUpdaterManager.getTaskStatusUpdater(anyString())).thenReturn(new
ReconTaskStatusUpdater(
- getDao(ReconTaskStatusDao.class), "mockedTask-" +
System.currentTimeMillis()));
+
when(reconTaskStatusUpdaterManager.getTaskStatusUpdater(anyString())).thenReturn(
+ mock(ReconTaskStatusUpdater.class));
ContainerManager containerManager = mock(ContainerManager.class);
task = new ContainerSizeCountTask(
containerManager,
reconTaskConfig,
- containerCountBySizeDao,
- utilizationSchemaDefinition,
+ reconContainerSizeMetadataManager,
reconTaskStatusUpdaterManager);
- // Truncate table before running each test
- dslContext.truncate(CONTAINER_COUNT_BY_SIZE);
+ // Clear table before running each test
+ reconContainerSizeMetadataManager.clearContainerCountTable();
}
@Test
- public void testProcess() {
+ public void testProcess() throws IOException {
// mock a container with invalid used bytes
ContainerInfo omContainerInfo0 = mock(ContainerInfo.class);
given(omContainerInfo0.containerID()).willReturn(ContainerID.valueOf(0));
@@ -108,23 +122,26 @@ public void testProcess() {
task.processContainers(containers);
// Verify 3 containers are in correct bins.
- assertEquals(3, containerCountBySizeDao.count());
+ // Note: getEstimatedKeyCount() may be inaccurate in RocksDB, so we count
actual entries
+ int firstCount = 0;
+ try
(org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator<ContainerSizeCountKey,
Long> iterator =
+ containerCountTable.iterator()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ firstCount++;
+ }
+ }
+ assertEquals(3, firstCount);
// container size upper bound for
// 1500000000L (1.5GB) is 2147483648L = 2^31 = 2GB (next highest power of
2)
- Record1<Long> recordToFind =
- dslContext.newRecord(
- CONTAINER_COUNT_BY_SIZE.CONTAINER_SIZE)
- .value1(2147483648L);
- assertEquals(1L,
- containerCountBySizeDao.findById(recordToFind.value1()).getCount()
- .longValue());
+ ContainerSizeCountKey key1 = new ContainerSizeCountKey(2147483648L);
+ assertEquals(1L, containerCountTable.get(key1).longValue());
+
// container size upper bound for
// 2500000000L (2.5GB) is 4294967296L = 2^32 = 4GB (next highest power of
2)
- recordToFind.value1(4294967296L);
- assertEquals(1L,
- containerCountBySizeDao.findById(recordToFind.value1()).getCount()
- .longValue());
+ ContainerSizeCountKey key2 = new ContainerSizeCountKey(4294967296L);
+ assertEquals(1L, containerCountTable.get(key2).longValue());
// Add a new container
ContainerInfo omContainerInfo3 = mock(ContainerInfo.class);
@@ -140,36 +157,39 @@ public void testProcess() {
task.processContainers(containers);
// Total size groups added to the database
- assertEquals(5, containerCountBySizeDao.count());
+ // After migration to RocksDB, entries with count=0 are deleted, so we
expect 4 entries
+ // Note: getEstimatedKeyCount() may be inaccurate in RocksDB, so we count
actual entries
+ int actualCount = 0;
+ try
(org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator<ContainerSizeCountKey,
Long> iterator =
+ containerCountTable.iterator()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ actualCount++;
+ }
+ }
+ assertEquals(4, actualCount);
// Check whether container size upper bound for
// 50000L is 536870912L = 2^29 = 512MB (next highest power of 2)
- recordToFind.value1(536870912L);
- assertEquals(1, containerCountBySizeDao
- .findById(recordToFind.value1())
- .getCount()
- .longValue());
+ ContainerSizeCountKey key3 = new ContainerSizeCountKey(536870912L);
+ assertEquals(1L, containerCountTable.get(key3).longValue());
// Check whether container size of 1000000000L has been successfully
updated
// The previous value upperbound was 4294967296L which is no longer there
- recordToFind.value1(4294967296L);
- assertEquals(0, containerCountBySizeDao
- .findById(recordToFind.value1())
- .getCount()
- .longValue());
+ ContainerSizeCountKey key4 = new ContainerSizeCountKey(4294967296L);
+ Long count = containerCountTable.get(key4);
+ assertEquals(0L, count == null ? 0L : count.longValue());
// Remove the container having size 1.5GB and upperbound 2147483648L
containers.remove(omContainerInfo1);
task.processContainers(containers);
- recordToFind.value1(2147483648L);
- assertEquals(0, containerCountBySizeDao
- .findById(recordToFind.value1())
- .getCount()
- .longValue());
+ ContainerSizeCountKey key5 = new ContainerSizeCountKey(2147483648L);
+ Long count2 = containerCountTable.get(key5);
+ assertEquals(0L, count2 == null ? 0L : count2.longValue());
}
@Test
- public void testProcessDeletedAndNegativeSizedContainers() {
+ public void testProcessDeletedAndNegativeSizedContainers() throws
IOException {
// Create a list of containers, including one that is deleted
ContainerInfo omContainerInfo1 = mock(ContainerInfo.class);
given(omContainerInfo1.containerID()).willReturn(ContainerID.valueOf(1));
@@ -219,7 +239,16 @@ public void testProcessDeletedAndNegativeSizedContainers()
{
task.processContainers(containers);
// Verify that only the valid containers are counted
- assertEquals(3, containerCountBySizeDao.count());
+ // Note: getEstimatedKeyCount() may be inaccurate in RocksDB, so we count
actual entries
+ int count = 0;
+ try
(org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator<ContainerSizeCountKey,
Long> iterator =
+ containerCountTable.iterator()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ count++;
+ }
+ }
+ assertEquals(3, count);
}
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestEventBufferOverflow.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestEventBufferOverflow.java
index 77fc6cdbb07..adfbbc67b6c 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestEventBufferOverflow.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestEventBufferOverflow.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
@@ -95,7 +96,8 @@ public void testBufferOverflowAndAsyncReinitialization()
throws Exception {
ReconTaskControllerImpl reconTaskController = new ReconTaskControllerImpl(
ozoneConfiguration, new HashSet<>(), reconTaskStatusUpdaterManagerMock,
reconDbProvider, reconContainerMgr, nsSummaryManager,
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
// Register a mock task for reinitialization
CountDownLatch reinitLatch = new CountDownLatch(1);
@@ -178,7 +180,8 @@ public void testDirectReInitializationEventQueueing()
throws Exception {
config, new HashSet<>(), taskStatusUpdaterManager,
reconDbProvider, mock(ReconContainerMetadataManager.class),
mock(ReconNamespaceSummaryManager.class),
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
// Set up properly mocked ReconOMMetadataManager with required dependencies
ReconOMMetadataManager mockOMMetadataManager =
mock(ReconOMMetadataManager.class);
@@ -248,7 +251,8 @@ public void
testCompleteBufferOverflowAndReInitializationCycle() throws Exceptio
ReconTaskControllerImpl reconTaskController = new ReconTaskControllerImpl(
config, new HashSet<>(), taskStatusUpdaterManager,
reconDbProvider, reconContainerMgr, nsSummaryManager,
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
// Set up properly mocked ReconOMMetadataManager with required dependencies
ReconOMMetadataManager mockOMMetadataManager =
mock(ReconOMMetadataManager.class);
@@ -391,7 +395,8 @@ public void testCheckpointCreationFailureAndRetry() throws
Exception {
config, new HashSet<>(), taskStatusUpdaterManager,
reconDbProvider, mock(ReconContainerMetadataManager.class),
mock(ReconNamespaceSummaryManager.class),
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
// Set up a mock OM metadata manager
ReconOMMetadataManager mockOMMetadataManager =
mock(ReconOMMetadataManager.class);
@@ -453,7 +458,8 @@ public void testRetryMechanismWithFullSnapshotFallback()
throws Exception {
config, new HashSet<>(), taskStatusUpdaterManager,
reconDbProvider, mock(ReconContainerMetadataManager.class),
mock(ReconNamespaceSummaryManager.class),
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
ReconOMMetadataManager mockOMMetadataManager =
mock(ReconOMMetadataManager.class);
reconTaskController.updateOMMetadataManager(mockOMMetadataManager);
@@ -523,7 +529,8 @@ public void testSuccessfulCheckpointAfterFailures() throws
Exception {
config, new HashSet<>(), taskStatusUpdaterManager,
reconDbProvider, mock(ReconContainerMetadataManager.class),
mock(ReconNamespaceSummaryManager.class),
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
// Set up properly mocked ReconOMMetadataManager with required dependencies
ReconOMMetadataManager mockOMMetadataManager =
mock(ReconOMMetadataManager.class);
@@ -591,7 +598,8 @@ public void testResetEventBufferMethod() throws Exception {
config, new HashSet<>(), taskStatusUpdaterManager,
reconDbProvider, mock(ReconContainerMetadataManager.class),
mock(ReconNamespaceSummaryManager.class),
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
// Add some events to the buffer
for (int i = 0; i < 5; i++) {
@@ -629,7 +637,8 @@ public void testResetEventFlagsMethod() throws Exception {
config, new HashSet<>(), taskStatusUpdaterManager,
reconDbProvider, mock(ReconContainerMetadataManager.class),
mock(ReconNamespaceSummaryManager.class),
- mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class));
+ mock(ReconGlobalStatsManager.class),
mock(ReconFileMetadataManager.class), mock(
+ ReconContainerSizeMetadataManager.class));
// Test resetting flags for each reason
reconTaskController.resetEventFlags();
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java
index f5ad9462d12..1bd009eaba9 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
@@ -111,12 +112,13 @@ void setUp() throws Exception {
ReconContainerMetadataManager reconContainerMgr =
mock(ReconContainerMetadataManager.class);
ReconGlobalStatsManager reconGlobalStatsManager =
mock(ReconGlobalStatsManager.class);
ReconFileMetadataManager reconFileMetadataManager =
mock(ReconFileMetadataManager.class);
+ ReconContainerSizeMetadataManager reconContainerSizeMetadataManager =
mock(ReconContainerSizeMetadataManager.class);
ReconDBProvider reconDbProvider = mock(ReconDBProvider.class);
when(reconDbProvider.getDbStore()).thenReturn(mock(DBStore.class));
when(reconDbProvider.getStagedReconDBProvider()).thenReturn(reconDbProvider);
taskController = new ReconTaskControllerImpl(ozoneConfiguration,
java.util.Collections.emptySet(),
mockTaskStatusUpdaterManager, reconDbProvider, reconContainerMgr,
mockNamespaceSummaryManager,
- reconGlobalStatsManager, reconFileMetadataManager);
+ reconGlobalStatsManager, reconFileMetadataManager,
reconContainerSizeMetadataManager);
taskController.start(); // Initialize the executor service
taskController.registerTask(nsSummaryTask);
taskController.registerTask(mockOtherTask);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
index 51763812172..412db266d9c 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerSizeMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
@@ -89,9 +90,10 @@ public void setUp() throws IOException {
ReconNamespaceSummaryManager nsSummaryManager =
mock(ReconNamespaceSummaryManager.class);
ReconGlobalStatsManager reconGlobalStatsManager =
mock(ReconGlobalStatsManager.class);
ReconFileMetadataManager reconFileMetadataManager =
mock(ReconFileMetadataManager.class);
+ ReconContainerSizeMetadataManager reconContainerSizeMetadataManager =
mock(ReconContainerSizeMetadataManager.class);
reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration, new
HashSet<>(),
reconTaskStatusUpdaterManagerMock, reconDbProvider, reconContainerMgr,
nsSummaryManager,
- reconGlobalStatsManager, reconFileMetadataManager);
+ reconGlobalStatsManager, reconFileMetadataManager,
reconContainerSizeMetadataManager);
reconTaskController.start();
}
@@ -397,9 +399,10 @@ public void
testDrainEventBufferAndCleanExistingCheckpoints() throws Exception {
ReconNamespaceSummaryManager nsSummaryManager =
mock(ReconNamespaceSummaryManager.class);
ReconGlobalStatsManager reconGlobalStatsManager =
mock(ReconGlobalStatsManager.class);
ReconFileMetadataManager reconFileMetadataManager =
mock(ReconFileMetadataManager.class);
+ ReconContainerSizeMetadataManager reconContainerSizeMetadataManager =
mock(ReconContainerSizeMetadataManager.class);
ReconTaskControllerImpl testController = new
ReconTaskControllerImpl(ozoneConfiguration, new HashSet<>(),
reconTaskStatusUpdaterManagerMock, reconDbProvider, reconContainerMgr,
nsSummaryManager,
- reconGlobalStatsManager, reconFileMetadataManager);
+ reconGlobalStatsManager, reconFileMetadataManager,
reconContainerSizeMetadataManager);
// Don't start async processing
// Add some events to buffer first
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]