bxji commented on a change in pull request #5778: URL: https://github.com/apache/incubator-pinot/pull/5778#discussion_r463314454
########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java ########## @@ -0,0 +1,61 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import java.util.List; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; Review comment: add header ########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java ########## @@ -0,0 +1,132 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import java.util.Collections; +import java.util.List; + + Review comment: add header ########## File path: thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcherTest.java ########## @@ -0,0 +1,68 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; Review comment: header ########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java ########## @@ -0,0 +1,142 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; Review comment: add header ########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java ########## @@ -0,0 +1,142 @@ +package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType; +import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager; +import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager; +import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager; +import org.apache.pinot.thirdeye.datalayer.dto.AbstractDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.datalayer.pojo.DetectionConfigBean; +import org.apache.pinot.thirdeye.datalayer.util.Predicate; +import org.apache.pinot.thirdeye.datasource.DAORegistry; + +import static org.apache.pinot.thirdeye.constant.AnomalyFeedbackType.*; + + +/** + * The type Anomaly searcher. + */ +public class AnomalySearcher { + private final MergedAnomalyResultManager anomalyDAO; + private final DetectionConfigManager detectionConfigDAO; + private final DetectionAlertConfigManager detectionAlertConfigDAO; + + /** + * Instantiates a new Anomaly searcher. + */ + public AnomalySearcher() { + this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO(); + this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager(); + this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager(); + } + + /** + * Search and retrieve all the anomalies matching to the search filter and limits. + * + * @param searchFilter the search filter + * @param limit the limit + * @param offset the offset + * @return the result + */ + public Map<String, Object> search(AnomalySearchFilter searchFilter, int limit, int offset) { + Predicate predicate = Predicate.EQ("child", false); + if (searchFilter.getStartTime() != null) { + predicate = Predicate.AND(predicate, Predicate.LT("startTime", searchFilter.getEndTime())); + } + if (searchFilter.getEndTime() != null) { + predicate = Predicate.AND(predicate, Predicate.GT("endTime", searchFilter.getStartTime())); + } + // search by detections or subscription groups + Set<Long> detectionConfigIds = new HashSet<>(); + Set<Long> subscribedDetectionConfigIds = new HashSet<>(); + if (!searchFilter.getDetectionNames().isEmpty()) { + detectionConfigIds = + this.detectionConfigDAO.findByPredicate(Predicate.IN("name", searchFilter.getDetectionNames().toArray())) + .stream() + .map(DetectionConfigBean::getId) + .collect(Collectors.toSet()); + } + if (!searchFilter.getSubscriptionGroups().isEmpty()) { + subscribedDetectionConfigIds = this.detectionAlertConfigDAO.findByPredicate( + Predicate.IN("name", searchFilter.getSubscriptionGroups().toArray())) + .stream() + .map(detectionAlertConfigDTO -> detectionAlertConfigDTO.getVectorClocks().keySet()) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + if (!searchFilter.getDetectionNames().isEmpty() && !searchFilter.getSubscriptionGroups().isEmpty()) { + // intersect the detection config ids if searching by both + detectionConfigIds.retainAll(subscribedDetectionConfigIds); + } else { + detectionConfigIds.addAll(subscribedDetectionConfigIds); + } + if (!searchFilter.getDetectionNames().isEmpty() || !searchFilter.getSubscriptionGroups().isEmpty()) { + // add the predicate using detection config id + if (detectionConfigIds.isEmpty()) { + // if detection not found, return empty result + return ImmutableMap.of("count", 0, "limit", limit, "offset", offset, "elements", Collections.emptyList()); + } + predicate = Predicate.AND(predicate, Predicate.IN("detectionConfigId", detectionConfigIds.toArray())); + } + + // search by datasets + if (!searchFilter.getDatasets().isEmpty()) { + List<Predicate> datasetPredicates = new ArrayList<>(); + for (String dataset : searchFilter.getDatasets()) { + datasetPredicates.add(Predicate.LIKE("collection", "%" + dataset + "%")); + } + predicate = Predicate.AND(predicate, Predicate.OR(datasetPredicates.toArray(new Predicate[0]))); + } + // search by metrics + if (!searchFilter.getMetrics().isEmpty()) { + predicate = Predicate.AND(predicate, Predicate.IN("metric", searchFilter.getMetrics().toArray())); + } + // search by ids + if (!searchFilter.getAnomalyIds().isEmpty()) { + predicate = Predicate.AND(predicate, Predicate.IN("baseId", searchFilter.getAnomalyIds().toArray())); + } + + long count; + List<MergedAnomalyResultDTO> results; + if (searchFilter.getFeedbacks().isEmpty()) { + List<Long> anomalyIds = this.anomalyDAO.findIdsByPredicate(predicate) + .stream() + .sorted(Comparator.reverseOrder()) + .collect(Collectors.toList()); + count = anomalyIds.size(); + results = anomalyIds.isEmpty() ? Collections.emptyList() + : this.anomalyDAO.findByIds(paginateResults(anomalyIds, offset, limit)); + } else { + // filter by feedback types if requested + List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(predicate); + Set<AnomalyFeedbackType> feedbackFilters = + searchFilter.getFeedbacks().stream().map(AnomalyFeedbackType::valueOf).collect(Collectors.toSet()); + results = anomalies.stream() + .filter(anomaly -> (anomaly.getFeedback() == null && feedbackFilters.contains(NO_FEEDBACK)) || ( + anomaly.getFeedback() != null && feedbackFilters.contains(anomaly.getFeedback().getFeedbackType()))) + .sorted(Comparator.comparingLong(AbstractDTO::getId).reversed()) + .collect(Collectors.toList()); + count = results.size(); + results = paginateResults(results, offset, limit); + } + return ImmutableMap.of("count", count, "limit", limit, "offset", offset, "elements", results); + } + + private <T> List<T> paginateResults(List<T> list, int offset, int limit) { Review comment: Just want to check my understanding; I see that we are starting with offset of 0. So my understanding is that if we have limit 25, the anomalies returned should be 0-24? If so, since we are starting with offset of 0, do we need >= as the equality check here? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org