xiaohui-sun commented on a change in pull request #4190: [TE] pass predicted 
time series thought out the detection pipeline
URL: https://github.com/apache/incubator-pinot/pull/4190#discussion_r282211262
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
 ##########
 @@ -201,8 +206,58 @@ public DetectionPipelineResult run() throws Exception {
       anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, 
this.detectorName);
     }
     long lastTimeStamp = this.getLastTimeStamp();
-    return new DetectionPipelineResult(anomalies.stream().filter(anomaly -> 
anomaly.getEndTime() <= lastTimeStamp).collect(
-        Collectors.toList()), lastTimeStamp);
+    List<MergedAnomalyResultDTO> anomalyResults = 
anomalies.stream().filter(anomaly -> anomaly.getEndTime() <= 
lastTimeStamp).collect(
+        Collectors.toList());
+    PredictionResult predictedTimeSeries = new 
PredictionResult(this.detectorName, this.metricUrn, 
predictedResult.getDataFrame());
+    return new DetectionPipelineResult(anomalyResults, lastTimeStamp, 
Collections.singletonList(predictedTimeSeries));
+  }
+
+  /**
+   * Join two time series, including current, baseline, lower bound and upper 
bound. If two time series have overlapped region, take the average
+   * @param ts1 timeseries 1
+   * @param ts2 timeseries 2
+   * @return the consolidated time series
+   */
+  static TimeSeries consolidateTimeSeries(TimeSeries ts1, TimeSeries ts2) {
+    DataFrame df1 = ts1.getDataFrame();
+    DataFrame df2 = ts2.getDataFrame();
+    DataFrame joinedDf = df1.joinOuter(df2, COL_TIME);
+    consolidateJoinedDf(joinedDf, COL_VALUE);
+    consolidateJoinedDf(joinedDf, COL_CURRENT);
+    consolidateJoinedDf(joinedDf, COL_LOWER_BOUND);
+    consolidateJoinedDf(joinedDf, COL_UPPER_BOUND);
+    return TimeSeries.fromDataFrame(joinedDf);
+  }
+
+  private static void consolidateJoinedDf(DataFrame joinedDf, String 
columnName) {
+    String columnNameLeft = columnName + DataFrame.COLUMN_JOIN_LEFT;
+    String columnNameRight = columnName + DataFrame.COLUMN_JOIN_RIGHT;
+    if (joinedDf.contains(columnNameLeft) && 
joinedDf.contains(columnNameRight)) {
+      joinedDf.addSeries(columnName, 
robustAverage(joinedDf.getDoubles(columnNameLeft), 
joinedDf.getDoubles(columnNameRight)));
+    }
+  }
+
+  /**
+   * Calculate the average of two double time series. If the value in either 
one is missing, take the available value as the result
+   * @param s1 series 1
+   * @param s2 series 2
+   * @return the averaged time series
+   */
+  private static DoubleSeries robustAverage(DoubleSeries s1, DoubleSeries s2) {
 
 Review comment:
   ok let's just take the latest values. Technically they should be the same.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to