This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b64bd81de1 [timeseries] Part-3.1: Add Support for Partial Aggregate 
and Complex Intermediate Type (#14631)
b64bd81de1 is described below

commit b64bd81de15859407f18a802573d667070de8768
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Tue Dec 17 09:45:29 2024 -0600

    [timeseries] Part-3.1: Add Support for Partial Aggregate and Complex 
Intermediate Type (#14631)
    
    * [timeseries] Add Support for Partial Aggregation and Complex Intermediate 
Type
    
    * Fix tests + add tests + cleanup
    
    * address feedback
    
    * add missing todo
---
 .../response/PinotBrokerTimeSeriesResponse.java    |   2 +-
 .../TimeSeriesAggregationOperatorTest.java         |   2 +-
 .../core/query/executor/QueryExecutorTest.java     |  14 +--
 .../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java       |   3 +-
 .../tsdb/m3ql/operator/KeepLastValueOperator.java  |   2 +-
 .../tsdb/m3ql/operator/TransformNullOperator.java  |   2 +-
 .../timeseries/serde/TimeSeriesBlockSerde.java     | 124 +++++++++++++++++++--
 .../PhysicalTimeSeriesServerPlanVisitorTest.java   |   2 +-
 .../TimeSeriesExchangeReceiveOperatorTest.java     |  12 +-
 .../timeseries/serde/TimeSeriesBlockSerdeTest.java |  43 ++++++-
 .../tsdb/planner/TimeSeriesPlanFragmenter.java     |  13 ++-
 .../java/org/apache/pinot/tsdb/spi/AggInfo.java    |  30 ++++-
 .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java      |   5 +
 .../tsdb/spi/series/BaseTimeSeriesBuilder.java     |  18 +--
 .../apache/pinot/tsdb/spi/series/TimeSeries.java   |  19 +++-
 .../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java  |   9 +-
 .../spi/plan/serde/TimeSeriesPlanSerdeTest.java    |   4 +-
 .../pinot/tsdb/spi/series/TimeSeriesTest.java      |  54 +++++++++
 18 files changed, 301 insertions(+), 57 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index 96320b8326..4a1f347d16 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -118,7 +118,7 @@ public class PinotBrokerTimeSeriesResponse {
       for (TimeSeries timeSeries : listOfTimeSeries) {
         Object[][] values = new Object[timeValues.length][];
         for (int i = 0; i < timeValues.length; i++) {
-          Object nullableValue = timeSeries.getValues()[i];
+          Object nullableValue = timeSeries.getDoubleValues()[i];
           values[i] = new Object[]{timeValues[i], nullableValue == null ? null 
: nullableValue.toString()};
         }
         result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
index eea81a4ba1..b6e97c849f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
@@ -44,7 +44,7 @@ public class TimeSeriesAggregationOperatorTest {
   private static final Random RANDOM = new Random();
   private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
   private static final String GROUP_BY_COLUMN = "city";
-  private static final AggInfo AGG_INFO = new AggInfo("SUM", 
Collections.emptyMap());
+  private static final AggInfo AGG_INFO = new AggInfo("SUM", false, 
Collections.emptyMap());
   private static final ExpressionContext VALUE_EXPRESSION = 
ExpressionContext.forIdentifier("someValueColumn");
   private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, 
Duration.ofSeconds(100), 10);
   private static final int NUM_DOCS_IN_DUMMY_DATA = 1000;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 4a171128c8..0b59468e0d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -223,7 +223,7 @@ public class QueryExecutorTest {
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderAmount");
     TimeSeriesContext timeSeriesContext =
         new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, 
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
-            0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null));
+            0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", false, 
Collections.emptyMap()));
     QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList());
     ServerQueryRequest serverQueryRequest =
         new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), 
ServerMetrics.get());
@@ -232,8 +232,8 @@ public class QueryExecutorTest {
     TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) 
instanceResponse.getResultsBlock();
     TimeSeriesBlock timeSeriesBlock = 
resultsBlock.getTimeSeriesBuilderBlock().build();
     assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
-    
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]);
-    
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1],
 29885544.0);
+    
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]);
+    
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1],
 29885544.0);
   }
 
   @Test
@@ -242,7 +242,7 @@ public class QueryExecutorTest {
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderItemCount");
     TimeSeriesContext timeSeriesContext =
         new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, 
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
-            0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null));
+            0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", false, 
Collections.emptyMap()));
     QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext);
     ServerQueryRequest serverQueryRequest =
         new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), 
ServerMetrics.get());
@@ -260,7 +260,7 @@ public class QueryExecutorTest {
         assertFalse(foundNewYork, "Found multiple time-series for New York");
         foundNewYork = true;
         Optional<Double> maxValue =
-            
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder());
+            
Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).max(Comparator.naturalOrder());
         assertTrue(maxValue.isPresent());
         assertEquals(maxValue.get().longValue(), 4L);
       }
@@ -274,7 +274,7 @@ public class QueryExecutorTest {
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderItemCount");
     TimeSeriesContext timeSeriesContext =
         new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, 
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
-            0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null));
+            0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", false, 
Collections.emptyMap()));
     QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext);
     ServerQueryRequest serverQueryRequest =
         new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), 
ServerMetrics.get());
@@ -292,7 +292,7 @@ public class QueryExecutorTest {
         assertFalse(foundChicago, "Found multiple time-series for Chicago");
         foundChicago = true;
         Optional<Double> minValue =
-            
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder());
+            
Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).min(Comparator.naturalOrder());
         assertTrue(minValue.isPresent());
         assertEquals(minValue.get().longValue(), 0L);
       }
diff --git 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
index 53844048a7..42515083c0 100644
--- 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
+++ 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.tsdb.m3ql;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
@@ -84,7 +85,7 @@ public class M3TimeSeriesPlanner implements 
TimeSeriesLogicalPlanner {
         case "max":
           Preconditions.checkState(commandId == 1, "Aggregation should be the 
second command (fetch should be first)");
           Preconditions.checkState(aggInfo == null, "Aggregation already set. 
Only single agg allowed.");
-          aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), null);
+          aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), false, 
Collections.emptyMap());
           if (commands.get(commandId).size() > 1) {
             String[] cols = commands.get(commandId).get(1).split(",");
             groupByColumns = 
Stream.of(cols).map(String::trim).collect(Collectors.toList());
diff --git 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
index 0330dff13b..cef90b69af 100644
--- 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
+++ 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
@@ -34,7 +34,7 @@ public class KeepLastValueOperator extends 
BaseTimeSeriesOperator {
     TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
     seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries 
-> {
       for (TimeSeries series : unionOfSeries) {
-        Double[] values = series.getValues();
+        Double[] values = series.getDoubleValues();
         Double lastValue = null;
         for (int index = 0; index < values.length; index++) {
           if (values[index] != null) {
diff --git 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
index ca971c932c..661e4de498 100644
--- 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
+++ 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
@@ -37,7 +37,7 @@ public class TransformNullOperator extends 
BaseTimeSeriesOperator {
     TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
     seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries 
-> {
       for (TimeSeries series : unionOfSeries) {
-        Double[] values = series.getValues();
+        Double[] values = series.getDoubleValues();
         for (int index = 0; index < values.length; index++) {
           values[index] = values[index] == null ? _defaultValue : 
values[index];
         }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
index cdbf668123..5978e29507 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.query.runtime.timeseries.serde;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,10 +31,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -51,7 +57,7 @@ import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
  *   the last column. As an example, consider the following, where FBV 
represents the first bucket value of TimeBuckets.
  *   <pre>
  *     
+-------------+------------+-------------+---------------------------------+
- *     | tag-0       | tag-1      | tag-n       | values                       
   |
+ *     | tag-0       | tag-1      | tag-n       | values (String[] or 
double[])  |
  *     
+-------------+------------+-------------+---------------------------------+
  *     | null        | null       | null        | [FBV, bucketSize, 
numBuckets]   |
  *     
+-------------+------------+-------------+---------------------------------+
@@ -74,6 +80,7 @@ public class TimeSeriesBlockSerde {
    * Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN 
can help detect divide by 0.
    * TODO(timeseries): Check if we can get rid of boxed Doubles altogether.
    */
+  private static final String VALUES_COLUMN_NAME = "__ts_serde_values";
   private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
 
   private TimeSeriesBlockSerde() {
@@ -85,12 +92,13 @@ public class TimeSeriesBlockSerde {
     TransferableBlock transferableBlock = 
TransferableBlockUtils.wrap(dataBlock);
     List<String> tagNames = 
generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(),
         "Missing data schema in TransferableBlock"));
+    final DataSchema dataSchema = transferableBlock.getDataSchema();
     List<Object[]> container = transferableBlock.getContainer();
-    TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0));
+    TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0), dataSchema);
     Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
     for (int index = 1; index < container.size(); index++) {
       Object[] row = container.get(index);
-      TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets);
+      TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets, 
dataSchema);
       long seriesId = Long.parseLong(timeSeries.getId());
       seriesMap.computeIfAbsent(seriesId, x -> new 
ArrayList<>()).add(timeSeries);
     }
@@ -112,17 +120,77 @@ public class TimeSeriesBlockSerde {
     return DataBlockUtils.toByteString(transferableBlock.getDataBlock());
   }
 
+  /**
+   * This method is only used for encoding time-bucket-values to byte arrays, 
when the TimeSeries value type
+   * is byte[][].
+   */
+  @VisibleForTesting
+  static byte[][] toBytesArray(double[] values) {
+    byte[][] result = new byte[values.length][8];
+    for (int index = 0; index < values.length; index++) {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(result[index]);
+      byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+      byteBuffer.putDouble(values[index]);
+    }
+    return result;
+  }
+
+  /**
+   * This method is only used for decoding time-bucket-values from byte 
arrays, when the TimeSeries value type
+   * is byte[][].
+   */
+  @VisibleForTesting
+  static double[] fromBytesArray(byte[][] bytes) {
+    double[] result = new double[bytes.length];
+    for (int index = 0; index < bytes.length; index++) {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes[index]);
+      byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+      result[index] = byteBuffer.getDouble();
+    }
+    return result;
+  }
+
+  /**
+   * Since {@link DataBlockBuilder} does not support {@link 
ColumnDataType#BYTES_ARRAY}, we have to encode the
+   * transmitted bytes as Hex to use String[].
+   */
+  @VisibleForTesting
+  static String[] encodeAsHex(byte[][] byteValues) {
+    String[] result = new String[byteValues.length];
+    for (int index = 0; index < result.length; index++) {
+      result[index] = Hex.encodeHexString(byteValues[index]);
+    }
+    return result;
+  }
+
+  /**
+   * Used for decoding Hex strings. See {@link 
TimeSeriesBlockSerde#encodeAsHex} for more.
+   */
+  @VisibleForTesting
+  static byte[][] decodeFromHex(String[] hexEncodedValues) {
+    byte[][] result = new byte[hexEncodedValues.length][];
+    for (int index = 0; index < hexEncodedValues.length; index++) {
+      try {
+        result[index] = Hex.decodeHex(hexEncodedValues[index]);
+      } catch (DecoderException e) {
+        throw new RuntimeException("Error decoding byte[] value from encoded 
hex string", e);
+      }
+    }
+    return result;
+  }
+
   private static DataSchema generateDataSchema(TimeSeriesBlock 
timeSeriesBlock) {
     TimeSeries sampledTimeSeries = 
sampleTimeSeries(timeSeriesBlock).orElse(null);
     int numTags = sampledTimeSeries == null ? 0 : 
sampledTimeSeries.getTagNames().size();
     ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1];
+    final ColumnDataType valueDataType = inferValueDataType(sampledTimeSeries);
     String[] columnNames = new String[numTags + 1];
     for (int tagIndex = 0; tagIndex < numTags; tagIndex++) {
       columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex);
       dataTypes[tagIndex] = ColumnDataType.STRING;
     }
-    columnNames[numTags] = "__ts_values";
-    dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY;
+    columnNames[numTags] = VALUES_COLUMN_NAME;
+    dataTypes[numTags] = valueDataType;
     return new DataSchema(columnNames, dataTypes);
   }
 
@@ -144,6 +212,14 @@ public class TimeSeriesBlockSerde {
     return Optional.of(timeSeriesList.get(0));
   }
 
+  private static ColumnDataType inferValueDataType(@Nullable TimeSeries 
timeSeries) {
+    if (timeSeries == null || timeSeries.getValues() instanceof Double[]) {
+      return ColumnDataType.DOUBLE_ARRAY;
+    }
+    // Byte values are encoded as hex array
+    return ColumnDataType.STRING_ARRAY;
+  }
+
   private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema 
dataSchema) {
     int numColumns = dataSchema.getColumnNames().length;
     Object[] result = new Object[numColumns];
@@ -153,12 +229,27 @@ public class TimeSeriesBlockSerde {
     double firstBucketValue = timeBuckets.getTimeBuckets()[0];
     double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds();
     double numBuckets = timeBuckets.getNumBuckets();
-    result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds, 
numBuckets};
+    final ColumnDataType valuesDataType = 
dataSchema.getColumnDataTypes()[numColumns - 1];
+    final double[] bucketsEncodedAsDouble = new double[]{firstBucketValue, 
bucketSizeSeconds, numBuckets};
+    if (valuesDataType == ColumnDataType.DOUBLE_ARRAY) {
+      result[numColumns - 1] = bucketsEncodedAsDouble;
+    } else {
+      Preconditions.checkState(valuesDataType == ColumnDataType.STRING_ARRAY,
+          "Expected bytes_array column type. Found: %s", valuesDataType);
+      result[numColumns - 1] = 
encodeAsHex(toBytesArray(bucketsEncodedAsDouble));
+    }
     return result;
   }
 
-  private static TimeBuckets timeBucketsFromRow(Object[] row) {
-    double[] values = (double[]) row[row.length - 1];
+  private static TimeBuckets timeBucketsFromRow(Object[] row, DataSchema 
dataSchema) {
+    int numColumns = dataSchema.getColumnDataTypes().length;
+    double[] values;
+    if (dataSchema.getColumnDataTypes()[numColumns - 1] == 
ColumnDataType.STRING_ARRAY) {
+      byte[][] byteValues = decodeFromHex((String[]) row[row.length - 1]);
+      values = fromBytesArray(byteValues);
+    } else {
+      values = (double[]) row[row.length - 1];
+    }
     long fbv = (long) values[0];
     Duration window = Duration.ofSeconds((long) values[1]);
     int numBuckets = (int) values[2];
@@ -172,14 +263,25 @@ public class TimeSeriesBlockSerde {
       Object tagValue = timeSeries.getTagValues()[index];
       result[index] = tagValue == null ? "null" : tagValue.toString();
     }
-    result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues());
+    if (dataSchema.getColumnDataTypes()[numColumns - 1] == 
ColumnDataType.DOUBLE_ARRAY) {
+      result[numColumns - 1] = unboxDoubleArray(timeSeries.getDoubleValues());
+    } else {
+      result[numColumns - 1] = encodeAsHex(timeSeries.getBytesValues());
+    }
     return result;
   }
 
-  private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[] 
row, TimeBuckets timeBuckets) {
-    Double[] values = boxDoubleArray((double[]) row[row.length - 1]);
+  private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[] 
row, TimeBuckets timeBuckets,
+      DataSchema dataSchema) {
+    int numColumns = dataSchema.getColumnDataTypes().length;
     Object[] tagValues = new Object[row.length - 1];
     System.arraycopy(row, 0, tagValues, 0, row.length - 1);
+    Object[] values;
+    if (dataSchema.getColumnDataTypes()[numColumns - 1] == 
ColumnDataType.DOUBLE_ARRAY) {
+      values = boxDoubleArray((double[]) row[row.length - 1]);
+    } else {
+      values = decodeFromHex((String[]) row[row.length - 1]);
+    }
     return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null, 
timeBuckets, values, tagNames, tagValues);
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
index e85d17cf6c..43b3496dfb 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
@@ -45,7 +45,7 @@ public class PhysicalTimeSeriesServerPlanVisitorTest {
     final String planId = "id";
     final String tableName = "orderTable";
     final String timeColumn = "orderTime";
-    final AggInfo aggInfo = new AggInfo("SUM", null);
+    final AggInfo aggInfo = new AggInfo("SUM", false, Collections.emptyMap());
     final String filterExpr = "cityName = 'Chicago'";
     PhysicalTimeSeriesServerPlanVisitor serverPlanVisitor = new 
PhysicalTimeSeriesServerPlanVisitor(
         mock(QueryExecutor.class), mock(ExecutorService.class), 
mock(ServerMetrics.class));
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
index c9fd929333..5a9079de2c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
@@ -39,7 +39,7 @@ import static org.testng.Assert.*;
 
 public class TimeSeriesExchangeReceiveOperatorTest {
   private static final int NUM_SERVERS_QUERIED = 3;
-  private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", null);
+  private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", false, 
Collections.emptyMap());
   private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, 
Duration.ofSeconds(200), 4);
   private static final List<String> TAG_NAMES = ImmutableList.of("city", 
"zip");
   private static final Object[] CHICAGO_SERIES_VALUES = new 
Object[]{"Chicago", "60605"};
@@ -65,10 +65,10 @@ public class TimeSeriesExchangeReceiveOperatorTest {
     assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 1, 
"Expected 1 series for Chicago");
     assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 
1 series for SF");
     // Ensure Chicago had series addition performed
-    Double[] chicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+    Double[] chicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues();
     assertEquals(chicagoSeriesValues, new Double[]{20.0, 20.0, 20.0, 20.0});
     // Ensure SF had input series unmodified
-    Double[] sanFranciscoSeriesValues = 
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+    Double[] sanFranciscoSeriesValues = 
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues();
     assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
   }
 
@@ -89,12 +89,12 @@ public class TimeSeriesExchangeReceiveOperatorTest {
     assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 2, 
"Expected 2 series for Chicago");
     assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 
1 series for SF");
     // Ensure Chicago has unmodified series values
-    Double[] firstChicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
-    Double[] secondChicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getValues();
+    Double[] firstChicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues();
+    Double[] secondChicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getDoubleValues();
     assertEquals(firstChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
     assertEquals(secondChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
     // Ensure SF has input unmodified series values
-    Double[] sanFranciscoSeriesValues = 
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+    Double[] sanFranciscoSeriesValues = 
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues();
     assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
index f08d39ca0a..d488d8fbd0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -47,7 +47,7 @@ public class TimeSeriesBlockSerdeTest {
     // 4. Compare ByteString-1 and ByteString-2.
     // 5. Compare values of Block-1 and Block-2.
     List<TimeSeriesBlock> blocks = List.of(buildBlockWithNoTags(), 
buildBlockWithSingleTag(),
-        buildBlockWithMultipleTags());
+        buildBlockWithMultipleTags(), buildBlockWithByteValues());
     for (TimeSeriesBlock block1 : blocks) {
       // Serialize, deserialize and serialize again
       ByteString byteString1 = 
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
@@ -61,6 +61,31 @@ public class TimeSeriesBlockSerdeTest {
     }
   }
 
+  @Test
+  public void testFromToBytesArray() {
+    // Encode and decode a double[] array to confirm the values turn out to be 
the same.
+    double[][] inputs = new double[][]{
+        {131.0, 1.31, 0.0},
+        {1.0, 1231.0, 1.0}
+    };
+    for (double[] input : inputs) {
+      byte[][] encodedBytes = TimeSeriesBlockSerde.toBytesArray(input);
+      double[] decodedValues = 
TimeSeriesBlockSerde.fromBytesArray(encodedBytes);
+      assertEquals(decodedValues, input);
+    }
+  }
+
+  @Test
+  public void testFromToHex() {
+    byte[][] input = new byte[][]{
+        {0x1a}, {0x00}, {0x77}, {Byte.MIN_VALUE},
+        {Byte.MAX_VALUE}, {0x13}, {0x19}, {0x77}
+    };
+    String[] encodedValues = TimeSeriesBlockSerde.encodeAsHex(input);
+    byte[][] decodedValues = TimeSeriesBlockSerde.decodeFromHex(encodedValues);
+    assertEquals(decodedValues, input);
+  }
+
   /**
    * Compares time series blocks in a way which makes it easy to debug test 
failures when/if they happen in CI.
    */
@@ -132,4 +157,20 @@ public class TimeSeriesBlockSerdeTest {
         new Double[]{Double.NaN, -1.0, -1231231.0, 3.14}, tagNames, 
seriesTwoValues)));
     return new TimeSeriesBlock(timeBuckets, seriesMap);
   }
+
+  private static TimeSeriesBlock buildBlockWithByteValues() {
+    TimeBuckets timeBuckets = TIME_BUCKETS;
+    // Series are: [cityId=Chicago, zip=60605] and [cityId=San Francisco, 
zip=94107]
+    List<String> tagNames = ImmutableList.of("cityId", "zip");
+    Object[] seriesOneValues = new Object[]{"Chicago", "60605"};
+    Object[] seriesTwoValues = new Object[]{"San Francisco", "94107"};
+    long seriesOneHash = TimeSeries.hash(seriesOneValues);
+    long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+    Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+    seriesMap.put(seriesOneHash, ImmutableList.of(new 
TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+        new byte[][]{{0x13}, {0x1b}, {0x12}, {0x00}}, tagNames, 
seriesOneValues)));
+    seriesMap.put(seriesTwoHash, ImmutableList.of(new 
TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+        new byte[][]{{0x00}, {0x00}, {Byte.MIN_VALUE}, {0x7f}}, tagNames, 
seriesTwoValues)));
+    return new TimeSeriesBlock(timeBuckets, seriesMap);
+  }
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
index 46a3f68c31..32287f4d83 100644
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.tsdb.planner;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
 import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
@@ -102,8 +104,15 @@ public class TimeSeriesPlanFragmenter {
   private static BaseTimeSeriesPlanNode 
fragmentRecursively(BaseTimeSeriesPlanNode planNode, Context context) {
     if (planNode instanceof LeafTimeSeriesPlanNode) {
       LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) planNode;
-      context._fragments.add(leafNode.withInputs(Collections.emptyList()));
-      return new TimeSeriesExchangeNode(planNode.getId(), 
Collections.emptyList(), leafNode.getAggInfo());
+      AggInfo currentAggInfo = leafNode.getAggInfo();
+      if (currentAggInfo == null) {
+        context._fragments.add(leafNode.withInputs(Collections.emptyList()));
+      } else {
+        Preconditions.checkState(!currentAggInfo.getIsPartial(),
+            "Leaf node in the logical plan should not have partial agg");
+        
context._fragments.add(leafNode.withAggInfo(currentAggInfo.withPartialAggregation()));
+      }
+      return new TimeSeriesExchangeNode(planNode.getId(), 
Collections.emptyList(), currentAggInfo);
     }
     List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
     for (BaseTimeSeriesPlanNode input : planNode.getInputs()) {
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
index 0dc3e0502d..33b66bff1f 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.Map;
-import javax.annotation.Nullable;
 
 
 /**
@@ -41,24 +40,47 @@ import javax.annotation.Nullable;
  * Example usage:
  * Map<String, String> params = new HashMap<>();
  * params.put("window", "5m");
- * AggInfo aggInfo = new AggInfo("rate", params);
+ * AggInfo aggInfo = new AggInfo("rate", true, params);
  */
 public class AggInfo {
   private final String _aggFunction;
+  /**
+   * Denotes whether an aggregate is partial or full. When returning the 
logical plan, language developers must not
+   * set this to true. This is used during Physical planning, and Pinot may 
set this to true if the corresponding
+   * aggregate node is not guaranteed to have the full data. In such cases, 
the physical plan will always add a
+   * complimentary full aggregate.
+   * <p>
+   *  TODO(timeseries): Ideally we should remove this from the logical plan 
completely.
+   * </p>
+   */
+  private final boolean _isPartial;
   private final Map<String, String> _params;
 
   @JsonCreator
-  public AggInfo(@JsonProperty("aggFunction") String aggFunction,
-      @JsonProperty("params") @Nullable Map<String, String> params) {
+  public AggInfo(@JsonProperty("aggFunction") String aggFunction, 
@JsonProperty("isPartial") boolean isPartial,
+      @JsonProperty("params") Map<String, String> params) {
     Preconditions.checkNotNull(aggFunction, "Received null aggFunction in 
AggInfo");
     _aggFunction = aggFunction;
+    _isPartial = isPartial;
     _params = params != null ? params : Collections.emptyMap();
   }
 
+  public AggInfo withPartialAggregation() {
+    return new AggInfo(_aggFunction, true, _params);
+  }
+
+  public AggInfo withFullAggregation() {
+    return new AggInfo(_aggFunction, false, _params);
+  }
+
   public String getAggFunction() {
     return _aggFunction;
   }
 
+  public boolean getIsPartial() {
+    return _isPartial;
+  }
+
   public Map<String, String> getParams() {
     return Collections.unmodifiableMap(_params);
   }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 1986f4713d..3deb4c68e6 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -64,6 +64,11 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
     _groupByExpressions = groupByExpressions;
   }
 
+  public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) {
+    return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn, 
_timeUnit, _offsetSeconds,
+        _filterExpression, _valueExpression, newAggInfo, _groupByExpressions);
+  }
+
   @Override
   public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> 
newInputs) {
     return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, 
_timeUnit, _offsetSeconds,
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
index 20ac1714a8..9cca55ebcb 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.tsdb.spi.series;
 
 import java.util.List;
-import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 
@@ -61,19 +60,14 @@ public abstract class BaseTimeSeriesBuilder {
 
   public abstract void addValue(long timeValue, Double value);
 
-  public void mergeSeries(TimeSeries series) {
-    int numDataPoints = series.getValues().length;
-    Long[] timeValues = Objects.requireNonNull(series.getTimeValues(),
-        "Cannot merge series: found null timeValues");
-    for (int i = 0; i < numDataPoints; i++) {
-      addValue(timeValues[i], series.getValues()[i]);
-    }
-  }
-
+  /**
+   * Assumes Double[] values and attempts to merge the given series with this 
builder. Implementations are
+   * recommended to override this to either optimize, or add bytes[][] values 
from the input Series.
+   */
   public void mergeAlignedSeries(TimeSeries series) {
-    int numDataPoints = series.getValues().length;
+    int numDataPoints = series.getDoubleValues().length;
     for (int i = 0; i < numDataPoints; i++) {
-      addValueAtIndex(i, series.getValues()[i]);
+      addValueAtIndex(i, series.getDoubleValues()[i]);
     }
   }
 
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
index 55e2a9a730..4a2e452116 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tsdb.spi.series;
 
+import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -67,12 +68,16 @@ public class TimeSeries {
   private final String _id;
   private final Long[] _timeValues;
   private final TimeBuckets _timeBuckets;
-  private final Double[] _values;
+  private final Object[] _values;
   private final List<String> _tagNames;
   private final Object[] _tagValues;
 
-  public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable 
TimeBuckets timeBuckets, Double[] values,
+  // TODO(timeseries): Time series may also benefit from storing 
extremal/outlier value traces, similar to Monarch.
+  // TODO(timeseries): It may make sense to allow types other than Double and 
byte[] arrays.
+  public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable 
TimeBuckets timeBuckets, Object[] values,
       List<String> tagNames, Object[] tagValues) {
+    Preconditions.checkArgument(values instanceof Double[] || values 
instanceof byte[][],
+        "Time Series can only take Double[] or byte[][] values");
     _id = id;
     _timeValues = timeValues;
     _timeBuckets = timeBuckets;
@@ -95,10 +100,18 @@ public class TimeSeries {
     return _timeBuckets;
   }
 
-  public Double[] getValues() {
+  public Object[] getValues() {
     return _values;
   }
 
+  public Double[] getDoubleValues() {
+    return (Double[]) _values;
+  }
+
+  public byte[][] getBytesValues() {
+    return (byte[][]) _values;
+  }
+
   public List<String> getTagNames() {
     return _tagNames;
   }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
index 011cb6fbc6..d326ed49b5 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -44,7 +44,7 @@ public class LeafTimeSeriesPlanNodeTest {
     {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
-              new AggInfo("SUM", null), Collections.singletonList("cityName"));
+              new AggInfo("SUM", false, null), 
Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + 
expectedEndTimeInFilter);
     }
@@ -52,7 +52,7 @@ public class LeafTimeSeriesPlanNodeTest {
     {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
-              new AggInfo("SUM", null), Collections.singletonList("cityName"));
+              new AggInfo("SUM", false, null), 
Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime 
<= " + (expectedEndTimeInFilter - 123));
     }
@@ -60,7 +60,7 @@ public class LeafTimeSeriesPlanNodeTest {
     {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
-              "value_col", new AggInfo("SUM", null), 
Collections.singletonList("cityName"));
+              "value_col", new AggInfo("SUM", false, Collections.emptyMap()), 
Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", 
nonEmptyFilter,
               (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 
123)));
@@ -69,7 +69,8 @@ public class LeafTimeSeriesPlanNodeTest {
     {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
-              nonEmptyFilter, "value_col", new AggInfo("SUM", null), 
Collections.singletonList("cityName"));
+              nonEmptyFilter, "value_col", new AggInfo("SUM", false, 
Collections.emptyMap()),
+              Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", 
nonEmptyFilter,
               (expectedStartTimeInFilter * 1000 - 123 * 1000), 
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
index 4bd5c37a5a..71bf2323fd 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -28,6 +28,7 @@ import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
@@ -40,7 +41,7 @@ public class TimeSeriesPlanSerdeTest {
 
     LeafTimeSeriesPlanNode leafTimeSeriesPlanNode =
         new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", 
"myTimeColumn", TimeUnit.MILLISECONDS, 0L,
-            "myFilterExpression", "myValueExpression", new AggInfo("SUM", 
aggParams), new ArrayList<>());
+            "myFilterExpression", "myValueExpression", new AggInfo("SUM", 
false, aggParams), new ArrayList<>());
     BaseTimeSeriesPlanNode planNode =
         
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode));
     assertTrue(planNode instanceof LeafTimeSeriesPlanNode);
@@ -52,6 +53,7 @@ public class TimeSeriesPlanSerdeTest {
     assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression");
     assertEquals(deserializedNode.getValueExpression(), "myValueExpression");
     assertNotNull(deserializedNode.getAggInfo());
+    assertFalse(deserializedNode.getAggInfo().getIsPartial());
     assertNotNull(deserializedNode.getAggInfo().getParams());
     assertEquals(deserializedNode.getAggInfo().getParams().get("window"), 
"5m");
     assertEquals(deserializedNode.getGroupByExpressions().size(), 0);
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java
new file mode 100644
index 0000000000..db651785e8
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.time.Duration;
+import java.util.Collections;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesTest {
+  private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(100, 
Duration.ofSeconds(10), 10);
+
+  @Test
+  public void testTimeSeriesAcceptsDoubleValues() {
+    Double[] values = new Double[10];
+    TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, 
values, Collections.emptyList(),
+        new Object[0]);
+    assertEquals(timeSeries.getDoubleValues(), values);
+  }
+
+  @Test
+  public void testTimeSeriesAcceptsBytesValues() {
+    byte[][] byteValues = new byte[10][1231];
+    TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, 
byteValues, Collections.emptyList(),
+        new Object[0]);
+    assertEquals(timeSeries.getBytesValues(), byteValues);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testTimeSeriesDeniesWhenValuesNotDoubleOrBytes() {
+    Object[] someValues = new Long[10];
+    TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, 
someValues, Collections.emptyList(),
+        new Object[0]);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to