This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f0c6bba73c [timeseries] Response Size Limit, Metrics and Series Limit 
(#14501)
f0c6bba73c is described below

commit f0c6bba73c0e4de43115d69ffc4ffdc010848fee
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Fri Nov 22 13:45:41 2024 -0600

    [timeseries] Response Size Limit, Metrics and Series Limit (#14501)
---
 .../requesthandler/TimeSeriesRequestHandler.java   |  38 +++++---
 .../apache/pinot/common/metrics/BrokerMeter.java   |   8 ++
 .../blocks/results/TimeSeriesResultsBlock.java     |  14 ++-
 .../merger/TimeSeriesAggResultsBlockMerger.java    |  13 +++
 .../timeseries/TimeSeriesAggregationOperator.java  |  25 ++++-
 .../apache/pinot/core/plan/TimeSeriesPlanNode.java |   3 +-
 .../TimeSeriesAggregationOperatorTest.java         | 103 ++++++++++++++++++++-
 .../runtime/timeseries/LeafTimeSeriesOperator.java |   4 +
 .../timeseries/TimeSeriesDispatchClient.java       |   7 +-
 .../spi/series/SimpleTimeSeriesBuilderFactory.java |  19 ++++
 .../tsdb/spi/series/TimeSeriesBuilderFactory.java  |  14 +++
 11 files changed, 221 insertions(+), 27 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 1773fca957..52cf63f562 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
 import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,8 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.common.utils.HumanReadableDuration;
@@ -90,19 +93,32 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
   @Override
   public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
       RequestContext requestContext) {
-    requestContext.setBrokerId(_brokerId);
-    requestContext.setRequestId(_requestIdGenerator.get());
-    RangeTimeSeriesRequest timeSeriesRequest = null;
+    PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
+    long queryStartTime = System.currentTimeMillis();
     try {
-      timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString);
-    } catch (URISyntaxException e) {
-      throw new RuntimeException(e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1);
+      requestContext.setBrokerId(_brokerId);
+      requestContext.setRequestId(_requestIdGenerator.get());
+      RangeTimeSeriesRequest timeSeriesRequest = null;
+      try {
+        timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString);
+      } catch (URISyntaxException e) {
+        return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST", 
"Error building RangeTimeSeriesRequest");
+      }
+      TimeSeriesLogicalPlanResult logicalPlanResult = 
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
+      TimeSeriesDispatchablePlan dispatchablePlan =
+          _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, 
requestContext, logicalPlanResult);
+      timeSeriesResponse = _queryDispatcher.submitAndGet(requestContext, 
dispatchablePlan,
+          timeSeriesRequest.getTimeout().toMillis(), new HashMap<>());
+      return timeSeriesResponse;
+    } finally {
+      _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, 
System.currentTimeMillis() - queryStartTime,
+          TimeUnit.MILLISECONDS);
+      if (timeSeriesResponse == null
+          || 
timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS))
 {
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
 1);
+      }
     }
-    TimeSeriesLogicalPlanResult logicalPlanResult = 
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
-    TimeSeriesDispatchablePlan dispatchablePlan = 
_queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext,
-        logicalPlanResult);
-    return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, 
timeSeriesRequest.getTimeout().toMillis(),
-        new HashMap<>());
   }
 
   @Override
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 086d5ffd9e..c36b4ab504 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -60,6 +60,14 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
    * Number of single-stage queries executed that would not have successfully 
run on the multi-stage query engine as is.
    */
   SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true),
+  /**
+   * Number of time-series queries. This metric is not grouped on the table 
name.
+   */
+  TIME_SERIES_GLOBAL_QUERIES("queries", true),
+  /**
+   * Number of time-series queries that failed. This metric is not grouped on 
the table name.
+   */
+  TIME_SERIES_GLOBAL_QUERIES_FAILED("queries", true),
   // These metrics track the exceptions caught during query execution in 
broker side.
   // Query rejected by Jersey thread pool executor
   QUERY_REJECTED_EXCEPTIONS("exceptions", true),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
index 30a66bd624..f8e7fac944 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
@@ -27,6 +27,7 @@ import 
org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
 
+// TODO(timeseries): Implement unsupported functions when merging with MSE.
 public class TimeSeriesResultsBlock extends BaseResultsBlock {
   private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock;
 
@@ -36,34 +37,31 @@ public class TimeSeriesResultsBlock extends 
BaseResultsBlock {
 
   @Override
   public int getNumRows() {
-    // TODO: Unused right now.
-    return 0;
+    return _timeSeriesBuilderBlock.getSeriesBuilderMap().size();
   }
 
   @Nullable
   @Override
   public QueryContext getQueryContext() {
-    // TODO: Unused right now.
-    return null;
+    throw new UnsupportedOperationException("Time series results block does 
not support getting QueryContext yet");
   }
 
   @Nullable
   @Override
   public DataSchema getDataSchema() {
-    // TODO: Unused right now.
-    return null;
+    throw new UnsupportedOperationException("Time series results block does 
not support getting DataSchema yet");
   }
 
   @Nullable
   @Override
   public List<Object[]> getRows() {
-    return null;
+    throw new UnsupportedOperationException("Time series results block does 
not support getRows yet");
   }
 
   @Override
   public DataTable getDataTable()
       throws IOException {
-    return null;
+    throw new UnsupportedOperationException("Time series results block does 
not support returning DataTable");
   }
 
   public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
index 17f22a1737..428cfde555 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import com.google.common.base.Preconditions;
 import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
 import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
 import org.apache.pinot.tsdb.spi.AggInfo;
@@ -28,10 +29,14 @@ import 
org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
 public class TimeSeriesAggResultsBlockMerger implements 
ResultsBlockMerger<TimeSeriesResultsBlock> {
   private final TimeSeriesBuilderFactory _seriesBuilderFactory;
   private final AggInfo _aggInfo;
+  private final int _maxSeriesLimit;
+  private final long _maxDataPointsLimit;
 
   public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory 
seriesBuilderFactory, AggInfo aggInfo) {
     _seriesBuilderFactory = seriesBuilderFactory;
     _aggInfo = aggInfo;
+    _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
+    _maxDataPointsLimit = 
_seriesBuilderFactory.getMaxDataPointsPerServerLimit();
   }
 
   @Override
@@ -44,6 +49,14 @@ public class TimeSeriesAggResultsBlockMerger implements 
ResultsBlockMerger<TimeS
       BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue();
       if (currentTimeSeriesBuilder == null) {
         currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, 
newTimeSeriesToMerge);
+        final long currentUniqueSeries = 
currentTimeSeriesBlock.getSeriesBuilderMap().size();
+        final long numBuckets = 
currentTimeSeriesBlock.getTimeBuckets().getNumBuckets();
+        Preconditions.checkState(currentUniqueSeries * numBuckets <= 
_maxDataPointsLimit,
+            "Max data points limit reached in combine operator. Limit: %s. 
Current count: %s",
+            _maxDataPointsLimit, currentUniqueSeries * numBuckets);
+        Preconditions.checkState(currentUniqueSeries <= _maxSeriesLimit,
+            "Max series limit reached in combine operator. Limit: %s. Current 
Size: %s",
+            _maxSeriesLimit, 
currentTimeSeriesBlock.getSeriesBuilderMap().size());
       } else {
         
currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge);
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index fd895a4607..a39c996f4f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.timeseries;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.time.Duration;
 import java.util.HashMap;
@@ -37,6 +38,7 @@ import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
@@ -59,6 +61,10 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   private final BaseProjectOperator<? extends ValueBlock> _projectOperator;
   private final TimeBuckets _timeBuckets;
   private final TimeSeriesBuilderFactory _seriesBuilderFactory;
+  private final int _maxSeriesLimit;
+  private final long _maxDataPointsLimit;
+  private final long _numTotalDocs;
+  private long _numDocsScanned = 0;
 
   public TimeSeriesAggregationOperator(
       String timeColumn,
@@ -69,7 +75,8 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
       List<String> groupByExpressions,
       TimeBuckets timeBuckets,
       BaseProjectOperator<? extends ValueBlock> projectOperator,
-      TimeSeriesBuilderFactory seriesBuilderFactory) {
+      TimeSeriesBuilderFactory seriesBuilderFactory,
+      SegmentMetadata segmentMetadata) {
     _timeColumn = timeColumn;
     _storedTimeUnit = timeUnit;
     _timeOffset = timeOffsetSeconds == null ? 0L : 
timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
@@ -79,6 +86,9 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
     _projectOperator = projectOperator;
     _timeBuckets = timeBuckets;
     _seriesBuilderFactory = seriesBuilderFactory;
+    _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
+    _maxDataPointsLimit = 
_seriesBuilderFactory.getMaxDataPointsPerServerLimit();
+    _numTotalDocs = segmentMetadata.getTotalDocs();
   }
 
   @Override
@@ -87,6 +97,7 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
     Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
     while ((valueBlock = _projectOperator.nextBlock()) != null) {
       int numDocs = valueBlock.getNumDocs();
+      _numDocsScanned += numDocs;
       // TODO: This is quite unoptimized and allocates liberally
       BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
       long[] timeValues = blockValSet.getLongValuesSV();
@@ -129,6 +140,12 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
           throw new IllegalStateException(
               "Don't yet support value expression of type: " + 
valueExpressionBlockValSet.getValueType());
       }
+      Preconditions.checkState(seriesBuilderMap.size() * (long) 
_timeBuckets.getNumBuckets() <= _maxDataPointsLimit,
+          "Exceeded max data point limit per server. Limit: %s. Data points in 
current segment so far: %s",
+          _maxDataPointsLimit, seriesBuilderMap.size() * 
_timeBuckets.getNumBuckets());
+      Preconditions.checkState(seriesBuilderMap.size() <= _maxSeriesLimit,
+          "Exceeded max unique series limit per server. Limit: %s. Series in 
current segment so far: %s",
+          _maxSeriesLimit, seriesBuilderMap.size());
     }
     return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, 
seriesBuilderMap));
   }
@@ -147,8 +164,10 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    // TODO: Implement this.
-    return new ExecutionStatistics(0, 0, 0, 0);
+    long numEntriesScannedInFilter = 
_projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = _numDocsScanned * 
_projectOperator.getNumColumnsProjected();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        _numTotalDocs);
   }
 
   @VisibleForTesting
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
index 22e3d7b912..dae0479ebb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
@@ -66,7 +66,8 @@ public class TimeSeriesPlanNode implements PlanNode {
         getGroupByColumns(),
         _timeSeriesContext.getTimeBuckets(),
         projectionOperator,
-        _seriesBuilderFactory);
+        _seriesBuilderFactory,
+        _segmentContext.getIndexSegment().getSegmentMetadata());
   }
 
   private List<ExpressionContext> getProjectPlanNodeExpressions() {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
index 888d0658e9..eea81a4ba1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
@@ -20,11 +20,19 @@ package org.apache.pinot.core.operator.timeseries;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
+import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
 import org.testng.annotations.Test;
 
@@ -33,9 +41,49 @@ import static org.testng.Assert.*;
 
 
 public class TimeSeriesAggregationOperatorTest {
+  private static final Random RANDOM = new Random();
   private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
-  private static final AggInfo AGG_INFO = new AggInfo("", 
Collections.emptyMap());
+  private static final String GROUP_BY_COLUMN = "city";
+  private static final AggInfo AGG_INFO = new AggInfo("SUM", 
Collections.emptyMap());
   private static final ExpressionContext VALUE_EXPRESSION = 
ExpressionContext.forIdentifier("someValueColumn");
+  private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, 
Duration.ofSeconds(100), 10);
+  private static final int NUM_DOCS_IN_DUMMY_DATA = 1000;
+
+  @Test
+  public void testTimeSeriesAggregationOperator() {
+    TimeSeriesAggregationOperator timeSeriesAggregationOperator = 
buildOperatorWithSampleData(
+        new SimpleTimeSeriesBuilderFactory());
+    TimeSeriesResultsBlock resultsBlock = 
timeSeriesAggregationOperator.getNextBlock();
+    // Expect 2 series: Chicago and San Francisco
+    assertNotNull(resultsBlock);
+    assertEquals(2, resultsBlock.getNumRows());
+  }
+
+  @Test
+  public void testTimeSeriesAggregationOperatorWhenSeriesLimit() {
+    // Since we test with 2 series, use 1 as the limit.
+    TimeSeriesAggregationOperator timeSeriesAggregationOperator = 
buildOperatorWithSampleData(
+        new SimpleTimeSeriesBuilderFactory(1, 100_000_000L));
+    try {
+      timeSeriesAggregationOperator.getNextBlock();
+      fail();
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Limit: 1. Series in current"));
+    }
+  }
+
+  @Test
+  public void testTimeSeriesAggregationOperatorWhenDataPoints() {
+    // Since we test with 2 series, use 1 as the limit.
+    TimeSeriesAggregationOperator timeSeriesAggregationOperator = 
buildOperatorWithSampleData(
+        new SimpleTimeSeriesBuilderFactory(1000, 11));
+    try {
+      timeSeriesAggregationOperator.getNextBlock();
+      fail();
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Limit: 11. Data points in current"));
+    }
+  }
 
   @Test
   public void testGetTimeValueIndexForSeconds() {
@@ -88,9 +136,60 @@ public class TimeSeriesAggregationOperatorTest {
     assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time 
index to spill beyond valid range");
   }
 
+  private TimeSeriesAggregationOperator 
buildOperatorWithSampleData(TimeSeriesBuilderFactory seriesBuilderFactory) {
+    BaseProjectOperator<ValueBlock> mockProjectOperator = 
mock(BaseProjectOperator.class);
+    ValueBlock valueBlock = buildValueBlockForProjectOperator();
+    when(mockProjectOperator.nextBlock()).thenReturn(valueBlock, (ValueBlock) 
null);
+    return new TimeSeriesAggregationOperator(DUMMY_TIME_COLUMN,
+        TimeUnit.SECONDS, 0L, AGG_INFO, VALUE_EXPRESSION, 
Collections.singletonList(GROUP_BY_COLUMN),
+        TIME_BUCKETS, mockProjectOperator, seriesBuilderFactory, 
mock(SegmentMetadata.class));
+  }
+
+  private static ValueBlock buildValueBlockForProjectOperator() {
+    ValueBlock valueBlock = mock(ValueBlock.class);
+    doReturn(NUM_DOCS_IN_DUMMY_DATA).when(valueBlock).getNumDocs();
+    
doReturn(buildBlockValSetForTime()).when(valueBlock).getBlockValueSet(DUMMY_TIME_COLUMN);
+    
doReturn(buildBlockValSetForValues()).when(valueBlock).getBlockValueSet(VALUE_EXPRESSION);
+    
doReturn(buildBlockValSetForGroupByColumns()).when(valueBlock).getBlockValueSet(GROUP_BY_COLUMN);
+    return valueBlock;
+  }
+
+  private static BlockValSet buildBlockValSetForGroupByColumns() {
+    BlockValSet blockValSet = mock(BlockValSet.class);
+    String[] stringArray = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
+      stringArray[index] = RANDOM.nextBoolean() ? "Chicago" : "San Francisco";
+    }
+    doReturn(stringArray).when(blockValSet).getStringValuesSV();
+    doReturn(FieldSpec.DataType.STRING).when(blockValSet).getValueType();
+    return blockValSet;
+  }
+
+  private static BlockValSet buildBlockValSetForValues() {
+    BlockValSet blockValSet = mock(BlockValSet.class);
+    long[] valuesArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
+      valuesArray[index] = index;
+    }
+    doReturn(valuesArray).when(blockValSet).getLongValuesSV();
+    doReturn(FieldSpec.DataType.LONG).when(blockValSet).getValueType();
+    return blockValSet;
+  }
+
+  private static BlockValSet buildBlockValSetForTime() {
+    BlockValSet blockValSet = mock(BlockValSet.class);
+    long[] timeValueArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
+      timeValueArray[index] = 901 + RANDOM.nextInt(1000);
+    }
+    doReturn(timeValueArray).when(blockValSet).getLongValuesSV();
+    return blockValSet;
+  }
+
   private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit, 
TimeBuckets timeBuckets) {
     return new TimeSeriesAggregationOperator(
         DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION, 
Collections.emptyList(),
-        timeBuckets, mock(BaseProjectOperator.class), 
mock(TimeSeriesBuilderFactory.class));
+        timeBuckets, mock(BaseProjectOperator.class), 
mock(TimeSeriesBuilderFactory.class),
+        mock(SegmentMetadata.class));
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
index ca119ebf1d..8b577105d3 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -25,6 +25,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -34,6 +35,7 @@ public class LeafTimeSeriesOperator extends 
BaseTimeSeriesOperator {
   private final ServerQueryRequest _request;
   private final QueryExecutor _queryExecutor;
   private final ExecutorService _executorService;
+  private final ServerQueryLogger _queryLogger;
 
   public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, 
QueryExecutor queryExecutor,
       ExecutorService executorService) {
@@ -41,6 +43,7 @@ public class LeafTimeSeriesOperator extends 
BaseTimeSeriesOperator {
     _request = serverQueryRequest;
     _queryExecutor = queryExecutor;
     _executorService = executorService;
+    _queryLogger = ServerQueryLogger.getInstance();
   }
 
   @Override
@@ -48,6 +51,7 @@ public class LeafTimeSeriesOperator extends 
BaseTimeSeriesOperator {
     Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has 
not been initialized");
     InstanceResponseBlock instanceResponseBlock = 
_queryExecutor.execute(_request, _executorService);
     assert instanceResponseBlock.getResultsBlock() instanceof 
TimeSeriesResultsBlock;
+    _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries");
     if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) {
       // TODO: Return error in the TimeSeriesBlock instead?
       String oneException = 
instanceResponseBlock.getExceptions().values().iterator().next();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
index a68e636b96..df77344665 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
@@ -29,16 +29,19 @@ import org.apache.pinot.query.routing.QueryServerInstance;
 
 /**
  * Dispatch client used to dispatch a runnable plan to the server.
- * TODO: This shouldn't exist and we should re-use DispatchClient. TBD as part 
of multi-stage
+ * TODO(timeseries): This shouldn't exist and we should re-use DispatchClient. 
TBD as part of multi-stage
  *   engine integration.
  */
 public class TimeSeriesDispatchClient {
+  // TODO(timeseries): Note that time-series engine at present uses 
QueryServer for data transfer from server to broker.
+  //   This will be fixed as we integrate with MSE.
+  private static final int INBOUND_SIZE_LIMIT = 256 * 1024 * 1024;
   private final ManagedChannel _channel;
   private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
 
   public TimeSeriesDispatchClient(String host, int port) {
     _channel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext().build();
-    _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel);
+    _dispatchStub = 
PinotQueryWorkerGrpc.newStub(_channel).withMaxInboundMessageSize(INBOUND_SIZE_LIMIT);
   }
 
   public ManagedChannel getChannel() {
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
index 9ed40954e5..98882d19a7 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
@@ -28,8 +28,17 @@ import 
org.apache.pinot.tsdb.spi.series.builders.SummingTimeSeriesBuilder;
 
 
 public class SimpleTimeSeriesBuilderFactory extends TimeSeriesBuilderFactory {
+  private final int _maxSeriesLimit;
+  private final long _maxDataPointsLimit;
+
   public SimpleTimeSeriesBuilderFactory() {
+    this(DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT, 
DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT);
+  }
+
+  public SimpleTimeSeriesBuilderFactory(int maxSeriesLimit, long 
maxDataPointsLimit) {
     super();
+    _maxSeriesLimit = maxSeriesLimit;
+    _maxDataPointsLimit = maxDataPointsLimit;
   }
 
   @Override
@@ -50,4 +59,14 @@ public class SimpleTimeSeriesBuilderFactory extends 
TimeSeriesBuilderFactory {
   @Override
   public void init(PinotConfiguration pinotConfiguration) {
   }
+
+  @Override
+  public int getMaxUniqueSeriesPerServerLimit() {
+    return _maxSeriesLimit;
+  }
+
+  @Override
+  public long getMaxDataPointsPerServerLimit() {
+    return _maxDataPointsLimit;
+  }
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
index 088f9b3c85..c48307efea 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
@@ -25,6 +25,12 @@ import org.apache.pinot.tsdb.spi.TimeBuckets;
 
 
 public abstract class TimeSeriesBuilderFactory {
+  protected static final int DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT = 
100_000;
+  /**
+   * Default limit for the total number of values across all series.
+   */
+  protected static final long DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT = 
100_000_000;
+
   public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder(
       AggInfo aggInfo,
       String id,
@@ -32,5 +38,13 @@ public abstract class TimeSeriesBuilderFactory {
       List<String> tagNames,
       Object[] tagValues);
 
+  public int getMaxUniqueSeriesPerServerLimit() {
+    return DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT;
+  }
+
+  public long getMaxDataPointsPerServerLimit() {
+    return DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT;
+  }
+
   public abstract void init(PinotConfiguration pinotConfiguration);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to