KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941501560


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String 
segmentName) {
     return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
 
+  /**
+   * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with 
no peer copy on any server. This method
+   * will call the server reIngestSegment API
+   * on one of the alive servers that are supposed to host that segment 
according to IdealState.
+   *
+   * API signature:
+   *   POST http://[serverURL]/reIngestSegment
+   *   Request body (JSON):
+   *   {
+   *     "tableNameWithType": [tableName],
+   *     "segmentName": [segmentName]
+   *   }
+   *
+   * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME"
+   */
+  public void reIngestSegmentsWithErrorState(String tableNameWithType) {
+    // Step 1: Fetch the ExternalView and all segments
+    ExternalView externalView = getExternalView(tableNameWithType);
+    IdealState idealState = getIdealState(tableNameWithType);
+    Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = 
externalView.getRecord().getMapFields();
+    Map<String, Map<String, String>> segmentToInstanceIdealStateMap = 
idealState.getRecord().getMapFields();
+
+    // find segments in ERROR state in externalView
+    List<String> segmentsInErrorState = new ArrayList<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
segmentToInstanceCurrentStateMap.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      boolean allReplicasInError = true;
+      for (String state : instanceStateMap.values()) {
+        if (!SegmentStateModel.ERROR.equals(state)) {
+          allReplicasInError = false;
+          break;
+        }
+      }
+      if (allReplicasInError) {
+        segmentsInErrorState.add(segmentName);
+      }
+    }
+
+    if (segmentsInErrorState.isEmpty()) {
+      LOGGER.info("No segments found in ERROR state for table {}", 
tableNameWithType);
+      return;
+    }
+
+    // filter out segments that are not ONLINE in IdealState
+    for (String segmentName : segmentsInErrorState) {
+      Map<String, String> instanceIdealStateMap = 
segmentToInstanceIdealStateMap.get(segmentName);
+      boolean isOnline = true;
+      for (String state : instanceIdealStateMap.values()) {
+        if (!SegmentStateModel.ONLINE.equals(state)) {
+          isOnline = false;
+          break;
+        }
+      }
+      if (!isOnline) {
+        segmentsInErrorState.remove(segmentName);
+      }
+    }
+
+    // Step 2: For each segment, check the ZK metadata for conditions
+    for (String segmentName : segmentsInErrorState) {
+      // Skip non-LLC segments or segments missing from the ideal state 
altogether
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      if (llcSegmentName == null) {
+        continue;
+      }
+
+      SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName);
+      // We only consider segments that are in COMMITTING state
+      if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
+        Map<String, String> instanceStateMap = 
segmentToInstanceIdealStateMap.get(segmentName);
+
+        // Step 3: “No peer has that segment.” => Re-ingest from one server 
that is supposed to host it and is alive
+        LOGGER.info(
+            "Segment {} in table {} is COMMITTING with missing download URL 
and no peer copy. Triggering re-ingestion.",
+            segmentName, tableNameWithType);
+
+        // Find at least one server that should host this segment and is alive
+        String aliveServer = 
findAliveServerToReIngest(instanceStateMap.keySet());
+        if (aliveServer == null) {
+          LOGGER.warn("No alive server found to re-ingest segment {} in table 
{}", segmentName, tableNameWithType);
+          continue;
+        }
+
+        try {
+          _fileUploadDownloadClient.triggerReIngestion(aliveServer, 
tableNameWithType, segmentName);

Review Comment:
   the reason I have kept it here so that I can utilise correct http client 
with all the ssl settings. What would be appropriate class to move this to? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to