xiaohui-sun commented on a change in pull request #4190: [TE] pass predicted 
time series thought out the detection pipeline
URL: https://github.com/apache/incubator-pinot/pull/4190#discussion_r282163346
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
 ##########
 @@ -201,8 +206,58 @@ public DetectionPipelineResult run() throws Exception {
       anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, 
this.detectorName);
     }
     long lastTimeStamp = this.getLastTimeStamp();
-    return new DetectionPipelineResult(anomalies.stream().filter(anomaly -> 
anomaly.getEndTime() <= lastTimeStamp).collect(
-        Collectors.toList()), lastTimeStamp);
+    List<MergedAnomalyResultDTO> anomalyResults = 
anomalies.stream().filter(anomaly -> anomaly.getEndTime() <= 
lastTimeStamp).collect(
+        Collectors.toList());
+    PredictionResult predictedTimeSeries = new 
PredictionResult(this.detectorName, this.metricUrn, 
predictedResult.getDataFrame());
+    return new DetectionPipelineResult(anomalyResults, lastTimeStamp, 
Collections.singletonList(predictedTimeSeries));
+  }
+
+  /**
+   * Join two time series, including current, baseline, lower bound and upper 
bound. If two time series have overlapped region, take the average
+   * @param ts1 timeseries 1
+   * @param ts2 timeseries 2
+   * @return the consolidated time series
+   */
+  static TimeSeries consolidateTimeSeries(TimeSeries ts1, TimeSeries ts2) {
+    DataFrame df1 = ts1.getDataFrame();
+    DataFrame df2 = ts2.getDataFrame();
+    DataFrame joinedDf = df1.joinOuter(df2, COL_TIME);
+    consolidateJoinedDf(joinedDf, COL_VALUE);
+    consolidateJoinedDf(joinedDf, COL_CURRENT);
+    consolidateJoinedDf(joinedDf, COL_LOWER_BOUND);
+    consolidateJoinedDf(joinedDf, COL_UPPER_BOUND);
+    return TimeSeries.fromDataFrame(joinedDf);
+  }
+
+  private static void consolidateJoinedDf(DataFrame joinedDf, String 
columnName) {
+    String columnNameLeft = columnName + DataFrame.COLUMN_JOIN_LEFT;
+    String columnNameRight = columnName + DataFrame.COLUMN_JOIN_RIGHT;
+    if (joinedDf.contains(columnNameLeft) && 
joinedDf.contains(columnNameRight)) {
+      joinedDf.addSeries(columnName, 
robustAverage(joinedDf.getDoubles(columnNameLeft), 
joinedDf.getDoubles(columnNameRight)));
+    }
+  }
+
+  /**
+   * Calculate the average of two double time series. If the value in either 
one is missing, take the available value as the result
+   * @param s1 series 1
+   * @param s2 series 2
+   * @return the averaged time series
+   */
+  private static DoubleSeries robustAverage(DoubleSeries s1, DoubleSeries s2) {
 
 Review comment:
   It would be hard to debug if using Average.
   How about always use the latest window to  overwrite?
   
   E.g, if we want to merge two windows
   1. t1 to t2
   2. t3 to t4, and t1 < t3 < t2
   Then use the values t3 to t4 to overwrite the value for t2  to t3.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to