rajagopr commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1890927616


##########
pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java:
##########
@@ -18,57 +18,180 @@
  */
 package org.apache.pinot.common.minion;
 
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 
 /**
  * Metadata for the minion task of type 
<code>RealtimeToOfflineSegmentsTask</code>.
- * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks 
have been executed.
+ * The <code>_windowStartMs</code> denotes the time (exclusive) until which 
it's certain that tasks have been
+ *   completed successfully.
+ * The <code>_expectedRealtimeToOfflineSegmentsTaskResultList</code> denotes 
the expected RTO tasks result info.
+ *   This list can contain both completed and in-completed Tasks expected 
Results. This list is used by
+ *   generator to validate whether a potential segment (for RTO task) has 
already been successfully
+ *   processed as a RTO task in the past or not.
+ * The <code>_windowStartMs</code> and <code>_windowEndMs</code> denote the 
window bucket time
+ *  of currently not successfully completed minion task. bucket: 
[_windowStartMs, _windowEndMs)
+ *  The window is updated by generator when it's certain that prev minon task 
run is successful.
  *
  * This gets serialized and stored in zookeeper under the path
  * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
  *
  * PinotTaskGenerator:
- * The <code>watermarkMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
- * to determine the window of execution for the task it is generating.
- * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
+ * The <code>_windowStartMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution of the prev task based on which it 
generates new task.
  *
  * PinotTaskExecutor:
- * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
- * - Verify that is is running the latest task scheduled by the task generator
- * - Update the watermark as the end of the window that it executed for
+ * The same windowStartMs is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that it's running the latest task scheduled by the task generator.
+ * - The ExpectedRealtimeToOfflineSegmentsTaskResultList is updated before the 
offline segments
+ *   are uploaded to the table.
  */
 public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {
 
-  private static final String WATERMARK_KEY = "watermarkMs";
+  private static final String WINDOW_START_KEY = "watermarkMs";

Review Comment:
   Value should be `windowStartMs`?



##########
pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java:
##########
@@ -18,57 +18,180 @@
  */
 package org.apache.pinot.common.minion;
 
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 
 /**
  * Metadata for the minion task of type 
<code>RealtimeToOfflineSegmentsTask</code>.
- * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks 
have been executed.
+ * The <code>_windowStartMs</code> denotes the time (exclusive) until which 
it's certain that tasks have been
+ *   completed successfully.
+ * The <code>_expectedRealtimeToOfflineSegmentsTaskResultList</code> denotes 
the expected RTO tasks result info.
+ *   This list can contain both completed and in-completed Tasks expected 
Results. This list is used by
+ *   generator to validate whether a potential segment (for RTO task) has 
already been successfully
+ *   processed as a RTO task in the past or not.
+ * The <code>_windowStartMs</code> and <code>_windowEndMs</code> denote the 
window bucket time
+ *  of currently not successfully completed minion task. bucket: 
[_windowStartMs, _windowEndMs)
+ *  The window is updated by generator when it's certain that prev minon task 
run is successful.
  *
  * This gets serialized and stored in zookeeper under the path
  * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
  *
  * PinotTaskGenerator:
- * The <code>watermarkMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
- * to determine the window of execution for the task it is generating.
- * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
+ * The <code>_windowStartMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution of the prev task based on which it 
generates new task.
  *
  * PinotTaskExecutor:
- * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
- * - Verify that is is running the latest task scheduled by the task generator
- * - Update the watermark as the end of the window that it executed for
+ * The same windowStartMs is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that it's running the latest task scheduled by the task generator.
+ * - The ExpectedRealtimeToOfflineSegmentsTaskResultList is updated before the 
offline segments
+ *   are uploaded to the table.
  */
 public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {
 
-  private static final String WATERMARK_KEY = "watermarkMs";
+  private static final String WINDOW_START_KEY = "watermarkMs";
+  private static final String WINDOW_END_KEY = "windowEndMs";
+  private static final String COMMA_SEPARATOR = ",";
+  private static final String SEGMENT_NAME_VS_EXPECTED_RTO_RESULT_ID_KEY = 
"segmentVsExpectedRTOResultId";
 
   private final String _tableNameWithType;
-  private final long _watermarkMs;
+  private long _windowStartMs;
+  private long _windowEndMs;
+  private final Map<String, ExpectedRealtimeToOfflineTaskResultInfo> 
_idVsExpectedRealtimeToOfflineTaskResultInfo;
+  private final Map<String, String> 
_segmentNameVsExpectedRealtimeToOfflineTaskResultInfoId;
 
-  public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long 
watermarkMs) {
+  public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long 
windowStartMs) {
+    _windowStartMs = windowStartMs;
     _tableNameWithType = tableNameWithType;
-    _watermarkMs = watermarkMs;
+    _idVsExpectedRealtimeToOfflineTaskResultInfo = new HashMap<>();

Review Comment:
   Variable name of the form `taskToResults` is easier to follow than 
`taskVsResults`



##########
pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java:
##########
@@ -18,57 +18,180 @@
  */
 package org.apache.pinot.common.minion;
 
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 
 /**
  * Metadata for the minion task of type 
<code>RealtimeToOfflineSegmentsTask</code>.
- * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks 
have been executed.
+ * The <code>_windowStartMs</code> denotes the time (exclusive) until which 
it's certain that tasks have been
+ *   completed successfully.
+ * The <code>_expectedRealtimeToOfflineSegmentsTaskResultList</code> denotes 
the expected RTO tasks result info.
+ *   This list can contain both completed and in-completed Tasks expected 
Results. This list is used by
+ *   generator to validate whether a potential segment (for RTO task) has 
already been successfully
+ *   processed as a RTO task in the past or not.
+ * The <code>_windowStartMs</code> and <code>_windowEndMs</code> denote the 
window bucket time
+ *  of currently not successfully completed minion task. bucket: 
[_windowStartMs, _windowEndMs)
+ *  The window is updated by generator when it's certain that prev minon task 
run is successful.
  *
  * This gets serialized and stored in zookeeper under the path
  * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
  *
  * PinotTaskGenerator:
- * The <code>watermarkMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
- * to determine the window of execution for the task it is generating.
- * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
+ * The <code>_windowStartMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution of the prev task based on which it 
generates new task.
  *
  * PinotTaskExecutor:
- * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
- * - Verify that is is running the latest task scheduled by the task generator
- * - Update the watermark as the end of the window that it executed for
+ * The same windowStartMs is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that it's running the latest task scheduled by the task generator.
+ * - The ExpectedRealtimeToOfflineSegmentsTaskResultList is updated before the 
offline segments
+ *   are uploaded to the table.
  */
 public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {
 
-  private static final String WATERMARK_KEY = "watermarkMs";
+  private static final String WINDOW_START_KEY = "watermarkMs";
+  private static final String WINDOW_END_KEY = "windowEndMs";
+  private static final String COMMA_SEPARATOR = ",";
+  private static final String SEGMENT_NAME_VS_EXPECTED_RTO_RESULT_ID_KEY = 
"segmentVsExpectedRTOResultId";
 
   private final String _tableNameWithType;
-  private final long _watermarkMs;
+  private long _windowStartMs;
+  private long _windowEndMs;
+  private final Map<String, ExpectedRealtimeToOfflineTaskResultInfo> 
_idVsExpectedRealtimeToOfflineTaskResultInfo;
+  private final Map<String, String> 
_segmentNameVsExpectedRealtimeToOfflineTaskResultInfoId;

Review Comment:
   rename as `_taskResultsMap` instead? Consider simplifying the variable names 
throughtout.



##########
pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedRealtimeToOfflineTaskResultInfo.java:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+
+/**
+ * ExpectedRealtimeOfflineTaskResultInfo is created in
+ * {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor}
+ * before uploading offline segment(s) to the offline table.
+ *
+ *  The <code>_segmentsFrom</code> denotes the input RealtimeSegments.
+ *  The <code>_segmentsTo</code> denotes the expected offline segemnts.
+ *  The <code>_id</code> denotes the unique identifier of object.
+ *  The <code>_taskID</code> denotes the minion taskId.
+ *  The <code>_taskFailure</code> denotes the status of minion task handling 
the
+ *    current ExpectedResult. This is modified in
+ *    {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator}
+ *    when a prev minion task is failed.
+ *
+ */
+public class ExpectedRealtimeToOfflineTaskResultInfo {

Review Comment:
   Use `RTOTaskResult` instead?



##########
pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedRealtimeToOfflineTaskResultInfo.java:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+
+/**
+ * ExpectedRealtimeOfflineTaskResultInfo is created in
+ * {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor}
+ * before uploading offline segment(s) to the offline table.
+ *
+ *  The <code>_segmentsFrom</code> denotes the input RealtimeSegments.
+ *  The <code>_segmentsTo</code> denotes the expected offline segemnts.
+ *  The <code>_id</code> denotes the unique identifier of object.
+ *  The <code>_taskID</code> denotes the minion taskId.
+ *  The <code>_taskFailure</code> denotes the status of minion task handling 
the

Review Comment:
   `_taskStatus` would be more apt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to