This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 750af31  Always read start/end time in millis from the segment ZK 
metadata (#6239)
750af31 is described below

commit 750af31133f758f3c13cda7d9b22126eb7d52512
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Nov 10 14:05:12 2020 -0800

    Always read start/end time in millis from the segment ZK metadata (#6239)
    
    - Adds the APIs to read the start/end time in millis from the 
SegmentZKMetadata, deprecates the old getters.
    - Modifies the logic in TimeBoundaryManager to support SDF for the hybrid 
table.
---
 .../routing/timeboundary/TimeBoundaryManager.java  |  96 +++----
 .../common/metadata/segment/SegmentZKMetadata.java | 291 ++++++++++-----------
 .../common/tier/TimeBasedTierSegmentSelector.java  |   7 +-
 .../apache/pinot/common/utils/CommonConstants.java |  12 +-
 .../pinot/common/data/DateTimeFormatSpecTest.java  |  70 +++--
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |  21 +-
 .../retention/strategy/TimeRetentionStrategy.java  |  13 +-
 .../controller/util/TableRetentionValidator.java   |  17 +-
 .../validation/OfflineSegmentIntervalChecker.java  |  14 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   1 -
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  16 +-
 .../apache/pinot/spi/data/DateTimeFormatSpec.java  |  58 ++--
 .../OfflineSegmentIntervalCheckerCommand.java      |   8 +-
 13 files changed, 268 insertions(+), 356 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index c7a64bb..4a8c357 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -48,15 +48,15 @@ import org.slf4j.LoggerFactory;
  */
 public class TimeBoundaryManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeBoundaryManager.class);
-  private static final long INVALID_END_TIME = -1;
+  private static final long INVALID_END_TIME_MS = -1;
 
   private final String _offlineTableName;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final String _segmentZKMetadataPathPrefix;
   private final String _timeColumn;
-  private final TimeUnit _timeUnit;
-  private final boolean _isHourlyTable;
-  private final Map<String, Long> _endTimeMap = new HashMap<>();
+  private final DateTimeFormatSpec _timeFormatSpec;
+  private final long _timeOffsetMs;
+  private final Map<String, Long> _endTimeMsMap = new HashMap<>();
 
   private volatile TimeBoundaryInfo _timeBoundaryInfo;
 
@@ -75,19 +75,20 @@ public class TimeBoundaryManager {
     DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn);
     Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in 
schema for time column: %s of table: %s",
         _timeColumn, _offlineTableName);
-    DateTimeFormatSpec formatSpec = new 
DateTimeFormatSpec(dateTimeSpec.getFormat());
-    _timeUnit = formatSpec.getColumnUnit();
-    Preconditions
-        .checkNotNull(_timeUnit, "Time unit must be configured in the field 
spec for time column: %s of table: %s",
-            _timeColumn, _offlineTableName);
+    _timeFormatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat());
+    Preconditions.checkNotNull(_timeFormatSpec.getColumnUnit(),
+        "Time unit must be configured in the field spec for time column: %s of 
table: %s", _timeColumn,
+        _offlineTableName);
 
     // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 
HOUR) as the time boundary; otherwise, use
     // (maxEndTime - 1 DAY)
-    _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
-        
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) 
&& _timeUnit != TimeUnit.DAYS;
+    boolean isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
+        
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency())
+        && _timeFormatSpec.getColumnUnit() != TimeUnit.DAYS;
+    _timeOffsetMs = isHourlyTable ? TimeUnit.HOURS.toMillis(1) : 
TimeUnit.DAYS.toMillis(1);
 
-    LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, 
timeUnit: {}, isHourlyTable: {} for table: {}",
-        _timeColumn, _timeUnit, _isHourlyTable, _offlineTableName);
+    LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, 
timeFormat: {}, isHourlyTable: {} for table: {}",
+        _timeColumn, _timeFormatSpec.getFormat(), isHourlyTable, 
_offlineTableName);
   }
 
   /**
@@ -108,38 +109,38 @@ public class TimeBoundaryManager {
       segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
     }
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT);
-    long maxEndTime = INVALID_END_TIME;
+    long maxEndTimeMs = INVALID_END_TIME_MS;
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      long endTime = extractEndTimeFromSegmentZKMetadataZNRecord(segment, 
znRecords.get(i));
-      _endTimeMap.put(segment, endTime);
-      maxEndTime = Math.max(maxEndTime, endTime);
+      long endTimeMs = extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, 
znRecords.get(i));
+      _endTimeMsMap.put(segment, endTimeMs);
+      maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs);
     }
-    updateTimeBoundaryInfo(maxEndTime);
+    updateTimeBoundaryInfo(maxEndTimeMs);
   }
 
-  private long extractEndTimeFromSegmentZKMetadataZNRecord(String segment, 
@Nullable ZNRecord znRecord) {
+  private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, 
@Nullable ZNRecord znRecord) {
     if (znRecord == null) {
       LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: 
{}", segment, _offlineTableName);
-      return INVALID_END_TIME;
+      return INVALID_END_TIME_MS;
     }
 
-    long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, 
INVALID_END_TIME);
-    if (endTime <= 0) {
+    long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+    if (endTime > 0) {
+      TimeUnit timeUnit = 
znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, 
TimeUnit.DAYS);
+      return timeUnit.toMillis(endTime);
+    } else {
       LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", 
segment, _offlineTableName);
-      return INVALID_END_TIME;
+      return INVALID_END_TIME_MS;
     }
-
-    TimeUnit timeUnit = 
znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, 
TimeUnit.DAYS);
-    return _timeUnit.convert(endTime, timeUnit);
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTime) {
-    if (maxEndTime > 0) {
-      long timeBoundary = getTimeBoundary(maxEndTime);
+  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
+    if (maxEndTimeMs > 0) {
+      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - 
_timeOffsetMs);
       TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || 
Long.parseLong(currentTimeBoundaryInfo.getTimeValue()) != timeBoundary) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, 
Long.toString(timeBoundary));
+      if (currentTimeBoundaryInfo == null || 
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
+        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
         LOGGER.info("Updated time boundary to: {} for table: {}", 
timeBoundary, _offlineTableName);
       }
     } else {
@@ -150,19 +151,6 @@ public class TimeBoundaryManager {
   }
 
   /**
-   * Returns the time boundary based on the given maximum end time.
-   * <p>NOTE: For HOURLY table with time unit other than DAYS, use (maxEndTime 
- 1 HOUR) as the time boundary;
-   * otherwise, use (maxEndTime - 1 DAY).
-   */
-  private long getTimeBoundary(long maxEndTime) {
-    if (_isHourlyTable) {
-      return maxEndTime - _timeUnit.convert(1L, TimeUnit.HOURS);
-    } else {
-      return maxEndTime - _timeUnit.convert(1L, TimeUnit.DAYS);
-    }
-  }
-
-  /**
    * Processes the external view change based on the given ideal state and 
online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector).
    * <p>NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
@@ -174,28 +162,28 @@ public class TimeBoundaryManager {
   public synchronized void onExternalViewChange(ExternalView externalView, 
IdealState idealState,
       Set<String> onlineSegments) {
     for (String segment : onlineSegments) {
-      _endTimeMap.computeIfAbsent(segment, k -> 
extractEndTimeFromSegmentZKMetadataZNRecord(segment,
+      _endTimeMsMap.computeIfAbsent(segment, k -> 
extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
           _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT)));
     }
-    _endTimeMap.keySet().retainAll(onlineSegments);
-    updateTimeBoundaryInfo(getMaxEndTime());
+    _endTimeMsMap.keySet().retainAll(onlineSegments);
+    updateTimeBoundaryInfo(getMaxEndTimeMs());
   }
 
-  private long getMaxEndTime() {
-    long maxEndTime = INVALID_END_TIME;
-    for (long endTime : _endTimeMap.values()) {
-      maxEndTime = Math.max(maxEndTime, endTime);
+  private long getMaxEndTimeMs() {
+    long maxEndTimeMs = INVALID_END_TIME_MS;
+    for (long endTimeMs : _endTimeMsMap.values()) {
+      maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs);
     }
-    return maxEndTime;
+    return maxEndTimeMs;
   }
 
   /**
    * Refreshes the metadata for the given segment (called when segment is 
getting refreshed).
    */
   public synchronized void refreshSegment(String segment) {
-    _endTimeMap.put(segment, 
extractEndTimeFromSegmentZKMetadataZNRecord(segment,
+    _endTimeMsMap.put(segment, 
extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
         _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT)));
-    updateTimeBoundaryInfo(getMaxEndTime());
+    updateTimeBoundaryInfo(getMaxEndTimeMs());
   }
 
   @Nullable
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index f0f00ad..5ee5288 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -22,11 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
 import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.joda.time.Duration;
@@ -34,11 +34,6 @@ import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.spi.utils.EqualityUtils.hashCodeOf;
-import static org.apache.pinot.spi.utils.EqualityUtils.isEqual;
-import static org.apache.pinot.spi.utils.EqualityUtils.isNullOrNotSameClass;
-import static org.apache.pinot.spi.utils.EqualityUtils.isSameReference;
-
 
 public abstract class SegmentZKMetadata implements ZKMetadata {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentZKMetadata.class);
@@ -50,16 +45,14 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
   private long _startTime = -1;
   private long _endTime = -1;
   private TimeUnit _timeUnit;
-  private Duration _timeGranularity;
-  private Interval _timeInterval;
   private String _indexVersion;
   private long _totalDocs = -1;
   private long _crc = -1;
   private long _creationTime = -1;
   private SegmentPartitionMetadata _partitionMetadata;
   private long _segmentUploadStartTime = -1;
-  private Map<String, String> _customMap;
   private String _crypterName;
+  private Map<String, String> _customMap;
 
   @Deprecated
   private String _tableName;
@@ -68,23 +61,20 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
   }
 
   public SegmentZKMetadata(ZNRecord znRecord) {
-    _segmentName = 
znRecord.getSimpleField(CommonConstants.Segment.SEGMENT_NAME);
-    _tableName = znRecord.getSimpleField(CommonConstants.Segment.TABLE_NAME);
-    _crypterName = 
znRecord.getSimpleField(CommonConstants.Segment.CRYPTER_NAME);
-    _segmentType = znRecord.getEnumField(CommonConstants.Segment.SEGMENT_TYPE, 
SegmentType.class, SegmentType.OFFLINE);
-    _startTime = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
-    _endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
-    if 
(znRecord.getSimpleFields().containsKey(CommonConstants.Segment.TIME_UNIT) && 
!znRecord
-        .getSimpleField(CommonConstants.Segment.TIME_UNIT).equals(NULL)) {
-      setTimeUnit(znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, 
TimeUnit.class, TimeUnit.DAYS));
+    _segmentName = znRecord.getSimpleField(Segment.SEGMENT_NAME);
+    _segmentType = znRecord.getEnumField(Segment.SEGMENT_TYPE, 
SegmentType.class, SegmentType.OFFLINE);
+    _startTime = znRecord.getLongField(Segment.START_TIME, -1);
+    _endTime = znRecord.getLongField(Segment.END_TIME, -1);
+    String timeUnitString = znRecord.getSimpleField(Segment.TIME_UNIT);
+    if (timeUnitString != null && !timeUnitString.equals(NULL)) {
+      _timeUnit = znRecord.getEnumField(Segment.TIME_UNIT, TimeUnit.class, 
TimeUnit.DAYS);
     }
-    _indexVersion = 
znRecord.getSimpleField(CommonConstants.Segment.INDEX_VERSION);
-    _totalDocs = znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
-    _crc = znRecord.getLongField(CommonConstants.Segment.CRC, -1);
-    _creationTime = 
znRecord.getLongField(CommonConstants.Segment.CREATION_TIME, -1);
-
+    _indexVersion = znRecord.getSimpleField(Segment.INDEX_VERSION);
+    _totalDocs = znRecord.getLongField(Segment.TOTAL_DOCS, -1);
+    _crc = znRecord.getLongField(Segment.CRC, -1);
+    _creationTime = znRecord.getLongField(Segment.CREATION_TIME, -1);
     try {
-      String partitionMetadataJson = 
znRecord.getSimpleField(CommonConstants.Segment.PARTITION_METADATA);
+      String partitionMetadataJson = 
znRecord.getSimpleField(Segment.PARTITION_METADATA);
       if (partitionMetadataJson != null) {
         _partitionMetadata = 
SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
       }
@@ -93,8 +83,12 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
           "Exception caught while reading partition info from zk metadata for 
segment '{}', partition info dropped.",
           _segmentName, e);
     }
-    _segmentUploadStartTime = 
znRecord.getLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, -1);
-    _customMap = znRecord.getMapField(CommonConstants.Segment.CUSTOM_MAP);
+    _segmentUploadStartTime = 
znRecord.getLongField(Segment.SEGMENT_UPLOAD_START_TIME, -1);
+    _crypterName = znRecord.getSimpleField(Segment.CRYPTER_NAME);
+    _customMap = znRecord.getMapField(Segment.CUSTOM_MAP);
+
+    // For backward-compatibility
+    _tableName = znRecord.getSimpleField(Segment.TABLE_NAME);
   }
 
   public String getSegmentName() {
@@ -105,54 +99,40 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
     _segmentName = segmentName;
   }
 
-  @Deprecated
-  public String getTableName() {
-    return _tableName;
+  public SegmentType getSegmentType() {
+    return _segmentType;
   }
 
-  @Deprecated
-  public void setTableName(String tableName) {
-    _tableName = tableName;
+  public void setSegmentType(SegmentType segmentType) {
+    _segmentType = segmentType;
   }
 
-  public long getStartTime() {
-    return _startTime;
+  public long getStartTimeMs() {
+    if (_startTime > 0 && _timeUnit != null) {
+      return _timeUnit.toMillis(_startTime);
+    } else {
+      return -1;
+    }
   }
 
   public void setStartTime(long startTime) {
     _startTime = startTime;
   }
 
-  public long getEndTime() {
-    return _endTime;
+  public long getEndTimeMs() {
+    if (_endTime > 0 && _timeUnit != null) {
+      return _timeUnit.toMillis(_endTime);
+    } else {
+      return -1;
+    }
   }
 
   public void setEndTime(long endTime) {
     _endTime = endTime;
   }
 
-  public TimeUnit getTimeUnit() {
-    return _timeUnit;
-  }
-
-  /**
-   * NOTE: should be called after setting start and end time.
-   */
-  public void setTimeUnit(@Nonnull TimeUnit timeUnit) {
+  public void setTimeUnit(TimeUnit timeUnit) {
     _timeUnit = timeUnit;
-    _timeGranularity = new Duration(_timeUnit.toMillis(1));
-    // For consuming segment, end time might not be set
-    if (_startTime >= 0 && _startTime <= _endTime) {
-      _timeInterval = new Interval(_timeUnit.toMillis(_startTime), 
_timeUnit.toMillis(_endTime));
-    }
-  }
-
-  public Duration getTimeGranularity() {
-    return _timeGranularity;
-  }
-
-  public Interval getTimeInterval() {
-    return _timeInterval;
   }
 
   public String getIndexVersion() {
@@ -163,22 +143,6 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
     _indexVersion = indexVersion;
   }
 
-  public SegmentType getSegmentType() {
-    return _segmentType;
-  }
-
-  public void setSegmentType(SegmentType segmentType) {
-    _segmentType = segmentType;
-  }
-
-  public String getCrypterName() {
-    return _crypterName;
-  }
-
-  public void setCrypterName(String crypterName) {
-    _crypterName = crypterName;
-  }
-
   public long getTotalDocs() {
     return _totalDocs;
   }
@@ -219,6 +183,14 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
     _segmentUploadStartTime = segmentUploadStartTime;
   }
 
+  public String getCrypterName() {
+    return _crypterName;
+  }
+
+  public void setCrypterName(String crypterName) {
+    _crypterName = crypterName;
+  }
+
   public Map<String, String> getCustomMap() {
     return _customMap;
   }
@@ -227,76 +199,87 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
     _customMap = customMap;
   }
 
+  @Deprecated
+  public String getTableName() {
+    return _tableName;
+  }
+
+  @Deprecated
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  @Deprecated
+  public long getStartTime() {
+    return _startTime;
+  }
+
+  @Deprecated
+  public long getEndTime() {
+    return _endTime;
+  }
+
+  @Deprecated
+  public TimeUnit getTimeUnit() {
+    return _timeUnit;
+  }
+
+  @Deprecated
+  public Duration getTimeGranularity() {
+    return _timeUnit != null ? new Duration(_timeUnit.toMillis(1)) : null;
+  }
+
+  @Deprecated
+  public Interval getTimeInterval() {
+    if (_startTime > 0 && _startTime <= _endTime && _timeUnit != null) {
+      return new Interval(_timeUnit.toMillis(_startTime), 
_timeUnit.toMillis(_endTime));
+    } else {
+      return null;
+    }
+  }
+
   @Override
-  public boolean equals(Object segmentMetadata) {
-    if (isSameReference(this, segmentMetadata)) {
+  public boolean equals(Object o) {
+    if (this == o) {
       return true;
     }
-
-    if (isNullOrNotSameClass(this, segmentMetadata)) {
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    SegmentZKMetadata metadata = (SegmentZKMetadata) segmentMetadata;
-    return isEqual(_segmentName, metadata._segmentName) && 
isEqual(_crypterName, metadata._crypterName) && isEqual(
-        _tableName, metadata._tableName) && isEqual(_indexVersion, 
metadata._indexVersion) && isEqual(_timeUnit,
-        metadata._timeUnit) && isEqual(_startTime, metadata._startTime) && 
isEqual(_endTime, metadata._endTime)
-        && isEqual(_segmentType, metadata._segmentType) && isEqual(_totalDocs, 
metadata._totalDocs) && isEqual(_crc,
-        metadata._crc) && isEqual(_creationTime, metadata._creationTime) && 
isEqual(_partitionMetadata,
-        metadata._partitionMetadata) && isEqual(_segmentUploadStartTime, 
metadata._segmentUploadStartTime) && isEqual(
-        _customMap, metadata._customMap);
+    SegmentZKMetadata that = (SegmentZKMetadata) o;
+    return _startTime == that._startTime && _endTime == that._endTime && 
_totalDocs == that._totalDocs
+        && _crc == that._crc && _creationTime == that._creationTime
+        && _segmentUploadStartTime == that._segmentUploadStartTime && 
Objects.equals(_segmentName, that._segmentName)
+        && _segmentType == that._segmentType && _timeUnit == that._timeUnit && 
Objects
+        .equals(_indexVersion, that._indexVersion) && 
Objects.equals(_partitionMetadata, that._partitionMetadata)
+        && Objects.equals(_crypterName, that._crypterName) && 
Objects.equals(_customMap, that._customMap) && Objects
+        .equals(_tableName, that._tableName);
   }
 
   @Override
   public int hashCode() {
-    int result = hashCodeOf(_segmentName);
-    result = hashCodeOf(result, _tableName);
-    result = hashCodeOf(result, _crypterName);
-    result = hashCodeOf(result, _segmentType);
-    result = hashCodeOf(result, _startTime);
-    result = hashCodeOf(result, _endTime);
-    result = hashCodeOf(result, _timeUnit);
-    result = hashCodeOf(result, _indexVersion);
-    result = hashCodeOf(result, _totalDocs);
-    result = hashCodeOf(result, _crc);
-    result = hashCodeOf(result, _creationTime);
-    result = hashCodeOf(result, _partitionMetadata);
-    result = hashCodeOf(result, _segmentUploadStartTime);
-    result = hashCodeOf(result, _customMap);
-    return result;
+    return Objects.hash(_segmentName, _segmentType, _startTime, _endTime, 
_timeUnit, _indexVersion, _totalDocs, _crc,
+        _creationTime, _partitionMetadata, _segmentUploadStartTime, 
_crypterName, _customMap, _tableName);
   }
 
   @Override
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = new ZNRecord(_segmentName);
-    znRecord.setSimpleField(CommonConstants.Segment.SEGMENT_NAME, 
_segmentName);
-
-    if (_tableName != null) {
-      znRecord.setSimpleField(CommonConstants.Segment.TABLE_NAME, _tableName);
-    }
 
-    if (_crypterName != null) {
-      znRecord.setSimpleField(CommonConstants.Segment.CRYPTER_NAME, 
_crypterName);
-    }
-
-    znRecord.setEnumField(CommonConstants.Segment.SEGMENT_TYPE, _segmentType);
-    if (_timeUnit == null) {
-      znRecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, NULL);
-    } else {
-      znRecord.setEnumField(CommonConstants.Segment.TIME_UNIT, _timeUnit);
-    }
-    znRecord.setLongField(CommonConstants.Segment.START_TIME, _startTime);
-    znRecord.setLongField(CommonConstants.Segment.END_TIME, _endTime);
-
-    znRecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, 
_indexVersion);
-    znRecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, _totalDocs);
-    znRecord.setLongField(CommonConstants.Segment.CRC, _crc);
-    znRecord.setLongField(CommonConstants.Segment.CREATION_TIME, 
_creationTime);
+    znRecord.setSimpleField(Segment.SEGMENT_NAME, _segmentName);
+    znRecord.setEnumField(Segment.SEGMENT_TYPE, _segmentType);
+    znRecord.setLongField(Segment.START_TIME, _startTime);
+    znRecord.setLongField(Segment.END_TIME, _endTime);
+    znRecord.setSimpleField(Segment.TIME_UNIT, _timeUnit != null ? 
_timeUnit.name() : NULL);
+    znRecord.setSimpleField(Segment.INDEX_VERSION, _indexVersion);
+    znRecord.setLongField(Segment.TOTAL_DOCS, _totalDocs);
+    znRecord.setLongField(Segment.CRC, _crc);
+    znRecord.setLongField(Segment.CREATION_TIME, _creationTime);
 
     if (_partitionMetadata != null) {
       try {
         String partitionMetadataJson = _partitionMetadata.toJsonString();
-        znRecord.setSimpleField(CommonConstants.Segment.PARTITION_METADATA, 
partitionMetadataJson);
+        znRecord.setSimpleField(Segment.PARTITION_METADATA, 
partitionMetadataJson);
       } catch (IOException e) {
         LOGGER
             .error("Exception caught while writing partition metadata into 
ZNRecord for segment '{}', will be dropped",
@@ -304,59 +287,65 @@ public abstract class SegmentZKMetadata implements 
ZKMetadata {
       }
     }
     if (_segmentUploadStartTime > 0) {
-      znRecord.setLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, 
_segmentUploadStartTime);
+      znRecord.setLongField(Segment.SEGMENT_UPLOAD_START_TIME, 
_segmentUploadStartTime);
+    }
+    if (_crypterName != null) {
+      znRecord.setSimpleField(Segment.CRYPTER_NAME, _crypterName);
     }
     if (_customMap != null) {
-      znRecord.setMapField(CommonConstants.Segment.CUSTOM_MAP, _customMap);
+      znRecord.setMapField(Segment.CUSTOM_MAP, _customMap);
+    }
+
+    // For backward-compatibility
+    if (_tableName != null) {
+      znRecord.setSimpleField(Segment.TABLE_NAME, _tableName);
     }
+
     return znRecord;
   }
 
   public Map<String, String> toMap() {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put(CommonConstants.Segment.SEGMENT_NAME, _segmentName);
-    if (_tableName != null) {
-      configMap.put(CommonConstants.Segment.TABLE_NAME, _tableName);
-    }
-    configMap.put(CommonConstants.Segment.SEGMENT_TYPE, 
_segmentType.toString());
-    if (_timeUnit == null) {
-      configMap.put(CommonConstants.Segment.TIME_UNIT, null);
-    } else {
-      configMap.put(CommonConstants.Segment.TIME_UNIT, _timeUnit.toString());
-    }
-    configMap.put(CommonConstants.Segment.START_TIME, 
Long.toString(_startTime));
-    configMap.put(CommonConstants.Segment.END_TIME, Long.toString(_endTime));
 
-    configMap.put(CommonConstants.Segment.INDEX_VERSION, _indexVersion);
-    configMap.put(CommonConstants.Segment.TOTAL_DOCS, 
Long.toString(_totalDocs));
-    configMap.put(CommonConstants.Segment.CRC, Long.toString(_crc));
-    configMap.put(CommonConstants.Segment.CREATION_TIME, 
Long.toString(_creationTime));
+    configMap.put(Segment.SEGMENT_NAME, _segmentName);
+    configMap.put(Segment.SEGMENT_TYPE, _segmentType.toString());
+    configMap.put(Segment.START_TIME, Long.toString(_startTime));
+    configMap.put(Segment.END_TIME, Long.toString(_endTime));
+    configMap.put(Segment.TIME_UNIT, _timeUnit != null ? _timeUnit.name() : 
null);
+    configMap.put(Segment.INDEX_VERSION, _indexVersion);
+    configMap.put(Segment.TOTAL_DOCS, Long.toString(_totalDocs));
+    configMap.put(Segment.CRC, Long.toString(_crc));
+    configMap.put(Segment.CREATION_TIME, Long.toString(_creationTime));
 
     if (_partitionMetadata != null) {
       try {
         String partitionMetadataJson = _partitionMetadata.toJsonString();
-        configMap.put(CommonConstants.Segment.PARTITION_METADATA, 
partitionMetadataJson);
+        configMap.put(Segment.PARTITION_METADATA, partitionMetadataJson);
       } catch (IOException e) {
         LOGGER.error(
             "Exception caught while converting partition metadata into JSON 
string for segment '{}', will be dropped",
             _segmentName, e);
       }
     }
-
     if (_segmentUploadStartTime > 0) {
-      configMap.put(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, 
Long.toString(_segmentUploadStartTime));
+      configMap.put(Segment.SEGMENT_UPLOAD_START_TIME, 
Long.toString(_segmentUploadStartTime));
     }
-
-    if (_customMap == null) {
-      configMap.put(CommonConstants.Segment.CUSTOM_MAP, null);
-    } else {
+    if (_crypterName != null) {
+      configMap.put(Segment.CRYPTER_NAME, _crypterName);
+    }
+    if (_customMap != null) {
       try {
-        configMap.put(CommonConstants.Segment.CUSTOM_MAP, 
JsonUtils.objectToString(_customMap));
+        configMap.put(Segment.CUSTOM_MAP, 
JsonUtils.objectToString(_customMap));
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
     }
 
+    // For backward-compatibility
+    if (_tableName != null) {
+      configMap.put(Segment.TABLE_NAME, _tableName);
+    }
+
     return configMap;
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
index f4b201c..3023bbf 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.common.tier;
 
 import com.google.common.base.Preconditions;
-import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -66,10 +65,10 @@ public class TimeBasedTierSegmentSelector implements 
TierSegmentSelector {
             tableNameWithType);
 
     // get segment end time to decide if segment gets selected
-    TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
+    long endTimeMs = segmentZKMetadata.getEndTimeMs();
     Preconditions
-        .checkNotNull(timeUnit, "Time unit is not set for segment: %s of 
table: %s", segmentName, tableNameWithType);
-    long endTimeMs = timeUnit.toMillis(segmentZKMetadata.getEndTime());
+        .checkState(endTimeMs > 0, "Invalid endTimeMs: %s for segment: %s of 
table: %s", endTimeMs, segmentName,
+            tableNameWithType);
     long now = System.currentTimeMillis();
     return (now - endTimeMs) > _segmentAgeMillis;
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index e709b15..afb1852 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -364,18 +364,16 @@ public class CommonConstants {
 
     public static final String SEGMENT_NAME = "segment.name";
     public static final String SEGMENT_TYPE = "segment.type";
-    public static final String CRYPTER_NAME = "segment.crypter";
-    public static final String INDEX_VERSION = "segment.index.version";
     public static final String START_TIME = "segment.start.time";
     public static final String END_TIME = "segment.end.time";
     public static final String TIME_UNIT = "segment.time.unit";
+    public static final String INDEX_VERSION = "segment.index.version";
     public static final String TOTAL_DOCS = "segment.total.docs";
     public static final String CRC = "segment.crc";
     public static final String CREATION_TIME = "segment.creation.time";
     public static final String FLUSH_THRESHOLD_SIZE = 
"segment.flush.threshold.size";
     public static final String FLUSH_THRESHOLD_TIME = 
"segment.flush.threshold.time";
     public static final String PARTITION_METADATA = 
"segment.partition.metadata";
-    public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
     /**
      * This field is used for parallel push protection to lock the segment 
globally.
      * We put the segment upload start timestamp so that if the previous push 
failed without unlock the segment, the
@@ -383,12 +381,17 @@ public class CommonConstants {
      */
     public static final String SEGMENT_UPLOAD_START_TIME = 
"segment.upload.start.time";
 
+    public static final String CRYPTER_NAME = "segment.crypter";
     public static final String CUSTOM_MAP = "custom.map";
 
+    @Deprecated
+    public static final String TABLE_NAME = "segment.table.name";
+
     public static final String SEGMENT_BACKUP_DIR_SUFFIX = ".segment.bak";
     public static final String SEGMENT_TEMP_DIR_SUFFIX = ".segment.tmp";
 
     public static final String LOCAL_SEGMENT_SCHEME = "file";
+    public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
     public static final String METADATA_URI_FOR_PEER_DOWNLOAD = "";
 
     public enum SegmentType {
@@ -405,9 +408,6 @@ public class CommonConstants {
       public static final String HOSTNAME = "$hostName";
       public static final String SEGMENTNAME = "$segmentName";
     }
-
-    @Deprecated
-    public static final String TABLE_NAME = "segment.table.name";
   }
 
   public static class Query {
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
index e253c49..fa9725f 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
@@ -40,21 +40,18 @@ public class DateTimeFormatSpecTest {
 
   // Test conversion of a dateTimeColumn value from a format to millis
   @Test(dataProvider = "testFromFormatToMillisDataProvider")
-  public void testFromFormatToMillis(String format, Object timeColumnValue, 
long millisExpected) {
-
-    DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format);
-    long millisActual = dateTimeFormatSpec.fromFormatToMillis(timeColumnValue);
-    Assert.assertEquals(millisActual, millisExpected);
+  public void testFromFormatToMillis(String format, String formattedValue, 
long expectedTimeMs) {
+    Assert.assertEquals(new 
DateTimeFormatSpec(format).fromFormatToMillis(formattedValue), expectedTimeMs);
   }
 
   @DataProvider(name = "testFromFormatToMillisDataProvider")
   public Object[][] provideTestFromFormatToMillisData() {
 
     List<Object[]> entries = new ArrayList<>();
-    entries.add(new Object[]{"1:HOURS:EPOCH", 416359L, 1498892400000L});
-    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, 
1498892400000L});
-    entries.add(new Object[]{"1:HOURS:EPOCH", 0L, 0L});
-    entries.add(new Object[]{"5:MINUTES:EPOCH", 4996308L, 1498892400000L});
+    entries.add(new Object[]{"1:HOURS:EPOCH", "416359", 1498892400000L});
+    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", "1498892400000", 
1498892400000L});
+    entries.add(new Object[]{"1:HOURS:EPOCH", "0", 0L});
+    entries.add(new Object[]{"5:MINUTES:EPOCH", "4996308", 1498892400000L});
     entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "20170701", 
DateTimeFormat.forPattern("yyyyMMdd")
         .withZoneUTC().parseMillis("20170701")});
     entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd 
tz(America/Chicago)", "20170701", DateTimeFormat
@@ -73,44 +70,37 @@ public class DateTimeFormatSpecTest {
 
   // Test the conversion of a millis value to date time column value in a 
format
   @Test(dataProvider = "testFromMillisToFormatDataProvider")
-  public void testFromMillisToFormat(String format, long timeColumnValueMS, 
Class<?> type,
-      Object timeColumnValueExpected) {
-
-    DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format);
-    Object timeColumnValueActual = 
dateTimeFormatSpec.fromMillisToFormat(timeColumnValueMS, type);
-    Assert.assertEquals(timeColumnValueActual, timeColumnValueExpected);
+  public void testFromMillisToFormat(String format, long timeMs, String 
expectedFormattedValue) {
+    Assert.assertEquals(new 
DateTimeFormatSpec(format).fromMillisToFormat(timeMs), expectedFormattedValue);
   }
 
   @DataProvider(name = "testFromMillisToFormatDataProvider")
   public Object[][] provideTestFromMillisToFormatData() {
 
     List<Object[]> entries = new ArrayList<>();
-    entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, Long.class, 
416359L});
-    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, 
Long.class, 1498892400000L});
-    entries.add(new Object[]{"1:HOURS:EPOCH", 0L, Long.class, 0L});
-    entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, Long.class, 
4996308L});
-    entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 
1498892400000L, String.class, DateTimeFormat
-        .forPattern("yyyyMMdd").withZoneUTC().print(1498892400000L)});
-    entries.add(
-        new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd 
tz(America/New_York)", 1498892400000L, String.class, DateTimeFormat
-            
.forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print(
-            1498892400000L)});
-    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 
1498892400000L, String.class, DateTimeFormat
-        .forPattern("yyyyMMdd HH").withZoneUTC().print(1498892400000L)});
-    entries.add(
-        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 
1498892400000L, String.class, DateTimeFormat
-            .forPattern("yyyyMMdd 
HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print(
-            1498892400000L)});
-    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 
1498892400000L, String.class, DateTimeFormat
+    entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, "416359"});
+    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, 
"1498892400000"});
+    entries.add(new Object[]{"1:HOURS:EPOCH", 0L, "0"});
+    entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, "4996308"});
+    entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 
1498892400000L, DateTimeFormat.forPattern("yyyyMMdd")
+        .withZoneUTC().print(1498892400000L)});
+    entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd 
tz(America/New_York)", 1498892400000L, DateTimeFormat
+        
.forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print(
+        1498892400000L)});
+    entries.add(
+        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 1498892400000L, 
DateTimeFormat.forPattern("yyyyMMdd HH")
+            .withZoneUTC().print(1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 
1498892400000L, DateTimeFormat
+        .forPattern("yyyyMMdd 
HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print(
+        1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 
1498892400000L, DateTimeFormat
         .forPattern("yyyyMMdd HH Z").withZoneUTC().print(1498892400000L)});
-    entries.add(
-        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z tz(GMT+0500)", 
1498892400000L, String.class, DateTimeFormat
-            .forPattern("yyyyMMdd HH 
Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print(
-            1498892400000L)});
-    entries.add(
-        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 
1498892400000L, String.class, DateTimeFormat
-            .forPattern("M/d/yyyy h:mm:ss 
a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)});
-    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 
1502066750000L, String.class, DateTimeFormat
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z 
tz(GMT+0500)", 1498892400000L, DateTimeFormat
+        .forPattern("yyyyMMdd HH 
Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print(
+        1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 
1498892400000L, DateTimeFormat
+        .forPattern("M/d/yyyy h:mm:ss 
a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 
1502066750000L, DateTimeFormat
         .forPattern("M/d/yyyy h 
a").withZoneUTC().withLocale(Locale.ENGLISH).print(1502066750000L)});
     return entries.toArray(new Object[entries.size()][]);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
index b505d28..e27c045 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -25,11 +25,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Segment;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -171,9 +169,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator 
implements PinotTaskGenerato
       boolean skipGenerate = false;
       for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : 
completedSegmentsMetadata) {
         String segmentName = realtimeSegmentZKMetadata.getSegmentName();
-        TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
-        long segmentStartTimeMs = 
timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
-        long segmentEndTimeMs = 
timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+        long segmentStartTimeMs = realtimeSegmentZKMetadata.getStartTimeMs();
+        long segmentEndTimeMs = realtimeSegmentZKMetadata.getEndTimeMs();
 
         // Check overlap with window
         if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < 
windowEndMs) {
@@ -287,21 +284,15 @@ public class RealtimeToOfflineSegmentsTaskGenerator 
implements PinotTaskGenerato
       long watermarkMs;
 
       // Find the smallest time from all segments
-      RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+      long minStartTimeMs = Long.MAX_VALUE;
       for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : 
completedSegmentsMetadata) {
-        if (minSegmentZkMetadata == null || 
realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
-            .getStartTime()) {
-          minSegmentZkMetadata = realtimeSegmentZKMetadata;
-        }
+        minStartTimeMs = Math.min(minStartTimeMs, 
realtimeSegmentZKMetadata.getStartTimeMs());
       }
-      Preconditions.checkState(minSegmentZkMetadata != null);
-
-      // Convert the segment minTime to millis
-      long minSegmentStartTimeMs = 
minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+      Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
 
       // Round off according to the bucket. This ensures we align the offline 
segments to proper time boundaries
       // For example, if start time millis is 20200813T12:34:59, we want to 
create the first segment for window [20200813, 20200814)
-      watermarkMs = (minSegmentStartTimeMs / bucketMs) * bucketMs;
+      watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
 
       // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark 
calculated above
       realtimeToOfflineSegmentsTaskMetadata = new 
RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index 2690d67..ad6d66f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -39,19 +39,12 @@ public class TimeRetentionStrategy implements 
RetentionStrategy {
 
   @Override
   public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata 
segmentZKMetadata) {
-    TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
-    if (timeUnit == null) {
-      LOGGER.warn("Time unit is not set for {} segment: {} of table: {}", 
segmentZKMetadata.getSegmentType(),
-          segmentZKMetadata.getSegmentName(), tableNameWithType);
-      return false;
-    }
-    long endTime = segmentZKMetadata.getEndTime();
-    long endTimeMs = timeUnit.toMillis(endTime);
+    long endTimeMs = segmentZKMetadata.getEndTimeMs();
 
     // Check that the end time is between 1971 and 2071
     if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
-      LOGGER.warn("{} segment: {} of table: {} has invalid end time: {} {}", 
segmentZKMetadata.getSegmentType(),
-          segmentZKMetadata.getSegmentName(), tableNameWithType, endTime, 
timeUnit);
+      LOGGER.warn("{} segment: {} of table: {} has invalid end time in millis: 
{}", segmentZKMetadata.getSegmentType(),
+          segmentZKMetadata.getSegmentName(), tableNameWithType, endTimeMs);
       return false;
     }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
index 455066f..c3d3df3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
@@ -162,18 +162,13 @@ public class TableRetentionValidator {
       List<String> errorMessages = new ArrayList<>();
       for (String segmentName : segmentNames) {
         OfflineSegmentZKMetadata offlineSegmentMetadata = 
getOfflineSegmentMetadata(tableName, segmentName);
-        TimeUnit segmentTimeUnit = offlineSegmentMetadata.getTimeUnit();
-        if (segmentTimeUnit == null) {
-          errorMessages.add("Segment: " + segmentName + " has null time unit");
-          continue;
+        long startTimeMs = offlineSegmentMetadata.getStartTimeMs();
+        if (!TimeUtils.timeValueInValidRange(startTimeMs)) {
+          errorMessages.add("Segment: " + segmentName + " has invalid start 
time in millis: " + startTimeMs);
         }
-        long startTimeInMillis = 
segmentTimeUnit.toMillis(offlineSegmentMetadata.getStartTime());
-        if (!TimeUtils.timeValueInValidRange(startTimeInMillis)) {
-          errorMessages.add("Segment: " + segmentName + " has invalid start 
time in millis: " + startTimeInMillis);
-        }
-        long endTimeInMillis = 
segmentTimeUnit.toMillis(offlineSegmentMetadata.getEndTime());
-        if (!TimeUtils.timeValueInValidRange(endTimeInMillis)) {
-          errorMessages.add("Segment: " + segmentName + " has invalid end time 
in millis: " + endTimeInMillis);
+        long endTimeMs = offlineSegmentMetadata.getEndTimeMs();
+        if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
+          errorMessages.add("Segment: " + segmentName + " has invalid end time 
in millis: " + endTimeMs);
         }
       }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index dc98600..7e593b3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -88,9 +88,10 @@ public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask<Void>
       List<Interval> segmentIntervals = new ArrayList<>(numSegments);
       int numSegmentsWithInvalidIntervals = 0;
       for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : 
offlineSegmentZKMetadataList) {
-        Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
-        if (timeInterval != null && 
TimeUtils.isValidTimeInterval(timeInterval)) {
-          segmentIntervals.add(timeInterval);
+        long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs();
+        long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+        if (TimeUtils.timeValueInValidRange(startTimeMs) && 
TimeUtils.timeValueInValidRange(endTimeMs)) {
+          segmentIntervals.add(new Interval(startTimeMs, endTimeMs));
         } else {
           numSegmentsWithInvalidIntervals++;
         }
@@ -110,10 +111,9 @@ public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask<Void>
     long maxSegmentPushTime = Long.MIN_VALUE;
 
     for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : 
offlineSegmentZKMetadataList) {
-      Interval segmentInterval = offlineSegmentZKMetadata.getTimeInterval();
-
-      if (segmentInterval != null && maxSegmentEndTime < 
segmentInterval.getEndMillis()) {
-        maxSegmentEndTime = segmentInterval.getEndMillis();
+      long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+      if (TimeUtils.timeValueInValidRange(endTimeMs) && maxSegmentEndTime < 
endTimeMs) {
+        maxSegmentEndTime = endTimeMs;
       }
 
       long segmentPushTime = offlineSegmentZKMetadata.getPushTime();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index e15d28c..4db0266 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -222,7 +222,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertEquals(committedSegmentZKMetadata.getEndOffset(),
         new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString());
     assertEquals(committedSegmentZKMetadata.getCreationTime(), 
CURRENT_TIME_MS);
-    assertEquals(committedSegmentZKMetadata.getTimeInterval(), INTERVAL);
     assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
     assertEquals(committedSegmentZKMetadata.getIndexVersion(), 
SEGMENT_VERSION);
     assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
index 82cb2fe..b25bd51 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -255,18 +255,12 @@ public class RealtimeToOfflineSegmentsTaskExecutor 
extends BaseMultipleSegmentsC
       filterFunction = getFilterFunctionLong(windowStartMs, windowEndMs, 
timeColumn);
     } else {
       // Convert windowStart and windowEnd to time format of the data
-      if 
(dateTimeFormatSpec.getTimeFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH)) 
{
-        long windowStart = 
dateTimeFormatSpec.fromMillisToFormat(windowStartMs, Long.class);
-        long windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, 
Long.class);
-        filterFunction = getFilterFunctionLong(windowStart, windowEnd, 
timeColumn);
+      String windowStart = 
dateTimeFormatSpec.fromMillisToFormat(windowStartMs);
+      String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs);
+      if (dateTimeFieldSpec.getDataType().isNumeric()) {
+        filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), 
Long.parseLong(windowEnd), timeColumn);
       } else {
-        String windowStart = 
dateTimeFormatSpec.fromMillisToFormat(windowStartMs, String.class);
-        String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, 
String.class);
-        if (dateTimeFieldSpec.getDataType().isNumeric()) {
-          filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), 
Long.parseLong(windowEnd), timeColumn);
-        } else {
-          filterFunction = getFilterFunctionString(windowStart, windowEnd, 
timeColumn);
-        }
+        filterFunction = getFilterFunctionString(windowStart, windowEnd, 
timeColumn);
       }
     }
     return new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
index f760778..6918da2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
@@ -121,61 +121,35 @@ public class DateTimeFormatSpec {
   }
 
   /**
+   * Converts the time in millis to the date time format.
    * <ul>
-   * <li>Given a timestamp in millis, convert it to the given format
-   * This method should not do validation of outputGranularity.
-   * The validation should be handled by caller using {@link 
#validateFormat}</li>
-   * <ul>
-   * <li>1) given dateTimeColumnValueMS = 1498892400000 and 
format=1:HOURS:EPOCH,
-   * dateTimeSpec.fromMillis(1498892400000) = 416359 (i.e. 
dateTimeColumnValueMS/(1000*60*60))</li>
-   * <li>2) given dateTimeColumnValueMS = 1498892400000 and 
format=5:MINUTES:EPOCH,
-   * dateTimeSpec.fromMillis(1498892400000) = 4996308 (i.e. 
timeColumnValueMS/(1000*60*5))</li>
-   * <li>3) given dateTimeColumnValueMS = 1498892400000 and
-   * format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd, 
dateTimeSpec.fromMillis(1498892400000) = 20170701</li>
-   * </ul>
+   *   <li>Given timeMs=1498892400000 and format='1:HOURS:EPOCH', returns 
1498892400000/(1000*60*60)='416359'</li>
+   *   <li>Given timeMs=1498892400000 and format='5:MINUTES:EPOCH', returns 
1498892400000/(1000*60*5)='4996308'</li>
+   *   <li>Given timeMs=1498892400000 and 
format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns '20170701'</li>
    * </ul>
-   * @param type - type of return value (can be int/long or string depending 
on time format)
-   * @return dateTime column value in dateTimeFieldSpec
    */
-  public <T extends Object> T fromMillisToFormat(Long dateTimeColumnValueMS, 
Class<T> type) {
-    Preconditions.checkNotNull(dateTimeColumnValueMS);
-
-    Object dateTimeColumnValue;
-    if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) {
-      dateTimeColumnValue = 
_unitSpec.getTimeUnit().convert(dateTimeColumnValueMS, TimeUnit.MILLISECONDS) / 
_size;
+  public String fromMillisToFormat(long timeMs) {
+    if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) {
+      return Long.toString(_unitSpec.getTimeUnit().convert(timeMs, 
TimeUnit.MILLISECONDS) / _size);
     } else {
-      dateTimeColumnValue = 
_patternSpec.getDateTimeFormatter().print(dateTimeColumnValueMS);
+      return _patternSpec.getDateTimeFormatter().print(timeMs);
     }
-    return type.cast(dateTimeColumnValue);
   }
 
   /**
+   * Converts the date time value to the time in millis.
    * <ul>
-   * <li>Convert a time value in a format, to millis.
-   * This method should not do validation of outputGranularity.
-   * The validation should be handled by caller using {@link 
#validateFormat}</li>
-   * <ul>
-   * <li>1) given dateTimeColumnValue = 416359 and format=1:HOURS:EPOCH
-   * dateTimeSpec.toMillis(416359) = 1498892400000 (i.e. 
timeColumnValue*60*60*1000)</li>
-   * <li>2) given dateTimeColumnValue = 4996308 and format=5:MINUTES:EPOCH
-   * dateTimeSpec.toMillis(4996308) = 1498892400000 (i.e. 
timeColumnValue*5*60*1000)</li>
-   * <li>3) given dateTimeColumnValue = 20170701 and 
format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd
-   * dateTimeSpec.toMillis(20170701) = 1498892400000</li>
+   *   <li>Given dateTimeValue='416359' and format='1:HOURS:EPOCH', returns 
416359*(1000*60*60)=1498892400000</li>
+   *   <li>Given dateTimeValue='4996308' and format='5:MINUTES:EPOCH', returns 
4996308*(1000*60*5)=1498892400000</li>
+   *   <li>Given dateTimeValue='20170701' and 
format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns 1498892400000</li>
    * </ul>
-   * <ul>
-   * @param dateTimeColumnValue - datetime Column value to convert to millis
-   * @return datetime value in millis
    */
-  public Long fromFormatToMillis(Object dateTimeColumnValue) {
-    Preconditions.checkNotNull(dateTimeColumnValue);
-
-    long timeColumnValueMS;
-    if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) {
-      timeColumnValueMS = TimeUnit.MILLISECONDS.convert((Long) 
dateTimeColumnValue * _size, _unitSpec.getTimeUnit());
+  public long fromFormatToMillis(String dateTimeValue) {
+    if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) {
+      return TimeUnit.MILLISECONDS.convert(Long.parseLong(dateTimeValue) * 
_size, _unitSpec.getTimeUnit());
     } else {
-      timeColumnValueMS = 
_patternSpec.getDateTimeFormatter().parseMillis(String.valueOf(dateTimeColumnValue));
+      return _patternSpec.getDateTimeFormatter().parseMillis(dateTimeValue);
     }
-    return timeColumnValueMS;
   }
 
   /**
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
index 754fdc7..af34969 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
@@ -35,7 +35,6 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.tools.Command;
-import org.joda.time.Interval;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -136,12 +135,13 @@ public class OfflineSegmentIntervalCheckerCommand extends 
AbstractBaseAdminComma
     List<OfflineSegmentZKMetadata> offlineSegmentZKMetadataList =
         
ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, 
offlineTableName);
 
-    // collect segments with invalid time intervals
+    // Collect segments with invalid start/end time
     List<String> segmentsWithInvalidIntervals = new ArrayList<>();
     if 
(SegmentIntervalUtils.eligibleForSegmentIntervalCheck(tableConfig.getValidationConfig()))
 {
       for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : 
offlineSegmentZKMetadataList) {
-        Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
-        if (timeInterval == null || 
!TimeUtils.isValidTimeInterval(timeInterval)) {
+        long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs();
+        long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+        if (!TimeUtils.timeValueInValidRange(startTimeMs) || 
!TimeUtils.timeValueInValidRange(endTimeMs)) {
           
segmentsWithInvalidIntervals.add(offlineSegmentZKMetadata.getSegmentName());
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to