sajjad-moradi commented on code in PR #8653:
URL: https://github.com/apache/pinot/pull/8653#discussion_r869500732


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -20,58 +20,106 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 public class ZKMetadataUtils {
   private ZKMetadataUtils() {
   }
 
-  public static void updateSegmentMetadata(SegmentZKMetadata 
segmentZKMetadata, SegmentMetadata segmentMetadata) {
-    SegmentVersion segmentVersion = segmentMetadata.getVersion();
-    if (segmentVersion != null) {
-      segmentZKMetadata.setIndexVersion(segmentVersion.name());
-    }
+  /**
+   * Creates the segment ZK metadata for a new segment.
+   */
+  public static SegmentZKMetadata createSegmentZKMetadata(String 
tableNameWithType, SegmentMetadata segmentMetadata,
+      String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
+    SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadata.getName());
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+        segmentSizeInBytes);
+    segmentZKMetadata.setPushTime(System.currentTimeMillis());
+    return segmentZKMetadata;
+  }
+
+  /**
+   * Refreshes the segment ZK metadata for a segment being replaced.
+   */
+  public static void refreshSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+        segmentSizeInBytes);
+    segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+  }
+
+  private static void updateSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
     if (segmentMetadata.getTimeInterval() != null) {
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
       segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
       segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+    } else {
+      segmentZKMetadata.setStartTime(-1);
+      segmentZKMetadata.setEndTime(-1);
+      segmentZKMetadata.setTimeUnit(null);
+    }
+    if (segmentMetadata.getVersion() != null) {
+      segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name());
+    } else {
+      segmentZKMetadata.setIndexVersion(null);
     }
     segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
-    segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+    segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
     segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
-    SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
-        new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
-            segmentZKMetadata.getCustomMap());
-    
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
+    segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+    segmentZKMetadata.setDownloadUrl(downloadUrl);
+    segmentZKMetadata.setCrypterName(crypter);
 
-    // Extract column partition metadata (if any), and set it into segment ZK 
metadata.
+    // Set partition metadata
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
-    if (segmentMetadata instanceof SegmentMetadataImpl) {
-      for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
-        String column = entry.getKey();
-        ColumnMetadata columnMetadata = entry.getValue();
-        PartitionFunction partitionFunction = 
columnMetadata.getPartitionFunction();
-
-        if (partitionFunction != null) {
-          ColumnPartitionMetadata columnPartitionMetadata =
-              new ColumnPartitionMetadata(partitionFunction.getName(), 
partitionFunction.getNumPartitions(),
-                  columnMetadata.getPartitions(), 
partitionFunction.getFunctionConfig());
-          columnPartitionMap.put(column, columnPartitionMetadata);
-        }
+    for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
+      String column = entry.getKey();
+      ColumnMetadata columnMetadata = entry.getValue();

Review Comment:
   Java8 syntax is easier to read:
   ```java
   segmentMetadata.getColumnMetadataMap().forEach((column, columnMetadata) -> {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -20,58 +20,106 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 public class ZKMetadataUtils {
   private ZKMetadataUtils() {
   }
 
-  public static void updateSegmentMetadata(SegmentZKMetadata 
segmentZKMetadata, SegmentMetadata segmentMetadata) {
-    SegmentVersion segmentVersion = segmentMetadata.getVersion();
-    if (segmentVersion != null) {
-      segmentZKMetadata.setIndexVersion(segmentVersion.name());
-    }
+  /**
+   * Creates the segment ZK metadata for a new segment.
+   */
+  public static SegmentZKMetadata createSegmentZKMetadata(String 
tableNameWithType, SegmentMetadata segmentMetadata,
+      String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
+    SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadata.getName());
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+        segmentSizeInBytes);
+    segmentZKMetadata.setPushTime(System.currentTimeMillis());
+    return segmentZKMetadata;
+  }
+
+  /**
+   * Refreshes the segment ZK metadata for a segment being replaced.
+   */
+  public static void refreshSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+        segmentSizeInBytes);
+    segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+  }
+
+  private static void updateSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
     if (segmentMetadata.getTimeInterval() != null) {
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
       segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
       segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+    } else {
+      segmentZKMetadata.setStartTime(-1);
+      segmentZKMetadata.setEndTime(-1);
+      segmentZKMetadata.setTimeUnit(null);
+    }
+    if (segmentMetadata.getVersion() != null) {
+      segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name());
+    } else {
+      segmentZKMetadata.setIndexVersion(null);
     }
     segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
-    segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+    segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
     segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
-    SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
-        new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
-            segmentZKMetadata.getCustomMap());
-    
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
+    segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+    segmentZKMetadata.setDownloadUrl(downloadUrl);
+    segmentZKMetadata.setCrypterName(crypter);
 
-    // Extract column partition metadata (if any), and set it into segment ZK 
metadata.
+    // Set partition metadata
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
-    if (segmentMetadata instanceof SegmentMetadataImpl) {
-      for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
-        String column = entry.getKey();
-        ColumnMetadata columnMetadata = entry.getValue();
-        PartitionFunction partitionFunction = 
columnMetadata.getPartitionFunction();
-
-        if (partitionFunction != null) {
-          ColumnPartitionMetadata columnPartitionMetadata =
-              new ColumnPartitionMetadata(partitionFunction.getName(), 
partitionFunction.getNumPartitions(),
-                  columnMetadata.getPartitions(), 
partitionFunction.getFunctionConfig());
-          columnPartitionMap.put(column, columnPartitionMetadata);
-        }
+    for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
+      String column = entry.getKey();
+      ColumnMetadata columnMetadata = entry.getValue();
+      PartitionFunction partitionFunction = 
columnMetadata.getPartitionFunction();
+      if (partitionFunction != null) {
+        ColumnPartitionMetadata columnPartitionMetadata =
+            new ColumnPartitionMetadata(partitionFunction.getName(), 
partitionFunction.getNumPartitions(),
+                columnMetadata.getPartitions(), 
partitionFunction.getFunctionConfig());
+        columnPartitionMap.put(column, columnPartitionMetadata);
       }
     }
-
     if (!columnPartitionMap.isEmpty()) {
       segmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(columnPartitionMap));
+    } else {
+      segmentZKMetadata.setPartitionMetadata(null);
+    }
+
+    // Update custom metadata
+    // NOTE: Do not remove existing keys because they can be set by the HTTP 
header from the segment upload request
+    Map<String, String> customMap = segmentZKMetadata.getCustomMap();
+    if (customMap == null) {
+      customMap = segmentMetadata.getCustomMap();
+    } else {
+      customMap.putAll(segmentMetadata.getCustomMap());
+    }
+    segmentZKMetadata.setCustomMap(customMap);
+
+    // Set fields specific to realtime table
+    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
+
+      // NOTE: Keep offset as is if it is not explicitly set in the segment 
metadata
+      if (segmentMetadata.getStartOffset() != null) {
+        segmentZKMetadata.setStartOffset(segmentMetadata.getStartOffset());
+      }
+      if (segmentMetadata.getEndOffset() != null) {
+        segmentZKMetadata.setEndOffset(segmentMetadata.getEndOffset());
+      }

Review Comment:
   This might be misleading. For example if someone for some reason replaces 
this first segment of a partition with more data from on offline source, then 
the start offset might be much smaller than the existing segment's start 
offset. What do you think of using the new segment's start & end offset, 
whatever they are? If they're not provided, the uploaded segment doesn't have 
those.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to