ankitsultana commented on code in PR #15960:
URL: https://github.com/apache/pinot/pull/15960#discussion_r2121953481


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupByMin() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | min{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupBySum() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | sum{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
+      String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+      long expected = ts <= DATA_START_TIME_SEC ? 0L
+        // If referral is true, views are MAX_VALUE, otherwise 20
+        : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByTwoColumnsAndExpressionValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
+        + " | max{%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByThreeColumnsAndConstantValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+      // Since there are 30 groups, each minute will have 2 rows.
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByWithFilter() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      PLATFORM_COLUMN, TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, 
REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
+    );
+  }
+
+  @Test
+  public void testTransformNull() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{42} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+      assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
+    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC);
+    System.out.println(result);
+    assertEquals(result.get("status").asText(), "success");
+
+    JsonNode series = result.get("data").get("result");
+    assertEquals(series.size(), expectedGroups);
+
+    for (JsonNode row : series) {
+      for (JsonNode point : row.get("values")) {
+        long ts = point.get(0).asLong();
+        long val = point.get(1).asLong();
+        validator.validate(ts, val, row);
+      }
+    }
+  }
+
+  @FunctionalInterface
+  interface TimeSeriesValidator {
+    void validate(long timestamp, long value, JsonNode row);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    addTimeSeriesConfigurations(brokerConf);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    addTimeSeriesConfigurations(serverConf);
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public long getCountStarResult() {
+    return 1000L;

Review Comment:
   nit: let's make it a static final constant



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java:
##########
@@ -549,6 +551,31 @@ public JsonNode postQuery(@Language("sql") String query)
     return queryBrokerHttpEndpoint(query);
   }
 
+  /**
+   * Queries the broker's timeseries query endpoint 
(/timeseries/api/v1/query_range).
+   * This is used for testing timeseries queries.
+   */
+  public JsonNode getTimeseriesQuery(@Language("sql") String query, long 
startTime, long endTime) {
+    try {
+      Map<String, String> queryParams = Map.of("language", "m3ql", "query", 
query, "start",
+        String.valueOf(startTime), "end", String.valueOf(endTime));
+      String url = 
buildQueryUrl(getTimeSeriesQueryApiUrl(getBrokerBaseApiUrl()), queryParams);
+      JsonNode responseJsonNode = 
JsonUtils.stringToJsonNode(sendGetRequest(url, Map.of()));
+      return sanitizeResponse(responseJsonNode);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get timeseries query: " + query, 
e);
+    }
+  }
+
+  private static String buildQueryUrl(String baseUrl, Map<String, String> 
params) throws Exception {

Review Comment:
   nit: Can you group this with other private static methods?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";

Review Comment:
   nit: use column names similar to the userAttributes table in the quickstart 
(I suppose that's what this is based on).
   
   lesser cognitive overhead



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java:
##########
@@ -549,6 +551,31 @@ public JsonNode postQuery(@Language("sql") String query)
     return queryBrokerHttpEndpoint(query);
   }
 
+  /**
+   * Queries the broker's timeseries query endpoint 
(/timeseries/api/v1/query_range).
+   * This is used for testing timeseries queries.
+   */
+  public JsonNode getTimeseriesQuery(@Language("sql") String query, long 
startTime, long endTime) {

Review Comment:
   "sql" ==> "m3ql" ?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupByMin() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | min{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupBySum() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | sum{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
+      String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+      long expected = ts <= DATA_START_TIME_SEC ? 0L
+        // If referral is true, views are MAX_VALUE, otherwise 20
+        : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByTwoColumnsAndExpressionValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
+        + " | max{%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByThreeColumnsAndConstantValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+      // Since there are 30 groups, each minute will have 2 rows.
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByWithFilter() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      PLATFORM_COLUMN, TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, 
REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
+    );
+  }
+
+  @Test
+  public void testTransformNull() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{42} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+      assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
+    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC);
+    System.out.println(result);
+    assertEquals(result.get("status").asText(), "success");
+
+    JsonNode series = result.get("data").get("result");
+    assertEquals(series.size(), expectedGroups);
+
+    for (JsonNode row : series) {
+      for (JsonNode point : row.get("values")) {
+        long ts = point.get(0).asLong();
+        long val = point.get(1).asLong();
+        validator.validate(ts, val, row);
+      }
+    }
+  }
+
+  @FunctionalInterface
+  interface TimeSeriesValidator {
+    void validate(long timestamp, long value, JsonNode row);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    addTimeSeriesConfigurations(brokerConf);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    addTimeSeriesConfigurations(serverConf);
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public long getCountStarResult() {
+    return 1000L;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+      .addSingleValueDimension(TS_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(CITY_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(PLATFORM_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(REFERRAL_COLUMN, FieldSpec.DataType.BOOLEAN)
+      .addSingleValueDimension(VIEWS_COLUMN, FieldSpec.DataType.LONG)
+      .build();
+  }
+
+  private org.apache.avro.Schema.Field createAvroField(String name, 
org.apache.avro.Schema.Type type) {
+    return new org.apache.avro.Schema.Field(name, 
org.apache.avro.Schema.create(type), null, null);
+  }
+
+  public File createAvroFile()
+    throws Exception {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+      createAvroField(TS_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(CITY_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(PLATFORM_COLUMN, org.apache.avro.Schema.Type.STRING),
+      createAvroField(REFERRAL_COLUMN, org.apache.avro.Schema.Type.BOOLEAN),
+      createAvroField(VIEWS_COLUMN, org.apache.avro.Schema.Type.LONG)
+    ));
+
+    File avroFile = new File(_tempDir, "data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < getCountStarResult(); i++) {
+        writer.append(getRecord(avroSchema, i));
+      }
+    }
+    return avroFile;
+  }
+
+  private static GenericData.@NotNull Record getRecord(org.apache.avro.Schema 
avroSchema, int i) {
+    GenericData.Record record = new GenericData.Record(avroSchema);
+    // Do not set DATA_START_TIME_SEC for easier assertion of values.
+    record.put(TS_COLUMN, (DATA_START_TIME_SEC + 1 + i) * 1000L);
+    record.put(CITY_COLUMN, i % 5);
+    record.put(PLATFORM_COLUMN, PLATFORMS[i % PLATFORMS.length]);
+    record.put(REFERRAL_COLUMN, (i % 2) == 0);
+    // Alternate between VIEWS_MIN_VALUE and VIEWS_MAX_VALUE.
+    record.put(VIEWS_COLUMN, VIEWS_MIN_VALUE + (VIEWS_MAX_VALUE - 
VIEWS_MIN_VALUE) * (i % 2));
+    return record;
+  }
+
+  @BeforeClass
+  public void setUp()

Review Comment:
   nit: suggest keeping public methods at the top for readability



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupByMin() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | min{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupBySum() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | sum{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
+      String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+      long expected = ts <= DATA_START_TIME_SEC ? 0L
+        // If referral is true, views are MAX_VALUE, otherwise 20
+        : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByTwoColumnsAndExpressionValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
+        + " | max{%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByThreeColumnsAndConstantValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+      // Since there are 30 groups, each minute will have 2 rows.
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByWithFilter() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      PLATFORM_COLUMN, TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, 
REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
+    );
+  }
+
+  @Test
+  public void testTransformNull() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{42} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+      assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
+    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC);
+    System.out.println(result);
+    assertEquals(result.get("status").asText(), "success");
+
+    JsonNode series = result.get("data").get("result");
+    assertEquals(series.size(), expectedGroups);
+
+    for (JsonNode row : series) {
+      for (JsonNode point : row.get("values")) {
+        long ts = point.get(0).asLong();
+        long val = point.get(1).asLong();
+        validator.validate(ts, val, row);
+      }
+    }
+  }
+
+  @FunctionalInterface
+  interface TimeSeriesValidator {
+    void validate(long timestamp, long value, JsonNode row);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    addTimeSeriesConfigurations(brokerConf);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    addTimeSeriesConfigurations(serverConf);
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public long getCountStarResult() {
+    return 1000L;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+      .addSingleValueDimension(TS_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(CITY_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(PLATFORM_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(REFERRAL_COLUMN, FieldSpec.DataType.BOOLEAN)
+      .addSingleValueDimension(VIEWS_COLUMN, FieldSpec.DataType.LONG)
+      .build();
+  }
+
+  private org.apache.avro.Schema.Field createAvroField(String name, 
org.apache.avro.Schema.Type type) {
+    return new org.apache.avro.Schema.Field(name, 
org.apache.avro.Schema.create(type), null, null);
+  }
+
+  public File createAvroFile()
+    throws Exception {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+      createAvroField(TS_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(CITY_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(PLATFORM_COLUMN, org.apache.avro.Schema.Type.STRING),
+      createAvroField(REFERRAL_COLUMN, org.apache.avro.Schema.Type.BOOLEAN),
+      createAvroField(VIEWS_COLUMN, org.apache.avro.Schema.Type.LONG)
+    ));
+
+    File avroFile = new File(_tempDir, "data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < getCountStarResult(); i++) {
+        writer.append(getRecord(avroSchema, i));
+      }
+    }
+    return avroFile;
+  }
+
+  private static GenericData.@NotNull Record getRecord(org.apache.avro.Schema 
avroSchema, int i) {
+    GenericData.Record record = new GenericData.Record(avroSchema);
+    // Do not set DATA_START_TIME_SEC for easier assertion of values.
+    record.put(TS_COLUMN, (DATA_START_TIME_SEC + 1 + i) * 1000L);
+    record.put(CITY_COLUMN, i % 5);
+    record.put(PLATFORM_COLUMN, PLATFORMS[i % PLATFORMS.length]);
+    record.put(REFERRAL_COLUMN, (i % 2) == 0);
+    // Alternate between VIEWS_MIN_VALUE and VIEWS_MAX_VALUE.
+    record.put(VIEWS_COLUMN, VIEWS_MIN_VALUE + (VIEWS_MAX_VALUE - 
VIEWS_MIN_VALUE) * (i % 2));
+    return record;
+  }
+
+  @BeforeClass
+  public void setUp()
+    throws Exception {
+    LOGGER.warn("Setting up integration test class: {}", 
getClass().getSimpleName());
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    if (_controllerRequestURLBuilder == null) {
+      _controllerRequestURLBuilder =
+        ControllerRequestURLBuilder.baseUrl("http://localhost:"; + 
getControllerPort());
+    }
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    // create & upload schema AND table config
+    Schema schema = createSchema();
+    addSchema(schema);
+
+    File avroFile = createAvroFile();
+    // create offline table
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // create & upload segments
+    ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    waitForAllDocsLoaded(60_000);
+    LOGGER.warn("Finished setting up integration test class: {}", 
getClass().getSimpleName());
+  }
+
+  @AfterClass
+  public void tearDown()
+    throws Exception {
+    LOGGER.warn("Tearing down integration test class: {}", 
getClass().getSimpleName());
+    dropOfflineTable(getTableName());
+    FileUtils.deleteDirectory(_tempDir);

Review Comment:
   multiple calls to tempDir?
   
   Also you can check the log level to info



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupByMin() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | min{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)

Review Comment:
   format: indent should be 2 here? It's 2 in some other functions too



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupByMin() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | min{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupBySum() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | sum{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
+      String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+      long expected = ts <= DATA_START_TIME_SEC ? 0L
+        // If referral is true, views are MAX_VALUE, otherwise 20
+        : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByTwoColumnsAndExpressionValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
+        + " | max{%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByThreeColumnsAndConstantValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+      // Since there are 30 groups, each minute will have 2 rows.
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByWithFilter() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      PLATFORM_COLUMN, TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, 
REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
+    );
+  }
+
+  @Test
+  public void testTransformNull() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{42} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+      assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
+    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC);
+    System.out.println(result);
+    assertEquals(result.get("status").asText(), "success");
+
+    JsonNode series = result.get("data").get("result");
+    assertEquals(series.size(), expectedGroups);
+
+    for (JsonNode row : series) {
+      for (JsonNode point : row.get("values")) {
+        long ts = point.get(0).asLong();
+        long val = point.get(1).asLong();
+        validator.validate(ts, val, row);
+      }
+    }
+  }
+
+  @FunctionalInterface
+  interface TimeSeriesValidator {
+    void validate(long timestamp, long value, JsonNode row);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    addTimeSeriesConfigurations(brokerConf);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    addTimeSeriesConfigurations(serverConf);
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public long getCountStarResult() {
+    return 1000L;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+      .addSingleValueDimension(TS_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(CITY_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(PLATFORM_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(REFERRAL_COLUMN, FieldSpec.DataType.BOOLEAN)
+      .addSingleValueDimension(VIEWS_COLUMN, FieldSpec.DataType.LONG)
+      .build();
+  }
+
+  private org.apache.avro.Schema.Field createAvroField(String name, 
org.apache.avro.Schema.Type type) {
+    return new org.apache.avro.Schema.Field(name, 
org.apache.avro.Schema.create(type), null, null);
+  }
+
+  public File createAvroFile()
+    throws Exception {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+      createAvroField(TS_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(CITY_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(PLATFORM_COLUMN, org.apache.avro.Schema.Type.STRING),
+      createAvroField(REFERRAL_COLUMN, org.apache.avro.Schema.Type.BOOLEAN),
+      createAvroField(VIEWS_COLUMN, org.apache.avro.Schema.Type.LONG)
+    ));
+
+    File avroFile = new File(_tempDir, "data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < getCountStarResult(); i++) {
+        writer.append(getRecord(avroSchema, i));
+      }
+    }
+    return avroFile;
+  }
+
+  private static GenericData.@NotNull Record getRecord(org.apache.avro.Schema 
avroSchema, int i) {
+    GenericData.Record record = new GenericData.Record(avroSchema);
+    // Do not set DATA_START_TIME_SEC for easier assertion of values.
+    record.put(TS_COLUMN, (DATA_START_TIME_SEC + 1 + i) * 1000L);
+    record.put(CITY_COLUMN, i % 5);
+    record.put(PLATFORM_COLUMN, PLATFORMS[i % PLATFORMS.length]);
+    record.put(REFERRAL_COLUMN, (i % 2) == 0);
+    // Alternate between VIEWS_MIN_VALUE and VIEWS_MAX_VALUE.
+    record.put(VIEWS_COLUMN, VIEWS_MIN_VALUE + (VIEWS_MAX_VALUE - 
VIEWS_MIN_VALUE) * (i % 2));
+    return record;
+  }
+
+  @BeforeClass
+  public void setUp()
+    throws Exception {
+    LOGGER.warn("Setting up integration test class: {}", 
getClass().getSimpleName());
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    if (_controllerRequestURLBuilder == null) {
+      _controllerRequestURLBuilder =
+        ControllerRequestURLBuilder.baseUrl("http://localhost:"; + 
getControllerPort());
+    }
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    // create & upload schema AND table config
+    Schema schema = createSchema();
+    addSchema(schema);
+
+    File avroFile = createAvroFile();
+    // create offline table
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // create & upload segments
+    ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    waitForAllDocsLoaded(60_000);
+    LOGGER.warn("Finished setting up integration test class: {}", 
getClass().getSimpleName());

Review Comment:
   log level



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupByMin() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | min{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupBySum() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | sum{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
+      String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+      long expected = ts <= DATA_START_TIME_SEC ? 0L
+        // If referral is true, views are MAX_VALUE, otherwise 20
+        : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByTwoColumnsAndExpressionValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
+        + " | max{%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByThreeColumnsAndConstantValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+      // Since there are 30 groups, each minute will have 2 rows.
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByWithFilter() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      PLATFORM_COLUMN, TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, 
REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
+    );
+  }
+
+  @Test
+  public void testTransformNull() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{42} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+      assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
+    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC);
+    System.out.println(result);
+    assertEquals(result.get("status").asText(), "success");
+
+    JsonNode series = result.get("data").get("result");
+    assertEquals(series.size(), expectedGroups);
+
+    for (JsonNode row : series) {
+      for (JsonNode point : row.get("values")) {
+        long ts = point.get(0).asLong();
+        long val = point.get(1).asLong();
+        validator.validate(ts, val, row);
+      }
+    }
+  }
+
+  @FunctionalInterface
+  interface TimeSeriesValidator {
+    void validate(long timestamp, long value, JsonNode row);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    addTimeSeriesConfigurations(brokerConf);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    addTimeSeriesConfigurations(serverConf);
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public long getCountStarResult() {
+    return 1000L;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+      .addSingleValueDimension(TS_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(CITY_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(PLATFORM_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(REFERRAL_COLUMN, FieldSpec.DataType.BOOLEAN)
+      .addSingleValueDimension(VIEWS_COLUMN, FieldSpec.DataType.LONG)
+      .build();
+  }
+
+  private org.apache.avro.Schema.Field createAvroField(String name, 
org.apache.avro.Schema.Type type) {
+    return new org.apache.avro.Schema.Field(name, 
org.apache.avro.Schema.create(type), null, null);
+  }
+
+  public File createAvroFile()
+    throws Exception {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+      createAvroField(TS_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(CITY_COLUMN, org.apache.avro.Schema.Type.LONG),
+      createAvroField(PLATFORM_COLUMN, org.apache.avro.Schema.Type.STRING),
+      createAvroField(REFERRAL_COLUMN, org.apache.avro.Schema.Type.BOOLEAN),
+      createAvroField(VIEWS_COLUMN, org.apache.avro.Schema.Type.LONG)
+    ));
+
+    File avroFile = new File(_tempDir, "data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < getCountStarResult(); i++) {
+        writer.append(getRecord(avroSchema, i));
+      }
+    }
+    return avroFile;
+  }
+
+  private static GenericData.@NotNull Record getRecord(org.apache.avro.Schema 
avroSchema, int i) {
+    GenericData.Record record = new GenericData.Record(avroSchema);
+    // Do not set DATA_START_TIME_SEC for easier assertion of values.
+    record.put(TS_COLUMN, (DATA_START_TIME_SEC + 1 + i) * 1000L);
+    record.put(CITY_COLUMN, i % 5);
+    record.put(PLATFORM_COLUMN, PLATFORMS[i % PLATFORMS.length]);
+    record.put(REFERRAL_COLUMN, (i % 2) == 0);
+    // Alternate between VIEWS_MIN_VALUE and VIEWS_MAX_VALUE.
+    record.put(VIEWS_COLUMN, VIEWS_MIN_VALUE + (VIEWS_MAX_VALUE - 
VIEWS_MIN_VALUE) * (i % 2));
+    return record;
+  }
+
+  @BeforeClass
+  public void setUp()
+    throws Exception {
+    LOGGER.warn("Setting up integration test class: {}", 
getClass().getSimpleName());
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    if (_controllerRequestURLBuilder == null) {
+      _controllerRequestURLBuilder =
+        ControllerRequestURLBuilder.baseUrl("http://localhost:"; + 
getControllerPort());

Review Comment:
   nit: format. I think the argument will be on a new line and the preceding 
tokens can stay on the same line.
   
   Idk if we have a clear definition of Pinot Style.. we really should define 
and enforce it at some point



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";

Review Comment:
   instead of views you can change it to totalTrips perhaps, in keeping with 
the column names used in the quickstart



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupByMin() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | min{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
+    );
+  }
+
+  @Test
+  public void testGroupBySum() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | sum{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
+      String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+      long expected = ts <= DATA_START_TIME_SEC ? 0L
+        // If referral is true, views are MAX_VALUE, otherwise 20
+        : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByTwoColumnsAndExpressionValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
+        + " | max{%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByThreeColumnsAndConstantValue() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+      // Since there are 30 groups, each minute will have 2 rows.
+      long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
+      assertEquals(val, expected);
+    });
+  }
+
+  @Test
+  public void testGroupByWithFilter() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+        + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
+      PLATFORM_COLUMN, TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN, CITY_COLUMN, 
REFERRAL_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
+    );
+  }
+
+  @Test
+  public void testTransformNull() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{42} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+      assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
+    );
+  }
+
+  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
+    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC);
+    System.out.println(result);
+    assertEquals(result.get("status").asText(), "success");
+
+    JsonNode series = result.get("data").get("result");
+    assertEquals(series.size(), expectedGroups);
+
+    for (JsonNode row : series) {
+      for (JsonNode point : row.get("values")) {
+        long ts = point.get(0).asLong();
+        long val = point.get(1).asLong();
+        validator.validate(ts, val, row);
+      }
+    }
+  }
+
+  @FunctionalInterface
+  interface TimeSeriesValidator {

Review Comment:
   nit: type declarations at the bottom might be better



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java:
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.util.TestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesIntegrationTest.class);
+  private static final String TS_COLUMN = "tsCol";
+  private static final String CITY_COLUMN = "cityCol";
+  private static final String PLATFORM_COLUMN = "platformCol";
+  private static final String REFERRAL_COLUMN = "referralCol";
+  private static final String VIEWS_COLUMN = "viewsCol";
+
+  private static final String[] PLATFORMS = new String[]{"windows", "android", 
"ios"};
+  private static final long VIEWS_MIN_VALUE = 20L;
+  private static final long VIEWS_MAX_VALUE = 30L;
+  private static final long DATA_START_TIME_SEC = 1747008000L;
+  private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
+  private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
+
+  @Test
+  public void testGroupByMax() {
+    String query = String.format(
+      
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+        + " | max{%s} | transformNull{0} | keepLastValue{}",
+      TS_COLUMN, VIEWS_COLUMN, PLATFORM_COLUMN
+    );
+    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+        assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)

Review Comment:
   format: indent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to