This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dce2e5e8c0d HDDS-13187. Extend Recon events handling to 
MultipartInfoTable (#9250)
dce2e5e8c0d is described below

commit dce2e5e8c0d9af02f37663cf667f2a3d97d0fb46
Author: Priyesh Karatha <[email protected]>
AuthorDate: Fri Nov 7 07:24:20 2025 +0530

    HDDS-13187. Extend Recon events handling to MultipartInfoTable (#9250)
    
    Co-authored-by: tanvipenumudy 
<[email protected]>
---
 .../ozone/recon/api/OMDBInsightEndpoint.java       |  55 +++++++
 .../recon/tasks/MultipartInfoInsightHandler.java   | 173 +++++++++++++++++++++
 .../ozone/recon/tasks/OmTableInsightTask.java      |   2 +
 .../ozone/recon/tasks/TestOmTableInsightTask.java  | 159 ++++++++++++++++++-
 4 files changed, 384 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index ae8e9bdb0b2..57cac7ec23c 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -21,6 +21,7 @@
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE;
 import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
@@ -377,6 +378,60 @@ private Long getValueFromId(GlobalStatsValue record) {
     return record != null ? record.getValue() : 0L;
   }
 
+  /**
+   * Retrieves the summary of open MPU keys.
+   *
+   * @return The HTTP response body includes a map with the following entries:
+   * - "totalOpenMPUKeys": the total number of open MPU keys
+   * - "totalReplicatedDataSize": the total replicated size for open MPU keys
+   * - "totalUnreplicatedDataSize": the total unreplicated size for open MPU 
keys
+   *
+   * Example response:
+   *   {
+   *    "totalOpenMPUKeys": 2,
+   *    "totalReplicatedDataSize": 90000,
+   *    "totalDataSize": 30000
+   *   }
+   */
+  @GET
+  @Path("/open/mpu/summary")
+  public Response getOpenMPUKeySummary() {
+    // Create a HashMap for the keysSummary
+    Map<String, Long> keysSummary = new HashMap<>();
+    // Create a keys summary for open MPU keys
+    createKeysSummaryForOpenMPUKey(keysSummary);
+    return Response.ok(keysSummary).build();
+  }
+
+  /**
+   * Creates a keys summary for open MPU keys and updates the provided 
keysSummary map.
+   * Calculates the total number of open keys, replicated data size, and 
unreplicated data size.
+   *
+   * @param keysSummary A map to store the keys summary information.
+   */
+  private void createKeysSummaryForOpenMPUKey(Map<String, Long> keysSummary) {
+    try {
+      Long replicatedSizeOpenMPUKey = 
getValueFromId(reconGlobalStatsManager.getGlobalStatsValue(
+          
OmTableInsightTask.getReplicatedSizeKeyFromTable(MULTIPART_INFO_TABLE)));
+      Long unreplicatedSizeOpenMPUKey = 
getValueFromId(reconGlobalStatsManager.getGlobalStatsValue(
+          
OmTableInsightTask.getUnReplicatedSizeKeyFromTable(MULTIPART_INFO_TABLE)));
+      Long openMPUKeyCount = 
getValueFromId(reconGlobalStatsManager.getGlobalStatsValue(
+          OmTableInsightTask.getTableCountKeyFromTable(MULTIPART_INFO_TABLE)));
+      // Calculate the total number of open MPU keys
+      keysSummary.put("totalOpenMPUKeys", openMPUKeyCount);
+      // Calculate the total replicated and unreplicated sizes of open MPU keys
+      keysSummary.put("totalReplicatedDataSize", replicatedSizeOpenMPUKey);
+      keysSummary.put("totalDataSize", unreplicatedSizeOpenMPUKey);
+    } catch (IOException ex) {
+      LOG.error("Error retrieving open mpu key summary from RocksDB", ex);
+      // Return zeros in case of error
+      keysSummary.put("totalOpenMPUKeys", 0L);
+      // Calculate the total replicated and unreplicated sizes of open MPU keys
+      keysSummary.put("totalReplicatedDataSize", 0L);
+      keysSummary.put("totalDataSize", 0L);
+    }
+  }
+
   /** Retrieves the summary of deleted keys.
    *
    * This method calculates and returns a summary of deleted keys.
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java
new file mode 100644
index 00000000000..2f501a33ad9
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages records in the MultipartInfo Table, updating counts and sizes of
+ * multipart upload keys in the backend.
+ */
+public class MultipartInfoInsightHandler implements OmTableHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MultipartInfoInsightHandler.class);
+
+  /**
+   * Invoked by the process method to add information on those keys that have
+   * been initiated for multipart upload in the backend.
+   */
+  @Override
+  public void handlePutEvent(OMDBUpdateEvent<String, Object> event, String 
tableName, Map<String, Long> objectCountMap,
+      Map<String, Long> unReplicatedSizeMap, Map<String, Long> 
replicatedSizeMap) {
+
+    if (event.getValue() != null) {
+      OmMultipartKeyInfo multipartKeyInfo = (OmMultipartKeyInfo) 
event.getValue();
+      objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName),
+          (k, count) -> count + 1L);
+
+      for (PartKeyInfo partKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) {
+        OmKeyInfo omKeyInfo = 
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+        
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> size + omKeyInfo.getDataSize());
+        
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> size + omKeyInfo.getReplicatedSize());
+      }
+    } else {
+      LOG.warn("Put event does not have the Multipart Key Info for {}.", 
event.getKey());
+    }
+  }
+
+  /**
+   * Invoked by the process method to delete information on those multipart 
uploads that
+   * have been completed or aborted in the backend.
+   */
+  @Override
+  public void handleDeleteEvent(OMDBUpdateEvent<String, Object> event, String 
tableName,
+      Map<String, Long> objectCountMap, Map<String, Long> unReplicatedSizeMap, 
Map<String, Long> replicatedSizeMap) {
+
+    if (event.getValue() != null) {
+      OmMultipartKeyInfo multipartKeyInfo = (OmMultipartKeyInfo) 
event.getValue();
+      objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName),
+          (k, count) -> count > 0 ? count - 1L : 0L);
+
+      for (PartKeyInfo partKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) {
+        OmKeyInfo omKeyInfo = 
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+        
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> {
+              long newSize = size > omKeyInfo.getDataSize() ? size - 
omKeyInfo.getDataSize() : 0L;
+              if (newSize < 0) {
+                LOG.warn("Negative unreplicated size for key: {}. Original: 
{}, Part: {}",
+                    k, size, omKeyInfo.getDataSize());
+              }
+              return newSize;
+            });
+        
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> {
+              long newSize = size > omKeyInfo.getReplicatedSize() ? size - 
omKeyInfo.getReplicatedSize() : 0L;
+              if (newSize < 0) {
+                LOG.warn("Negative replicated size for key: {}. Original: {}, 
Part: {}",
+                    k, size, omKeyInfo.getReplicatedSize());
+              }
+              return newSize;
+            });
+      }
+    } else {
+      LOG.warn("Delete event does not have the Multipart Key Info for {}.", 
event.getKey());
+    }
+  }
+
+  /**
+   * Invoked by the process method to update information on those multipart 
uploads that
+   * have been updated in the backend.
+   */
+  @Override
+  public void handleUpdateEvent(OMDBUpdateEvent<String, Object> event, String 
tableName,
+      Map<String, Long> objectCountMap, Map<String, Long> unReplicatedSizeMap, 
Map<String, Long> replicatedSizeMap) {
+
+    if (event.getValue() != null) {
+      if (event.getOldValue() == null) {
+        LOG.warn("Update event does not have the old Multipart Key Info for 
{}.", event.getKey());
+        return;
+      }
+
+      // In Update event the count for the multipart info table will not 
change. So we
+      // don't need to update the count.
+      OmMultipartKeyInfo oldMultipartKeyInfo = (OmMultipartKeyInfo) 
event.getOldValue();
+      OmMultipartKeyInfo newMultipartKeyInfo = (OmMultipartKeyInfo) 
event.getValue();
+
+      // Calculate old sizes
+      for (PartKeyInfo partKeyInfo : oldMultipartKeyInfo.getPartKeyInfoMap()) {
+        OmKeyInfo omKeyInfo = 
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+        
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> size - omKeyInfo.getDataSize());
+        
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> size - omKeyInfo.getReplicatedSize());
+      }
+
+      // Calculate new sizes
+      for (PartKeyInfo partKeyInfo : newMultipartKeyInfo.getPartKeyInfoMap()) {
+        OmKeyInfo omKeyInfo = 
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+        
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> size + omKeyInfo.getDataSize());
+        
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+            (k, size) -> size + omKeyInfo.getReplicatedSize());
+      }
+    } else {
+      LOG.warn("Update event does not have the Multipart Key Info for {}.", 
event.getKey());
+    }
+  }
+
+  /**
+   * This method is called by the reprocess method. It calculates the record
+   * counts for the multipart info table. Additionally, it computes the sizes
+   * of both replicated and unreplicated parts that are currently in multipart
+   * uploads in the backend.
+   */
+  @Override
+  public Triple<Long, Long, Long> getTableSizeAndCount(
+      TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator) {
+    long count = 0;
+    long unReplicatedSize = 0;
+    long replicatedSize = 0;
+
+    if (iterator != null) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, ?> kv = iterator.next();
+        if (kv != null && kv.getValue() != null) {
+          OmMultipartKeyInfo multipartKeyInfo = (OmMultipartKeyInfo) 
kv.getValue();
+          for (PartKeyInfo partKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) 
{
+            OmKeyInfo omKeyInfo = 
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+            unReplicatedSize += omKeyInfo.getDataSize();
+            replicatedSize += omKeyInfo.getReplicatedSize();
+          }
+          count++;
+        }
+      }
+    }
+    return Triple.of(count, unReplicatedSize, replicatedSize);
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
index 0ea225e12fb..341912b5d2e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.recon.tasks;
 
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE;
 
@@ -70,6 +71,7 @@ public OmTableInsightTask(ReconGlobalStatsManager 
reconGlobalStatsManager,
     tableHandlers.put(OPEN_KEY_TABLE, new OpenKeysInsightHandler());
     tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler());
     tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler());
+    tableHandlers.put(MULTIPART_INFO_TABLE, new MultipartInfoInsightHandler());
   }
 
   @Override
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
index 659ece4b398..faa158bfab3 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
@@ -21,6 +21,7 @@
 import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.VOLUME_TABLE;
@@ -47,7 +48,9 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -57,14 +60,18 @@
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.ozone.recon.ReconTestInjector;
 import org.apache.hadoop.ozone.recon.api.types.NSSummary;
 import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
 import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
+import org.apache.hadoop.util.Time;
 import org.jooq.DSLContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -357,6 +364,16 @@ public void testReprocessForCount() throws Exception {
         when(keyInfo.getOmKeyInfoList()).thenReturn(
             Arrays.asList(mock(OmKeyInfo.class)));
         when(mockKeyValue.getValue()).thenReturn(keyInfo);
+      } else if (tableName.equals(MULTIPART_INFO_TABLE)) {
+        String uploadID = UUID.randomUUID().toString();
+        OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo.Builder()
+            .setUploadID(uploadID)
+            .build();
+        PartKeyInfo partKeyInfo =
+            createPartKeyInfo(UUID.randomUUID().toString(), 
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
+                uploadID, 1, 100L);
+        multipartKeyInfo.addPartKeyInfo(partKeyInfo);
+        when(mockKeyValue.getValue()).thenReturn(multipartKeyInfo);
       } else {
         when(mockKeyValue.getValue()).thenReturn(mock(OmKeyInfo.class));
       }
@@ -373,6 +390,7 @@ public void testReprocessForCount() throws Exception {
     assertEquals(5L, getCountForTable(BUCKET_TABLE));
     assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
     assertEquals(5L, getCountForTable(DELETED_TABLE));
+    assertEquals(5L, getCountForTable(MULTIPART_INFO_TABLE));
   }
 
   @Test
@@ -443,8 +461,8 @@ public void testProcessForCount() {
 
     // Creating events for each table except the deleted table
     for (String tableName : omTableInsightTask.getTaskTables()) {
-      if (tableName.equals(DELETED_TABLE)) {
-        continue; // Skipping deleted table as it has a separate test
+      if (tableName.equals(DELETED_TABLE) || 
tableName.equals(MULTIPART_INFO_TABLE)) {
+        continue; // Skipping deleted and multipartInfo tables as they have 
separate tests
       }
 
       // Adding 5 PUT events per table
@@ -471,7 +489,7 @@ public void testProcessForCount() {
 
     // Verifying the count in each table
     for (String tableName : omTableInsightTask.getTaskTables()) {
-      if (tableName.equals(DELETED_TABLE)) {
+      if (tableName.equals(DELETED_TABLE) || 
tableName.equals(MULTIPART_INFO_TABLE)) {
         continue;
       }
       assertEquals(4L, getCountForTable(
@@ -481,7 +499,7 @@ public void testProcessForCount() {
     List<OMDBUpdateEvent> additionalEvents = new ArrayList<>();
     // Simulating new PUT and DELETE events
     for (String tableName : omTableInsightTask.getTaskTables()) {
-      if (tableName.equals(DELETED_TABLE)) {
+      if (tableName.equals(DELETED_TABLE) || 
tableName.equals(MULTIPART_INFO_TABLE)) {
         continue;
       }
       // Adding 1 new PUT event
@@ -499,7 +517,7 @@ public void testProcessForCount() {
     omTableInsightTask.process(additionalBatch, Collections.emptyMap());
     // Verifying the final count in each table
     for (String tableName : omTableInsightTask.getTaskTables()) {
-      if (tableName.equals(DELETED_TABLE)) {
+      if (tableName.equals(DELETED_TABLE) || 
tableName.equals(MULTIPART_INFO_TABLE)) {
         continue;
       }
       // 5 items expected after processing the additional events.
@@ -627,6 +645,137 @@ public void testProcessForDeletedTable() {
     assertEquals(12000L, getReplicatedSizeForTable(DELETED_TABLE));
   }
 
+  @Test
+  public void testProcessForMultipartInfoTable() {
+    // Prepare 5 MPU key PUT events.
+    ArrayList<OMDBUpdateEvent> putEvents = new ArrayList<>();
+    String[] multipartKeys = new String[5];
+    OmMultipartKeyInfo[] mpuInfos = new OmMultipartKeyInfo[5];
+    String uploadID = UUID.randomUUID().toString();
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    for (int i = 0; i < 5; i++) {
+      OmMultipartKeyInfo mpu = new OmMultipartKeyInfo.Builder()
+          .setObjectID(i + 1)
+          .setUploadID(uploadID)
+          .setCreationTime(Time.now())
+          .setReplicationConfig(RatisReplicationConfig.getInstance(
+              HddsProtos.ReplicationFactor.THREE))
+          .build();
+
+      // Each MPU has 2 parts, each part is 100 bytes.
+      mpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName, 
uploadID, 1, 100L));
+      mpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName, 
uploadID, 2, 100L));
+      String multipartKey = reconOMMetadataManager.getMultipartKey(volumeName, 
bucketName, keyName, uploadID);
+      multipartKeys[i] = multipartKey;
+      mpuInfos[i] = mpu;
+      putEvents.add(getOMUpdateEvent(multipartKey, mpu, MULTIPART_INFO_TABLE, 
PUT, null));
+    }
+    OMUpdateEventBatch putBatch = new OMUpdateEventBatch(putEvents, 0L);
+    omTableInsightTask.process(putBatch, Collections.emptyMap());
+
+    // After 5 MPU key PUTs, each with 2 parts of 100 bytes, total 
unreplicated size = 5 * 2 * 100 bytes = 1000 bytes.
+    // Replicated size (with RATIS THREE replication) = 1000 bytes * 3 = 3000 
bytes.
+    assertEquals(5L, getCountForTable(MULTIPART_INFO_TABLE));
+    assertEquals(1000L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+    assertEquals(3000L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+
+    // DELETE the last MPU key.
+    ArrayList<OMDBUpdateEvent> deleteEvents = new ArrayList<>();
+    deleteEvents.add(getOMUpdateEvent(multipartKeys[4], mpuInfos[4], 
MULTIPART_INFO_TABLE, DELETE, null));
+    OMUpdateEventBatch deleteBatch = new OMUpdateEventBatch(deleteEvents, 0L);
+    omTableInsightTask.process(deleteBatch, Collections.emptyMap());
+
+    // After DELETE: 4 MPU keys left, 4 * 2 * 100 = 800 bytes unreplicated 
size, 800 bytes * 3 = 2400 bytes
+    // replicated size.
+    assertEquals(4L, getCountForTable(MULTIPART_INFO_TABLE));
+    assertEquals(800L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+    assertEquals(2400L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+
+    // UPDATE the first MPU key: change part 1 to 200 bytes, part 2 stays 100 
bytes.
+    OmMultipartKeyInfo newMpu = new OmMultipartKeyInfo.Builder()
+        .setObjectID(1L)
+        .setUploadID(uploadID)
+        .setCreationTime(Time.now())
+        .setReplicationConfig(RatisReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.THREE))
+        .build();
+
+    newMpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName, 
uploadID, 1, 200L));
+    newMpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName, 
uploadID, 2, 100L));
+
+    ArrayList<OMDBUpdateEvent> updateEvents = new ArrayList<>();
+    updateEvents.add(getOMUpdateEvent(multipartKeys[0], newMpu, 
MULTIPART_INFO_TABLE, UPDATE, mpuInfos[0]));
+    OMUpdateEventBatch updateBatch = new OMUpdateEventBatch(updateEvents, 0L);
+    omTableInsightTask.process(updateBatch, Collections.emptyMap());
+
+    // After UPDATE: 3 MPU keys unchanged (2*100 bytes each), 1 MPU with 
200+100 bytes.
+    // Total unreplicated size = 3*2*100 + 200+100 = 600+300 = 900 bytes.
+    // Total replicated size (with RATIS THREE replication) = 900 * 3 = 2700 
bytes.
+    assertEquals(4L, getCountForTable(MULTIPART_INFO_TABLE));
+    assertEquals(900L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+    assertEquals(2700L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+  }
+
+  @Test
+  public void testReprocessForMultipartInfoTable() throws Exception {
+    String uploadID = UUID.randomUUID().toString();
+    OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo.Builder()
+        .setObjectID(1L)
+        .setUploadID(uploadID)
+        .setCreationTime(Time.now())
+        .setReplicationConfig(RatisReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.THREE))
+        .build();
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    PartKeyInfo part1 = createPartKeyInfo(volumeName, bucketName, keyName, 
uploadID, 1, 100L);
+    omMultipartKeyInfo.addPartKeyInfo(part1);
+
+    PartKeyInfo part2 = createPartKeyInfo(volumeName, bucketName, keyName, 
uploadID, 2, 100L);
+    omMultipartKeyInfo.addPartKeyInfo(part2);
+
+    PartKeyInfo part3 = createPartKeyInfo(volumeName, bucketName, keyName, 
uploadID, 3, 100L);
+    omMultipartKeyInfo.addPartKeyInfo(part3);
+
+    String multipartKey = reconOMMetadataManager.getMultipartKey(volumeName, 
bucketName, keyName, uploadID);
+    reconOMMetadataManager.getMultipartInfoTable().put(multipartKey, 
omMultipartKeyInfo);
+
+    ReconOmTask.TaskResult result = 
omTableInsightTask.reprocess(reconOMMetadataManager);
+    assertTrue(result.isTaskSuccess());
+
+    assertEquals(1L, getCountForTable(MULTIPART_INFO_TABLE));
+    // each MPU part size is 100 bytes * 3 MPU parts = 300 bytes.
+    assertEquals(300L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+    // each MPU part is replicated using RATIS THREE, total replicated size = 
300 bytes * 3 = 900 bytes.
+    assertEquals(900L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+  }
+
+  public PartKeyInfo createPartKeyInfo(String volumeName, String bucketName,
+                                       String keyName, String uploadID, int 
partNumber, long dataSize) {
+    return PartKeyInfo.newBuilder()
+        .setPartNumber(partNumber)
+        .setPartName(reconOMMetadataManager.getMultipartKey(volumeName,
+            bucketName, keyName, uploadID))
+        .setPartKeyInfo(KeyInfo.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .setDataSize(dataSize)
+            .setCreationTime(Time.now())
+            .setModificationTime(Time.now())
+            .setObjectID(UUID.randomUUID().hashCode())
+            .setType(HddsProtos.ReplicationType.RATIS)
+            .setFactor(HddsProtos.ReplicationFactor.THREE)
+            .build())
+        .build();
+  }
+
   private OMDBUpdateEvent getOMUpdateEvent(
       String name, Object value,
       String table,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to