This is an automated email from the ASF dual-hosted git repository. pratik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b64bd81de1 [timeseries] Part-3.1: Add Support for Partial Aggregate and Complex Intermediate Type (#14631) b64bd81de1 is described below commit b64bd81de15859407f18a802573d667070de8768 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Tue Dec 17 09:45:29 2024 -0600 [timeseries] Part-3.1: Add Support for Partial Aggregate and Complex Intermediate Type (#14631) * [timeseries] Add Support for Partial Aggregation and Complex Intermediate Type * Fix tests + add tests + cleanup * address feedback * add missing todo --- .../response/PinotBrokerTimeSeriesResponse.java | 2 +- .../TimeSeriesAggregationOperatorTest.java | 2 +- .../core/query/executor/QueryExecutorTest.java | 14 +-- .../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java | 3 +- .../tsdb/m3ql/operator/KeepLastValueOperator.java | 2 +- .../tsdb/m3ql/operator/TransformNullOperator.java | 2 +- .../timeseries/serde/TimeSeriesBlockSerde.java | 124 +++++++++++++++++++-- .../PhysicalTimeSeriesServerPlanVisitorTest.java | 2 +- .../TimeSeriesExchangeReceiveOperatorTest.java | 12 +- .../timeseries/serde/TimeSeriesBlockSerdeTest.java | 43 ++++++- .../tsdb/planner/TimeSeriesPlanFragmenter.java | 13 ++- .../java/org/apache/pinot/tsdb/spi/AggInfo.java | 30 ++++- .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 5 + .../tsdb/spi/series/BaseTimeSeriesBuilder.java | 18 +-- .../apache/pinot/tsdb/spi/series/TimeSeries.java | 19 +++- .../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java | 9 +- .../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 4 +- .../pinot/tsdb/spi/series/TimeSeriesTest.java | 54 +++++++++ 18 files changed, 301 insertions(+), 57 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java index 96320b8326..4a1f347d16 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java @@ -118,7 +118,7 @@ public class PinotBrokerTimeSeriesResponse { for (TimeSeries timeSeries : listOfTimeSeries) { Object[][] values = new Object[timeValues.length][]; for (int i = 0; i < timeValues.length; i++) { - Object nullableValue = timeSeries.getValues()[i]; + Object nullableValue = timeSeries.getDoubleValues()[i]; values[i] = new Object[]{timeValues[i], nullableValue == null ? null : nullableValue.toString()}; } result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values)); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java index eea81a4ba1..b6e97c849f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java @@ -44,7 +44,7 @@ public class TimeSeriesAggregationOperatorTest { private static final Random RANDOM = new Random(); private static final String DUMMY_TIME_COLUMN = "someTimeColumn"; private static final String GROUP_BY_COLUMN = "city"; - private static final AggInfo AGG_INFO = new AggInfo("SUM", Collections.emptyMap()); + private static final AggInfo AGG_INFO = new AggInfo("SUM", false, Collections.emptyMap()); private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn"); private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(100), 10); private static final int NUM_DOCS_IN_DUMMY_DATA = 1000; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 4a171128c8..0b59468e0d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -223,7 +223,7 @@ public class QueryExecutorTest { ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null)); + 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", false, Collections.emptyMap())); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList()); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); @@ -232,8 +232,8 @@ public class QueryExecutorTest { TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); assertEquals(timeSeriesBlock.getSeriesMap().size(), 1); - assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]); - assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1], 29885544.0); + assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]); + assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1], 29885544.0); } @Test @@ -242,7 +242,7 @@ public class QueryExecutorTest { ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null)); + 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", false, Collections.emptyMap())); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); @@ -260,7 +260,7 @@ public class QueryExecutorTest { assertFalse(foundNewYork, "Found multiple time-series for New York"); foundNewYork = true; Optional<Double> maxValue = - Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder()); + Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).max(Comparator.naturalOrder()); assertTrue(maxValue.isPresent()); assertEquals(maxValue.get().longValue(), 4L); } @@ -274,7 +274,7 @@ public class QueryExecutorTest { ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null)); + 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", false, Collections.emptyMap())); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); @@ -292,7 +292,7 @@ public class QueryExecutorTest { assertFalse(foundChicago, "Found multiple time-series for Chicago"); foundChicago = true; Optional<Double> minValue = - Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder()); + Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).min(Comparator.naturalOrder()); assertTrue(minValue.isPresent()); assertEquals(minValue.get().longValue(), 0L); } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java index 53844048a7..42515083c0 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java @@ -20,6 +20,7 @@ package org.apache.pinot.tsdb.m3ql; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; @@ -84,7 +85,7 @@ public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner { case "max": Preconditions.checkState(commandId == 1, "Aggregation should be the second command (fetch should be first)"); Preconditions.checkState(aggInfo == null, "Aggregation already set. Only single agg allowed."); - aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), null); + aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), false, Collections.emptyMap()); if (commands.get(commandId).size() > 1) { String[] cols = commands.get(commandId).get(1).split(","); groupByColumns = Stream.of(cols).map(String::trim).collect(Collectors.toList()); diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java index 0330dff13b..cef90b69af 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java @@ -34,7 +34,7 @@ public class KeepLastValueOperator extends BaseTimeSeriesOperator { TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock(); seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> { for (TimeSeries series : unionOfSeries) { - Double[] values = series.getValues(); + Double[] values = series.getDoubleValues(); Double lastValue = null; for (int index = 0; index < values.length; index++) { if (values[index] != null) { diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java index ca971c932c..661e4de498 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java @@ -37,7 +37,7 @@ public class TransformNullOperator extends BaseTimeSeriesOperator { TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock(); seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> { for (TimeSeries series : unionOfSeries) { - Double[] values = series.getValues(); + Double[] values = series.getDoubleValues(); for (int index = 0; index < values.length; index++) { values[index] = values[index] == null ? _defaultValue : values[index]; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java index cdbf668123..5978e29507 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.query.runtime.timeseries.serde; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -29,10 +31,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.datablock.DataBlockBuilder; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.tsdb.spi.TimeBuckets; @@ -51,7 +57,7 @@ import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; * the last column. As an example, consider the following, where FBV represents the first bucket value of TimeBuckets. * <pre> * +-------------+------------+-------------+---------------------------------+ - * | tag-0 | tag-1 | tag-n | values | + * | tag-0 | tag-1 | tag-n | values (String[] or double[]) | * +-------------+------------+-------------+---------------------------------+ * | null | null | null | [FBV, bucketSize, numBuckets] | * +-------------+------------+-------------+---------------------------------+ @@ -74,6 +80,7 @@ public class TimeSeriesBlockSerde { * Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN can help detect divide by 0. * TODO(timeseries): Check if we can get rid of boxed Doubles altogether. */ + private static final String VALUES_COLUMN_NAME = "__ts_serde_values"; private static final double NULL_PLACEHOLDER = Double.MIN_VALUE; private TimeSeriesBlockSerde() { @@ -85,12 +92,13 @@ public class TimeSeriesBlockSerde { TransferableBlock transferableBlock = TransferableBlockUtils.wrap(dataBlock); List<String> tagNames = generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(), "Missing data schema in TransferableBlock")); + final DataSchema dataSchema = transferableBlock.getDataSchema(); List<Object[]> container = transferableBlock.getContainer(); - TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0)); + TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0), dataSchema); Map<Long, List<TimeSeries>> seriesMap = new HashMap<>(); for (int index = 1; index < container.size(); index++) { Object[] row = container.get(index); - TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets); + TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets, dataSchema); long seriesId = Long.parseLong(timeSeries.getId()); seriesMap.computeIfAbsent(seriesId, x -> new ArrayList<>()).add(timeSeries); } @@ -112,17 +120,77 @@ public class TimeSeriesBlockSerde { return DataBlockUtils.toByteString(transferableBlock.getDataBlock()); } + /** + * This method is only used for encoding time-bucket-values to byte arrays, when the TimeSeries value type + * is byte[][]. + */ + @VisibleForTesting + static byte[][] toBytesArray(double[] values) { + byte[][] result = new byte[values.length][8]; + for (int index = 0; index < values.length; index++) { + ByteBuffer byteBuffer = ByteBuffer.wrap(result[index]); + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + byteBuffer.putDouble(values[index]); + } + return result; + } + + /** + * This method is only used for decoding time-bucket-values from byte arrays, when the TimeSeries value type + * is byte[][]. + */ + @VisibleForTesting + static double[] fromBytesArray(byte[][] bytes) { + double[] result = new double[bytes.length]; + for (int index = 0; index < bytes.length; index++) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes[index]); + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + result[index] = byteBuffer.getDouble(); + } + return result; + } + + /** + * Since {@link DataBlockBuilder} does not support {@link ColumnDataType#BYTES_ARRAY}, we have to encode the + * transmitted bytes as Hex to use String[]. + */ + @VisibleForTesting + static String[] encodeAsHex(byte[][] byteValues) { + String[] result = new String[byteValues.length]; + for (int index = 0; index < result.length; index++) { + result[index] = Hex.encodeHexString(byteValues[index]); + } + return result; + } + + /** + * Used for decoding Hex strings. See {@link TimeSeriesBlockSerde#encodeAsHex} for more. + */ + @VisibleForTesting + static byte[][] decodeFromHex(String[] hexEncodedValues) { + byte[][] result = new byte[hexEncodedValues.length][]; + for (int index = 0; index < hexEncodedValues.length; index++) { + try { + result[index] = Hex.decodeHex(hexEncodedValues[index]); + } catch (DecoderException e) { + throw new RuntimeException("Error decoding byte[] value from encoded hex string", e); + } + } + return result; + } + private static DataSchema generateDataSchema(TimeSeriesBlock timeSeriesBlock) { TimeSeries sampledTimeSeries = sampleTimeSeries(timeSeriesBlock).orElse(null); int numTags = sampledTimeSeries == null ? 0 : sampledTimeSeries.getTagNames().size(); ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1]; + final ColumnDataType valueDataType = inferValueDataType(sampledTimeSeries); String[] columnNames = new String[numTags + 1]; for (int tagIndex = 0; tagIndex < numTags; tagIndex++) { columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex); dataTypes[tagIndex] = ColumnDataType.STRING; } - columnNames[numTags] = "__ts_values"; - dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY; + columnNames[numTags] = VALUES_COLUMN_NAME; + dataTypes[numTags] = valueDataType; return new DataSchema(columnNames, dataTypes); } @@ -144,6 +212,14 @@ public class TimeSeriesBlockSerde { return Optional.of(timeSeriesList.get(0)); } + private static ColumnDataType inferValueDataType(@Nullable TimeSeries timeSeries) { + if (timeSeries == null || timeSeries.getValues() instanceof Double[]) { + return ColumnDataType.DOUBLE_ARRAY; + } + // Byte values are encoded as hex array + return ColumnDataType.STRING_ARRAY; + } + private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dataSchema) { int numColumns = dataSchema.getColumnNames().length; Object[] result = new Object[numColumns]; @@ -153,12 +229,27 @@ public class TimeSeriesBlockSerde { double firstBucketValue = timeBuckets.getTimeBuckets()[0]; double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds(); double numBuckets = timeBuckets.getNumBuckets(); - result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds, numBuckets}; + final ColumnDataType valuesDataType = dataSchema.getColumnDataTypes()[numColumns - 1]; + final double[] bucketsEncodedAsDouble = new double[]{firstBucketValue, bucketSizeSeconds, numBuckets}; + if (valuesDataType == ColumnDataType.DOUBLE_ARRAY) { + result[numColumns - 1] = bucketsEncodedAsDouble; + } else { + Preconditions.checkState(valuesDataType == ColumnDataType.STRING_ARRAY, + "Expected bytes_array column type. Found: %s", valuesDataType); + result[numColumns - 1] = encodeAsHex(toBytesArray(bucketsEncodedAsDouble)); + } return result; } - private static TimeBuckets timeBucketsFromRow(Object[] row) { - double[] values = (double[]) row[row.length - 1]; + private static TimeBuckets timeBucketsFromRow(Object[] row, DataSchema dataSchema) { + int numColumns = dataSchema.getColumnDataTypes().length; + double[] values; + if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.STRING_ARRAY) { + byte[][] byteValues = decodeFromHex((String[]) row[row.length - 1]); + values = fromBytesArray(byteValues); + } else { + values = (double[]) row[row.length - 1]; + } long fbv = (long) values[0]; Duration window = Duration.ofSeconds((long) values[1]); int numBuckets = (int) values[2]; @@ -172,14 +263,25 @@ public class TimeSeriesBlockSerde { Object tagValue = timeSeries.getTagValues()[index]; result[index] = tagValue == null ? "null" : tagValue.toString(); } - result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues()); + if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.DOUBLE_ARRAY) { + result[numColumns - 1] = unboxDoubleArray(timeSeries.getDoubleValues()); + } else { + result[numColumns - 1] = encodeAsHex(timeSeries.getBytesValues()); + } return result; } - private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[] row, TimeBuckets timeBuckets) { - Double[] values = boxDoubleArray((double[]) row[row.length - 1]); + private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[] row, TimeBuckets timeBuckets, + DataSchema dataSchema) { + int numColumns = dataSchema.getColumnDataTypes().length; Object[] tagValues = new Object[row.length - 1]; System.arraycopy(row, 0, tagValues, 0, row.length - 1); + Object[] values; + if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.DOUBLE_ARRAY) { + values = boxDoubleArray((double[]) row[row.length - 1]); + } else { + values = decodeFromHex((String[]) row[row.length - 1]); + } return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null, timeBuckets, values, tagNames, tagValues); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java index e85d17cf6c..43b3496dfb 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java @@ -45,7 +45,7 @@ public class PhysicalTimeSeriesServerPlanVisitorTest { final String planId = "id"; final String tableName = "orderTable"; final String timeColumn = "orderTime"; - final AggInfo aggInfo = new AggInfo("SUM", null); + final AggInfo aggInfo = new AggInfo("SUM", false, Collections.emptyMap()); final String filterExpr = "cityName = 'Chicago'"; PhysicalTimeSeriesServerPlanVisitor serverPlanVisitor = new PhysicalTimeSeriesServerPlanVisitor( mock(QueryExecutor.class), mock(ExecutorService.class), mock(ServerMetrics.class)); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java index c9fd929333..5a9079de2c 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java @@ -39,7 +39,7 @@ import static org.testng.Assert.*; public class TimeSeriesExchangeReceiveOperatorTest { private static final int NUM_SERVERS_QUERIED = 3; - private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", null); + private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", false, Collections.emptyMap()); private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(200), 4); private static final List<String> TAG_NAMES = ImmutableList.of("city", "zip"); private static final Object[] CHICAGO_SERIES_VALUES = new Object[]{"Chicago", "60605"}; @@ -65,10 +65,10 @@ public class TimeSeriesExchangeReceiveOperatorTest { assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 1, "Expected 1 series for Chicago"); assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 1 series for SF"); // Ensure Chicago had series addition performed - Double[] chicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues(); + Double[] chicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues(); assertEquals(chicagoSeriesValues, new Double[]{20.0, 20.0, 20.0, 20.0}); // Ensure SF had input series unmodified - Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues(); + Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues(); assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0}); } @@ -89,12 +89,12 @@ public class TimeSeriesExchangeReceiveOperatorTest { assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 2, "Expected 2 series for Chicago"); assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 1 series for SF"); // Ensure Chicago has unmodified series values - Double[] firstChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues(); - Double[] secondChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getValues(); + Double[] firstChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues(); + Double[] secondChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getDoubleValues(); assertEquals(firstChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0}); assertEquals(secondChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0}); // Ensure SF has input unmodified series values - Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues(); + Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues(); assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0}); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java index f08d39ca0a..d488d8fbd0 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java @@ -47,7 +47,7 @@ public class TimeSeriesBlockSerdeTest { // 4. Compare ByteString-1 and ByteString-2. // 5. Compare values of Block-1 and Block-2. List<TimeSeriesBlock> blocks = List.of(buildBlockWithNoTags(), buildBlockWithSingleTag(), - buildBlockWithMultipleTags()); + buildBlockWithMultipleTags(), buildBlockWithByteValues()); for (TimeSeriesBlock block1 : blocks) { // Serialize, deserialize and serialize again ByteString byteString1 = TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1); @@ -61,6 +61,31 @@ public class TimeSeriesBlockSerdeTest { } } + @Test + public void testFromToBytesArray() { + // Encode and decode a double[] array to confirm the values turn out to be the same. + double[][] inputs = new double[][]{ + {131.0, 1.31, 0.0}, + {1.0, 1231.0, 1.0} + }; + for (double[] input : inputs) { + byte[][] encodedBytes = TimeSeriesBlockSerde.toBytesArray(input); + double[] decodedValues = TimeSeriesBlockSerde.fromBytesArray(encodedBytes); + assertEquals(decodedValues, input); + } + } + + @Test + public void testFromToHex() { + byte[][] input = new byte[][]{ + {0x1a}, {0x00}, {0x77}, {Byte.MIN_VALUE}, + {Byte.MAX_VALUE}, {0x13}, {0x19}, {0x77} + }; + String[] encodedValues = TimeSeriesBlockSerde.encodeAsHex(input); + byte[][] decodedValues = TimeSeriesBlockSerde.decodeFromHex(encodedValues); + assertEquals(decodedValues, input); + } + /** * Compares time series blocks in a way which makes it easy to debug test failures when/if they happen in CI. */ @@ -132,4 +157,20 @@ public class TimeSeriesBlockSerdeTest { new Double[]{Double.NaN, -1.0, -1231231.0, 3.14}, tagNames, seriesTwoValues))); return new TimeSeriesBlock(timeBuckets, seriesMap); } + + private static TimeSeriesBlock buildBlockWithByteValues() { + TimeBuckets timeBuckets = TIME_BUCKETS; + // Series are: [cityId=Chicago, zip=60605] and [cityId=San Francisco, zip=94107] + List<String> tagNames = ImmutableList.of("cityId", "zip"); + Object[] seriesOneValues = new Object[]{"Chicago", "60605"}; + Object[] seriesTwoValues = new Object[]{"San Francisco", "94107"}; + long seriesOneHash = TimeSeries.hash(seriesOneValues); + long seriesTwoHash = TimeSeries.hash(seriesTwoValues); + Map<Long, List<TimeSeries>> seriesMap = new HashMap<>(); + seriesMap.put(seriesOneHash, ImmutableList.of(new TimeSeries(Long.toString(seriesOneHash), null, timeBuckets, + new byte[][]{{0x13}, {0x1b}, {0x12}, {0x00}}, tagNames, seriesOneValues))); + seriesMap.put(seriesTwoHash, ImmutableList.of(new TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets, + new byte[][]{{0x00}, {0x00}, {Byte.MIN_VALUE}, {0x7f}}, tagNames, seriesTwoValues))); + return new TimeSeriesBlock(timeBuckets, seriesMap); + } } diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java index 46a3f68c31..32287f4d83 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.tsdb.planner; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; @@ -102,8 +104,15 @@ public class TimeSeriesPlanFragmenter { private static BaseTimeSeriesPlanNode fragmentRecursively(BaseTimeSeriesPlanNode planNode, Context context) { if (planNode instanceof LeafTimeSeriesPlanNode) { LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) planNode; - context._fragments.add(leafNode.withInputs(Collections.emptyList())); - return new TimeSeriesExchangeNode(planNode.getId(), Collections.emptyList(), leafNode.getAggInfo()); + AggInfo currentAggInfo = leafNode.getAggInfo(); + if (currentAggInfo == null) { + context._fragments.add(leafNode.withInputs(Collections.emptyList())); + } else { + Preconditions.checkState(!currentAggInfo.getIsPartial(), + "Leaf node in the logical plan should not have partial agg"); + context._fragments.add(leafNode.withAggInfo(currentAggInfo.withPartialAggregation())); + } + return new TimeSeriesExchangeNode(planNode.getId(), Collections.emptyList(), currentAggInfo); } List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>(); for (BaseTimeSeriesPlanNode input : planNode.getInputs()) { diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java index 0dc3e0502d..33b66bff1f 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.Map; -import javax.annotation.Nullable; /** @@ -41,24 +40,47 @@ import javax.annotation.Nullable; * Example usage: * Map<String, String> params = new HashMap<>(); * params.put("window", "5m"); - * AggInfo aggInfo = new AggInfo("rate", params); + * AggInfo aggInfo = new AggInfo("rate", true, params); */ public class AggInfo { private final String _aggFunction; + /** + * Denotes whether an aggregate is partial or full. When returning the logical plan, language developers must not + * set this to true. This is used during Physical planning, and Pinot may set this to true if the corresponding + * aggregate node is not guaranteed to have the full data. In such cases, the physical plan will always add a + * complimentary full aggregate. + * <p> + * TODO(timeseries): Ideally we should remove this from the logical plan completely. + * </p> + */ + private final boolean _isPartial; private final Map<String, String> _params; @JsonCreator - public AggInfo(@JsonProperty("aggFunction") String aggFunction, - @JsonProperty("params") @Nullable Map<String, String> params) { + public AggInfo(@JsonProperty("aggFunction") String aggFunction, @JsonProperty("isPartial") boolean isPartial, + @JsonProperty("params") Map<String, String> params) { Preconditions.checkNotNull(aggFunction, "Received null aggFunction in AggInfo"); _aggFunction = aggFunction; + _isPartial = isPartial; _params = params != null ? params : Collections.emptyMap(); } + public AggInfo withPartialAggregation() { + return new AggInfo(_aggFunction, true, _params); + } + + public AggInfo withFullAggregation() { + return new AggInfo(_aggFunction, false, _params); + } + public String getAggFunction() { return _aggFunction; } + public boolean getIsPartial() { + return _isPartial; + } + public Map<String, String> getParams() { return Collections.unmodifiableMap(_params); } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java index 1986f4713d..3deb4c68e6 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java @@ -64,6 +64,11 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { _groupByExpressions = groupByExpressions; } + public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) { + return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds, + _filterExpression, _valueExpression, newAggInfo, _groupByExpressions); + } + @Override public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs) { return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds, diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java index 20ac1714a8..9cca55ebcb 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java @@ -19,7 +19,6 @@ package org.apache.pinot.tsdb.spi.series; import java.util.List; -import java.util.Objects; import javax.annotation.Nullable; import org.apache.pinot.tsdb.spi.TimeBuckets; @@ -61,19 +60,14 @@ public abstract class BaseTimeSeriesBuilder { public abstract void addValue(long timeValue, Double value); - public void mergeSeries(TimeSeries series) { - int numDataPoints = series.getValues().length; - Long[] timeValues = Objects.requireNonNull(series.getTimeValues(), - "Cannot merge series: found null timeValues"); - for (int i = 0; i < numDataPoints; i++) { - addValue(timeValues[i], series.getValues()[i]); - } - } - + /** + * Assumes Double[] values and attempts to merge the given series with this builder. Implementations are + * recommended to override this to either optimize, or add bytes[][] values from the input Series. + */ public void mergeAlignedSeries(TimeSeries series) { - int numDataPoints = series.getValues().length; + int numDataPoints = series.getDoubleValues().length; for (int i = 0; i < numDataPoints; i++) { - addValueAtIndex(i, series.getValues()[i]); + addValueAtIndex(i, series.getDoubleValues()[i]); } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java index 55e2a9a730..4a2e452116 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.tsdb.spi.series; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -67,12 +68,16 @@ public class TimeSeries { private final String _id; private final Long[] _timeValues; private final TimeBuckets _timeBuckets; - private final Double[] _values; + private final Object[] _values; private final List<String> _tagNames; private final Object[] _tagValues; - public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, Double[] values, + // TODO(timeseries): Time series may also benefit from storing extremal/outlier value traces, similar to Monarch. + // TODO(timeseries): It may make sense to allow types other than Double and byte[] arrays. + public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, Object[] values, List<String> tagNames, Object[] tagValues) { + Preconditions.checkArgument(values instanceof Double[] || values instanceof byte[][], + "Time Series can only take Double[] or byte[][] values"); _id = id; _timeValues = timeValues; _timeBuckets = timeBuckets; @@ -95,10 +100,18 @@ public class TimeSeries { return _timeBuckets; } - public Double[] getValues() { + public Object[] getValues() { return _values; } + public Double[] getDoubleValues() { + return (Double[]) _values; + } + + public byte[][] getBytesValues() { + return (byte[][]) _values; + } + public List<String> getTagNames() { return _tagNames; } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java index 011cb6fbc6..d326ed49b5 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java @@ -44,7 +44,7 @@ public class LeafTimeSeriesPlanNodeTest { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 0L, "", "value_col", - new AggInfo("SUM", null), Collections.singletonList("cityName")); + new AggInfo("SUM", false, null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter); } @@ -52,7 +52,7 @@ public class LeafTimeSeriesPlanNodeTest { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, "", "value_col", - new AggInfo("SUM", null), Collections.singletonList("cityName")); + new AggInfo("SUM", false, null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123)); } @@ -60,7 +60,7 @@ public class LeafTimeSeriesPlanNodeTest { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter, - "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); + "value_col", new AggInfo("SUM", false, Collections.emptyMap()), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123))); @@ -69,7 +69,8 @@ public class LeafTimeSeriesPlanNodeTest { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TimeUnit.MILLISECONDS, 123L, - nonEmptyFilter, "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); + nonEmptyFilter, "value_col", new AggInfo("SUM", false, Collections.emptyMap()), + Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000))); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java index 4bd5c37a5a..71bf2323fd 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java @@ -28,6 +28,7 @@ import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -40,7 +41,7 @@ public class TimeSeriesPlanSerdeTest { LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, 0L, - "myFilterExpression", "myValueExpression", new AggInfo("SUM", aggParams), new ArrayList<>()); + "myFilterExpression", "myValueExpression", new AggInfo("SUM", false, aggParams), new ArrayList<>()); BaseTimeSeriesPlanNode planNode = TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode)); assertTrue(planNode instanceof LeafTimeSeriesPlanNode); @@ -52,6 +53,7 @@ public class TimeSeriesPlanSerdeTest { assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression"); assertEquals(deserializedNode.getValueExpression(), "myValueExpression"); assertNotNull(deserializedNode.getAggInfo()); + assertFalse(deserializedNode.getAggInfo().getIsPartial()); assertNotNull(deserializedNode.getAggInfo().getParams()); assertEquals(deserializedNode.getAggInfo().getParams().get("window"), "5m"); assertEquals(deserializedNode.getGroupByExpressions().size(), 0); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java new file mode 100644 index 0000000000..db651785e8 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series; + +import java.time.Duration; +import java.util.Collections; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class TimeSeriesTest { + private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(100, Duration.ofSeconds(10), 10); + + @Test + public void testTimeSeriesAcceptsDoubleValues() { + Double[] values = new Double[10]; + TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, values, Collections.emptyList(), + new Object[0]); + assertEquals(timeSeries.getDoubleValues(), values); + } + + @Test + public void testTimeSeriesAcceptsBytesValues() { + byte[][] byteValues = new byte[10][1231]; + TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, byteValues, Collections.emptyList(), + new Object[0]); + assertEquals(timeSeries.getBytesValues(), byteValues); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testTimeSeriesDeniesWhenValuesNotDoubleOrBytes() { + Object[] someValues = new Long[10]; + TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, someValues, Collections.emptyList(), + new Object[0]); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org