This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch pushdown_topk_filter in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5c802425d8abfa064c4c0557f214d5dce05466a9 Author: Xiaohui Sun <xh...@xhsun-mn1.linkedin.biz> AuthorDate: Mon May 11 16:10:50 2020 -0700 add top n support for scv and sql data source --- .../pinot/thirdeye/datasource/sql/SqlUtils.java | 22 ++-- .../pinot/thirdeye/detection/DataProvider.java | 3 +- .../detection/algorithm/DimensionWrapper.java | 9 +- .../thirdeye/datasource/sql/TestSqlUtils.java | 121 +++++++++++++++++++++ .../pinot/thirdeye/detection/DataProviderTest.java | 2 +- .../pinot/thirdeye/detection/MockDataProvider.java | 10 +- 6 files changed, 153 insertions(+), 14 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java index 28f6272..f8b98a2 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java @@ -258,16 +258,17 @@ public class SqlUtils { sb.append(" AND ").append(dimensionWhereClause); } - if (limit <= 0) { - limit = DEFAULT_LIMIT; - } - String groupByClause = getDimensionGroupByClause(groupBy, timeGranularity, dataTimeSpec); if (StringUtils.isNotBlank(groupByClause)) { sb.append(" ").append(groupByClause); - sb.append(" LIMIT " + limit); } + if (limit > 0 ){ + sb.append(" ORDER BY " + getSelectMetricClause(metricConfig, metricFunction) + " DESC"); + } + + limit = limit > 0 ? limit : DEFAULT_LIMIT; + sb.append(" LIMIT " + limit); return sb.toString(); } @@ -290,12 +291,20 @@ public class SqlUtils { } else { //timeFormat case builder.append(dateTimeSpec.getColumnName()).append(", "); } - } + } for (String groupByKey: groupByKeys) { builder.append(groupByKey).append(", "); } + String selectMetricClause = getSelectMetricClause(metricConfig, metricFunction); + builder.append(selectMetricClause); + + return builder.toString(); + } + + private static String getSelectMetricClause(MetricConfigDTO metricConfig, MetricFunction metricFunction) { + StringBuilder builder = new StringBuilder(); String metricName = null; if (metricFunction.getMetricName().equals("*")) { metricName = "*"; @@ -303,7 +312,6 @@ public class SqlUtils { metricName = metricConfig.getName(); } builder.append(convertAggFunction(metricFunction.getFunctionName())).append("(").append(metricName).append(")"); - return builder.toString(); } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java index 4bd0ab3..3ad473c 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java @@ -69,7 +69,8 @@ public interface DataProvider { * * @param slices metric slices * @param dimensions dimensions to group by - * @param limit max number of records to return. No limitation if it is a non-positive number. + * @param limit max number of records to return ordered by metric value + * no limitation if it is a non-positive number * @return map of aggregation values (keyed by slice) */ Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, List<String> dimensions, int limit); diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java index 757f4f8..cc252e8 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java @@ -170,8 +170,13 @@ public class DimensionWrapper extends DetectionPipeline { MetricEntity metric = MetricEntity.fromURN(this.metricUrn); MetricSlice slice = MetricSlice.from(metric.getId(), this.start.getMillis(), this.end.getMillis(), metric.getFilters()); - // Here we only pull the top k records, this is safe since the result is sorted by default in Pinot - DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions, this.k).get(slice); + // We can push down the top k filter if min contribution is not defined. + // Otherwise it is not accurate to calculate the contribution. + int limit = -1; + if (Double.isNaN(this.minContribution) && this.k > 0) { + limit = this.k; + } + DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions, limit).get(slice); if (aggregates.isEmpty()) { return nestedMetrics; diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/sql/TestSqlUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/sql/TestSqlUtils.java new file mode 100644 index 0000000..2106254 --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/sql/TestSqlUtils.java @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-c...@linkedin.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pinot.thirdeye.datasource.sql; + +import com.google.common.cache.LoadingCache; +import com.google.common.collect.HashMultimap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.thirdeye.common.time.TimeGranularity; +import org.apache.pinot.thirdeye.common.time.TimeSpec; +import org.apache.pinot.thirdeye.constant.MetricAggFunction; +import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase; +import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO; +import org.apache.pinot.thirdeye.datasource.DAORegistry; +import org.apache.pinot.thirdeye.datasource.MetricFunction; +import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry; +import org.apache.pinot.thirdeye.datasource.ThirdEyeRequest; +import org.apache.pinot.thirdeye.datasource.cache.MetricDataset; +import org.joda.time.DateTime; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestSqlUtils { + + private final String dataset = "mysql.db.table"; + private final String metric = "metric"; + + private MetricDataset metricDataset; + private MetricFunction metricFunction; + private DAOTestBase daoTestBase; + + @BeforeMethod + public void beforeMethod() throws Exception { + this.daoTestBase = DAOTestBase.getInstance(); + this.metricDataset = new MetricDataset(metric, dataset); + + LoadingCache<String, DatasetConfigDTO> mockDatasetConfigCache = Mockito.mock(LoadingCache.class); + Mockito.when(mockDatasetConfigCache.get(this.dataset)).thenReturn(new DatasetConfigDTO()); + + LoadingCache<MetricDataset, MetricConfigDTO> mockMetricConfigCache = Mockito.mock(LoadingCache.class); + Mockito.when(mockMetricConfigCache.get(this.metricDataset)).thenReturn(new MetricConfigDTO()); + + ThirdEyeCacheRegistry.getInstance().registerDatasetConfigCache(mockDatasetConfigCache); + ThirdEyeCacheRegistry.getInstance().registerMetricConfigCache(mockMetricConfigCache); + + MetricConfigDTO metricConfigDTO = new MetricConfigDTO(); + metricConfigDTO.setDataset(this.dataset); + metricConfigDTO.setName(this.metricDataset.getMetricName()); + metricConfigDTO.setAlias(this.metricDataset.getDataset() + "::" + this.metricDataset.getMetricName()); + + metricFunction = new MetricFunction(); + metricFunction.setDataset(dataset); + metricFunction.setMetricId(1L); + metricFunction.setMetricName(metric); + metricFunction.setFunctionName(MetricAggFunction.SUM); + + DAORegistry.getInstance().getMetricConfigDAO().save(metricConfigDTO); + } + + @AfterMethod + public void afterMethod() { + try { this.daoTestBase.cleanup(); } catch (Exception ignore) {} + } + + @Test + public void testSqlWithExplicitLimit() { + TimeGranularity timeGranularity = new TimeGranularity(1, TimeUnit.DAYS); + + ThirdEyeRequest request = ThirdEyeRequest.newBuilder() + .setDataSource(this.dataset) + .setLimit(100) + .setGroupBy("country") + .setStartTimeInclusive(DateTime.parse("2020-05-01")) + .setEndTimeExclusive(DateTime.parse("2020-05-01")) + .setGroupByTimeGranularity(timeGranularity) + .build(""); + + String timeFormat = TimeSpec.SINCE_EPOCH_FORMAT; + TimeSpec timeSpec = new TimeSpec("date", timeGranularity, timeFormat); + String actualSql = SqlUtils.getSql(request, this.metricFunction, HashMultimap.create(), timeSpec, this.dataset); + String expected = "SELECT date, country, SUM(metric) FROM table WHERE date = 18384 GROUP BY date, country ORDER BY SUM(metric) DESC LIMIT 100"; + Assert.assertEquals(actualSql, expected); + } + + @Test + public void testSqlWithoutExplicitLimit() { + TimeGranularity timeGranularity = new TimeGranularity(1, TimeUnit.DAYS); + + ThirdEyeRequest request = ThirdEyeRequest.newBuilder() + .setDataSource(this.dataset) + .setGroupBy("country") + .setStartTimeInclusive(DateTime.parse("2020-05-01")) + .setEndTimeExclusive(DateTime.parse("2020-05-01")) + .setGroupByTimeGranularity(timeGranularity) + .build(""); + + String timeFormat = TimeSpec.SINCE_EPOCH_FORMAT; + TimeSpec timeSpec = new TimeSpec("date", timeGranularity, timeFormat); + String actual = SqlUtils.getSql(request, this.metricFunction, HashMultimap.create(), timeSpec, this.dataset); + String expected = "SELECT date, country, SUM(metric) FROM table WHERE date = 18384 GROUP BY date, country LIMIT 100000"; + Assert.assertEquals(actual, expected); + } +} diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java index 26668e1..51533f7 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java @@ -231,7 +231,7 @@ public class DataProviderTest { @Test public void testFetchAggregation() { MetricSlice metricSlice = MetricSlice.from(this.metricIds.get(1), 0L, 32400000L, ArrayListMultimap.create()); - Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singletonList(metricSlice), Collections.emptyList(), -1); + Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singletonList(metricSlice), Collections.emptyList(), 1); Assert.assertEquals(aggregates.keySet().size(), 1); } diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java index 74fa54d..0b8f625 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java @@ -124,9 +124,13 @@ public class MockDataProvider implements DataProvider { result.put(slice, this.aggregates.get(slice.withFilters(NO_FILTERS))); } else { - result.put(slice, this.aggregates.get(slice.withFilters(NO_FILTERS)) - .groupByValue(new ArrayList<>(dimensions)).aggregate(expr) - .dropSeries(COL_KEY).setIndex(dimensions)); + DataFrame aggResult = this.aggregates.get(slice.withFilters(NO_FILTERS)) + .groupByValue(new ArrayList<>(dimensions)).aggregate(expr); + + if (limit > 0) { + aggResult = aggResult.sortedBy(COL_VALUE).reverse().head(limit); + } + result.put(slice, aggResult.dropSeries(COL_KEY).setIndex(dimensions)); } } return result; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org