This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b58650658c Allow updating start/end offset for pushed segments (#8653)
b58650658c is described below

commit b58650658c0aac258e1c7932073188ddc5590d89
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed May 11 11:14:58 2022 -0700

    Allow updating start/end offset for pushed segments (#8653)
    
    - Support updating start/end offset in the segment ZK metadata for pushed 
segments if provided in the segment metadata
    - Refactor the updating segment ZK metadata logic to have consistent 
behavior for adding new segment and refreshing existing segment
---
 .../broker/broker/HelixBrokerStarterTest.java      |   2 +-
 .../common/metadata/segment/SegmentZKMetadata.java |   3 +-
 .../pinot/controller/api/upload/ZKOperator.java    | 162 +++++++++++----------
 .../helix/core/PinotHelixResourceManager.java      |  61 ++------
 .../helix/core/util/ZKMetadataUtils.java           | 105 +++++++++----
 .../helix/core/retention/RetentionManagerTest.java |  38 ++---
 .../validation/ValidationManagerTest.java          |   3 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |   9 +-
 .../apache/pinot/segment/spi/SegmentMetadata.java  |   4 +
 .../org/apache/pinot/segment/spi/V1Constants.java  |   5 +
 .../spi/index/metadata/SegmentMetadataImpl.java    |  29 +++-
 11 files changed, 228 insertions(+), 193 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index a2047ba4a6..ca5cc76de7 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -227,7 +227,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
         _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, 
segmentToRefresh);
     _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
         
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, 
segmentToRefresh, newEndTime),
-        segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null, -1);
+        segmentZKMetadata, EXPECTED_VERSION, "downloadUrl");
 
     TestUtils.waitForCondition(aVoid -> 
routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
         .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update 
the time boundary for refreshed segment");
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index 0b73af486d..169171bb2e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.metadata.segment;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.MapUtils;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadata;
 import org.apache.pinot.spi.utils.CommonConstants.Segment;
@@ -231,7 +232,7 @@ public class SegmentZKMetadata implements ZKMetadata {
 
   public void setCustomMap(Map<String, String> customMap) {
     Map<String, Map<String, String>> mapFields = _znRecord.getMapFields();
-    if (customMap != null) {
+    if (MapUtils.isNotEmpty(customMap)) {
       mapFields.put(Segment.CUSTOM_MAP, customMap);
     } else {
       mapFields.remove(Segment.CUSTOM_MAP);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 28f17536ab..2e7ba5c76e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -31,10 +31,10 @@ import 
org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
  * The ZKOperator is a util class that is used during segment upload to set 
relevant metadata fields in zk. It will
  * currently
  * also perform the data move. In the future when we introduce versioning, we 
will decouple these two steps.
+ * TODO: Merge it into PinotHelixResourceManager
  */
 public class ZKOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZKOperator.class);
@@ -69,6 +70,7 @@ public class ZKOperator {
     boolean refreshOnly =
         
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
     if (segmentMetadataZNRecord == null) {
+      // Add a new segment
       if (refreshOnly) {
         throw new ControllerApplicationException(LOGGER,
             "Cannot refresh non-existing segment, aborted uploading segment: " 
+ segmentName + " of table: "
@@ -78,41 +80,30 @@ public class ZKOperator {
       processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, headers,
           crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, 
enableParallelPushProtection,
           segmentSizeInBytes);
-      return;
-    }
-
-    // We reach here if a segment with the same name already exists.
-
-    if (!allowRefresh) {
-      // We cannot perform this check up-front in UploadSegment API call. If a 
segment doesn't exist during the check
-      // done up-front but ends up getting created before the check here, we 
could incorrectly refresh an existing
-      // segment.
-      throw new ControllerApplicationException(LOGGER,
-          "Segment: " + segmentName + " already exists in table: " + 
tableNameWithType + ". Refresh not permitted.",
-          Response.Status.CONFLICT);
-    }
-
-    // TODO Allow segment refreshing for realtime tables.
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      throw new ControllerApplicationException(LOGGER,
-          "Refresh existing segment " + segmentName + " for realtime table " + 
tableNameWithType
-              + " is not yet supported ", Response.Status.NOT_IMPLEMENTED);
+    } else {
+      // Refresh an existing segment
+      if (!allowRefresh) {
+        // We cannot perform this check up-front in UploadSegment API call. If 
a segment doesn't exist during the check
+        // done up-front but ends up getting created before the check here, we 
could incorrectly refresh an existing
+        // segment.
+        throw new ControllerApplicationException(LOGGER,
+            "Segment: " + segmentName + " already exists in table: " + 
tableNameWithType + ". Refresh not permitted.",
+            Response.Status.CONFLICT);
+      }
+      LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, tableNameWithType);
+      processExistingSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation,
+          enableParallelPushProtection, headers, zkDownloadURI, crypter, 
tableNameWithType, segmentName,
+          segmentMetadataZNRecord, moveSegmentToFinalLocation, 
segmentSizeInBytes);
     }
-
-    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, tableNameWithType);
-    processExistingSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
tableNameWithType, segmentName,
-        segmentMetadataZNRecord, moveSegmentToFinalLocation, 
segmentSizeInBytes);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
-      File currentSegmentLocation, boolean enableParallelPushProtection, 
HttpHeaders headers, String zkDownloadURI,
+      File currentSegmentLocation, boolean enableParallelPushProtection, 
HttpHeaders headers, String downloadUrl,
       String crypter, String tableNameWithType, String segmentName, ZNRecord 
znRecord,
       boolean moveSegmentToFinalLocation, long segmentSizeInBytes)
       throws Exception {
-
-    SegmentZKMetadata existingSegmentZKMetadata = new 
SegmentZKMetadata(znRecord);
-    long existingCrc = existingSegmentZKMetadata.getCrc();
+    SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(znRecord);
+    long existingCrc = segmentZKMetadata.getCrc();
     int expectedVersion = znRecord.getVersion();
 
     // Check if CRC match when IF-MATCH header is set
@@ -121,7 +112,7 @@ public class ZKOperator {
     // Check segment upload start time when parallel push protection enabled
     if (enableParallelPushProtection) {
       // When segment upload start time is larger than 0, that means another 
upload is in progress
-      long segmentUploadStartTime = 
existingSegmentZKMetadata.getSegmentUploadStartTime();
+      long segmentUploadStartTime = 
segmentZKMetadata.getSegmentUploadStartTime();
       if (segmentUploadStartTime > 0) {
         if (System.currentTimeMillis() - segmentUploadStartTime > 
_controllerConf.getSegmentUploadTimeoutInMillis()) {
           // Last segment upload does not finish properly, replace the segment
@@ -137,9 +128,8 @@ public class ZKOperator {
       }
 
       // Lock the segment by setting the upload start time in ZK
-      
existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
-      if (!_pinotHelixResourceManager
-          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
expectedVersion)) {
+      segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
+      if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
         throw new ControllerApplicationException(LOGGER,
             "Failed to lock the segment: " + segmentName + " of table: " + 
tableNameWithType + ", retry later",
             Response.Status.CONFLICT);
@@ -152,40 +142,37 @@ public class ZKOperator {
     // Reset segment upload start time to unlock the segment later
     // NOTE: reset this value even if parallel push protection is not enabled 
so that segment can recover in case
     // previous segment upload did not finish properly and the parallel push 
protection is turned off
-    existingSegmentZKMetadata.setSegmentUploadStartTime(-1);
+    segmentZKMetadata.setSegmentUploadStartTime(-1);
 
     try {
-      // Modify the custom map in segment ZK metadata
-      String segmentZKMetadataCustomMapModifierStr =
+      // Construct the segment ZK metadata custom map modifier
+      String customMapModifierStr =
           
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER);
-      SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier;
-      if (segmentZKMetadataCustomMapModifierStr != null) {
-        segmentZKMetadataCustomMapModifier =
-            new 
SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr);
-      } else {
-        // By default, use REPLACE modify mode
-        segmentZKMetadataCustomMapModifier =
-            new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE,
 null);
-      }
-      existingSegmentZKMetadata
-          
.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(existingSegmentZKMetadata.getCustomMap()));
+      SegmentZKMetadataCustomMapModifier customMapModifier =
+          customMapModifierStr != null ? new 
SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null;
 
       // Update ZK metadata and refresh the segment if necessary
-      long newCrc = Long.valueOf(segmentMetadata.getCrc());
+      long newCrc = Long.parseLong(segmentMetadata.getCrc());
       if (newCrc == existingCrc) {
         LOGGER.info(
             "New segment crc '{}' is the same as existing segment crc for 
segment '{}'. Updating ZK metadata without "
                 + "refreshing the segment.", newCrc, segmentName);
-        // NOTE: even though we don't need to refresh the segment, we should 
still update creation time and refresh time
-        // (creation time is not included in the crc)
-        
existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
-        existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
-        // NOTE: in rare cases the segment can be deleted before the metadata 
is updated and the expected version won't
-        // match, we should fail the request for such cases
-        if (!_pinotHelixResourceManager
-            .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
expectedVersion)) {
+        // NOTE: Even though we don't need to refresh the segment, we should 
still update the following fields:
+        // - Creation time (not included in the crc)
+        // - Refresh time
+        // - Custom map
+        
segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+        segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+        if (customMapModifier != null) {
+          
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+        } else {
+          // If no modifier is provided, use the custom map from the segment 
metadata
+          segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
+        }
+        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
           throw new RuntimeException(
-              "Failed to update ZK metadata for segment: " + segmentName + " 
of table: " + tableNameWithType);
+              String.format("Failed to update ZK metadata for segment: %s, 
table: %s, expected version: %d",
+                  segmentName, tableNameWithType, expectedVersion));
         }
       } else {
         // New segment is different with the existing one, update ZK metadata 
and refresh the segment
@@ -198,17 +185,38 @@ public class ZKOperator {
               currentSegmentLocation.getAbsolutePath(), 
finalSegmentLocationURI.getPath());
         } else {
           LOGGER.info("Skipping segment move, keeping segment {} from table {} 
at {}", segmentName, tableNameWithType,
-              zkDownloadURI);
+              downloadUrl);
+        }
+
+        // NOTE: Must first set the segment ZK metadata before trying to 
refresh because servers and brokers rely on
+        // segment ZK metadata to refresh the segment (server will compare the 
segment ZK metadata with the local
+        // metadata to decide whether to download the new segment; broker will 
update the segment partition info & time
+        // boundary based on the segment ZK metadata)
+        if (customMapModifier == null) {
+          // If no modifier is provided, use the custom map from the segment 
metadata
+          segmentZKMetadata.setCustomMap(null);
+          ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, 
segmentZKMetadata, segmentMetadata, downloadUrl,
+              crypter, segmentSizeInBytes);
+        } else {
+          // If modifier is provided, first set the custom map from the 
segment metadata, then apply the modifier
+          ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, 
segmentZKMetadata, segmentMetadata, downloadUrl,
+              crypter, segmentSizeInBytes);
+          
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+        }
+        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
+          throw new RuntimeException(
+              String.format("Failed to update ZK metadata for segment: %s, 
table: %s, expected version: %d",
+                  segmentName, tableNameWithType, expectedVersion));
         }
+        LOGGER.info("Updated segment: {} of table: {} to property store", 
segmentName, tableNameWithType);
 
-        _pinotHelixResourceManager
-            .refreshSegment(tableNameWithType, segmentMetadata, 
existingSegmentZKMetadata, expectedVersion,
-                zkDownloadURI, crypter, segmentSizeInBytes);
+        // Send a message to servers and brokers hosting the table to refresh 
the segment
+        
_pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType, 
segmentName, true, true);
       }
     } catch (Exception e) {
-      if (!_pinotHelixResourceManager
-          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
expectedVersion)) {
-        LOGGER.error("Failed to update ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
+      if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
+        LOGGER.error("Failed to update ZK metadata for segment: {}, table: {}, 
expected version: {}", segmentName,
+            tableNameWithType, expectedVersion);
       }
       throw e;
     }
@@ -239,8 +247,8 @@ public class ZKOperator {
       long segmentSizeInBytes)
       throws Exception {
     SegmentZKMetadata newSegmentZKMetadata =
-        
_pinotHelixResourceManager.constructZkMetadataForNewSegment(tableNameWithType, 
segmentMetadata, zkDownloadURI,
-            crypter, segmentSizeInBytes);
+        ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, 
segmentMetadata, zkDownloadURI, crypter,
+            segmentSizeInBytes);
 
     // Lock if enableParallelPushProtection is true.
     if (enableParallelPushProtection) {
@@ -248,13 +256,13 @@ public class ZKOperator {
     }
 
     // Update zk metadata customer map
-    String segmentZKMetadataCustomMapModifierStr = headers != null ? headers
-        
.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER)
 : null;
+    String segmentZKMetadataCustomMapModifierStr = headers != null ? 
headers.getHeaderString(
+        
FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER) 
: null;
     if (segmentZKMetadataCustomMapModifierStr != null) {
       SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
           new 
SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr);
-      newSegmentZKMetadata
-          
.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
+      newSegmentZKMetadata.setCustomMap(
+          
segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
     }
     if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, 
newSegmentZKMetadata)) {
       throw new RuntimeException(
@@ -265,13 +273,12 @@ public class ZKOperator {
     if (moveSegmentToFinalLocation) {
       try {
         moveSegmentToPermanentDirectory(currentSegmentLocation, 
finalSegmentLocationURI);
-        LOGGER
-            .info("Moved segment {} from temp location {} to {}", segmentName, 
currentSegmentLocation.getAbsolutePath(),
-                finalSegmentLocationURI.getPath());
+        LOGGER.info("Moved segment {} from temp location {} to {}", 
segmentName,
+            currentSegmentLocation.getAbsolutePath(), 
finalSegmentLocationURI.getPath());
       } catch (Exception e) {
         // Cleanup the Zk entry and the segment from the permanent directory 
if it exists.
-        LOGGER
-            .error("Could not move segment {} from table {} to permanent 
directory", segmentName, tableNameWithType, e);
+        LOGGER.error("Could not move segment {} from table {} to permanent 
directory", segmentName, tableNameWithType,
+            e);
         _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);
         LOGGER.info("Deleted zk entry and segment {} for table {}.", 
segmentName, tableNameWithType);
         throw e;
@@ -285,9 +292,8 @@ public class ZKOperator {
       _pinotHelixResourceManager.assignTableSegment(tableNameWithType, 
segmentMetadata.getName());
     } catch (Exception e) {
       // assignTableSegment removes the zk entry. Call deleteSegment to remove 
the segment from permanent location.
-      LOGGER
-          .error("Caught exception while calling assignTableSegment for adding 
segment: {} to table: {}", segmentName,
-              tableNameWithType, e);
+      LOGGER.error("Caught exception while calling assignTableSegment for 
adding segment: {} to table: {}", segmentName,
+          tableNameWithType, e);
       _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
       LOGGER.info("Deleted zk entry and segment {} for table {}.", 
segmentName, tableNameWithType);
       throw e;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 860ed3d34d..0334721571 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1988,16 +1988,12 @@ public class PinotHelixResourceManager {
     return instanceSet;
   }
 
+  @VisibleForTesting
   public void addNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, String downloadUrl) {
-    addNewSegment(tableNameWithType, segmentMetadata, downloadUrl, null);
-  }
-
-  public void addNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, String downloadUrl,
-      @Nullable String crypter) {
     // NOTE: must first set the segment ZK metadata before assigning segment 
to instances because segment assignment
     // might need them to determine the partition of the segment, and server 
will need them to download the segment
     SegmentZKMetadata segmentZkmetadata =
-        constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, 
downloadUrl, crypter, -1);
+        ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, 
segmentMetadata, downloadUrl, null, -1);
     ZNRecord znRecord = segmentZkmetadata.toZNRecord();
 
     String segmentName = segmentMetadata.getName();
@@ -2010,40 +2006,6 @@ public class PinotHelixResourceManager {
     assignTableSegment(tableNameWithType, segmentName);
   }
 
-  /**
-   * Construct segmentZkMetadata for new segment of offline or realtime table.
-   *
-   * @param tableNameWithType Table name with type
-   * @param segmentMetadata Segment metadata
-   * @param downloadUrl Download URL
-   * @param crypter Crypter
-   * @param segmentSizeInBytes Size of segment in bytes.
-   * @return SegmentZkMetadata of the input segment
-   */
-  public SegmentZKMetadata constructZkMetadataForNewSegment(String 
tableNameWithType, SegmentMetadata segmentMetadata,
-      String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
-    // Construct segment zk metadata with common fields for offline and 
realtime.
-    String segmentName = segmentMetadata.getName();
-    SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
-    ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
-    segmentZKMetadata.setDownloadUrl(downloadUrl);
-    segmentZKMetadata.setCrypterName(crypter);
-    segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
-
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      Preconditions.checkState(isUpsertTable(tableNameWithType),
-          "Upload segment " + segmentName + " for non upsert enabled realtime 
table " + tableNameWithType
-              + " is not supported");
-      // Set fields specific to realtime segments.
-      
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
-    } else {
-      // Set fields specific to offline segments.
-      segmentZKMetadata.setPushTime(System.currentTimeMillis());
-    }
-
-    return segmentZKMetadata;
-  }
-
   public void assignTableSegment(String tableNameWithType, String segmentName) 
{
     String segmentZKMetadataPath =
         
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, 
segmentName);
@@ -2169,22 +2131,19 @@ public class PinotHelixResourceManager {
     }
   }
 
+  @VisibleForTesting
   public void refreshSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata,
-      SegmentZKMetadata segmentZKMetadata, int expectedVersion, String 
downloadUrl, @Nullable String crypter,
-      long segmentSizeInBytes) {
+      SegmentZKMetadata segmentZKMetadata, int expectedVersion, String 
downloadUrl) {
     String segmentName = segmentMetadata.getName();
 
     // NOTE: Must first set the segment ZK metadata before trying to refresh 
because servers and brokers rely on segment
     // ZK metadata to refresh the segment (server will compare the segment ZK 
metadata with the local metadata to decide
-    // whether to download the new segment; broker will update the the segment 
partition info & time boundary based on
-    // the segment ZK metadata)
-    ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
-    segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
-    segmentZKMetadata.setDownloadUrl(downloadUrl);
-    segmentZKMetadata.setCrypterName(crypter);
-    segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
-    if (!ZKMetadataProvider
-        .setSegmentZKMetadata(_propertyStore, tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
+    // whether to download the new segment; broker will update the segment 
partition info & time boundary based on the
+    // segment ZK metadata)
+    ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, 
segmentZKMetadata, segmentMetadata, downloadUrl, null,
+        -1);
+    if (!ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentZKMetadata,
+        expectedVersion)) {
       throw new RuntimeException(
           "Failed to update ZK metadata for segment: " + segmentName + " of 
table: " + tableNameWithType);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index bf7b305410..2628d9f6bd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -20,58 +20,105 @@ package org.apache.pinot.controller.helix.core.util;
 
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 public class ZKMetadataUtils {
   private ZKMetadataUtils() {
   }
 
-  public static void updateSegmentMetadata(SegmentZKMetadata 
segmentZKMetadata, SegmentMetadata segmentMetadata) {
-    SegmentVersion segmentVersion = segmentMetadata.getVersion();
-    if (segmentVersion != null) {
-      segmentZKMetadata.setIndexVersion(segmentVersion.name());
-    }
+  /**
+   * Creates the segment ZK metadata for a new segment.
+   */
+  public static SegmentZKMetadata createSegmentZKMetadata(String 
tableNameWithType, SegmentMetadata segmentMetadata,
+      String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
+    SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadata.getName());
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+        segmentSizeInBytes);
+    segmentZKMetadata.setPushTime(System.currentTimeMillis());
+    return segmentZKMetadata;
+  }
+
+  /**
+   * Refreshes the segment ZK metadata for a segment being replaced.
+   */
+  public static void refreshSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+        segmentSizeInBytes);
+    segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+  }
+
+  private static void updateSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
     if (segmentMetadata.getTimeInterval() != null) {
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
       segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
       segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+    } else {
+      segmentZKMetadata.setStartTime(-1);
+      segmentZKMetadata.setEndTime(-1);
+      segmentZKMetadata.setTimeUnit(null);
+    }
+    if (segmentMetadata.getVersion() != null) {
+      segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name());
+    } else {
+      segmentZKMetadata.setIndexVersion(null);
     }
     segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
-    segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+    segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
     segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
-    SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
-        new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
-            segmentZKMetadata.getCustomMap());
-    
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
+    segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+    segmentZKMetadata.setDownloadUrl(downloadUrl);
+    segmentZKMetadata.setCrypterName(crypter);
 
-    // Extract column partition metadata (if any), and set it into segment ZK 
metadata.
+    // Set partition metadata
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
-    if (segmentMetadata instanceof SegmentMetadataImpl) {
-      for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
-        String column = entry.getKey();
-        ColumnMetadata columnMetadata = entry.getValue();
-        PartitionFunction partitionFunction = 
columnMetadata.getPartitionFunction();
-
-        if (partitionFunction != null) {
-          ColumnPartitionMetadata columnPartitionMetadata =
-              new ColumnPartitionMetadata(partitionFunction.getName(), 
partitionFunction.getNumPartitions(),
-                  columnMetadata.getPartitions(), 
partitionFunction.getFunctionConfig());
-          columnPartitionMap.put(column, columnPartitionMetadata);
-        }
+    segmentMetadata.getColumnMetadataMap().forEach((column, columnMetadata) -> 
{
+      PartitionFunction partitionFunction = 
columnMetadata.getPartitionFunction();
+      if (partitionFunction != null) {
+        ColumnPartitionMetadata columnPartitionMetadata =
+            new ColumnPartitionMetadata(partitionFunction.getName(), 
partitionFunction.getNumPartitions(),
+                columnMetadata.getPartitions(), 
partitionFunction.getFunctionConfig());
+        columnPartitionMap.put(column, columnPartitionMetadata);
       }
-    }
-
+    });
     if (!columnPartitionMap.isEmpty()) {
       segmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(columnPartitionMap));
+    } else {
+      segmentZKMetadata.setPartitionMetadata(null);
+    }
+
+    // Update custom metadata
+    // NOTE: Do not remove existing keys because they can be set by the HTTP 
header from the segment upload request
+    Map<String, String> customMap = segmentZKMetadata.getCustomMap();
+    if (customMap == null) {
+      customMap = segmentMetadata.getCustomMap();
+    } else {
+      customMap.putAll(segmentMetadata.getCustomMap());
+    }
+    segmentZKMetadata.setCustomMap(customMap);
+
+    // Set fields specific to realtime table
+    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
+
+      // NOTE:
+      // - If start/end offset is available in the uploaded segment, update 
them in the segment ZK metadata
+      // - If not, keep the existing start/end offset in the segment ZK 
metadata unchanged
+      if (segmentMetadata.getStartOffset() != null) {
+        segmentZKMetadata.setStartOffset(segmentMetadata.getStartOffset());
+      }
+      if (segmentMetadata.getEndOffset() != null) {
+        segmentZKMetadata.setEndOffset(segmentMetadata.getEndOffset());
+      }
     }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index ce902636a8..b872f70d06 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -23,7 +23,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
@@ -33,16 +35,12 @@ import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
-import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
-import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.joda.time.Duration;
-import org.joda.time.Interval;
 import org.mockito.ArgumentMatchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -66,18 +64,14 @@ public class RetentionManagerTest {
     final int numOlderSegments = 10;
     List<String> removedSegments = new ArrayList<>();
     for (int i = 0; i < numOlderSegments; i++) {
-      SegmentMetadata segmentMetadata = mockSegmentMetadata(pastTimeStamp, 
pastTimeStamp, timeUnit);
-      SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadata.getName());
-      ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, 
segmentMetadata);
+      SegmentZKMetadata segmentZKMetadata = 
mockSegmentZKMetadata(pastTimeStamp, pastTimeStamp, timeUnit);
       segmentsZKMetadata.add(segmentZKMetadata);
       removedSegments.add(segmentZKMetadata.getSegmentName());
     }
     // Create metadata for 5 segments that will not be removed.
     for (int i = 0; i < 5; i++) {
-      SegmentMetadata segmentMetadata =
-          mockSegmentMetadata(dayAfterTomorrowTimeStamp, 
dayAfterTomorrowTimeStamp, timeUnit);
-      SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadata.getName());
-      ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, 
segmentMetadata);
+      SegmentZKMetadata segmentZKMetadata =
+          mockSegmentZKMetadata(dayAfterTomorrowTimeStamp, 
dayAfterTomorrowTimeStamp, timeUnit);
       segmentsZKMetadata.add(segmentZKMetadata);
     }
     final TableConfig tableConfig = createOfflineTableConfig();
@@ -167,6 +161,9 @@ public class RetentionManagerTest {
     final String tableNameWithType = tableConfig.getTableName();
     
when(resourceManager.getAllTables()).thenReturn(Collections.singletonList(tableNameWithType));
 
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
     SegmentDeletionManager deletionManager = 
mock(SegmentDeletionManager.class);
     // Ignore the call to SegmentDeletionManager.removeAgedDeletedSegments. we 
only test that the call is made once per
     // run of the retention manager
@@ -308,18 +305,13 @@ public class RetentionManagerTest {
     return segmentMetadata;
   }
 
-  private SegmentMetadata mockSegmentMetadata(long startTime, long endTime, 
TimeUnit timeUnit) {
+  private SegmentZKMetadata mockSegmentZKMetadata(long startTime, long 
endTime, TimeUnit timeUnit) {
     long creationTime = System.currentTimeMillis();
-    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
-    when(segmentMetadata.getName()).thenReturn(TEST_TABLE_NAME + creationTime);
-    when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTime);
-    when(segmentMetadata.getCrc()).thenReturn(Long.toString(creationTime));
-    when(segmentMetadata.getStartTime()).thenReturn(startTime);
-    when(segmentMetadata.getEndTime()).thenReturn(endTime);
-    when(segmentMetadata.getTimeUnit()).thenReturn(timeUnit);
-    when(segmentMetadata.getTimeInterval())
-        .thenReturn(new Interval(timeUnit.toMillis(startTime), 
timeUnit.toMillis(endTime)));
-    when(segmentMetadata.getTimeGranularity()).thenReturn(new 
Duration(timeUnit.toMillis(1)));
-    return segmentMetadata;
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata.getSegmentName()).thenReturn(TEST_TABLE_NAME + 
creationTime);
+    when(segmentZKMetadata.getCreationTime()).thenReturn(creationTime);
+    
when(segmentZKMetadata.getStartTimeMs()).thenReturn(timeUnit.toMillis(startTime));
+    
when(segmentZKMetadata.getEndTimeMs()).thenReturn(timeUnit.toMillis(endTime));
+    return segmentZKMetadata;
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index fcac9a04fd..cc2dd52faf 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -89,8 +89,7 @@ public class ValidationManagerTest {
     }, 30_000L, "Failed to find the segment in the ExternalView");
     
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
     ControllerTestUtils.getHelixResourceManager()
-        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, 
EXPECTED_VERSION, "downloadUrl", null,
-            -1);
+        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, 
EXPECTED_VERSION, "downloadUrl");
 
     segmentZKMetadata =
         
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
 TEST_SEGMENT_NAME);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 86de96c170..26a4c693dc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -64,7 +64,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
@@ -285,8 +284,8 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     // Do not create dictionary if index size with dictionary is going to be 
larger than index size without dictionary
     // This is done to reduce the cost of dictionary for high cardinality 
columns
     // Off by default and needs optimizeDictionaryEnabled to be set to true
-    if (config.isOptimizeDictionaryForMetrics() && spec.getFieldType() == 
FieldType.METRIC
-        && spec.isSingleValueField() && spec.getDataType().isFixedWidth()) {
+    if (config.isOptimizeDictionaryForMetrics() && spec.getFieldType() == 
FieldType.METRIC && spec.isSingleValueField()
+        && spec.getDataType().isFixedWidth()) {
       long dictionarySize = info.getDistinctValueCount() * 
spec.getDataType().size();
       long forwardIndexSize =
           ((long) info.getTotalNumberOfEntries() * 
PinotDataBitSet.getNumBitsPerValue(info.getDistinctValueCount() - 1)
@@ -663,8 +662,8 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
 
     SegmentZKPropsConfig segmentZKPropsConfig = 
_config.getSegmentZKPropsConfig();
     if (segmentZKPropsConfig != null) {
-      properties.setProperty(CommonConstants.Segment.Realtime.START_OFFSET, 
segmentZKPropsConfig.getStartOffset());
-      properties.setProperty(CommonConstants.Segment.Realtime.END_OFFSET, 
segmentZKPropsConfig.getEndOffset());
+      properties.setProperty(Realtime.START_OFFSET, 
segmentZKPropsConfig.getStartOffset());
+      properties.setProperty(Realtime.END_OFFSET, 
segmentZKPropsConfig.getEndOffset());
     }
 
     properties.save();
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
index 3e977cb88d..d1be4b8624 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
@@ -94,6 +94,10 @@ public interface SegmentMetadata {
 
   Map<String, String> getCustomMap();
 
+  String getStartOffset();
+
+  String getEndOffset();
+
   default Set<String> getAllColumns() {
     return getSchema().getColumnNames();
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index fc47d53765..02049438ab 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -77,6 +77,11 @@ public class V1Constants {
       public static final String SEGMENT_PADDING_CHARACTER = 
"segment.padding.character";
 
       public static final String CUSTOM_SUBSET = "custom";
+
+      public static class Realtime {
+        public static final String START_OFFSET = 
"segment.realtime.startOffset";
+        public static final String END_OFFSET = "segment.realtime.endOffset";
+      }
     }
 
     public static class Column {
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index 7096f72ece..b17f5d41ea 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -81,13 +81,19 @@ public class SegmentMetadataImpl implements SegmentMetadata 
{
 
   private SegmentVersion _segmentVersion;
   private List<StarTreeV2Metadata> _starTreeV2MetadataList;
-  // Caching properties around can be costly when the number of segments is 
high according to the
-  // finding in PR #2996. So for now, caching is used only when initializing 
from input streams.
-  private PropertiesConfiguration _segmentMetadataPropertiesConfiguration = 
null;
   private String _creatorName;
   private int _totalDocs;
   private final Map<String, String> _customMap = new HashMap<>();
 
+  // Fields specific to realtime table
+  private String _startOffset;
+  private String _endOffset;
+
+  // TODO: No need to cache this. We cannot modify the metadata if it is from 
a input stream
+  // Caching properties around can be costly when the number of segments is 
high according to the
+  // finding in PR #2996. So for now, caching is used only when initializing 
from input streams.
+  private PropertiesConfiguration _segmentMetadataPropertiesConfiguration = 
null;
+
   @Deprecated
   private String _rawTableName;
 
@@ -250,6 +256,10 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
       }
     }
 
+    // Set start/end offset if available
+    _startOffset = 
segmentMetadataPropertiesConfiguration.getString(Segment.Realtime.START_OFFSET, 
null);
+    _endOffset = 
segmentMetadataPropertiesConfiguration.getString(Segment.Realtime.END_OFFSET, 
null);
+
     // Set custom configs from metadata properties
     setCustomConfigs(segmentMetadataPropertiesConfiguration, _customMap);
   }
@@ -378,6 +388,16 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
     return _customMap;
   }
 
+  @Override
+  public String getStartOffset() {
+    return _startOffset;
+  }
+
+  @Override
+  public String getEndOffset() {
+    return _endOffset;
+  }
+
   @Override
   public Map<String, ColumnMetadata> getColumnMetadataMap() {
     return _columnMetadataMap;
@@ -427,6 +447,9 @@ public class SegmentMetadataImpl implements SegmentMetadata 
{
     }
     segmentMetadata.set("custom", customConfigs);
 
+    segmentMetadata.put("startOffset", _startOffset);
+    segmentMetadata.put("endOffset", _endOffset);
+
     if (_columnMetadataMap != null) {
       ArrayNode columnsMetadata = JsonUtils.newArrayNode();
       for (Map.Entry<String, ColumnMetadata> entry : 
_columnMetadataMap.entrySet()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to