haibow commented on a change in pull request #5399: URL: https://github.com/apache/incubator-pinot/pull/5399#discussion_r426114279
########## File path: pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java ########## @@ -75,85 +75,112 @@ public void testTimeBoundaryManager() { ExternalView externalView = Mockito.mock(ExternalView.class); for (TimeUnit timeUnit : TimeUnit.values()) { - // Test DAILY push table + // Test DAILY push table, with timeFieldSpec String rawTableName = "testTable_" + timeUnit + "_DAILY"; - TableConfig tableConfig = getTableConfig(rawTableName, timeUnit, "DAILY"); - setSchema(rawTableName, timeUnit); - - // Start with no segment - TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore); - Set<String> onlineSegments = new HashSet<>(); - timeBoundaryManager.init(externalView, onlineSegments); - assertNull(timeBoundaryManager.getTimeBoundaryInfo()); - - // Add the first segment should update the time boundary - String segment0 = "segment0"; - onlineSegments.add(segment0); - setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit); - timeBoundaryManager.init(externalView, onlineSegments); - verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS)); - - // Add a new segment with larger end time should update the time boundary - String segment1 = "segment1"; - onlineSegments.add(segment1); - setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit); - timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); - verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS)); - - // Add a new segment with smaller end time should not change the time boundary - String segment2 = "segment2"; - onlineSegments.add(segment2); - setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit); - timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); - verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS)); - - // Remove the segment with largest end time should update the time boundary - onlineSegments.remove(segment1); - timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); - verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS)); - - // Change segment ZK metadata without refreshing should not update the time boundary - setSegmentZKMetadata(rawTableName, segment2, 5, timeUnit); - timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); - verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS)); - - // Refresh the changed segment should update the time boundary - timeBoundaryManager.refreshSegment(segment2); - verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(4, TimeUnit.DAYS)); + TableConfig tableConfig = getTableConfig(rawTableName, "DAILY"); + setSchemaTimeFieldSpec(rawTableName, timeUnit); + testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView); - // Test HOURLY push table + // Test HOURLY push table, with timeFieldSpec rawTableName = "testTable_" + timeUnit + "_HOURLY"; - tableConfig = getTableConfig(rawTableName, timeUnit, "HOURLY"); - setSchema(rawTableName, timeUnit); - timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore); - onlineSegments = new HashSet<>(); - onlineSegments.add(segment0); - setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit); - timeBoundaryManager.init(externalView, onlineSegments); - long expectedTimeValue; - if (timeUnit == TimeUnit.DAYS) { - // Time boundary should be endTime - 1 DAY when time unit is DAYS - expectedTimeValue = timeUnit.convert(1, TimeUnit.DAYS); - } else { - // Time boundary should be endTime - 1 HOUR when time unit is other than DAYS - expectedTimeValue = timeUnit.convert(47, TimeUnit.HOURS); - } - verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), expectedTimeValue); + tableConfig = getTableConfig(rawTableName, "HOURLY"); + setSchemaTimeFieldSpec(rawTableName, timeUnit); + testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView); + + // Test DAILY push table with dateTimeFieldSpec + rawTableName = "testTableDateTime_" + timeUnit + "_DAILY"; + tableConfig = getTableConfig(rawTableName, "DAILY"); + setSchemaDateTimeFieldSpec(rawTableName, timeUnit); + testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView); + + // Test HOURLY push table + rawTableName = "testTableDateTime_" + timeUnit + "_HOURLY"; + tableConfig = getTableConfig(rawTableName, "HOURLY"); + setSchemaDateTimeFieldSpec(rawTableName, timeUnit); + testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView); + } + } + + private void testDailyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit, ExternalView externalView) { + // Start with no segment + TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore); + Set<String> onlineSegments = new HashSet<>(); + timeBoundaryManager.init(externalView, onlineSegments); + assertNull(timeBoundaryManager.getTimeBoundaryInfo()); + + // Add the first segment should update the time boundary + String segment0 = "segment0"; + onlineSegments.add(segment0); + setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit); + timeBoundaryManager.init(externalView, onlineSegments); + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS)); + + // Add a new segment with larger end time should update the time boundary + String segment1 = "segment1"; + onlineSegments.add(segment1); + setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit); + timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS)); + + // Add a new segment with smaller end time should not change the time boundary + String segment2 = "segment2"; + onlineSegments.add(segment2); + setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit); + timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS)); + + // Remove the segment with largest end time should update the time boundary + onlineSegments.remove(segment1); + timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS)); + + // Change segment ZK metadata without refreshing should not update the time boundary + setSegmentZKMetadata(rawTableName, segment2, 5, timeUnit); + timeBoundaryManager.onExternalViewChange(externalView, onlineSegments); + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS)); + + // Refresh the changed segment should update the time boundary + timeBoundaryManager.refreshSegment(segment2); + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(4, TimeUnit.DAYS)); + } + + private void testHourlyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit, ExternalView externalView) { + TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore); + Set<String> onlineSegments = new HashSet<>(); + String segment0 = "segment0"; + onlineSegments.add(segment0); + setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit); + timeBoundaryManager.init(externalView, onlineSegments); + long expectedTimeValue; + if (timeUnit == TimeUnit.DAYS) { + // Time boundary should be endTime - 1 DAY when time unit is DAYS + expectedTimeValue = timeUnit.convert(1, TimeUnit.DAYS); + } else { + // Time boundary should be endTime - 1 HOUR when time unit is other than DAYS + expectedTimeValue = timeUnit.convert(47, TimeUnit.HOURS); } + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), expectedTimeValue); } - private TableConfig getTableConfig(String rawTableName, TimeUnit timeUnit, String pushFrequency) { + private TableConfig getTableConfig(String rawTableName, String pushFrequency) { return new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN) - .setTimeType(timeUnit.name()).setSegmentPushFrequency(pushFrequency).build(); + .setSegmentPushFrequency(pushFrequency).build(); } - private void setSchema(String rawTableName, TimeUnit timeUnit) { + private void setSchemaTimeFieldSpec(String rawTableName, TimeUnit timeUnit) { ZKMetadataProvider.setSchema(_propertyStore, new Schema.SchemaBuilder().setSchemaName(rawTableName) .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, timeUnit, TIME_COLUMN), null) .build()); } + private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit timeUnit) { + ZKMetadataProvider.setSchema(_propertyStore, + new Schema.SchemaBuilder().setSchemaName(rawTableName) + .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:"+timeUnit+":EPOCH", "1:"+timeUnit) Review comment: nit: please reformat this file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org