This is an automated email from the ASF dual-hosted git repository. jihao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0dee4d1 [TE] fix changing createdTime of anomalies (#6269) 0dee4d1 is described below commit 0dee4d1bf2a2fb828ada437e65d86e3f186f9020 Author: Vincent Chen <jianc...@linkedin.com> AuthorDate: Mon Nov 30 10:31:45 2020 -0800 [TE] fix changing createdTime of anomalies (#6269) This PR is to fix a bug that the createdTime of anomalies is updated when new anomalies with earlier startTime is generated, which causes that the same anomalies get send out twice due to updated createdTime of anomalies. The fix is to merge new anomalies into existing anomalies if possible, regardless of its startTime. --- .../dashboard/resources/SummaryResourceTest.java | 8 +++++ .../detection/algorithm/MergeWrapperTest.java | 19 ++++++++-- .../pinot/resources/PinotDataSourceResource.java | 10 ++---- .../thirdeye/detection/algorithm/MergeWrapper.java | 42 ++++++++++++---------- 4 files changed, 50 insertions(+), 29 deletions(-) diff --git a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java index 8b5dfe7..4b7e46d 100644 --- a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java +++ b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java @@ -1,11 +1,19 @@ package org.apache.pinot.thirdeye.dashboard.resources; +import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase; import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO; import org.testng.Assert; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class SummaryResourceTest { + private DAOTestBase testDAOProvider; + + @BeforeMethod + public void setUp() { + testDAOProvider = DAOTestBase.getInstance(); + } @Test public void testIsSimpleRatioMetric() { // False: null metric config diff --git a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java index c5e98f5..db26454 100644 --- a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java +++ b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java @@ -49,6 +49,7 @@ public class MergeWrapperTest { private List<MockPipeline> runs; private List<MockPipelineOutput> outputs; private MockPipelineLoader mockLoader; + private List<MergedAnomalyResultDTO> existing; private static final Long PROP_ID_VALUE = 1000L; private static final String PROP_NAME_VALUE = "myName"; @@ -99,10 +100,10 @@ public class MergeWrapperTest { this.config.setName(PROP_NAME_VALUE); this.config.setProperties(this.properties); - List<MergedAnomalyResultDTO> existing = new ArrayList<>(); + this.existing = new ArrayList<>(); // For existing anomalies add ids. - existing.add(setAnomalyId(makeAnomaly(100, 1000), 0)); - existing.add(setAnomalyId(makeAnomaly(1500, 2000), 1)); + this.existing.add(setAnomalyId(makeAnomaly(100, 1000), 0)); + this.existing.add(setAnomalyId(makeAnomaly(1500, 2000), 1)); this.outputs = new ArrayList<>(); @@ -141,6 +142,12 @@ public class MergeWrapperTest { Assert.assertEquals(output.getLastTimestamp(), 3000); // anomalies [100, 1000] and [1150,1250] are merged into [50, 1200] Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0))); + // ensure that the createdTime of anomalies is not changed + Assert.assertEquals( + output.getAnomalies().stream() + .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime(), + existing.stream() + .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime()); // anomalies [2200, 2300] and [2400, 2800] are merged Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800))); } @@ -156,6 +163,12 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 3); Assert.assertEquals(output.getLastTimestamp(), 3000); Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0))); + // ensure that the createdTime of anomalies is not changed + Assert.assertEquals( + output.getAnomalies().stream() + .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime(), + existing.stream() + .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime()); Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 2300), 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800))); } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java index bbcabd1..b0d2fed 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java @@ -30,8 +30,6 @@ import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource; import org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSet; import org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSetGroup; import org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSetSerializer; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; import java.util.concurrent.ExecutionException; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -67,14 +65,10 @@ public class PinotDataSourceResource { */ @GET @Path("/query") - public String executePQL(@QueryParam("pql") String pql, @QueryParam("tableName") String tableName) - throws UnsupportedEncodingException { + public String executePQL(@QueryParam("pql") String pql, @QueryParam("tableName") String tableName) { initPinotDataSource(); - String resultString; - String decodedPql = URLDecoder.decode(pql, URL_ENCODING); - String decodedTableName = URLDecoder.decode(tableName, URL_ENCODING); - PinotQuery pinotQuery = new PinotQuery(decodedPql, decodedTableName); + PinotQuery pinotQuery = new PinotQuery(pql, tableName); try { ThirdEyeResultSetGroup thirdEyeResultSetGroup = pinotDataSource.executePQL(pinotQuery); resultString = OBJECT_MAPPER.writeValueAsString(thirdEyeResultSetGroup); diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java index ee67927..c254b2d 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java @@ -66,12 +66,12 @@ public class MergeWrapper extends DetectionPipeline { protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() { @Override public int compare(MergedAnomalyResultDTO o1, MergedAnomalyResultDTO o2) { - // earlier for start time + // first order anomalies from earliest startTime to latest int res = Long.compare(o1.getStartTime(), o2.getStartTime()); if (res != 0) return res; - // later for end time - res = Long.compare(o2.getEndTime(), o1.getEndTime()); + // order anomalies from earliest createdTime to latest, if startTime are the same + res = Long.compare(o1.getCreatedTime(), o2.getCreatedTime()); if (res != 0) return res; // pre-existing @@ -190,25 +190,31 @@ public class MergeWrapper extends DetectionPipeline { // parent |-------------------| // anomaly |-------------| // - parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime())); - - // merge the anomaly's properties into parent - ThirdEyeUtils.mergeAnomalyProperties(parent.getProperties(), anomaly.getProperties()); + // merge new anomaly to existing anomaly if (isExistingAnomaly(parent)) { + // parent (existing) |---------------------| + // anomaly (new) |-------------------| + parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime())); + ThirdEyeUtils.mergeAnomalyProperties(parent.getProperties(), anomaly.getProperties()); + mergeChildren(parent, anomaly); modifiedExistingAnomalies.add(parent); + } else if (isExistingAnomaly(anomaly)) { + // parent (new) |---------------------| + // anomaly (existing) |-------------------| + anomaly.setStartTime(Math.min(parent.getStartTime(), anomaly.getStartTime())); + anomaly.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime())); + ThirdEyeUtils.mergeAnomalyProperties(anomaly.getProperties(), parent.getProperties()); + mergeChildren(anomaly, parent); + modifiedExistingAnomalies.add(anomaly); + retainedNewAnomalies.remove(parent); + parents.put(key, anomaly); } else { - // merge existing anomaly to new anomaly, set id to new anomaly - // parent (new) |-------------------| - // anomaly (existing) |-------------| - if (isExistingAnomaly(anomaly)) { - parent.setId(anomaly.getId()); - anomaly.setId(null); - } + // parent (new) |---------------------| + // anomaly (new) |-------------------| + parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime())); + ThirdEyeUtils.mergeAnomalyProperties(parent.getProperties(), anomaly.getProperties()); + mergeChildren(parent, anomaly); } - - // merge the anomaly's children into the parent - mergeChildren(parent, anomaly); - } else if (parent.getEndTime() >= anomaly.getStartTime()) { // mergeable but exceeds maxDuration, then truncate // parent |---------------------| --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org