This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch pushdown_topk_filter in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 91205ed8e341907b89a63a98f52d2d025b1d77c6 Author: Xiaohui Sun <xh...@xhsun-mn1.linkedin.biz> AuthorDate: Wed May 6 14:03:55 2020 -0700 [TE] Push down top k filter to data provider --- .../pinot/thirdeye/detection/DataProvider.java | 3 +- .../thirdeye/detection/DefaultDataProvider.java | 10 +--- .../detection/DefaultInputDataFetcher.java | 2 +- .../detection/StaticDetectionPipeline.java | 2 +- .../algorithm/BaselineRuleFilterWrapper.java | 4 +- .../detection/algorithm/DimensionWrapper.java | 3 +- .../detection/algorithm/LegacyMergeWrapper.java | 2 +- .../algorithm/ThresholdRuleFilterWrapper.java | 2 +- .../pinot/thirdeye/detection/DataProviderTest.java | 64 ++++++++++++++++++++-- .../pinot/thirdeye/detection/MockDataProvider.java | 2 +- 10 files changed, 74 insertions(+), 20 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java index 5d1361d..4bd0ab3 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java @@ -69,9 +69,10 @@ public interface DataProvider { * * @param slices metric slices * @param dimensions dimensions to group by + * @param limit max number of records to return. No limitation if it is a non-positive number. * @return map of aggregation values (keyed by slice) */ - Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, List<String> dimensions); + Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, List<String> dimensions, int limit); /** * Returns a multimap of anomalies (keyed by slice) for a given set of slices. diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java index d24fbe3..4d82342 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java @@ -118,16 +118,12 @@ public class DefaultDataProvider implements DataProvider { } @Override - public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions) { + public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions, int limit) { try { Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>(); for (final MetricSlice slice : slices) { - futures.put(slice, this.executor.submit(new Callable<DataFrame>() { - @Override - public DataFrame call() throws Exception { - return DefaultDataProvider.this.aggregationLoader.loadAggregate(slice, dimensions, -1); - } - })); + futures.put(slice, this.executor.submit( + () -> DefaultDataProvider.this.aggregationLoader.loadAggregate(slice, dimensions, limit))); } final long deadline = System.currentTimeMillis() + TIMEOUT; diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java index 2dd1efc..c5072a9 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java @@ -57,7 +57,7 @@ public class DefaultInputDataFetcher implements InputDataFetcher { */ public InputData fetchData(InputDataSpec inputDataSpec) { Map<MetricSlice, DataFrame> timeseries = provider.fetchTimeseries(inputDataSpec.getTimeseriesSlices()); - Map<MetricSlice, DataFrame> aggregates = provider.fetchAggregates(inputDataSpec.getAggregateSlices(), Collections.<String>emptyList()); + Map<MetricSlice, DataFrame> aggregates = provider.fetchAggregates(inputDataSpec.getAggregateSlices(), Collections.<String>emptyList(), -1); Collection<AnomalySlice> slicesWithConfigId = new HashSet<>(); for (AnomalySlice slice : inputDataSpec.getAnomalySlices()) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java index d068685..42dea97 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java @@ -81,7 +81,7 @@ public abstract class StaticDetectionPipeline extends DetectionPipeline { public final DetectionPipelineResult run() throws Exception { InputDataSpec dataSpec = this.getInputDataSpec(); Map<MetricSlice, DataFrame> timeseries = this.provider.fetchTimeseries(dataSpec.getTimeseriesSlices()); - Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(dataSpec.getAggregateSlices(), Collections.<String>emptyList()); + Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(dataSpec.getAggregateSlices(), Collections.<String>emptyList(), -1); Collection<AnomalySlice> updatedSlices = new HashSet<>(); for (AnomalySlice slice : dataSpec.getAnomalySlices()) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java index 686bd30..ea43d58 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java @@ -87,7 +87,7 @@ public class BaselineRuleFilterWrapper extends RuleBasedFilterWrapper { MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters()); MetricSlice baselineSlice = this.baseline.scatter(currentSlice).get(0); - Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Arrays.asList(currentSlice, baselineSlice), Collections.<String>emptyList()); + Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Arrays.asList(currentSlice, baselineSlice), Collections.<String>emptyList(), -1); double currentValue = getValueFromAggregates(currentSlice, aggregates); double baselineValue = getValueFromAggregates(baselineSlice, aggregates); if (!Double.isNaN(this.difference) && Math.abs(currentValue - baselineValue) < this.difference) { @@ -102,7 +102,7 @@ public class BaselineRuleFilterWrapper extends RuleBasedFilterWrapper { MetricSlice siteWideSlice = this.baseline.scatter( MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters())).get(0); double siteWideBaselineValue = getValueFromAggregates(siteWideSlice, - this.provider.fetchAggregates(Collections.singleton(siteWideSlice), Collections.<String>emptyList())); + this.provider.fetchAggregates(Collections.singleton(siteWideSlice), Collections.<String>emptyList(), -1)); if (siteWideBaselineValue != 0 && (Math.abs(currentValue - baselineValue) / siteWideBaselineValue) < this.siteWideImpactThreshold) { return false; diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java index 8eeb7fb..757f4f8 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java @@ -170,7 +170,8 @@ public class DimensionWrapper extends DetectionPipeline { MetricEntity metric = MetricEntity.fromURN(this.metricUrn); MetricSlice slice = MetricSlice.from(metric.getId(), this.start.getMillis(), this.end.getMillis(), metric.getFilters()); - DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions).get(slice); + // Here we only pull the top k records, this is safe since the result is sorted by default in Pinot + DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions, this.k).get(slice); if (aggregates.isEmpty()) { return nestedMetrics; diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java index 48473b7..7d3435e 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java @@ -256,7 +256,7 @@ public class LegacyMergeWrapper extends DetectionPipeline { if (!StringUtils.isBlank(anomalyFunctionSpec.getGlobalMetric())) { MetricSlice slice = makeGlobalSlice(anomalyFunctionSpec, mergedAnomalyResult); - double valGlobal = this.provider.fetchAggregates(Collections.singleton(slice), Collections.<String>emptyList()).get(slice).getDouble(COL_VALUE, 0); + double valGlobal = this.provider.fetchAggregates(Collections.singleton(slice), Collections.<String>emptyList(), -1).get(slice).getDouble(COL_VALUE, 0); double diffLocal = mergedAnomalyResult.getAvgCurrentVal() - mergedAnomalyResult.getAvgBaselineVal(); mergedAnomalyResult.setImpactToGlobal(diffLocal / valGlobal); diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java index dbc907a..71676de 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java @@ -56,7 +56,7 @@ public class ThresholdRuleFilterWrapper extends RuleBasedFilterWrapper { MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn()); MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters()); - Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singleton(currentSlice), Collections.<String>emptyList()); + Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singleton(currentSlice), Collections.<String>emptyList(), -1); double currentValue = getValueFromAggregates(currentSlice, aggregates); if (!Double.isNaN(this.min) && currentValue < this.min) { return false; diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java index b261c95..26668e1 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java @@ -17,6 +17,8 @@ package org.apache.pinot.thirdeye.detection; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; import java.io.InputStreamReader; @@ -29,9 +31,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.pinot.thirdeye.anomaly.AnomalyType; import org.apache.pinot.thirdeye.dataframe.DataFrame; +import org.apache.pinot.thirdeye.dataframe.util.MetricSlice; import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase; import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager; @@ -45,13 +49,20 @@ import org.apache.pinot.thirdeye.datalayer.dto.EventDTO; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO; import org.apache.pinot.thirdeye.datasource.DAORegistry; +import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry; +import org.apache.pinot.thirdeye.datasource.ThirdEyeDataSource; +import org.apache.pinot.thirdeye.datasource.cache.MetricDataset; import org.apache.pinot.thirdeye.datasource.cache.QueryCache; +import org.apache.pinot.thirdeye.datasource.csv.CSVThirdEyeDataSource; +import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader; +import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader; import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader; import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader; import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder; import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder; import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice; import org.apache.pinot.thirdeye.detection.spi.model.EventSlice; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeMethod; @@ -69,8 +80,9 @@ public class DataProviderTest { private DatasetConfigManager datasetDAO; private EvaluationManager evaluationDAO; private DetectionConfigManager detectionDAO; - private QueryCache cache; + private QueryCache queryCache; private TimeSeriesLoader timeseriesLoader; + private AggregationLoader aggregationLoader; private DataFrame data; @@ -82,6 +94,8 @@ public class DataProviderTest { private List<Long> datasetIds; private List<Long> detectionIds; + private static final MetricDataset METRIC = new MetricDataset("metric", "collection1"); + @BeforeMethod public void beforeMethod() throws Exception { this.testBase = DAOTestBase.getInstance(); @@ -139,12 +153,47 @@ public class DataProviderTest { this.data.addSeries(COL_TIME, this.data.getLongs(COL_TIME).multiply(1000)); } - // loaders - this.timeseriesLoader = new DefaultTimeSeriesLoader(this.metricDAO, this.datasetDAO, this.cache, null); + // register caches + + LoadingCache<String, DatasetConfigDTO> mockDatasetConfigCache = Mockito.mock(LoadingCache.class); + DatasetConfigDTO datasetConfig = this.datasetDAO.findByDataset("myDataset2"); + Mockito.when(mockDatasetConfigCache.get("myDataset2")).thenReturn(datasetConfig); + + + LoadingCache<String, Long> mockDatasetMaxDataTimeCache = Mockito.mock(LoadingCache.class); + Mockito.when(mockDatasetMaxDataTimeCache.get("myDataset2")) + .thenReturn(Long.MAX_VALUE); + + MetricDataset metricDataset = new MetricDataset("myMetric2", "myDataset2"); + LoadingCache<MetricDataset, MetricConfigDTO> mockMetricConfigCache = Mockito.mock(LoadingCache.class); + MetricConfigDTO metricConfig = this.metricDAO.findByMetricAndDataset("myMetric2", "myDataset2"); + Mockito.when(mockMetricConfigCache.get(metricDataset)).thenReturn(metricConfig); + + Map<String, DataFrame> datasets = new HashMap<>(); + datasets.put("myDataset1", data); + datasets.put("myDataset2", data); + + Map<Long, String> id2name = new HashMap<>(); + id2name.put(this.metricIds.get(1), "value"); + Map<String, ThirdEyeDataSource> dataSourceMap = new HashMap<>(); + dataSourceMap.put("myDataSource", CSVThirdEyeDataSource.fromDataFrame(datasets, id2name)); + this.queryCache = new QueryCache(dataSourceMap, Executors.newSingleThreadExecutor()); + + ThirdEyeCacheRegistry cacheRegistry = ThirdEyeCacheRegistry.getInstance(); + cacheRegistry.registerMetricConfigCache(mockMetricConfigCache); + cacheRegistry.registerDatasetConfigCache(mockDatasetConfigCache); + cacheRegistry.registerQueryCache(this.queryCache); + cacheRegistry.registerDatasetMaxDataTimeCache(mockDatasetMaxDataTimeCache); + + // time series loader + this.timeseriesLoader = new DefaultTimeSeriesLoader(this.metricDAO, this.datasetDAO, this.queryCache, null); + + // aggregation loader + this.aggregationLoader = new DefaultAggregationLoader(this.metricDAO, this.datasetDAO, this.queryCache, mockDatasetMaxDataTimeCache); // provider this.provider = new DefaultDataProvider(this.metricDAO, this.datasetDAO, this.eventDAO, this.anomalyDAO, - this.evaluationDAO, this.timeseriesLoader, null, null, + this.evaluationDAO, this.timeseriesLoader, aggregationLoader, null, TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance()); } @@ -179,6 +228,13 @@ public class DataProviderTest { Assert.assertTrue(metrics.contains(makeMetric(this.metricIds.get(2), "myMetric3", "myDataset1"))); } + @Test + public void testFetchAggregation() { + MetricSlice metricSlice = MetricSlice.from(this.metricIds.get(1), 0L, 32400000L, ArrayListMultimap.create()); + Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singletonList(metricSlice), Collections.emptyList(), -1); + Assert.assertEquals(aggregates.keySet().size(), 1); + } + // // datasets // diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java index 364c80c..74fa54d 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java @@ -111,7 +111,7 @@ public class MockDataProvider implements DataProvider { } @Override - public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions) { + public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions, int limit) { Map<MetricSlice, DataFrame> result = new HashMap<>(); for (MetricSlice slice : slices) { List<String> expr = new ArrayList<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org