This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch anomalies-pagination
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 2a2be355b869110ebcfcf8e0c8af335a03c39415
Author: Jihao Zhang <jihzh...@linkedin.com>
AuthorDate: Wed Jul 22 14:22:33 2020 -0700

    new anomalies endpoint
---
 .../thirdeye/constant/AnomalyFeedbackType.java     |   2 +-
 .../dashboard/ThirdEyeDashboardApplication.java    |   2 +
 .../v2/anomalies/AnomalySearchFilter.java          | 114 +++++++++++++++++
 .../v2/anomalies/AnomalySearchResource.java        |  61 +++++++++
 .../resources/v2/anomalies/AnomalySearcher.java    | 142 +++++++++++++++++++++
 .../datalayer/entity/MergedAnomalyResultIndex.java |   9 ++
 .../src/main/resources/schema/create-schema.sql    |   1 +
 7 files changed, 330 insertions(+), 1 deletion(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java
index c6c7bfa..93675dd 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java
@@ -47,4 +47,4 @@ public enum AnomalyFeedbackType {
   public boolean isUnresolved() {
     return this.equals(NO_FEEDBACK);
   }
-}
+}
\ No newline at end of file
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index c3decbc..20e547a 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -59,6 +59,7 @@ import 
org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseMetricResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseResource;
 import 
org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseSessionResource;
 import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import 
org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies.AnomalySearchResource;
 import 
org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.DefaultEntityFormatter;
 import 
org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.FormatterLoader;
 import org.apache.pinot.thirdeye.dataset.DatasetAutoOnboardResource;
@@ -188,6 +189,7 @@ public class ThirdEyeDashboardApplication
         config.getAlertOnboardingPermitPerSecond()));
     env.jersey().register(new SqlDataSourceResource());
     env.jersey().register(new AlertResource());
+    env.jersey().register(new AnomalySearchResource());
 
     TimeSeriesLoader timeSeriesLoader = new DefaultTimeSeriesLoader(
         DAO_REGISTRY.getMetricConfigDAO(), DAO_REGISTRY.getDatasetConfigDAO(),
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java
new file mode 100644
index 0000000..e995e5a
--- /dev/null
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java
@@ -0,0 +1,114 @@
+package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies;
+
+import java.util.List;
+
+
+/**
+ * The type Anomaly search filter.
+ */
+public class AnomalySearchFilter {
+  private final List<String> feedbacks;
+  private final List<String> subscriptionGroups;
+  private final List<String> detectionNames;
+  private final List<String> metrics;
+  private final List<String> datasets;
+  private final List<Long> anomalyIds;
+  private final Long startTime;
+  private final Long endTime;
+
+  /**
+   * Instantiates a new Anomaly search filter.
+   *
+   * @param startTime the start time
+   * @param endTime the end time
+   * @param feedbacks the feedbacks
+   * @param subscriptionGroups the subscription groups
+   * @param detectionNames the detection names
+   * @param metrics the metrics
+   * @param datasets the datasets
+   * @param anomalyIds the anomaly ids
+   */
+  public AnomalySearchFilter(Long startTime, Long endTime, List<String> 
feedbacks, List<String> subscriptionGroups, List<String> detectionNames,
+      List<String> metrics, List<String> datasets, List<Long> anomalyIds) {
+    this.feedbacks = feedbacks;
+    this.subscriptionGroups = subscriptionGroups;
+    this.detectionNames = detectionNames;
+    this.metrics = metrics;
+    this.datasets = datasets;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.anomalyIds = anomalyIds;
+  }
+
+  /**
+   * Gets start time.
+   *
+   * @return the start time
+   */
+  public Long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Gets end time.
+   *
+   * @return the end time
+   */
+  public Long getEndTime() {
+    return endTime;
+  }
+
+  /**
+   * Gets feedbacks.
+   *
+   * @return the feedbacks
+   */
+  public List<String> getFeedbacks() {
+    return feedbacks;
+  }
+
+  /**
+   * Gets subscription groups.
+   *
+   * @return the subscription groups
+   */
+  public List<String> getSubscriptionGroups() {
+    return subscriptionGroups;
+  }
+
+  /**
+   * Gets detection names.
+   *
+   * @return the detection names
+   */
+  public List<String> getDetectionNames() {
+    return detectionNames;
+  }
+
+  /**
+   * Gets metrics.
+   *
+   * @return the metrics
+   */
+  public List<String> getMetrics() {
+    return metrics;
+  }
+
+  /**
+   * Gets datasets.
+   *
+   * @return the datasets
+   */
+  public List<String> getDatasets() {
+    return datasets;
+  }
+
+  /**
+   * Gets anomaly ids.
+   *
+   * @return the anomaly ids
+   */
+  public List<Long> getAnomalyIds() {
+    return anomalyIds;
+  }
+}
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java
new file mode 100644
index 0000000..81b51a9
--- /dev/null
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java
@@ -0,0 +1,61 @@
+package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.thirdeye.api.Constants;
+
+
+/**
+ * The type Anomaly search resource.
+ */
+@Path(value = "/anomaly-search")
+@Produces(MediaType.APPLICATION_JSON)
+@Api(tags = {Constants.DETECTION_TAG})
+public class AnomalySearchResource {
+
+  private final AnomalySearcher anomalySearcher;
+
+  /**
+   * Instantiates a new Anomaly search resource.
+   */
+  public AnomalySearchResource() {
+    this.anomalySearcher = new AnomalySearcher();
+  }
+
+  /**
+   * Search and paginate the anomalies according to the parameters.
+   *
+   * @param limit the limit
+   * @param offset the offset
+   * @param startTime the start time
+   * @param endTime the end time
+   * @param feedbacks the feedback types, e.g. ANOMALY, NOT_ANOMALY
+   * @param subscriptionGroups the subscription groups
+   * @param detectionNames the detection names
+   * @param metrics the metrics
+   * @param datasets the datasets
+   * @param anomalyIds the anomaly ids
+   * @return the response
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation("Search and paginate anomalies according to the parameters")
+  public Response findAlerts(@QueryParam("limit") @DefaultValue("10") int 
limit,
+      @QueryParam("offset") @DefaultValue("0") int offset, 
@QueryParam("startTime") Long startTime,
+      @QueryParam("endTime") Long endTime, @QueryParam("feedbackStatus") 
List<String> feedbacks,
+      @QueryParam("subscriptionGroup") List<String> subscriptionGroups,
+      @QueryParam("detectionName") List<String> detectionNames, 
@QueryParam("metric") List<String> metrics,
+      @QueryParam("dataset") List<String> datasets, @QueryParam("anomalyId") 
List<Long> anomalyIds) {
+    AnomalySearchFilter searchFilter =
+        new AnomalySearchFilter(startTime, endTime, feedbacks, 
subscriptionGroups, detectionNames, metrics, datasets, anomalyIds);
+    return Response.ok().entity(this.anomalySearcher.search(searchFilter, 
limit, offset)).build();
+  }
+}
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java
new file mode 100644
index 0000000..463cbe3
--- /dev/null
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java
@@ -0,0 +1,142 @@
+package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AbstractDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.pojo.DetectionConfigBean;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+
+import static org.apache.pinot.thirdeye.constant.AnomalyFeedbackType.*;
+
+
+/**
+ * The type Anomaly searcher.
+ */
+public class AnomalySearcher {
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DetectionAlertConfigManager detectionAlertConfigDAO;
+
+  /**
+   * Instantiates a new Anomaly searcher.
+   */
+  public AnomalySearcher() {
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.detectionConfigDAO = 
DAORegistry.getInstance().getDetectionConfigManager();
+    this.detectionAlertConfigDAO = 
DAORegistry.getInstance().getDetectionAlertConfigManager();
+  }
+
+  /**
+   * Search and retrieve all the anomalies matching to the search filter and 
limits.
+   *
+   * @param searchFilter the search filter
+   * @param limit the limit
+   * @param offset the offset
+   * @return the result
+   */
+  public Map<String, Object> search(AnomalySearchFilter searchFilter, int 
limit, int offset) {
+    Predicate predicate = Predicate.EQ("child", false);
+    if (searchFilter.getStartTime() != null) {
+      predicate = Predicate.AND(predicate, Predicate.LT("startTime", 
searchFilter.getEndTime()));
+    }
+    if (searchFilter.getEndTime() != null) {
+      predicate = Predicate.AND(predicate, Predicate.GT("endTime", 
searchFilter.getStartTime()));
+    }
+    // search by detections or subscription groups
+    Set<Long> detectionConfigIds = new HashSet<>();
+    Set<Long> subscribedDetectionConfigIds = new HashSet<>();
+    if (!searchFilter.getDetectionNames().isEmpty()) {
+      detectionConfigIds =
+          this.detectionConfigDAO.findByPredicate(Predicate.IN("name", 
searchFilter.getDetectionNames().toArray()))
+              .stream()
+              .map(DetectionConfigBean::getId)
+              .collect(Collectors.toSet());
+    }
+    if (!searchFilter.getSubscriptionGroups().isEmpty()) {
+      subscribedDetectionConfigIds = 
this.detectionAlertConfigDAO.findByPredicate(
+          Predicate.IN("name", searchFilter.getSubscriptionGroups().toArray()))
+          .stream()
+          .map(detectionAlertConfigDTO -> 
detectionAlertConfigDTO.getVectorClocks().keySet())
+          .flatMap(Collection::stream)
+          .collect(Collectors.toSet());
+    }
+    if (!searchFilter.getDetectionNames().isEmpty() && 
!searchFilter.getSubscriptionGroups().isEmpty()) {
+      // intersect the detection config ids if searching by both
+      detectionConfigIds.retainAll(subscribedDetectionConfigIds);
+    } else {
+      detectionConfigIds.addAll(subscribedDetectionConfigIds);
+    }
+    if (!searchFilter.getDetectionNames().isEmpty() || 
!searchFilter.getSubscriptionGroups().isEmpty()) {
+      // add the predicate using detection config id
+      if (detectionConfigIds.isEmpty()) {
+        // if detection not found, return empty result
+        return ImmutableMap.of("count", 0, "limit", limit, "offset", offset, 
"elements", Collections.emptyList());
+      }
+      predicate = Predicate.AND(predicate, Predicate.IN("detectionConfigId", 
detectionConfigIds.toArray()));
+    }
+
+    // search by datasets
+    if (!searchFilter.getDatasets().isEmpty()) {
+      List<Predicate> datasetPredicates = new ArrayList<>();
+      for (String dataset : searchFilter.getDatasets()) {
+        datasetPredicates.add(Predicate.LIKE("collection", "%" + dataset + 
"%"));
+      }
+      predicate = Predicate.AND(predicate, 
Predicate.OR(datasetPredicates.toArray(new Predicate[0])));
+    }
+    // search by metrics
+    if (!searchFilter.getMetrics().isEmpty()) {
+      predicate = Predicate.AND(predicate, Predicate.IN("metric", 
searchFilter.getMetrics().toArray()));
+    }
+    // search by ids
+    if (!searchFilter.getAnomalyIds().isEmpty()) {
+      predicate = Predicate.AND(predicate, Predicate.IN("baseId", 
searchFilter.getAnomalyIds().toArray()));
+    }
+
+    long count;
+    List<MergedAnomalyResultDTO> results;
+    if (searchFilter.getFeedbacks().isEmpty()) {
+      List<Long> anomalyIds = this.anomalyDAO.findIdsByPredicate(predicate)
+          .stream()
+          .sorted(Comparator.reverseOrder())
+          .collect(Collectors.toList());
+      count = anomalyIds.size();
+      results = anomalyIds.isEmpty() ? Collections.emptyList()
+          : this.anomalyDAO.findByIds(paginateResults(anomalyIds, offset, 
limit));
+    } else {
+      // filter by feedback types if requested
+      List<MergedAnomalyResultDTO> anomalies = 
this.anomalyDAO.findByPredicate(predicate);
+      Set<AnomalyFeedbackType> feedbackFilters =
+          
searchFilter.getFeedbacks().stream().map(AnomalyFeedbackType::valueOf).collect(Collectors.toSet());
+      results = anomalies.stream()
+          .filter(anomaly -> (anomaly.getFeedback() == null && 
feedbackFilters.contains(NO_FEEDBACK)) || (
+              anomaly.getFeedback() != null && 
feedbackFilters.contains(anomaly.getFeedback().getFeedbackType())))
+          .sorted(Comparator.comparingLong(AbstractDTO::getId).reversed())
+          .collect(Collectors.toList());
+      count = results.size();
+      results = paginateResults(results, offset, limit);
+    }
+    return ImmutableMap.of("count", count, "limit", limit, "offset", offset, 
"elements", results);
+  }
+
+  private <T> List<T> paginateResults(List<T> list, int offset, int limit) {
+    if (offset > list.size()) {
+      // requested page is out of bound
+      return Collections.emptyList();
+    }
+    return list.subList(offset, Math.min(offset + limit, list.size()));
+  }
+}
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java
index 8e3c768..cfa5c68 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java
@@ -35,6 +35,7 @@ public class MergedAnomalyResultIndex extends 
AbstractIndexEntity {
   String metric;
   DimensionMap dimensions;
   boolean notified;
+  boolean child;
 
   public long getDetectionConfigId() {
     return detectionConfigId;
@@ -115,4 +116,12 @@ public class MergedAnomalyResultIndex extends 
AbstractIndexEntity {
   public void setNotified(boolean notified) {
     this.notified = notified;
   }
+
+  public boolean isChild() {
+    return child;
+  }
+
+  public void setChild(boolean child) {
+    this.child = child;
+  }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql 
b/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql
index c93b922..8454348 100644
--- a/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql
+++ b/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql
@@ -110,6 +110,7 @@ create table if not exists merged_anomaly_result_index (
     base_id bigint(20) not null,
     create_time timestamp,
     update_time timestamp default current_timestamp,
+    child boolean,
     version int(10)
 ) ENGINE=InnoDB;
 create index merged_anomaly_result_function_idx on 
merged_anomaly_result_index(function_id);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to