This is an automated email from the ASF dual-hosted git repository. jihao pushed a commit to branch anomalies-pagination in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 2a2be355b869110ebcfcf8e0c8af335a03c39415 Author: Jihao Zhang <jihzh...@linkedin.com> AuthorDate: Wed Jul 22 14:22:33 2020 -0700 new anomalies endpoint --- .../thirdeye/constant/AnomalyFeedbackType.java | 2 +- .../dashboard/ThirdEyeDashboardApplication.java | 2 + .../v2/anomalies/AnomalySearchFilter.java | 114 +++++++++++++++++ .../v2/anomalies/AnomalySearchResource.java | 61 +++++++++ .../resources/v2/anomalies/AnomalySearcher.java | 142 +++++++++++++++++++++ .../datalayer/entity/MergedAnomalyResultIndex.java | 9 ++ .../src/main/resources/schema/create-schema.sql | 1 + 7 files changed, 330 insertions(+), 1 deletion(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java index c6c7bfa..93675dd 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java @@ -47,4 +47,4 @@ public enum AnomalyFeedbackType { public boolean isUnresolved() { return this.equals(NO_FEEDBACK); } -} +} \ No newline at end of file diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java index c3decbc..20e547a 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java @@ -59,6 +59,7 @@ import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseMetricResource; import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseResource; import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseSessionResource; import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource; +import org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies.AnomalySearchResource; import org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.DefaultEntityFormatter; import org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.FormatterLoader; import org.apache.pinot.thirdeye.dataset.DatasetAutoOnboardResource; @@ -188,6 +189,7 @@ public class ThirdEyeDashboardApplication config.getAlertOnboardingPermitPerSecond())); env.jersey().register(new SqlDataSourceResource()); env.jersey().register(new AlertResource()); + env.jersey().register(new AnomalySearchResource()); TimeSeriesLoader timeSeriesLoader = new DefaultTimeSeriesLoader( DAO_REGISTRY.getMetricConfigDAO(), DAO_REGISTRY.getDatasetConfigDAO(), diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java new file mode 100644 index 0000000..e995e5a --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java @@ -0,0 +1,114 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import java.util.List; + + +/** + * The type Anomaly search filter. + */ +public class AnomalySearchFilter { + private final List<String> feedbacks; + private final List<String> subscriptionGroups; + private final List<String> detectionNames; + private final List<String> metrics; + private final List<String> datasets; + private final List<Long> anomalyIds; + private final Long startTime; + private final Long endTime; + + /** + * Instantiates a new Anomaly search filter. + * + * @param startTime the start time + * @param endTime the end time + * @param feedbacks the feedbacks + * @param subscriptionGroups the subscription groups + * @param detectionNames the detection names + * @param metrics the metrics + * @param datasets the datasets + * @param anomalyIds the anomaly ids + */ + public AnomalySearchFilter(Long startTime, Long endTime, List<String> feedbacks, List<String> subscriptionGroups, List<String> detectionNames, + List<String> metrics, List<String> datasets, List<Long> anomalyIds) { + this.feedbacks = feedbacks; + this.subscriptionGroups = subscriptionGroups; + this.detectionNames = detectionNames; + this.metrics = metrics; + this.datasets = datasets; + this.startTime = startTime; + this.endTime = endTime; + this.anomalyIds = anomalyIds; + } + + /** + * Gets start time. + * + * @return the start time + */ + public Long getStartTime() { + return startTime; + } + + /** + * Gets end time. + * + * @return the end time + */ + public Long getEndTime() { + return endTime; + } + + /** + * Gets feedbacks. + * + * @return the feedbacks + */ + public List<String> getFeedbacks() { + return feedbacks; + } + + /** + * Gets subscription groups. + * + * @return the subscription groups + */ + public List<String> getSubscriptionGroups() { + return subscriptionGroups; + } + + /** + * Gets detection names. + * + * @return the detection names + */ + public List<String> getDetectionNames() { + return detectionNames; + } + + /** + * Gets metrics. + * + * @return the metrics + */ + public List<String> getMetrics() { + return metrics; + } + + /** + * Gets datasets. + * + * @return the datasets + */ + public List<String> getDatasets() { + return datasets; + } + + /** + * Gets anomaly ids. + * + * @return the anomaly ids + */ + public List<Long> getAnomalyIds() { + return anomalyIds; + } +} diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java new file mode 100644 index 0000000..81b51a9 --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java @@ -0,0 +1,61 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import java.util.List; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.thirdeye.api.Constants; + + +/** + * The type Anomaly search resource. + */ +@Path(value = "/anomaly-search") +@Produces(MediaType.APPLICATION_JSON) +@Api(tags = {Constants.DETECTION_TAG}) +public class AnomalySearchResource { + + private final AnomalySearcher anomalySearcher; + + /** + * Instantiates a new Anomaly search resource. + */ + public AnomalySearchResource() { + this.anomalySearcher = new AnomalySearcher(); + } + + /** + * Search and paginate the anomalies according to the parameters. + * + * @param limit the limit + * @param offset the offset + * @param startTime the start time + * @param endTime the end time + * @param feedbacks the feedback types, e.g. ANOMALY, NOT_ANOMALY + * @param subscriptionGroups the subscription groups + * @param detectionNames the detection names + * @param metrics the metrics + * @param datasets the datasets + * @param anomalyIds the anomaly ids + * @return the response + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation("Search and paginate anomalies according to the parameters") + public Response findAlerts(@QueryParam("limit") @DefaultValue("10") int limit, + @QueryParam("offset") @DefaultValue("0") int offset, @QueryParam("startTime") Long startTime, + @QueryParam("endTime") Long endTime, @QueryParam("feedbackStatus") List<String> feedbacks, + @QueryParam("subscriptionGroup") List<String> subscriptionGroups, + @QueryParam("detectionName") List<String> detectionNames, @QueryParam("metric") List<String> metrics, + @QueryParam("dataset") List<String> datasets, @QueryParam("anomalyId") List<Long> anomalyIds) { + AnomalySearchFilter searchFilter = + new AnomalySearchFilter(startTime, endTime, feedbacks, subscriptionGroups, detectionNames, metrics, datasets, anomalyIds); + return Response.ok().entity(this.anomalySearcher.search(searchFilter, limit, offset)).build(); + } +} diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java new file mode 100644 index 0000000..463cbe3 --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java @@ -0,0 +1,142 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType; +import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager; +import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager; +import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager; +import org.apache.pinot.thirdeye.datalayer.dto.AbstractDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.datalayer.pojo.DetectionConfigBean; +import org.apache.pinot.thirdeye.datalayer.util.Predicate; +import org.apache.pinot.thirdeye.datasource.DAORegistry; + +import static org.apache.pinot.thirdeye.constant.AnomalyFeedbackType.*; + + +/** + * The type Anomaly searcher. + */ +public class AnomalySearcher { + private final MergedAnomalyResultManager anomalyDAO; + private final DetectionConfigManager detectionConfigDAO; + private final DetectionAlertConfigManager detectionAlertConfigDAO; + + /** + * Instantiates a new Anomaly searcher. + */ + public AnomalySearcher() { + this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO(); + this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager(); + this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager(); + } + + /** + * Search and retrieve all the anomalies matching to the search filter and limits. + * + * @param searchFilter the search filter + * @param limit the limit + * @param offset the offset + * @return the result + */ + public Map<String, Object> search(AnomalySearchFilter searchFilter, int limit, int offset) { + Predicate predicate = Predicate.EQ("child", false); + if (searchFilter.getStartTime() != null) { + predicate = Predicate.AND(predicate, Predicate.LT("startTime", searchFilter.getEndTime())); + } + if (searchFilter.getEndTime() != null) { + predicate = Predicate.AND(predicate, Predicate.GT("endTime", searchFilter.getStartTime())); + } + // search by detections or subscription groups + Set<Long> detectionConfigIds = new HashSet<>(); + Set<Long> subscribedDetectionConfigIds = new HashSet<>(); + if (!searchFilter.getDetectionNames().isEmpty()) { + detectionConfigIds = + this.detectionConfigDAO.findByPredicate(Predicate.IN("name", searchFilter.getDetectionNames().toArray())) + .stream() + .map(DetectionConfigBean::getId) + .collect(Collectors.toSet()); + } + if (!searchFilter.getSubscriptionGroups().isEmpty()) { + subscribedDetectionConfigIds = this.detectionAlertConfigDAO.findByPredicate( + Predicate.IN("name", searchFilter.getSubscriptionGroups().toArray())) + .stream() + .map(detectionAlertConfigDTO -> detectionAlertConfigDTO.getVectorClocks().keySet()) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + if (!searchFilter.getDetectionNames().isEmpty() && !searchFilter.getSubscriptionGroups().isEmpty()) { + // intersect the detection config ids if searching by both + detectionConfigIds.retainAll(subscribedDetectionConfigIds); + } else { + detectionConfigIds.addAll(subscribedDetectionConfigIds); + } + if (!searchFilter.getDetectionNames().isEmpty() || !searchFilter.getSubscriptionGroups().isEmpty()) { + // add the predicate using detection config id + if (detectionConfigIds.isEmpty()) { + // if detection not found, return empty result + return ImmutableMap.of("count", 0, "limit", limit, "offset", offset, "elements", Collections.emptyList()); + } + predicate = Predicate.AND(predicate, Predicate.IN("detectionConfigId", detectionConfigIds.toArray())); + } + + // search by datasets + if (!searchFilter.getDatasets().isEmpty()) { + List<Predicate> datasetPredicates = new ArrayList<>(); + for (String dataset : searchFilter.getDatasets()) { + datasetPredicates.add(Predicate.LIKE("collection", "%" + dataset + "%")); + } + predicate = Predicate.AND(predicate, Predicate.OR(datasetPredicates.toArray(new Predicate[0]))); + } + // search by metrics + if (!searchFilter.getMetrics().isEmpty()) { + predicate = Predicate.AND(predicate, Predicate.IN("metric", searchFilter.getMetrics().toArray())); + } + // search by ids + if (!searchFilter.getAnomalyIds().isEmpty()) { + predicate = Predicate.AND(predicate, Predicate.IN("baseId", searchFilter.getAnomalyIds().toArray())); + } + + long count; + List<MergedAnomalyResultDTO> results; + if (searchFilter.getFeedbacks().isEmpty()) { + List<Long> anomalyIds = this.anomalyDAO.findIdsByPredicate(predicate) + .stream() + .sorted(Comparator.reverseOrder()) + .collect(Collectors.toList()); + count = anomalyIds.size(); + results = anomalyIds.isEmpty() ? Collections.emptyList() + : this.anomalyDAO.findByIds(paginateResults(anomalyIds, offset, limit)); + } else { + // filter by feedback types if requested + List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(predicate); + Set<AnomalyFeedbackType> feedbackFilters = + searchFilter.getFeedbacks().stream().map(AnomalyFeedbackType::valueOf).collect(Collectors.toSet()); + results = anomalies.stream() + .filter(anomaly -> (anomaly.getFeedback() == null && feedbackFilters.contains(NO_FEEDBACK)) || ( + anomaly.getFeedback() != null && feedbackFilters.contains(anomaly.getFeedback().getFeedbackType()))) + .sorted(Comparator.comparingLong(AbstractDTO::getId).reversed()) + .collect(Collectors.toList()); + count = results.size(); + results = paginateResults(results, offset, limit); + } + return ImmutableMap.of("count", count, "limit", limit, "offset", offset, "elements", results); + } + + private <T> List<T> paginateResults(List<T> list, int offset, int limit) { + if (offset > list.size()) { + // requested page is out of bound + return Collections.emptyList(); + } + return list.subList(offset, Math.min(offset + limit, list.size())); + } +} diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java index 8e3c768..cfa5c68 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java @@ -35,6 +35,7 @@ public class MergedAnomalyResultIndex extends AbstractIndexEntity { String metric; DimensionMap dimensions; boolean notified; + boolean child; public long getDetectionConfigId() { return detectionConfigId; @@ -115,4 +116,12 @@ public class MergedAnomalyResultIndex extends AbstractIndexEntity { public void setNotified(boolean notified) { this.notified = notified; } + + public boolean isChild() { + return child; + } + + public void setChild(boolean child) { + this.child = child; + } } diff --git a/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql b/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql index c93b922..8454348 100644 --- a/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql +++ b/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql @@ -110,6 +110,7 @@ create table if not exists merged_anomaly_result_index ( base_id bigint(20) not null, create_time timestamp, update_time timestamp default current_timestamp, + child boolean, version int(10) ) ENGINE=InnoDB; create index merged_anomaly_result_function_idx on merged_anomaly_result_index(function_id); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org