xiaohui-sun commented on a change in pull request #4190: [TE] pass predicted time series thought out the detection pipeline URL: https://github.com/apache/incubator-pinot/pull/4190#discussion_r282195747
########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java ########## @@ -201,8 +206,58 @@ public DetectionPipelineResult run() throws Exception { anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, this.detectorName); } long lastTimeStamp = this.getLastTimeStamp(); - return new DetectionPipelineResult(anomalies.stream().filter(anomaly -> anomaly.getEndTime() <= lastTimeStamp).collect( - Collectors.toList()), lastTimeStamp); + List<MergedAnomalyResultDTO> anomalyResults = anomalies.stream().filter(anomaly -> anomaly.getEndTime() <= lastTimeStamp).collect( + Collectors.toList()); + PredictionResult predictedTimeSeries = new PredictionResult(this.detectorName, this.metricUrn, predictedResult.getDataFrame()); + return new DetectionPipelineResult(anomalyResults, lastTimeStamp, Collections.singletonList(predictedTimeSeries)); + } + + /** + * Join two time series, including current, baseline, lower bound and upper bound. If two time series have overlapped region, take the average + * @param ts1 timeseries 1 + * @param ts2 timeseries 2 + * @return the consolidated time series + */ + static TimeSeries consolidateTimeSeries(TimeSeries ts1, TimeSeries ts2) { + DataFrame df1 = ts1.getDataFrame(); + DataFrame df2 = ts2.getDataFrame(); + DataFrame joinedDf = df1.joinOuter(df2, COL_TIME); + consolidateJoinedDf(joinedDf, COL_VALUE); + consolidateJoinedDf(joinedDf, COL_CURRENT); + consolidateJoinedDf(joinedDf, COL_LOWER_BOUND); + consolidateJoinedDf(joinedDf, COL_UPPER_BOUND); + return TimeSeries.fromDataFrame(joinedDf); + } + + private static void consolidateJoinedDf(DataFrame joinedDf, String columnName) { + String columnNameLeft = columnName + DataFrame.COLUMN_JOIN_LEFT; + String columnNameRight = columnName + DataFrame.COLUMN_JOIN_RIGHT; + if (joinedDf.contains(columnNameLeft) && joinedDf.contains(columnNameRight)) { + joinedDf.addSeries(columnName, robustAverage(joinedDf.getDoubles(columnNameLeft), joinedDf.getDoubles(columnNameRight))); + } + } + + /** + * Calculate the average of two double time series. If the value in either one is missing, take the available value as the result + * @param s1 series 1 + * @param s2 series 2 + * @return the averaged time series + */ + private static DoubleSeries robustAverage(DoubleSeries s1, DoubleSeries s2) { Review comment: Normally we don't need to merge the windows. It is only used when the detection windows overlap. Also better to check with AI team on what is the best way. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org