This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a0a2171d34 Fix the time segment pruner on TIMESTAMP data type (#12789)
a0a2171d34 is described below

commit a0a2171d349979461c34e58b1ec7f98f6e8e615b
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Sun Apr 7 18:22:10 2024 -0700

    Fix the time segment pruner on TIMESTAMP data type (#12789)
---
 .../segmentpruner/SegmentPrunerFactory.java        |  44 +-
 .../routing/segmentpruner/TimeSegmentPruner.java   | 136 ++---
 .../routing/segmentpruner/SegmentPrunerTest.java   | 621 +++++++++------------
 .../org/apache/pinot/common/data/SchemaTest.java   |  29 +-
 .../apache/pinot/spi/data/DateTimeFieldSpec.java   |  17 +-
 5 files changed, 375 insertions(+), 472 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 6135982e18..423eb527fa 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +35,6 @@ import 
org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +65,7 @@ public class SegmentPrunerFactory {
         List<SegmentPruner> configuredSegmentPruners = new 
ArrayList<>(segmentPrunerTypes.size());
         for (String segmentPrunerType : segmentPrunerTypes) {
           if 
(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType))
 {
-            SegmentPruner partitionSegmentPruner = 
getPartitionSegmentPruner(tableConfig, propertyStore);
+            SegmentPruner partitionSegmentPruner = 
getPartitionSegmentPruner(tableConfig);
             if (partitionSegmentPruner != null) {
               configuredSegmentPruners.add(partitionSegmentPruner);
             }
@@ -91,7 +88,7 @@ public class SegmentPrunerFactory {
         if ((tableType == TableType.OFFLINE && 
LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase(
             routingTableBuilderName)) || (tableType == TableType.REALTIME
             && 
LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName)))
 {
-          SegmentPruner partitionSegmentPruner = 
getPartitionSegmentPruner(tableConfig, propertyStore);
+          SegmentPruner partitionSegmentPruner = 
getPartitionSegmentPruner(tableConfig);
           if (partitionSegmentPruner != null) {
             segmentPruners.add(partitionSegmentPruner);
           }
@@ -102,8 +99,7 @@ public class SegmentPrunerFactory {
   }
 
   @Nullable
-  private static SegmentPruner getPartitionSegmentPruner(TableConfig 
tableConfig,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  private static SegmentPruner getPartitionSegmentPruner(TableConfig 
tableConfig) {
     String tableNameWithType = tableConfig.getTableName();
     SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
     if (segmentPartitionConfig == null) {
@@ -137,26 +133,20 @@ public class SegmentPrunerFactory {
       LOGGER.warn("Cannot enable time range pruning without time column for 
table: {}", tableNameWithType);
       return null;
     }
-    return createTimeSegmentPruner(tableConfig, propertyStore);
-  }
-
-  @VisibleForTesting
-  static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    String tableNameWithType = tableConfig.getTableName();
-    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
-    Preconditions.checkNotNull(timeColumn, "Time column must be configured in 
table config for table: %s",
-        tableNameWithType);
-    Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableNameWithType);
-    Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", 
tableNameWithType);
-    DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
-    Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in 
schema for time column: %s of table: %s",
-        timeColumn, tableNameWithType);
-    DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
-
-    LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with 
DateTimeFormatSpec: {}",
-        timeColumn, tableNameWithType, timeFormatSpec);
-    return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec);
+    Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableConfig);
+    if (schema == null) {
+      LOGGER.warn("Cannot enable time range pruning without schema for table: 
{}", tableNameWithType);
+      return null;
+    }
+    DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn(timeColumn);
+    if (timeFieldSpec == null) {
+      LOGGER.warn("Cannot enable time range pruning without field spec for 
table: {}, time column: {}",
+          tableNameWithType, timeColumn);
+      return null;
+    }
+    LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with 
DateTimeFieldSpec: {}", timeColumn,
+        tableNameWithType, timeFieldSpec);
+    return new TimeSegmentPruner(tableConfig, timeFieldSpec);
   }
 
   private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner> 
pruners) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index a7ac4fce4b..c2e6b20cce 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,7 +38,9 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.Literal;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Query.Range;
@@ -64,10 +67,10 @@ public class TimeSegmentPruner implements SegmentPruner {
   private volatile IntervalTree<String> _intervalTree;
   private final Map<String, Interval> _intervalMap = new HashMap<>();
 
-  public TimeSegmentPruner(TableConfig tableConfig, String timeColumn, 
DateTimeFormatSpec timeFormatSpec) {
+  public TimeSegmentPruner(TableConfig tableConfig, DateTimeFieldSpec 
timeFieldSpec) {
     _tableNameWithType = tableConfig.getTableName();
-    _timeColumn = timeColumn;
-    _timeFormatSpec = timeFormatSpec;
+    _timeColumn = timeFieldSpec.getName();
+    _timeFormatSpec = timeFieldSpec.getFormatSpec();
   }
 
   @Override
@@ -206,97 +209,53 @@ public class TimeSegmentPruner implements SegmentPruner {
         } else {
           return getComplementSortedIntervals(childIntervals);
         }
-      case EQUALS: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
-          long timeStamp = 
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
-          return Collections.singletonList(new Interval(timeStamp, timeStamp));
-        } else {
-          return null;
+      case EQUALS:
+        if (isTimeColumn(operands.get(0))) {
+          long timestamp = toMillisSinceEpoch(operands.get(1));
+          return List.of(new Interval(timestamp, timestamp));
         }
-      }
-      case IN: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
+        return null;
+      case IN:
+        if (isTimeColumn(operands.get(0))) {
           int numOperands = operands.size();
           List<Interval> intervals = new ArrayList<>(numOperands - 1);
           for (int i = 1; i < numOperands; i++) {
-            long timeStamp =
-                
_timeFormatSpec.fromFormatToMillis(operands.get(i).getLiteral().getFieldValue().toString());
-            intervals.add(new Interval(timeStamp, timeStamp));
+            long timestamp = toMillisSinceEpoch(operands.get(i));
+            intervals.add(new Interval(timestamp, timestamp));
           }
           return intervals;
-        } else {
-          return null;
         }
-      }
-      case GREATER_THAN: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
-          long timeStamp = 
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
-          return Collections.singletonList(new Interval(timeStamp + 1, 
MAX_END_TIME));
-        } else {
-          return null;
+        return null;
+      case GREATER_THAN:
+        if (isTimeColumn(operands.get(0))) {
+          return getInterval(toMillisSinceEpoch(operands.get(1)) + 1, 
MAX_END_TIME);
         }
-      }
-      case GREATER_THAN_OR_EQUAL: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
-          long timeStamp = 
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
-          return Collections.singletonList(new Interval(timeStamp, 
MAX_END_TIME));
-        } else {
-          return null;
+        return null;
+      case GREATER_THAN_OR_EQUAL:
+        if (isTimeColumn(operands.get(0))) {
+          return getInterval(toMillisSinceEpoch(operands.get(1)), 
MAX_END_TIME);
         }
-      }
-      case LESS_THAN: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
-          long timeStamp = 
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
-          if (timeStamp > MIN_START_TIME) {
-            return Collections.singletonList(new Interval(MIN_START_TIME, 
timeStamp - 1));
-          } else {
-            return Collections.emptyList();
-          }
-        } else {
-          return null;
+        return null;
+      case LESS_THAN:
+        if (isTimeColumn(operands.get(0))) {
+          return getInterval(MIN_START_TIME, 
toMillisSinceEpoch(operands.get(1)) - 1);
         }
-      }
-      case LESS_THAN_OR_EQUAL: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
-          long timeStamp = 
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
-          if (timeStamp >= MIN_START_TIME) {
-            return Collections.singletonList(new Interval(MIN_START_TIME, 
timeStamp));
-          } else {
-            return Collections.emptyList();
-          }
-        } else {
-          return null;
+        return null;
+      case LESS_THAN_OR_EQUAL:
+        if (isTimeColumn(operands.get(0))) {
+          return getInterval(MIN_START_TIME, 
toMillisSinceEpoch(operands.get(1)));
         }
-      }
-      case BETWEEN: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
-          long startTimestamp =
-              
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
-          long endTimestamp =
-              
_timeFormatSpec.fromFormatToMillis(operands.get(2).getLiteral().getFieldValue().toString());
-          if (endTimestamp >= startTimestamp) {
-            return Collections.singletonList(new Interval(startTimestamp, 
endTimestamp));
-          } else {
-            return Collections.emptyList();
-          }
-        } else {
-          return null;
+        return null;
+      case BETWEEN:
+        if (isTimeColumn(operands.get(0))) {
+          return getInterval(toMillisSinceEpoch(operands.get(1)), 
toMillisSinceEpoch(operands.get(2)));
         }
-      }
-      case RANGE: {
-        Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_timeColumn)) {
+        return null;
+      case RANGE:
+        if (isTimeColumn(operands.get(0))) {
           return 
parseInterval(operands.get(1).getLiteral().getFieldValue().toString());
         }
         return null;
-      }
       default:
         return null;
     }
@@ -408,6 +367,17 @@ public class TimeSegmentPruner implements SegmentPruner {
     return res;
   }
 
+  private boolean isTimeColumn(Expression expression) {
+    Identifier identifier = expression.getIdentifier();
+    return identifier != null && identifier.getName().equals(_timeColumn);
+  }
+
+  private long toMillisSinceEpoch(Expression expression) {
+    Literal literal = expression.getLiteral();
+    Preconditions.checkArgument(literal != null, "Literal is required for time 
column filter, got: %s", expression);
+    return 
_timeFormatSpec.fromFormatToMillis(literal.getFieldValue().toString());
+  }
+
   /**
    * Parse interval to millisecond as [min, max] with both sides included.
    * E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as 
[1456, 16310]
@@ -432,10 +402,10 @@ public class TimeSegmentPruner implements SegmentPruner {
         endTime--;
       }
     }
+    return getInterval(startTime, endTime);
+  }
 
-    if (startTime > endTime) {
-      return Collections.emptyList();
-    }
-    return Collections.singletonList(new Interval(startTime, endTime));
+  private static List<Interval> getInterval(long inclusiveStart, long 
inclusiveEnd) {
+    return inclusiveStart <= inclusiveEnd ? List.of(new 
Interval(inclusiveStart, inclusiveEnd)) : List.of();
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index feaad35169..5e48a981cc 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
-import java.util.Arrays;
-import java.util.Collections;
+import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -50,11 +49,11 @@ import 
org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.mockito.Mockito;
@@ -78,29 +77,45 @@ public class SegmentPrunerTest extends ControllerTest {
   private static final String SDF_PATTERN = "yyyyMMdd";
 
   private static final String QUERY_1 = "SELECT * FROM testTable";
-  private static final String QUERY_2 = "SELECT * FROM testTable where 
memberId = 0";
-  private static final String QUERY_3 = "SELECT * FROM testTable where 
memberId IN (1, 2)";
-  private static final String QUERY_4 = "SELECT * FROM testTable where 
memberId = 0 AND memberName='xyz'";
-
-  private static final String TIME_QUERY_1 = "SELECT * FROM testTable where 
timeColumn = 40";
-  private static final String TIME_QUERY_2 = "SELECT * FROM testTable where 
timeColumn BETWEEN 20 AND 30";
-  private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 
< timeColumn AND timeColumn <= 50";
-  private static final String TIME_QUERY_4 = "SELECT * FROM testTable where 
timeColumn < 15 OR timeColumn > 45";
+  private static final String QUERY_2 = "SELECT * FROM testTable WHERE 
memberId = 0";
+  private static final String QUERY_3 = "SELECT * FROM testTable WHERE 
memberId IN (1, 2)";
+  private static final String QUERY_4 = "SELECT * FROM testTable WHERE 
memberId = 0 AND memberName = 'xyz'";
+
+  private static final String TIME_QUERY_1 = "SELECT * FROM testTable WHERE 
timeColumn = 40";
+  private static final String TIME_QUERY_2 = "SELECT * FROM testTable WHERE 
timeColumn BETWEEN 20 AND 30";
+  private static final String TIME_QUERY_3 = "SELECT * FROM testTable WHERE 30 
< timeColumn AND timeColumn <= 50";
+  private static final String TIME_QUERY_4 = "SELECT * FROM testTable WHERE 
timeColumn < 15 OR timeColumn > 45";
   private static final String TIME_QUERY_5 =
-      "SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND 
timeColumn < 70)";
-  private static final String TIME_QUERY_6 = "SELECT * FROM testTable where 
timeColumn < 0 AND timeColumn > 0";
+      "SELECT * FROM testTable WHERE timeColumn < 15 OR (60 < timeColumn AND 
timeColumn < 70)";
+  private static final String TIME_QUERY_6 = "SELECT * FROM testTable WHERE 
timeColumn NOT BETWEEN 20 AND 30";
+  private static final String TIME_QUERY_7 = "SELECT * FROM testTable WHERE 
NOT timeColumn > 30";
+  private static final String TIME_QUERY_8 = "SELECT * FROM testTable WHERE 
timeColumn < 0 AND timeColumn > 0";
 
-  private static final String SDF_QUERY_1 = "SELECT * FROM testTable where 
timeColumn = 20200131";
-  private static final String SDF_QUERY_2 = "SELECT * FROM testTable where 
timeColumn BETWEEN 20200101 AND 20200331";
+  private static final String SDF_QUERY_1 = "SELECT * FROM testTable WHERE 
timeColumn = 20200131";
+  private static final String SDF_QUERY_2 = "SELECT * FROM testTable WHERE 
timeColumn BETWEEN 20200101 AND 20200331";
   private static final String SDF_QUERY_3 =
-      "SELECT * FROM testTable where 20200430 < timeColumn AND timeColumn < 
20200630";
+      "SELECT * FROM testTable WHERE 20200430 < timeColumn AND timeColumn < 
20200630";
   private static final String SDF_QUERY_4 =
-      "SELECT * FROM testTable where timeColumn <= 20200101 OR timeColumn in 
(20200201, 20200401)";
+      "SELECT * FROM testTable WHERE timeColumn <= 20200101 OR timeColumn IN 
(20200201, 20200401)";
   private static final String SDF_QUERY_5 =
-      "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND 
timeColumn >= 20200530";
-
-  private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable 
WHERE timeColumn NOT BETWEEN 20 AND 30";
-  private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable 
WHERE NOT timeColumn > 30";
+      "SELECT * FROM testTable WHERE timeColumn IN (20200101, 20200102) AND 
timeColumn >= 20200530";
+
+  // Timestamp can be passed as string or long
+  private static final String TIMESTAMP_QUERY_1 = "SELECT * FROM testTable 
WHERE timeColumn = '2020-01-31 00:00:00'";
+  private static final String TIMESTAMP_QUERY_2 = String.format("SELECT * FROM 
testTable WHERE timeColumn = %d",
+      Timestamp.valueOf("2020-01-31 00:00:00").getTime());
+  private static final String TIMESTAMP_QUERY_3 =
+      "SELECT * FROM testTable WHERE timeColumn BETWEEN '2020-01-01 00:00:00' 
AND '2020-03-31 00:00:00'";
+  private static final String TIMESTAMP_QUERY_4 =
+      String.format("SELECT * FROM testTable WHERE timeColumn BETWEEN %d AND 
%d",
+          Timestamp.valueOf("2020-01-01 00:00:00").getTime(), 
Timestamp.valueOf("2020-03-31 00:00:00").getTime());
+  private static final String TIMESTAMP_QUERY_5 =
+      "SELECT * FROM testTable WHERE timeColumn <= '2020-01-01 00:00:00' OR 
timeColumn IN ('2020-02-01 00:00:00', "
+          + "'2020-04-01 00:00:00')";
+  private static final String TIMESTAMP_QUERY_6 =
+      String.format("SELECT * FROM testTable WHERE timeColumn <= %d OR 
timeColumn IN (%d, %d)",
+          Timestamp.valueOf("2020-01-01 00:00:00").getTime(), 
Timestamp.valueOf("2020-02-01 00:00:00").getTime(),
+          Timestamp.valueOf("2020-04-01 00:00:00").getTime());
 
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use 
KinesisConfig.STREAM_TYPE directly, we
   // hardcode the value here to avoid pulling the entire pinot-kinesis module 
as dependency.
@@ -127,6 +142,7 @@ public class SegmentPrunerTest extends ControllerTest {
   @Test
   public void testSegmentPrunerFactoryForPartitionPruner() {
     TableConfig tableConfig = mock(TableConfig.class);
+    when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
     IndexingConfig indexingConfig = mock(IndexingConfig.class);
     when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
 
@@ -141,8 +157,7 @@ public class SegmentPrunerTest extends ControllerTest {
     assertEquals(segmentPruners.size(), 0);
 
     // Segment partition config is missing
-    when(routingConfig.getSegmentPrunerTypes()).thenReturn(
-        
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
+    
when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
     assertEquals(segmentPruners.size(), 0);
 
@@ -189,8 +204,7 @@ public class SegmentPrunerTest extends ControllerTest {
   @Test
   public void testSegmentPrunerFactoryForTimeRangePruner() {
     TableConfig tableConfig = mock(TableConfig.class);
-    when(tableConfig.getTableName()).thenReturn(RAW_TABLE_NAME);
-    setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.HOURS);
+    when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
 
     // Routing config is missing
     List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
@@ -203,8 +217,7 @@ public class SegmentPrunerTest extends ControllerTest {
     assertEquals(segmentPruners.size(), 0);
 
     // Validation config is missing
-    when(routingConfig.getSegmentPrunerTypes()).thenReturn(
-        Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
+    
when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
     assertEquals(segmentPruners.size(), 0);
 
@@ -214,41 +227,54 @@ public class SegmentPrunerTest extends ControllerTest {
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
     assertEquals(segmentPruners.size(), 0);
 
-    // Time range pruner should be returned
+    // Schema is missing
     when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
+    assertEquals(segmentPruners.size(), 0);
+
+    // Field spec is missing
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build();
+    ZKMetadataProvider.setSchema(_propertyStore, schema);
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
+    assertEquals(segmentPruners.size(), 0);
+
+    // Time range pruner should be returned
+    schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addDateTimeField(TIME_COLUMN, DataType.TIMESTAMP, "TIMESTAMP", 
"1:MILLISECONDS").build();
+    ZKMetadataProvider.setSchema(_propertyStore, schema);
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
     assertEquals(segmentPruners.size(), 1);
     assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner);
   }
 
   @Test
-  public void testEnablingEmptySegmentPruner() {
+  public void testSegmentPrunerFactoryForEmptySegmentPruner() {
     TableConfig tableConfig = mock(TableConfig.class);
+    when(tableConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
     IndexingConfig indexingConfig = mock(IndexingConfig.class);
+    when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
     RoutingConfig routingConfig = mock(RoutingConfig.class);
-    StreamIngestionConfig streamIngestionConfig = 
mock(StreamIngestionConfig.class);
+    when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
 
     // When routingConfig is configured with EmptySegmentPruner, 
EmptySegmentPruner should be returned.
-    when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
-    when(routingConfig.getSegmentPrunerTypes()).thenReturn(
-        Collections.singletonList(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
-    List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    
when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
+    List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore);
     assertEquals(segmentPruners.size(), 1);
     assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // When indexingConfig is configured with Kinesis streaming, 
EmptySegmentPruner should be returned.
-    when(indexingConfig.getStreamConfigs()).thenReturn(
-        Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, 
KINESIS_STREAM_TYPE));
-    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
+    
when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE,
 KINESIS_STREAM_TYPE));
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
propertyStore);
     assertEquals(segmentPruners.size(), 1);
     assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // When streamIngestionConfig is configured with Kinesis streaming, 
EmptySegmentPruner should be returned.
+    StreamIngestionConfig streamIngestionConfig = 
mock(StreamIngestionConfig.class);
     when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(
-        
Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE,
 KINESIS_STREAM_TYPE)));
-    when(indexingConfig.getStreamConfigs()).thenReturn(
-        Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, 
KINESIS_STREAM_TYPE));
-    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
+        List.of(Map.of(StreamConfigProperties.STREAM_TYPE, 
KINESIS_STREAM_TYPE)));
+    
when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE,
 KINESIS_STREAM_TYPE));
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
propertyStore);
     assertEquals(segmentPruners.size(), 1);
     assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
   }
@@ -259,95 +285,76 @@ public class SegmentPrunerTest extends ControllerTest {
     BrokerRequest brokerRequest2 = 
CalciteSqlCompiler.compileToBrokerRequest(QUERY_2);
     BrokerRequest brokerRequest3 = 
CalciteSqlCompiler.compileToBrokerRequest(QUERY_3);
     BrokerRequest brokerRequest4 = 
CalciteSqlCompiler.compileToBrokerRequest(QUERY_4);
+
     // NOTE: Ideal state and external view are not used in the current 
implementation
     IdealState idealState = Mockito.mock(IdealState.class);
     ExternalView externalView = Mockito.mock(ExternalView.class);
 
     SinglePartitionColumnSegmentPruner singlePartitionColumnSegmentPruner =
         new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME, 
PARTITION_COLUMN_1);
-    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
-        _propertyStore);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
     segmentZkMetadataFetcher.register(singlePartitionColumnSegmentPruner);
     Set<String> onlineSegments = new HashSet<>();
     segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
Collections.emptySet()),
-        Collections.emptySet());
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
Collections.emptySet()),
-        Collections.emptySet());
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
Collections.emptySet()),
-        Collections.emptySet());
+
+    Set<String> input = Set.of();
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, 
input), input);
 
     // Segments without metadata (not updated yet) should not be pruned
     String newSegment = "newSegment";
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
+    onlineSegments.add(newSegment);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    input = Set.of(newSegment);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, 
input), input);
 
     // Segments without partition metadata should not be pruned
     String segmentWithoutPartitionMetadata = "segmentWithoutPartitionMetadata";
-    onlineSegments.add(segmentWithoutPartitionMetadata);
-    SegmentZKMetadata segmentZKMetadataWithoutPartitionMetadata =
-        new SegmentZKMetadata(segmentWithoutPartitionMetadata);
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME,
-        segmentZKMetadataWithoutPartitionMetadata);
+        new SegmentZKMetadata(segmentWithoutPartitionMetadata));
+    onlineSegments.add(segmentWithoutPartitionMetadata);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
-            new 
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
-        Collections.singletonList(segmentWithoutPartitionMetadata));
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
-            new 
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
-        Collections.singletonList(segmentWithoutPartitionMetadata));
-    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
-            new 
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
-        Collections.singletonList(segmentWithoutPartitionMetadata));
+    input = Set.of(segmentWithoutPartitionMetadata);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, 
input), input);
 
     // Test different partition functions and number of partitions
     // 0 % 5 = 0; 1 % 5 = 1; 2 % 5 = 2
     String segment0 = "segment0";
-    onlineSegments.add(segment0);
     setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 5, 
0);
+    onlineSegments.add(segment0);
     // Murmur(0) % 4 = 0; Murmur(1) % 4 = 3; Murmur(2) % 4 = 0
     String segment1 = "segment1";
-    onlineSegments.add(segment1);
     setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment1, "Murmur", 4, 
0);
+    onlineSegments.add(segment1);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Collections.singletonList(segment1)));
+    input = Set.of(segment0, segment1);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
input), Set.of(segment1));
 
     // Update partition metadata without refreshing should have no effect
     setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 4, 
1);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Collections.singletonList(segment1)));
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
input), Set.of(segment1));
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, 
input), input);
 
     // Refresh the changed segment should update the segment pruner
     segmentZkMetadataFetcher.refreshSegment(segment0);
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Collections.singletonList(segment1)));
-    assertEquals(
-        singlePartitionColumnSegmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
input), Set.of(segment1));
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, 
input), input);
+    assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, 
input), Set.of(segment1));
 
     // Multi-column partitioned segment.
     MultiPartitionColumnsSegmentPruner multiPartitionColumnsSegmentPruner =
@@ -356,38 +363,25 @@ public class SegmentPrunerTest extends ControllerTest {
     segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
     segmentZkMetadataFetcher.register(multiPartitionColumnsSegmentPruner);
     segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
-    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, 
Collections.emptySet()),
-        Collections.emptySet());
-    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, 
Collections.emptySet()),
-        Collections.emptySet());
-    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, 
Collections.emptySet()),
-        Collections.emptySet());
-    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, 
Collections.emptySet()),
-        Collections.emptySet());
+
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, 
input), Set.of(segment1));
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, 
input), input);
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, 
input), Set.of(segment1));
 
     String segment2 = "segment2";
-    onlineSegments.add(segment2);
     Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = new 
HashMap<>();
-    columnPartitionMetadataMap.put(PARTITION_COLUMN_1,
-        new ColumnPartitionMetadata("Modulo", 4, Collections.singleton(0), 
null));
-    Map<String, String> partitionColumn2FunctionConfig = new HashMap<>();
-    partitionColumn2FunctionConfig.put("columnValues", "xyz|abc");
-    partitionColumn2FunctionConfig.put("columnValuesDelimiter", "|");
-    columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new 
ColumnPartitionMetadata(
-        "BoundedColumnValue", 3, Collections.singleton(1), 
partitionColumn2FunctionConfig));
+    columnPartitionMetadataMap.put(PARTITION_COLUMN_1, new 
ColumnPartitionMetadata("Modulo", 4, Set.of(0), null));
+    columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new 
ColumnPartitionMetadata("BoundedColumnValue", 3, Set.of(1),
+        Map.of("columnValues", "xyz|abc", "columnValuesDelimiter", "|")));
     setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment2, 
columnPartitionMetadataMap);
+    onlineSegments.add(segment2);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(
-        multiPartitionColumnsSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(
-        multiPartitionColumnsSegmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Collections.singletonList(segment1)));
-    assertEquals(
-        multiPartitionColumnsSegmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4,
-        new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new 
HashSet<>(Arrays.asList(segment1, segment2)));
+    input = Set.of(segment0, segment1, segment2);
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, 
input), input);
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, 
input), Set.of(segment1, segment2));
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, 
input), Set.of(segment0, segment1));
+    assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, 
input), Set.of(segment1, segment2));
   }
 
   @Test
@@ -399,143 +393,112 @@ public class SegmentPrunerTest extends ControllerTest {
     BrokerRequest brokerRequest5 = 
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_4);
     BrokerRequest brokerRequest6 = 
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_5);
     BrokerRequest brokerRequest7 = 
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_6);
+    BrokerRequest brokerRequest8 = 
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_7);
+    BrokerRequest brokerRequest9 = 
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_8);
+
     // NOTE: Ideal state and external view are not used in the current 
implementation
     IdealState idealState = Mockito.mock(IdealState.class);
     ExternalView externalView = Mockito.mock(ExternalView.class);
 
-    TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
-    setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
-    TimeSegmentPruner segmentPruner = 
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig,
-        _propertyStore);
-    Set<String> onlineSegments = new HashSet<>();
-    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
-        _propertyStore);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    DateTimeFieldSpec timeFieldSpec = new DateTimeFieldSpec(TIME_COLUMN, 
DataType.INT, "EPOCH|DAYS", "1:DAYS");
+    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, 
timeFieldSpec);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
     segmentZkMetadataFetcher.register(segmentPruner);
+    Set<String> onlineSegments = new HashSet<>();
     segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest4, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest5, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest6, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest7, Collections.emptySet()), 
Collections.emptySet());
-
-    // Initialize with non-empty onlineSegments
+
+    Set<String> input = Set.of();
+    assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest9, input), input);
+
     // Segments without metadata (not updated yet) should not be pruned
-    segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, 
_propertyStore);
-    segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
-    segmentZkMetadataFetcher.register(segmentPruner);
     String newSegment = "newSegment";
     onlineSegments.add(newSegment);
-    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest3, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest4, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest5, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest6, 
Collections.singleton(newSegment)),
-        Collections.singletonList(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest7, 
Collections.singleton(newSegment)),
-        Collections.emptySet()); // query with invalid range will always have 
empty filtered result
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    input = Set.of(newSegment);
+    assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // 
Query with invalid range
 
     // Segments without time range metadata should not be pruned
     String segmentWithoutTimeRangeMetadata = "segmentWithoutTimeRangeMetadata";
-    onlineSegments.add(segmentWithoutTimeRangeMetadata);
     SegmentZKMetadata segmentZKMetadataWithoutTimeRangeMetadata =
         new SegmentZKMetadata(segmentWithoutTimeRangeMetadata);
-    
segmentZKMetadataWithoutTimeRangeMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
REALTIME_TABLE_NAME,
         segmentZKMetadataWithoutTimeRangeMetadata);
+    onlineSegments.add(segmentWithoutTimeRangeMetadata);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(
-        segmentPruner.prune(brokerRequest1, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
-        Collections.singletonList(segmentWithoutTimeRangeMetadata));
-    assertEquals(
-        segmentPruner.prune(brokerRequest2, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
-        Collections.singletonList(segmentWithoutTimeRangeMetadata));
-    assertEquals(
-        segmentPruner.prune(brokerRequest3, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
-        Collections.singletonList(segmentWithoutTimeRangeMetadata));
-    assertEquals(
-        segmentPruner.prune(brokerRequest4, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
-        Collections.singletonList(segmentWithoutTimeRangeMetadata));
-    assertEquals(
-        segmentPruner.prune(brokerRequest5, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
-        Collections.singletonList(segmentWithoutTimeRangeMetadata));
-    assertEquals(
-        segmentPruner.prune(brokerRequest6, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
-        Collections.singletonList(segmentWithoutTimeRangeMetadata));
-    assertEquals(
-        segmentPruner.prune(brokerRequest7, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
-        Collections.emptySet());
+    assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // 
Query with invalid range
 
     // Test different time range
     String segment0 = "segment0";
-    onlineSegments.add(segment0);
     setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, 
TimeUnit.DAYS);
-
+    onlineSegments.add(segment0);
     String segment1 = "segment1";
-    onlineSegments.add(segment1);
     setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, 
TimeUnit.DAYS);
-
+    onlineSegments.add(segment1);
     String segment2 = "segment2";
-    onlineSegments.add(segment2);
     setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, 
TimeUnit.DAYS);
-
+    onlineSegments.add(segment2);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.singleton(segment0));
-    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(segmentPruner.prune(brokerRequest4, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest5, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest6, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest7, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.emptySet());
+    input = Set.of(segment0, segment1, segment2);
+    assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, 
segment1));
+    assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0, 
segment1));
+    assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // 
Query with invalid range
 
     // Update metadata without external view change or refreshing should have 
no effect
     setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30, 
TimeUnit.DAYS);
-    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.singleton(segment0));
-    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(segmentPruner.prune(brokerRequest4, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest5, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest6, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest7, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.emptySet());
+    assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, 
segment1));
+    assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0, 
segment2));
+    assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0, 
segment1));
+    assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // 
Query with invalid range
 
     // Refresh the changed segment should update the segment pruner
     segmentZkMetadataFetcher.refreshSegment(segment2);
-    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.singleton(segment0));
-    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest4, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.singleton(segment0));
-    assertEquals(segmentPruner.prune(brokerRequest5, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.singleton(segment0));
-    assertEquals(segmentPruner.prune(brokerRequest6, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.singleton(segment0));
-    assertEquals(segmentPruner.prune(brokerRequest7, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        Collections.emptySet());
+    assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // 
Query with invalid range
   }
 
   @Test
@@ -545,215 +508,175 @@ public class SegmentPrunerTest extends ControllerTest {
     BrokerRequest brokerRequest3 = 
CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_3);
     BrokerRequest brokerRequest4 = 
CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_4);
     BrokerRequest brokerRequest5 = 
CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_5);
+
     // NOTE: Ideal state and external view are not used in the current 
implementation
     IdealState idealState = Mockito.mock(IdealState.class);
     ExternalView externalView = Mockito.mock(ExternalView.class);
 
-    TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
-    setSchemaDateTimeFieldSpecSDF(RAW_TABLE_NAME, SDF_PATTERN);
-
-    TimeSegmentPruner segmentPruner = 
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
-    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
-        _propertyStore);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    DateTimeFieldSpec timeFieldSpec =
+        new DateTimeFieldSpec(TIME_COLUMN, DataType.STRING, 
"SIMPLE_DATE_FORMAT|" + SDF_PATTERN, "1:DAYS");
+    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, 
timeFieldSpec);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
     segmentZkMetadataFetcher.register(segmentPruner);
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
RAW_TABLE_NAME);
-    DateTimeFormatSpec dateTimeFormatSpec = 
schema.getSpecForTimeColumn(TIME_COLUMN).getFormatSpec();
-
+    DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec();
     Set<String> onlineSegments = new HashSet<>();
     String segment0 = "segment0";
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 
timeFormatSpec.fromFormatToMillis("20200101"),
+        timeFormatSpec.fromFormatToMillis("20200228"), TimeUnit.MILLISECONDS);
     onlineSegments.add(segment0);
-    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 
dateTimeFormatSpec.fromFormatToMillis("20200101"),
-        dateTimeFormatSpec.fromFormatToMillis("20200228"), 
TimeUnit.MILLISECONDS);
-
     String segment1 = "segment1";
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 
timeFormatSpec.fromFormatToMillis("20200201"),
+        timeFormatSpec.fromFormatToMillis("20200530"), TimeUnit.MILLISECONDS);
     onlineSegments.add(segment1);
-    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 
dateTimeFormatSpec.fromFormatToMillis("20200201"),
-        dateTimeFormatSpec.fromFormatToMillis("20200530"), 
TimeUnit.MILLISECONDS);
-
     String segment2 = "segment2";
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 
timeFormatSpec.fromFormatToMillis("20200401"),
+        timeFormatSpec.fromFormatToMillis("20200430"), TimeUnit.MILLISECONDS);
     onlineSegments.add(segment2);
-    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 
dateTimeFormatSpec.fromFormatToMillis("20200401"),
-        dateTimeFormatSpec.fromFormatToMillis("20200430"), 
TimeUnit.MILLISECONDS);
-
     segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
Collections.singleton(segment0));
-    assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment1)));
-    assertEquals(segmentPruner.prune(brokerRequest3, onlineSegments), 
Collections.singleton(segment1));
-    assertEquals(segmentPruner.prune(brokerRequest4, onlineSegments),
-        new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), 
Collections.emptySet());
+
+    Set<String> input = Set.of(segment0, segment1, segment2);
+    assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0, 
segment1));
+    assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment1));
+    assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of());
   }
 
   @Test
-  public void testTimeSegmentPrunerSql() {
-    BrokerRequest brokerRequest1 = 
CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_1);
-    BrokerRequest brokerRequest2 = 
CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_2);
+  public void testTimeSegmentPrunerTimestampFormat() {
+    BrokerRequest brokerRequest1 = 
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_1);
+    BrokerRequest brokerRequest2 = 
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_2);
+    BrokerRequest brokerRequest3 = 
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_3);
+    BrokerRequest brokerRequest4 = 
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_4);
+    BrokerRequest brokerRequest5 = 
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_5);
+    BrokerRequest brokerRequest6 = 
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_6);
+
     // NOTE: Ideal state and external view are not used in the current 
implementation
     IdealState idealState = Mockito.mock(IdealState.class);
     ExternalView externalView = Mockito.mock(ExternalView.class);
 
-    TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
-    setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
-
-    TimeSegmentPruner segmentPruner = 
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
-    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
-        _propertyStore);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    // Intentionally put EPOCH as the format which Pinot should handle
+    DateTimeFieldSpec timeFieldSpec =
+        new DateTimeFieldSpec(TIME_COLUMN, DataType.TIMESTAMP, 
"EPOCH|MILLISECONDS", "1:DAYS");
+    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, 
timeFieldSpec);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
     segmentZkMetadataFetcher.register(segmentPruner);
+    DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec();
     Set<String> onlineSegments = new HashSet<>();
     String segment0 = "segment0";
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0,
+        timeFormatSpec.fromFormatToMillis("2020-01-01 00:00:00"),
+        timeFormatSpec.fromFormatToMillis("2020-02-28 00:00:00"), 
TimeUnit.MILLISECONDS);
     onlineSegments.add(segment0);
-    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, 
TimeUnit.DAYS);
     String segment1 = "segment1";
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1,
+        timeFormatSpec.fromFormatToMillis("2020-02-01 00:00:00"),
+        timeFormatSpec.fromFormatToMillis("2020-05-30 00:00:00"), 
TimeUnit.MILLISECONDS);
     onlineSegments.add(segment1);
-    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, 
TimeUnit.DAYS);
     String segment2 = "segment2";
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2,
+        timeFormatSpec.fromFormatToMillis("2020-04-01 00:00:00"),
+        timeFormatSpec.fromFormatToMillis("2020-04-30 00:00:00"), 
TimeUnit.MILLISECONDS);
     onlineSegments.add(segment2);
-    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, 
TimeUnit.DAYS);
     segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
 
-    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment1)));
+    Set<String> input = Set.of(segment0, segment1, segment2);
+    assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+    assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, 
segment1));
+    assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, 
segment1));
+    assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+    assertEquals(segmentPruner.prune(brokerRequest6, input), input);
   }
 
   @Test
   public void testEmptySegmentPruner() {
     BrokerRequest brokerRequest1 = 
CalciteSqlCompiler.compileToBrokerRequest(QUERY_1);
-    BrokerRequest brokerRequest2 = 
CalciteSqlCompiler.compileToBrokerRequest(QUERY_2);
-    BrokerRequest brokerRequest3 = 
CalciteSqlCompiler.compileToBrokerRequest(QUERY_3);
+
     // NOTE: Ideal state and external view are not used in the current 
implementation
     IdealState idealState = Mockito.mock(IdealState.class);
     ExternalView externalView = Mockito.mock(ExternalView.class);
 
-    TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
 
-    // init with list of segments
+    // Init with a list of segments
     EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig);
-    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
-        _propertyStore);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
     segmentZkMetadataFetcher.register(segmentPruner);
     Set<String> onlineSegments = new HashSet<>();
     String segment0 = "segment0";
-    onlineSegments.add(segment0);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10);
+    onlineSegments.add(segment0);
     String segment1 = "segment1";
-    onlineSegments.add(segment1);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
+    onlineSegments.add(segment1);
     segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Collections.singletonList(segment0)));
-    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Collections.singletonList(segment0)));
-    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
-        new HashSet<>(Collections.singletonList(segment0)));
-
-    // init with empty list of segments
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
Set.of(segment0));
+
+    // Init with no segment
     segmentPruner = new EmptySegmentPruner(tableConfig);
     segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
     segmentZkMetadataFetcher.register(segmentPruner);
     onlineSegments.clear();
     segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), 
Collections.emptySet());
-    assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), 
Collections.emptySet());
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
onlineSegments);
 
     // Segments without metadata (not updated yet) should not be pruned
     String newSegment = "newSegment";
     onlineSegments.add(newSegment);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(newSegment)),
-        Collections.singleton(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(newSegment)),
-        Collections.singleton(newSegment));
-    assertEquals(segmentPruner.prune(brokerRequest3, 
Collections.singleton(newSegment)),
-        Collections.singleton(newSegment));
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
onlineSegments);
 
     // Segments without totalDocs metadata should not be pruned
-    onlineSegments.clear();
     String segmentWithoutTotalDocsMetadata = "segmentWithoutTotalDocsMetadata";
-    onlineSegments.add(segmentWithoutTotalDocsMetadata);
     SegmentZKMetadata segmentZKMetadataWithoutTotalDocsMetadata =
         new SegmentZKMetadata(segmentWithoutTotalDocsMetadata);
-    
segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
REALTIME_TABLE_NAME,
         segmentZKMetadataWithoutTotalDocsMetadata);
+    onlineSegments.add(segmentWithoutTotalDocsMetadata);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(segmentWithoutTotalDocsMetadata)),
-        Collections.singleton(segmentWithoutTotalDocsMetadata));
-    assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(segmentWithoutTotalDocsMetadata)),
-        Collections.singleton(segmentWithoutTotalDocsMetadata));
-    assertEquals(segmentPruner.prune(brokerRequest3, 
Collections.singleton(segmentWithoutTotalDocsMetadata)),
-        Collections.singleton(segmentWithoutTotalDocsMetadata));
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
onlineSegments);
 
     // Segments with -1 totalDocs should not be pruned
-    onlineSegments.clear();
     String segmentWithNegativeTotalDocsMetadata = 
"segmentWithNegativeTotalDocsMetadata";
-    onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, 
segmentWithNegativeTotalDocsMetadata, -1);
+    onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
-        Collections.singleton(segmentWithNegativeTotalDocsMetadata));
-    assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
-        Collections.singleton(segmentWithNegativeTotalDocsMetadata));
-    assertEquals(segmentPruner.prune(brokerRequest3, 
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
-        Collections.singleton(segmentWithNegativeTotalDocsMetadata));
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
onlineSegments);
 
     // Prune segments with 0 total docs
     onlineSegments.clear();
-    onlineSegments.add(segment0);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10);
-    onlineSegments.add(segment1);
+    onlineSegments.add(segment0);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
+    onlineSegments.add(segment1);
     String segment2 = "segment2";
-    onlineSegments.add(segment2);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, -1);
-
+    onlineSegments.add(segment2);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
Set.of(segment0, segment2));
 
     // Update metadata without external view change or refreshing should have 
no effect
-    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30, 
TimeUnit.DAYS);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, 0);
-    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
-    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
Set.of(segment0, segment2));
 
     // Refresh the changed segment should update the segment pruner
     segmentZkMetadataFetcher.refreshSegment(segment2);
-    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Collections.singletonList(segment0)));
-    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Collections.singletonList(segment0)));
-    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
-        new HashSet<>(Collections.singletonList(segment0)));
-  }
-
-  private TableConfig getTableConfig(String rawTableName, TableType type) {
-    return new 
TableConfigBuilder(type).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN).build();
-  }
-
-  private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit 
timeUnit) {
-    ZKMetadataProvider.setSchema(_propertyStore, new 
Schema.SchemaBuilder().setSchemaName(rawTableName)
-        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:" + timeUnit + 
":EPOCH", "1:" + timeUnit).build());
-  }
-
-  private void setSchemaDateTimeFieldSpecSDF(String rawTableName, String 
format) {
-    ZKMetadataProvider.setSchema(_propertyStore, new 
Schema.SchemaBuilder().setSchemaName(rawTableName)
-        .addDateTime(TIME_COLUMN, FieldSpec.DataType.STRING, 
"1:DAYS:SIMPLE_DATE_FORMAT:" + format, "1:DAYS").build());
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
Set.of(segment0));
   }
 
   private void setSegmentZKPartitionMetadata(String tableNameWithType, String 
segment, String partitionFunction,
       int numPartitions, int partitionId) {
     SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
-    segmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN_1,
-        new ColumnPartitionMetadata(partitionFunction, numPartitions, 
Collections.singleton(partitionId), null))));
+    segmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(Map.of(PARTITION_COLUMN_1,
+        new ColumnPartitionMetadata(partitionFunction, numPartitions, 
Set.of(partitionId), null))));
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, 
segmentZKMetadata);
   }
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
index 626a091005..e8fd128729 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
@@ -221,8 +221,7 @@ public class SchemaTest {
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.DAYS, "time"), null)
         .addDateTime("dateTime0", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", 
"1:HOURS")
         .addDateTime("dateTime1", FieldSpec.DataType.TIMESTAMP, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
-        .addDateTime("dateTime2", FieldSpec.DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS")
-        .build();
+        .addDateTime("dateTime2", FieldSpec.DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS").build();
 
     // Test method which fetches the DateTimeFieldSpec given the timeColumnName
     // Test is on TIME
@@ -254,7 +253,7 @@ public class SchemaTest {
     Assert.assertEquals(dateTimeFieldSpec.getDataType(), 
FieldSpec.DataType.TIMESTAMP);
     Assert.assertTrue(dateTimeFieldSpec.isSingleValueField());
     Assert.assertEquals(dateTimeFieldSpec.getDefaultNullValue(), 0L);
-    Assert.assertEquals(dateTimeFieldSpec.getFormat(), "1:MILLISECONDS:EPOCH");
+    Assert.assertEquals(dateTimeFieldSpec.getFormat(), "TIMESTAMP");
     Assert.assertEquals(dateTimeFieldSpec.getGranularity(), "1:MILLISECONDS");
 
     dateTimeFieldSpec = schema.getSpecForTimeColumn("dateTime2");
@@ -326,15 +325,10 @@ public class SchemaTest {
   @Test
   public void testSerializeDeserializeOptions()
       throws IOException {
-    String json = "{\n"
-        + "  \"primaryKeyColumns\" : null,\n"
-        + "  \"timeFieldSpec\" : null,\n"
-        + "  \"schemaName\" : null,\n"
-        + "  \"enableColumnBasedNullHandling\" : true,\n"
-        + "  \"dimensionFieldSpecs\" : [ ],\n"
-        + "  \"metricFieldSpecs\" : [ ],\n"
-        + "  \"dateTimeFieldSpecs\" : [ ]\n"
-        + "}";
+    String json =
+        "{\n" + "  \"primaryKeyColumns\" : null,\n" + "  \"timeFieldSpec\" : 
null,\n" + "  \"schemaName\" : null,\n"
+            + "  \"enableColumnBasedNullHandling\" : true,\n" + "  
\"dimensionFieldSpecs\" : [ ],\n"
+            + "  \"metricFieldSpecs\" : [ ],\n" + "  \"dateTimeFieldSpecs\" : 
[ ]\n" + "}";
     JsonNode expectedNode = JsonUtils.stringToJsonNode(json);
 
     Schema schema = JsonUtils.jsonNodeToObject(expectedNode, Schema.class);
@@ -363,6 +357,17 @@ public class SchemaTest {
     Assert.assertEquals(schemaFromJson.hashCode(), schema.hashCode());
   }
 
+  @Test
+  public void testTimestampFormatOverride()
+      throws Exception {
+    URL resourceUrl = 
getClass().getClassLoader().getResource("schemaTest.schema");
+    Assert.assertNotNull(resourceUrl);
+    Schema schema = Schema.fromFile(new File(resourceUrl.getFile()));
+    DateTimeFieldSpec fieldSpec = schema.getDateTimeSpec("dateTime3");
+    Assert.assertNotNull(fieldSpec);
+    Assert.assertEquals(fieldSpec.getFormat(), "TIMESTAMP");
+  }
+
   @Test
   public void testByteType()
       throws Exception {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java
index ea9285a104..dbb92090d1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.EqualityUtils;
 
+
 @SuppressWarnings("unused")
 @JsonIgnoreProperties(ignoreUnknown = true)
 public final class DateTimeFieldSpec extends FieldSpec {
@@ -74,6 +75,10 @@ public final class DateTimeFieldSpec extends FieldSpec {
       @Nullable Object sampleValue) {
     super(name, dataType, true);
 
+    // Override format to be "TIMESTAMP" for TIMESTAMP data type because the 
format is implicit
+    if (dataType == DataType.TIMESTAMP) {
+      format = TimeFormat.TIMESTAMP.name();
+    }
     _format = format;
     _granularity = granularity;
     _formatSpec = new DateTimeFormatSpec(format);
@@ -119,13 +124,23 @@ public final class DateTimeFieldSpec extends FieldSpec {
     Preconditions.checkArgument(isSingleValueField, "Unsupported multi-value 
for date time field.");
   }
 
+  @Override
+  public void setDataType(DataType dataType) {
+    super.setDataType(dataType);
+    if (dataType == DataType.TIMESTAMP) {
+      _format = TimeFormat.TIMESTAMP.name();
+    }
+  }
+
   public String getFormat() {
     return _format;
   }
 
   // Required by JSON de-serializer. DO NOT REMOVE.
   public void setFormat(String format) {
-    _format = format;
+    if (_dataType != DataType.TIMESTAMP) {
+      _format = format;
+    }
   }
 
   @JsonIgnore


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to