This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 750af31 Always read start/end time in millis from the segment ZK metadata (#6239) 750af31 is described below commit 750af31133f758f3c13cda7d9b22126eb7d52512 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Nov 10 14:05:12 2020 -0800 Always read start/end time in millis from the segment ZK metadata (#6239) - Adds the APIs to read the start/end time in millis from the SegmentZKMetadata, deprecates the old getters. - Modifies the logic in TimeBoundaryManager to support SDF for the hybrid table. --- .../routing/timeboundary/TimeBoundaryManager.java | 96 +++---- .../common/metadata/segment/SegmentZKMetadata.java | 291 ++++++++++----------- .../common/tier/TimeBasedTierSegmentSelector.java | 7 +- .../apache/pinot/common/utils/CommonConstants.java | 12 +- .../pinot/common/data/DateTimeFormatSpecTest.java | 70 +++-- .../RealtimeToOfflineSegmentsTaskGenerator.java | 21 +- .../retention/strategy/TimeRetentionStrategy.java | 13 +- .../controller/util/TableRetentionValidator.java | 17 +- .../validation/OfflineSegmentIntervalChecker.java | 14 +- .../PinotLLCRealtimeSegmentManagerTest.java | 1 - .../RealtimeToOfflineSegmentsTaskExecutor.java | 16 +- .../apache/pinot/spi/data/DateTimeFormatSpec.java | 58 ++-- .../OfflineSegmentIntervalCheckerCommand.java | 8 +- 13 files changed, 268 insertions(+), 356 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java index c7a64bb..4a8c357 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java @@ -48,15 +48,15 @@ import org.slf4j.LoggerFactory; */ public class TimeBoundaryManager { private static final Logger LOGGER = LoggerFactory.getLogger(TimeBoundaryManager.class); - private static final long INVALID_END_TIME = -1; + private static final long INVALID_END_TIME_MS = -1; private final String _offlineTableName; private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private final String _segmentZKMetadataPathPrefix; private final String _timeColumn; - private final TimeUnit _timeUnit; - private final boolean _isHourlyTable; - private final Map<String, Long> _endTimeMap = new HashMap<>(); + private final DateTimeFormatSpec _timeFormatSpec; + private final long _timeOffsetMs; + private final Map<String, Long> _endTimeMsMap = new HashMap<>(); private volatile TimeBoundaryInfo _timeBoundaryInfo; @@ -75,19 +75,20 @@ public class TimeBoundaryManager { DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn); Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s", _timeColumn, _offlineTableName); - DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat()); - _timeUnit = formatSpec.getColumnUnit(); - Preconditions - .checkNotNull(_timeUnit, "Time unit must be configured in the field spec for time column: %s of table: %s", - _timeColumn, _offlineTableName); + _timeFormatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat()); + Preconditions.checkNotNull(_timeFormatSpec.getColumnUnit(), + "Time unit must be configured in the field spec for time column: %s of table: %s", _timeColumn, + _offlineTableName); // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use // (maxEndTime - 1 DAY) - _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY - .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) && _timeUnit != TimeUnit.DAYS; + boolean isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY + .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) + && _timeFormatSpec.getColumnUnit() != TimeUnit.DAYS; + _timeOffsetMs = isHourlyTable ? TimeUnit.HOURS.toMillis(1) : TimeUnit.DAYS.toMillis(1); - LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeUnit: {}, isHourlyTable: {} for table: {}", - _timeColumn, _timeUnit, _isHourlyTable, _offlineTableName); + LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeFormat: {}, isHourlyTable: {} for table: {}", + _timeColumn, _timeFormatSpec.getFormat(), isHourlyTable, _offlineTableName); } /** @@ -108,38 +109,38 @@ public class TimeBoundaryManager { segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); } List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT); - long maxEndTime = INVALID_END_TIME; + long maxEndTimeMs = INVALID_END_TIME_MS; for (int i = 0; i < numSegments; i++) { String segment = segments.get(i); - long endTime = extractEndTimeFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)); - _endTimeMap.put(segment, endTime); - maxEndTime = Math.max(maxEndTime, endTime); + long endTimeMs = extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)); + _endTimeMsMap.put(segment, endTimeMs); + maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs); } - updateTimeBoundaryInfo(maxEndTime); + updateTimeBoundaryInfo(maxEndTimeMs); } - private long extractEndTimeFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) { + private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) { if (znRecord == null) { LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _offlineTableName); - return INVALID_END_TIME; + return INVALID_END_TIME_MS; } - long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, INVALID_END_TIME); - if (endTime <= 0) { + long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1); + if (endTime > 0) { + TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS); + return timeUnit.toMillis(endTime); + } else { LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName); - return INVALID_END_TIME; + return INVALID_END_TIME_MS; } - - TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS); - return _timeUnit.convert(endTime, timeUnit); } - private void updateTimeBoundaryInfo(long maxEndTime) { - if (maxEndTime > 0) { - long timeBoundary = getTimeBoundary(maxEndTime); + private void updateTimeBoundaryInfo(long maxEndTimeMs) { + if (maxEndTimeMs > 0) { + String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs); TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo; - if (currentTimeBoundaryInfo == null || Long.parseLong(currentTimeBoundaryInfo.getTimeValue()) != timeBoundary) { - _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, Long.toString(timeBoundary)); + if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) { + _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary); LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName); } } else { @@ -150,19 +151,6 @@ public class TimeBoundaryManager { } /** - * Returns the time boundary based on the given maximum end time. - * <p>NOTE: For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; - * otherwise, use (maxEndTime - 1 DAY). - */ - private long getTimeBoundary(long maxEndTime) { - if (_isHourlyTable) { - return maxEndTime - _timeUnit.convert(1L, TimeUnit.HOURS); - } else { - return maxEndTime - _timeUnit.convert(1L, TimeUnit.DAYS); - } - } - - /** * Processes the external view change based on the given ideal state and online segments (segments with * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector). * <p>NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed @@ -174,28 +162,28 @@ public class TimeBoundaryManager { public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) { for (String segment : onlineSegments) { - _endTimeMap.computeIfAbsent(segment, k -> extractEndTimeFromSegmentZKMetadataZNRecord(segment, + _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))); } - _endTimeMap.keySet().retainAll(onlineSegments); - updateTimeBoundaryInfo(getMaxEndTime()); + _endTimeMsMap.keySet().retainAll(onlineSegments); + updateTimeBoundaryInfo(getMaxEndTimeMs()); } - private long getMaxEndTime() { - long maxEndTime = INVALID_END_TIME; - for (long endTime : _endTimeMap.values()) { - maxEndTime = Math.max(maxEndTime, endTime); + private long getMaxEndTimeMs() { + long maxEndTimeMs = INVALID_END_TIME_MS; + for (long endTimeMs : _endTimeMsMap.values()) { + maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs); } - return maxEndTime; + return maxEndTimeMs; } /** * Refreshes the metadata for the given segment (called when segment is getting refreshed). */ public synchronized void refreshSegment(String segment) { - _endTimeMap.put(segment, extractEndTimeFromSegmentZKMetadataZNRecord(segment, + _endTimeMsMap.put(segment, extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))); - updateTimeBoundaryInfo(getMaxEndTime()); + updateTimeBoundaryInfo(getMaxEndTimeMs()); } @Nullable diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java index f0f00ad..5ee5288 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java @@ -22,11 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; import org.apache.helix.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadata; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Segment; import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType; import org.apache.pinot.spi.utils.JsonUtils; import org.joda.time.Duration; @@ -34,11 +34,6 @@ import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.spi.utils.EqualityUtils.hashCodeOf; -import static org.apache.pinot.spi.utils.EqualityUtils.isEqual; -import static org.apache.pinot.spi.utils.EqualityUtils.isNullOrNotSameClass; -import static org.apache.pinot.spi.utils.EqualityUtils.isSameReference; - public abstract class SegmentZKMetadata implements ZKMetadata { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadata.class); @@ -50,16 +45,14 @@ public abstract class SegmentZKMetadata implements ZKMetadata { private long _startTime = -1; private long _endTime = -1; private TimeUnit _timeUnit; - private Duration _timeGranularity; - private Interval _timeInterval; private String _indexVersion; private long _totalDocs = -1; private long _crc = -1; private long _creationTime = -1; private SegmentPartitionMetadata _partitionMetadata; private long _segmentUploadStartTime = -1; - private Map<String, String> _customMap; private String _crypterName; + private Map<String, String> _customMap; @Deprecated private String _tableName; @@ -68,23 +61,20 @@ public abstract class SegmentZKMetadata implements ZKMetadata { } public SegmentZKMetadata(ZNRecord znRecord) { - _segmentName = znRecord.getSimpleField(CommonConstants.Segment.SEGMENT_NAME); - _tableName = znRecord.getSimpleField(CommonConstants.Segment.TABLE_NAME); - _crypterName = znRecord.getSimpleField(CommonConstants.Segment.CRYPTER_NAME); - _segmentType = znRecord.getEnumField(CommonConstants.Segment.SEGMENT_TYPE, SegmentType.class, SegmentType.OFFLINE); - _startTime = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1); - _endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1); - if (znRecord.getSimpleFields().containsKey(CommonConstants.Segment.TIME_UNIT) && !znRecord - .getSimpleField(CommonConstants.Segment.TIME_UNIT).equals(NULL)) { - setTimeUnit(znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS)); + _segmentName = znRecord.getSimpleField(Segment.SEGMENT_NAME); + _segmentType = znRecord.getEnumField(Segment.SEGMENT_TYPE, SegmentType.class, SegmentType.OFFLINE); + _startTime = znRecord.getLongField(Segment.START_TIME, -1); + _endTime = znRecord.getLongField(Segment.END_TIME, -1); + String timeUnitString = znRecord.getSimpleField(Segment.TIME_UNIT); + if (timeUnitString != null && !timeUnitString.equals(NULL)) { + _timeUnit = znRecord.getEnumField(Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS); } - _indexVersion = znRecord.getSimpleField(CommonConstants.Segment.INDEX_VERSION); - _totalDocs = znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1); - _crc = znRecord.getLongField(CommonConstants.Segment.CRC, -1); - _creationTime = znRecord.getLongField(CommonConstants.Segment.CREATION_TIME, -1); - + _indexVersion = znRecord.getSimpleField(Segment.INDEX_VERSION); + _totalDocs = znRecord.getLongField(Segment.TOTAL_DOCS, -1); + _crc = znRecord.getLongField(Segment.CRC, -1); + _creationTime = znRecord.getLongField(Segment.CREATION_TIME, -1); try { - String partitionMetadataJson = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_METADATA); + String partitionMetadataJson = znRecord.getSimpleField(Segment.PARTITION_METADATA); if (partitionMetadataJson != null) { _partitionMetadata = SegmentPartitionMetadata.fromJsonString(partitionMetadataJson); } @@ -93,8 +83,12 @@ public abstract class SegmentZKMetadata implements ZKMetadata { "Exception caught while reading partition info from zk metadata for segment '{}', partition info dropped.", _segmentName, e); } - _segmentUploadStartTime = znRecord.getLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, -1); - _customMap = znRecord.getMapField(CommonConstants.Segment.CUSTOM_MAP); + _segmentUploadStartTime = znRecord.getLongField(Segment.SEGMENT_UPLOAD_START_TIME, -1); + _crypterName = znRecord.getSimpleField(Segment.CRYPTER_NAME); + _customMap = znRecord.getMapField(Segment.CUSTOM_MAP); + + // For backward-compatibility + _tableName = znRecord.getSimpleField(Segment.TABLE_NAME); } public String getSegmentName() { @@ -105,54 +99,40 @@ public abstract class SegmentZKMetadata implements ZKMetadata { _segmentName = segmentName; } - @Deprecated - public String getTableName() { - return _tableName; + public SegmentType getSegmentType() { + return _segmentType; } - @Deprecated - public void setTableName(String tableName) { - _tableName = tableName; + public void setSegmentType(SegmentType segmentType) { + _segmentType = segmentType; } - public long getStartTime() { - return _startTime; + public long getStartTimeMs() { + if (_startTime > 0 && _timeUnit != null) { + return _timeUnit.toMillis(_startTime); + } else { + return -1; + } } public void setStartTime(long startTime) { _startTime = startTime; } - public long getEndTime() { - return _endTime; + public long getEndTimeMs() { + if (_endTime > 0 && _timeUnit != null) { + return _timeUnit.toMillis(_endTime); + } else { + return -1; + } } public void setEndTime(long endTime) { _endTime = endTime; } - public TimeUnit getTimeUnit() { - return _timeUnit; - } - - /** - * NOTE: should be called after setting start and end time. - */ - public void setTimeUnit(@Nonnull TimeUnit timeUnit) { + public void setTimeUnit(TimeUnit timeUnit) { _timeUnit = timeUnit; - _timeGranularity = new Duration(_timeUnit.toMillis(1)); - // For consuming segment, end time might not be set - if (_startTime >= 0 && _startTime <= _endTime) { - _timeInterval = new Interval(_timeUnit.toMillis(_startTime), _timeUnit.toMillis(_endTime)); - } - } - - public Duration getTimeGranularity() { - return _timeGranularity; - } - - public Interval getTimeInterval() { - return _timeInterval; } public String getIndexVersion() { @@ -163,22 +143,6 @@ public abstract class SegmentZKMetadata implements ZKMetadata { _indexVersion = indexVersion; } - public SegmentType getSegmentType() { - return _segmentType; - } - - public void setSegmentType(SegmentType segmentType) { - _segmentType = segmentType; - } - - public String getCrypterName() { - return _crypterName; - } - - public void setCrypterName(String crypterName) { - _crypterName = crypterName; - } - public long getTotalDocs() { return _totalDocs; } @@ -219,6 +183,14 @@ public abstract class SegmentZKMetadata implements ZKMetadata { _segmentUploadStartTime = segmentUploadStartTime; } + public String getCrypterName() { + return _crypterName; + } + + public void setCrypterName(String crypterName) { + _crypterName = crypterName; + } + public Map<String, String> getCustomMap() { return _customMap; } @@ -227,76 +199,87 @@ public abstract class SegmentZKMetadata implements ZKMetadata { _customMap = customMap; } + @Deprecated + public String getTableName() { + return _tableName; + } + + @Deprecated + public void setTableName(String tableName) { + _tableName = tableName; + } + + @Deprecated + public long getStartTime() { + return _startTime; + } + + @Deprecated + public long getEndTime() { + return _endTime; + } + + @Deprecated + public TimeUnit getTimeUnit() { + return _timeUnit; + } + + @Deprecated + public Duration getTimeGranularity() { + return _timeUnit != null ? new Duration(_timeUnit.toMillis(1)) : null; + } + + @Deprecated + public Interval getTimeInterval() { + if (_startTime > 0 && _startTime <= _endTime && _timeUnit != null) { + return new Interval(_timeUnit.toMillis(_startTime), _timeUnit.toMillis(_endTime)); + } else { + return null; + } + } + @Override - public boolean equals(Object segmentMetadata) { - if (isSameReference(this, segmentMetadata)) { + public boolean equals(Object o) { + if (this == o) { return true; } - - if (isNullOrNotSameClass(this, segmentMetadata)) { + if (o == null || getClass() != o.getClass()) { return false; } - - SegmentZKMetadata metadata = (SegmentZKMetadata) segmentMetadata; - return isEqual(_segmentName, metadata._segmentName) && isEqual(_crypterName, metadata._crypterName) && isEqual( - _tableName, metadata._tableName) && isEqual(_indexVersion, metadata._indexVersion) && isEqual(_timeUnit, - metadata._timeUnit) && isEqual(_startTime, metadata._startTime) && isEqual(_endTime, metadata._endTime) - && isEqual(_segmentType, metadata._segmentType) && isEqual(_totalDocs, metadata._totalDocs) && isEqual(_crc, - metadata._crc) && isEqual(_creationTime, metadata._creationTime) && isEqual(_partitionMetadata, - metadata._partitionMetadata) && isEqual(_segmentUploadStartTime, metadata._segmentUploadStartTime) && isEqual( - _customMap, metadata._customMap); + SegmentZKMetadata that = (SegmentZKMetadata) o; + return _startTime == that._startTime && _endTime == that._endTime && _totalDocs == that._totalDocs + && _crc == that._crc && _creationTime == that._creationTime + && _segmentUploadStartTime == that._segmentUploadStartTime && Objects.equals(_segmentName, that._segmentName) + && _segmentType == that._segmentType && _timeUnit == that._timeUnit && Objects + .equals(_indexVersion, that._indexVersion) && Objects.equals(_partitionMetadata, that._partitionMetadata) + && Objects.equals(_crypterName, that._crypterName) && Objects.equals(_customMap, that._customMap) && Objects + .equals(_tableName, that._tableName); } @Override public int hashCode() { - int result = hashCodeOf(_segmentName); - result = hashCodeOf(result, _tableName); - result = hashCodeOf(result, _crypterName); - result = hashCodeOf(result, _segmentType); - result = hashCodeOf(result, _startTime); - result = hashCodeOf(result, _endTime); - result = hashCodeOf(result, _timeUnit); - result = hashCodeOf(result, _indexVersion); - result = hashCodeOf(result, _totalDocs); - result = hashCodeOf(result, _crc); - result = hashCodeOf(result, _creationTime); - result = hashCodeOf(result, _partitionMetadata); - result = hashCodeOf(result, _segmentUploadStartTime); - result = hashCodeOf(result, _customMap); - return result; + return Objects.hash(_segmentName, _segmentType, _startTime, _endTime, _timeUnit, _indexVersion, _totalDocs, _crc, + _creationTime, _partitionMetadata, _segmentUploadStartTime, _crypterName, _customMap, _tableName); } @Override public ZNRecord toZNRecord() { ZNRecord znRecord = new ZNRecord(_segmentName); - znRecord.setSimpleField(CommonConstants.Segment.SEGMENT_NAME, _segmentName); - - if (_tableName != null) { - znRecord.setSimpleField(CommonConstants.Segment.TABLE_NAME, _tableName); - } - if (_crypterName != null) { - znRecord.setSimpleField(CommonConstants.Segment.CRYPTER_NAME, _crypterName); - } - - znRecord.setEnumField(CommonConstants.Segment.SEGMENT_TYPE, _segmentType); - if (_timeUnit == null) { - znRecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, NULL); - } else { - znRecord.setEnumField(CommonConstants.Segment.TIME_UNIT, _timeUnit); - } - znRecord.setLongField(CommonConstants.Segment.START_TIME, _startTime); - znRecord.setLongField(CommonConstants.Segment.END_TIME, _endTime); - - znRecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, _indexVersion); - znRecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, _totalDocs); - znRecord.setLongField(CommonConstants.Segment.CRC, _crc); - znRecord.setLongField(CommonConstants.Segment.CREATION_TIME, _creationTime); + znRecord.setSimpleField(Segment.SEGMENT_NAME, _segmentName); + znRecord.setEnumField(Segment.SEGMENT_TYPE, _segmentType); + znRecord.setLongField(Segment.START_TIME, _startTime); + znRecord.setLongField(Segment.END_TIME, _endTime); + znRecord.setSimpleField(Segment.TIME_UNIT, _timeUnit != null ? _timeUnit.name() : NULL); + znRecord.setSimpleField(Segment.INDEX_VERSION, _indexVersion); + znRecord.setLongField(Segment.TOTAL_DOCS, _totalDocs); + znRecord.setLongField(Segment.CRC, _crc); + znRecord.setLongField(Segment.CREATION_TIME, _creationTime); if (_partitionMetadata != null) { try { String partitionMetadataJson = _partitionMetadata.toJsonString(); - znRecord.setSimpleField(CommonConstants.Segment.PARTITION_METADATA, partitionMetadataJson); + znRecord.setSimpleField(Segment.PARTITION_METADATA, partitionMetadataJson); } catch (IOException e) { LOGGER .error("Exception caught while writing partition metadata into ZNRecord for segment '{}', will be dropped", @@ -304,59 +287,65 @@ public abstract class SegmentZKMetadata implements ZKMetadata { } } if (_segmentUploadStartTime > 0) { - znRecord.setLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, _segmentUploadStartTime); + znRecord.setLongField(Segment.SEGMENT_UPLOAD_START_TIME, _segmentUploadStartTime); + } + if (_crypterName != null) { + znRecord.setSimpleField(Segment.CRYPTER_NAME, _crypterName); } if (_customMap != null) { - znRecord.setMapField(CommonConstants.Segment.CUSTOM_MAP, _customMap); + znRecord.setMapField(Segment.CUSTOM_MAP, _customMap); + } + + // For backward-compatibility + if (_tableName != null) { + znRecord.setSimpleField(Segment.TABLE_NAME, _tableName); } + return znRecord; } public Map<String, String> toMap() { Map<String, String> configMap = new HashMap<>(); - configMap.put(CommonConstants.Segment.SEGMENT_NAME, _segmentName); - if (_tableName != null) { - configMap.put(CommonConstants.Segment.TABLE_NAME, _tableName); - } - configMap.put(CommonConstants.Segment.SEGMENT_TYPE, _segmentType.toString()); - if (_timeUnit == null) { - configMap.put(CommonConstants.Segment.TIME_UNIT, null); - } else { - configMap.put(CommonConstants.Segment.TIME_UNIT, _timeUnit.toString()); - } - configMap.put(CommonConstants.Segment.START_TIME, Long.toString(_startTime)); - configMap.put(CommonConstants.Segment.END_TIME, Long.toString(_endTime)); - configMap.put(CommonConstants.Segment.INDEX_VERSION, _indexVersion); - configMap.put(CommonConstants.Segment.TOTAL_DOCS, Long.toString(_totalDocs)); - configMap.put(CommonConstants.Segment.CRC, Long.toString(_crc)); - configMap.put(CommonConstants.Segment.CREATION_TIME, Long.toString(_creationTime)); + configMap.put(Segment.SEGMENT_NAME, _segmentName); + configMap.put(Segment.SEGMENT_TYPE, _segmentType.toString()); + configMap.put(Segment.START_TIME, Long.toString(_startTime)); + configMap.put(Segment.END_TIME, Long.toString(_endTime)); + configMap.put(Segment.TIME_UNIT, _timeUnit != null ? _timeUnit.name() : null); + configMap.put(Segment.INDEX_VERSION, _indexVersion); + configMap.put(Segment.TOTAL_DOCS, Long.toString(_totalDocs)); + configMap.put(Segment.CRC, Long.toString(_crc)); + configMap.put(Segment.CREATION_TIME, Long.toString(_creationTime)); if (_partitionMetadata != null) { try { String partitionMetadataJson = _partitionMetadata.toJsonString(); - configMap.put(CommonConstants.Segment.PARTITION_METADATA, partitionMetadataJson); + configMap.put(Segment.PARTITION_METADATA, partitionMetadataJson); } catch (IOException e) { LOGGER.error( "Exception caught while converting partition metadata into JSON string for segment '{}', will be dropped", _segmentName, e); } } - if (_segmentUploadStartTime > 0) { - configMap.put(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, Long.toString(_segmentUploadStartTime)); + configMap.put(Segment.SEGMENT_UPLOAD_START_TIME, Long.toString(_segmentUploadStartTime)); } - - if (_customMap == null) { - configMap.put(CommonConstants.Segment.CUSTOM_MAP, null); - } else { + if (_crypterName != null) { + configMap.put(Segment.CRYPTER_NAME, _crypterName); + } + if (_customMap != null) { try { - configMap.put(CommonConstants.Segment.CUSTOM_MAP, JsonUtils.objectToString(_customMap)); + configMap.put(Segment.CUSTOM_MAP, JsonUtils.objectToString(_customMap)); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } + // For backward-compatibility + if (_tableName != null) { + configMap.put(Segment.TABLE_NAME, _tableName); + } + return configMap; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java index f4b201c..3023bbf 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java @@ -19,7 +19,6 @@ package org.apache.pinot.common.tier; import com.google.common.base.Preconditions; -import java.util.concurrent.TimeUnit; import org.apache.helix.HelixManager; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -66,10 +65,10 @@ public class TimeBasedTierSegmentSelector implements TierSegmentSelector { tableNameWithType); // get segment end time to decide if segment gets selected - TimeUnit timeUnit = segmentZKMetadata.getTimeUnit(); + long endTimeMs = segmentZKMetadata.getEndTimeMs(); Preconditions - .checkNotNull(timeUnit, "Time unit is not set for segment: %s of table: %s", segmentName, tableNameWithType); - long endTimeMs = timeUnit.toMillis(segmentZKMetadata.getEndTime()); + .checkState(endTimeMs > 0, "Invalid endTimeMs: %s for segment: %s of table: %s", endTimeMs, segmentName, + tableNameWithType); long now = System.currentTimeMillis(); return (now - endTimeMs) > _segmentAgeMillis; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index e709b15..afb1852 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -364,18 +364,16 @@ public class CommonConstants { public static final String SEGMENT_NAME = "segment.name"; public static final String SEGMENT_TYPE = "segment.type"; - public static final String CRYPTER_NAME = "segment.crypter"; - public static final String INDEX_VERSION = "segment.index.version"; public static final String START_TIME = "segment.start.time"; public static final String END_TIME = "segment.end.time"; public static final String TIME_UNIT = "segment.time.unit"; + public static final String INDEX_VERSION = "segment.index.version"; public static final String TOTAL_DOCS = "segment.total.docs"; public static final String CRC = "segment.crc"; public static final String CREATION_TIME = "segment.creation.time"; public static final String FLUSH_THRESHOLD_SIZE = "segment.flush.threshold.size"; public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time"; public static final String PARTITION_METADATA = "segment.partition.metadata"; - public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://"; /** * This field is used for parallel push protection to lock the segment globally. * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the @@ -383,12 +381,17 @@ public class CommonConstants { */ public static final String SEGMENT_UPLOAD_START_TIME = "segment.upload.start.time"; + public static final String CRYPTER_NAME = "segment.crypter"; public static final String CUSTOM_MAP = "custom.map"; + @Deprecated + public static final String TABLE_NAME = "segment.table.name"; + public static final String SEGMENT_BACKUP_DIR_SUFFIX = ".segment.bak"; public static final String SEGMENT_TEMP_DIR_SUFFIX = ".segment.tmp"; public static final String LOCAL_SEGMENT_SCHEME = "file"; + public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://"; public static final String METADATA_URI_FOR_PEER_DOWNLOAD = ""; public enum SegmentType { @@ -405,9 +408,6 @@ public class CommonConstants { public static final String HOSTNAME = "$hostName"; public static final String SEGMENTNAME = "$segmentName"; } - - @Deprecated - public static final String TABLE_NAME = "segment.table.name"; } public static class Query { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java index e253c49..fa9725f 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java @@ -40,21 +40,18 @@ public class DateTimeFormatSpecTest { // Test conversion of a dateTimeColumn value from a format to millis @Test(dataProvider = "testFromFormatToMillisDataProvider") - public void testFromFormatToMillis(String format, Object timeColumnValue, long millisExpected) { - - DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format); - long millisActual = dateTimeFormatSpec.fromFormatToMillis(timeColumnValue); - Assert.assertEquals(millisActual, millisExpected); + public void testFromFormatToMillis(String format, String formattedValue, long expectedTimeMs) { + Assert.assertEquals(new DateTimeFormatSpec(format).fromFormatToMillis(formattedValue), expectedTimeMs); } @DataProvider(name = "testFromFormatToMillisDataProvider") public Object[][] provideTestFromFormatToMillisData() { List<Object[]> entries = new ArrayList<>(); - entries.add(new Object[]{"1:HOURS:EPOCH", 416359L, 1498892400000L}); - entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, 1498892400000L}); - entries.add(new Object[]{"1:HOURS:EPOCH", 0L, 0L}); - entries.add(new Object[]{"5:MINUTES:EPOCH", 4996308L, 1498892400000L}); + entries.add(new Object[]{"1:HOURS:EPOCH", "416359", 1498892400000L}); + entries.add(new Object[]{"1:MILLISECONDS:EPOCH", "1498892400000", 1498892400000L}); + entries.add(new Object[]{"1:HOURS:EPOCH", "0", 0L}); + entries.add(new Object[]{"5:MINUTES:EPOCH", "4996308", 1498892400000L}); entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "20170701", DateTimeFormat.forPattern("yyyyMMdd") .withZoneUTC().parseMillis("20170701")}); entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/Chicago)", "20170701", DateTimeFormat @@ -73,44 +70,37 @@ public class DateTimeFormatSpecTest { // Test the conversion of a millis value to date time column value in a format @Test(dataProvider = "testFromMillisToFormatDataProvider") - public void testFromMillisToFormat(String format, long timeColumnValueMS, Class<?> type, - Object timeColumnValueExpected) { - - DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format); - Object timeColumnValueActual = dateTimeFormatSpec.fromMillisToFormat(timeColumnValueMS, type); - Assert.assertEquals(timeColumnValueActual, timeColumnValueExpected); + public void testFromMillisToFormat(String format, long timeMs, String expectedFormattedValue) { + Assert.assertEquals(new DateTimeFormatSpec(format).fromMillisToFormat(timeMs), expectedFormattedValue); } @DataProvider(name = "testFromMillisToFormatDataProvider") public Object[][] provideTestFromMillisToFormatData() { List<Object[]> entries = new ArrayList<>(); - entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, Long.class, 416359L}); - entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, Long.class, 1498892400000L}); - entries.add(new Object[]{"1:HOURS:EPOCH", 0L, Long.class, 0L}); - entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, Long.class, 4996308L}); - entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 1498892400000L, String.class, DateTimeFormat - .forPattern("yyyyMMdd").withZoneUTC().print(1498892400000L)}); - entries.add( - new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/New_York)", 1498892400000L, String.class, DateTimeFormat - .forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print( - 1498892400000L)}); - entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 1498892400000L, String.class, DateTimeFormat - .forPattern("yyyyMMdd HH").withZoneUTC().print(1498892400000L)}); - entries.add( - new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 1498892400000L, String.class, DateTimeFormat - .forPattern("yyyyMMdd HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print( - 1498892400000L)}); - entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 1498892400000L, String.class, DateTimeFormat + entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, "416359"}); + entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, "1498892400000"}); + entries.add(new Object[]{"1:HOURS:EPOCH", 0L, "0"}); + entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, "4996308"}); + entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 1498892400000L, DateTimeFormat.forPattern("yyyyMMdd") + .withZoneUTC().print(1498892400000L)}); + entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/New_York)", 1498892400000L, DateTimeFormat + .forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print( + 1498892400000L)}); + entries.add( + new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 1498892400000L, DateTimeFormat.forPattern("yyyyMMdd HH") + .withZoneUTC().print(1498892400000L)}); + entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 1498892400000L, DateTimeFormat + .forPattern("yyyyMMdd HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print( + 1498892400000L)}); + entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 1498892400000L, DateTimeFormat .forPattern("yyyyMMdd HH Z").withZoneUTC().print(1498892400000L)}); - entries.add( - new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z tz(GMT+0500)", 1498892400000L, String.class, DateTimeFormat - .forPattern("yyyyMMdd HH Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print( - 1498892400000L)}); - entries.add( - new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 1498892400000L, String.class, DateTimeFormat - .forPattern("M/d/yyyy h:mm:ss a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)}); - entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 1502066750000L, String.class, DateTimeFormat + entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z tz(GMT+0500)", 1498892400000L, DateTimeFormat + .forPattern("yyyyMMdd HH Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print( + 1498892400000L)}); + entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 1498892400000L, DateTimeFormat + .forPattern("M/d/yyyy h:mm:ss a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)}); + entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 1502066750000L, DateTimeFormat .forPattern("M/d/yyyy h a").withZoneUTC().withLocale(Locale.ENGLISH).print(1502066750000L)}); return entries.toArray(new Object[entries.size()][]); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java index b505d28..e27c045 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java @@ -25,11 +25,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; -import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; import org.apache.pinot.common.utils.CommonConstants.Segment; import org.apache.pinot.common.utils.LLCSegmentName; @@ -171,9 +169,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato boolean skipGenerate = false; for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) { String segmentName = realtimeSegmentZKMetadata.getSegmentName(); - TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit(); - long segmentStartTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime()); - long segmentEndTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime()); + long segmentStartTimeMs = realtimeSegmentZKMetadata.getStartTimeMs(); + long segmentEndTimeMs = realtimeSegmentZKMetadata.getEndTimeMs(); // Check overlap with window if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { @@ -287,21 +284,15 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato long watermarkMs; // Find the smallest time from all segments - RealtimeSegmentZKMetadata minSegmentZkMetadata = null; + long minStartTimeMs = Long.MAX_VALUE; for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) { - if (minSegmentZkMetadata == null || realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata - .getStartTime()) { - minSegmentZkMetadata = realtimeSegmentZKMetadata; - } + minStartTimeMs = Math.min(minStartTimeMs, realtimeSegmentZKMetadata.getStartTimeMs()); } - Preconditions.checkState(minSegmentZkMetadata != null); - - // Convert the segment minTime to millis - long minSegmentStartTimeMs = minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime()); + Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE); // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window [20200813, 20200814) - watermarkMs = (minSegmentStartTimeMs / bucketMs) * bucketMs; + watermarkMs = (minStartTimeMs / bucketMs) * bucketMs; // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java index 2690d67..ad6d66f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java @@ -39,19 +39,12 @@ public class TimeRetentionStrategy implements RetentionStrategy { @Override public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) { - TimeUnit timeUnit = segmentZKMetadata.getTimeUnit(); - if (timeUnit == null) { - LOGGER.warn("Time unit is not set for {} segment: {} of table: {}", segmentZKMetadata.getSegmentType(), - segmentZKMetadata.getSegmentName(), tableNameWithType); - return false; - } - long endTime = segmentZKMetadata.getEndTime(); - long endTimeMs = timeUnit.toMillis(endTime); + long endTimeMs = segmentZKMetadata.getEndTimeMs(); // Check that the end time is between 1971 and 2071 if (!TimeUtils.timeValueInValidRange(endTimeMs)) { - LOGGER.warn("{} segment: {} of table: {} has invalid end time: {} {}", segmentZKMetadata.getSegmentType(), - segmentZKMetadata.getSegmentName(), tableNameWithType, endTime, timeUnit); + LOGGER.warn("{} segment: {} of table: {} has invalid end time in millis: {}", segmentZKMetadata.getSegmentType(), + segmentZKMetadata.getSegmentName(), tableNameWithType, endTimeMs); return false; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java index 455066f..c3d3df3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java @@ -162,18 +162,13 @@ public class TableRetentionValidator { List<String> errorMessages = new ArrayList<>(); for (String segmentName : segmentNames) { OfflineSegmentZKMetadata offlineSegmentMetadata = getOfflineSegmentMetadata(tableName, segmentName); - TimeUnit segmentTimeUnit = offlineSegmentMetadata.getTimeUnit(); - if (segmentTimeUnit == null) { - errorMessages.add("Segment: " + segmentName + " has null time unit"); - continue; + long startTimeMs = offlineSegmentMetadata.getStartTimeMs(); + if (!TimeUtils.timeValueInValidRange(startTimeMs)) { + errorMessages.add("Segment: " + segmentName + " has invalid start time in millis: " + startTimeMs); } - long startTimeInMillis = segmentTimeUnit.toMillis(offlineSegmentMetadata.getStartTime()); - if (!TimeUtils.timeValueInValidRange(startTimeInMillis)) { - errorMessages.add("Segment: " + segmentName + " has invalid start time in millis: " + startTimeInMillis); - } - long endTimeInMillis = segmentTimeUnit.toMillis(offlineSegmentMetadata.getEndTime()); - if (!TimeUtils.timeValueInValidRange(endTimeInMillis)) { - errorMessages.add("Segment: " + segmentName + " has invalid end time in millis: " + endTimeInMillis); + long endTimeMs = offlineSegmentMetadata.getEndTimeMs(); + if (!TimeUtils.timeValueInValidRange(endTimeMs)) { + errorMessages.add("Segment: " + segmentName + " has invalid end time in millis: " + endTimeMs); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java index dc98600..7e593b3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java @@ -88,9 +88,10 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void> List<Interval> segmentIntervals = new ArrayList<>(numSegments); int numSegmentsWithInvalidIntervals = 0; for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) { - Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval(); - if (timeInterval != null && TimeUtils.isValidTimeInterval(timeInterval)) { - segmentIntervals.add(timeInterval); + long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs(); + long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs(); + if (TimeUtils.timeValueInValidRange(startTimeMs) && TimeUtils.timeValueInValidRange(endTimeMs)) { + segmentIntervals.add(new Interval(startTimeMs, endTimeMs)); } else { numSegmentsWithInvalidIntervals++; } @@ -110,10 +111,9 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void> long maxSegmentPushTime = Long.MIN_VALUE; for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) { - Interval segmentInterval = offlineSegmentZKMetadata.getTimeInterval(); - - if (segmentInterval != null && maxSegmentEndTime < segmentInterval.getEndMillis()) { - maxSegmentEndTime = segmentInterval.getEndMillis(); + long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs(); + if (TimeUtils.timeValueInValidRange(endTimeMs) && maxSegmentEndTime < endTimeMs) { + maxSegmentEndTime = endTimeMs; } long segmentPushTime = offlineSegmentZKMetadata.getPushTime(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index e15d28c..4db0266 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -222,7 +222,6 @@ public class PinotLLCRealtimeSegmentManagerTest { assertEquals(committedSegmentZKMetadata.getEndOffset(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString()); assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS); - assertEquals(committedSegmentZKMetadata.getTimeInterval(), INTERVAL); assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC)); assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION); assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS); diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java index 82cb2fe..b25bd51 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java @@ -255,18 +255,12 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC filterFunction = getFilterFunctionLong(windowStartMs, windowEndMs, timeColumn); } else { // Convert windowStart and windowEnd to time format of the data - if (dateTimeFormatSpec.getTimeFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH)) { - long windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs, Long.class); - long windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, Long.class); - filterFunction = getFilterFunctionLong(windowStart, windowEnd, timeColumn); + String windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs); + String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs); + if (dateTimeFieldSpec.getDataType().isNumeric()) { + filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), Long.parseLong(windowEnd), timeColumn); } else { - String windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs, String.class); - String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, String.class); - if (dateTimeFieldSpec.getDataType().isNumeric()) { - filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), Long.parseLong(windowEnd), timeColumn); - } else { - filterFunction = getFilterFunctionString(windowStart, windowEnd, timeColumn); - } + filterFunction = getFilterFunctionString(windowStart, windowEnd, timeColumn); } } return new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java index f760778..6918da2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java @@ -121,61 +121,35 @@ public class DateTimeFormatSpec { } /** + * Converts the time in millis to the date time format. * <ul> - * <li>Given a timestamp in millis, convert it to the given format - * This method should not do validation of outputGranularity. - * The validation should be handled by caller using {@link #validateFormat}</li> - * <ul> - * <li>1) given dateTimeColumnValueMS = 1498892400000 and format=1:HOURS:EPOCH, - * dateTimeSpec.fromMillis(1498892400000) = 416359 (i.e. dateTimeColumnValueMS/(1000*60*60))</li> - * <li>2) given dateTimeColumnValueMS = 1498892400000 and format=5:MINUTES:EPOCH, - * dateTimeSpec.fromMillis(1498892400000) = 4996308 (i.e. timeColumnValueMS/(1000*60*5))</li> - * <li>3) given dateTimeColumnValueMS = 1498892400000 and - * format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd, dateTimeSpec.fromMillis(1498892400000) = 20170701</li> - * </ul> + * <li>Given timeMs=1498892400000 and format='1:HOURS:EPOCH', returns 1498892400000/(1000*60*60)='416359'</li> + * <li>Given timeMs=1498892400000 and format='5:MINUTES:EPOCH', returns 1498892400000/(1000*60*5)='4996308'</li> + * <li>Given timeMs=1498892400000 and format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns '20170701'</li> * </ul> - * @param type - type of return value (can be int/long or string depending on time format) - * @return dateTime column value in dateTimeFieldSpec */ - public <T extends Object> T fromMillisToFormat(Long dateTimeColumnValueMS, Class<T> type) { - Preconditions.checkNotNull(dateTimeColumnValueMS); - - Object dateTimeColumnValue; - if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) { - dateTimeColumnValue = _unitSpec.getTimeUnit().convert(dateTimeColumnValueMS, TimeUnit.MILLISECONDS) / _size; + public String fromMillisToFormat(long timeMs) { + if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) { + return Long.toString(_unitSpec.getTimeUnit().convert(timeMs, TimeUnit.MILLISECONDS) / _size); } else { - dateTimeColumnValue = _patternSpec.getDateTimeFormatter().print(dateTimeColumnValueMS); + return _patternSpec.getDateTimeFormatter().print(timeMs); } - return type.cast(dateTimeColumnValue); } /** + * Converts the date time value to the time in millis. * <ul> - * <li>Convert a time value in a format, to millis. - * This method should not do validation of outputGranularity. - * The validation should be handled by caller using {@link #validateFormat}</li> - * <ul> - * <li>1) given dateTimeColumnValue = 416359 and format=1:HOURS:EPOCH - * dateTimeSpec.toMillis(416359) = 1498892400000 (i.e. timeColumnValue*60*60*1000)</li> - * <li>2) given dateTimeColumnValue = 4996308 and format=5:MINUTES:EPOCH - * dateTimeSpec.toMillis(4996308) = 1498892400000 (i.e. timeColumnValue*5*60*1000)</li> - * <li>3) given dateTimeColumnValue = 20170701 and format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd - * dateTimeSpec.toMillis(20170701) = 1498892400000</li> + * <li>Given dateTimeValue='416359' and format='1:HOURS:EPOCH', returns 416359*(1000*60*60)=1498892400000</li> + * <li>Given dateTimeValue='4996308' and format='5:MINUTES:EPOCH', returns 4996308*(1000*60*5)=1498892400000</li> + * <li>Given dateTimeValue='20170701' and format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns 1498892400000</li> * </ul> - * <ul> - * @param dateTimeColumnValue - datetime Column value to convert to millis - * @return datetime value in millis */ - public Long fromFormatToMillis(Object dateTimeColumnValue) { - Preconditions.checkNotNull(dateTimeColumnValue); - - long timeColumnValueMS; - if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) { - timeColumnValueMS = TimeUnit.MILLISECONDS.convert((Long) dateTimeColumnValue * _size, _unitSpec.getTimeUnit()); + public long fromFormatToMillis(String dateTimeValue) { + if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) { + return TimeUnit.MILLISECONDS.convert(Long.parseLong(dateTimeValue) * _size, _unitSpec.getTimeUnit()); } else { - timeColumnValueMS = _patternSpec.getDateTimeFormatter().parseMillis(String.valueOf(dateTimeColumnValue)); + return _patternSpec.getDateTimeFormatter().parseMillis(dateTimeValue); } - return timeColumnValueMS; } /** diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java index 754fdc7..af34969 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java @@ -35,7 +35,6 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.tools.Command; -import org.joda.time.Interval; import org.kohsuke.args4j.Option; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,12 +135,13 @@ public class OfflineSegmentIntervalCheckerCommand extends AbstractBaseAdminComma List<OfflineSegmentZKMetadata> offlineSegmentZKMetadataList = ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, offlineTableName); - // collect segments with invalid time intervals + // Collect segments with invalid start/end time List<String> segmentsWithInvalidIntervals = new ArrayList<>(); if (SegmentIntervalUtils.eligibleForSegmentIntervalCheck(tableConfig.getValidationConfig())) { for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) { - Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval(); - if (timeInterval == null || !TimeUtils.isValidTimeInterval(timeInterval)) { + long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs(); + long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs(); + if (!TimeUtils.timeValueInValidRange(startTimeMs) || !TimeUtils.timeValueInValidRange(endTimeMs)) { segmentsWithInvalidIntervals.add(offlineSegmentZKMetadata.getSegmentName()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org