This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f80b5ea9da Add new /uploadFromServerToDeepstore endpoint (#17197)
1f80b5ea9da is described below

commit 1f80b5ea9dab2ca4cbb21dc7e5cd2375a33a177d
Author: tarun11Mavani <[email protected]>
AuthorDate: Fri Nov 21 23:52:24 2025 +0530

    Add new /uploadFromServerToDeepstore endpoint (#17197)
---
 .../api/resources/PinotSegmentRestletResource.java | 83 ++++++++++++++++++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 38 ++++++----
 2 files changed, 109 insertions(+), 12 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 0aca6417f04..6355371273c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -466,6 +466,89 @@ public class PinotSegmentRestletResource {
     }
   }
 
+  @POST
+  @Path("segments/{tableNameWithType}/uploadFromServerToDeepstore")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action = Actions.Table.UPLOAD_SEGMENT)
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Upload realtime segments from server to deep store",
+      notes = "Uploads realtime segments from servers (with online replicas) 
to deep store. "
+          + "When forceMode=false (default), only uploads segments missing 
from deep store. "
+          + "When forceMode=true, bypasses all checks and reuploads segments 
even if they already exist.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 400, message = "Bad request - table must be a 
realtime table"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public SuccessResponse uploadFromServerToDeepstore(
+      @ApiParam(value = "Name of the realtime table with type", required = 
true, example = "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "List of segment names to upload. If not provided, 
uploads all segments.",
+          allowMultiple = true) @QueryParam("segmentNames") @Nullable 
List<String> segmentNames,
+      @ApiParam(value = "Force mode: when true, bypasses checks and reuploads 
even if segments exist in deep store",
+          defaultValue = "false") @QueryParam("forceMode") 
@DefaultValue("false") boolean forceMode,
+      @Context HttpHeaders headers) {
+    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+
+    // Validate this is a realtime table
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    if (tableType != TableType.REALTIME) {
+      throw new ControllerApplicationException(LOGGER,
+          "Upload from server to deep store is only supported for REALTIME 
tables. Provided: " + tableNameWithType,
+          Status.BAD_REQUEST);
+    }
+
+    // Get table config
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Table config not found for table: " + tableNameWithType, 
Status.NOT_FOUND);
+    }
+
+    // Get segments to upload
+    List<SegmentZKMetadata> segmentsToUpload;
+    if (segmentNames == null || segmentNames.isEmpty()) {
+      LOGGER.info("Empty/null segment list provided for table: {}. Not 
uploading anything.", tableNameWithType);
+      return new SuccessResponse("No segments to upload for table: "
+          + tableNameWithType);
+    } else {
+      // Upload specific segments
+      segmentsToUpload = new ArrayList<>();
+      for (String segmentName : segmentNames) {
+        SegmentZKMetadata segmentZKMetadata =
+            _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, 
segmentName);
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("Segment not found: {} in table: {} while reuploading to 
deepstore",
+              segmentName, tableNameWithType);
+          continue;
+        }
+        segmentsToUpload.add(segmentZKMetadata);
+      }
+    }
+
+    if (segmentsToUpload.isEmpty()) {
+      return new SuccessResponse("No segments found to upload for table: " + 
tableNameWithType);
+    }
+
+    LOGGER.info("Uploading {} segments from server to deep store for table: {} 
(forceMode: {})",
+        segmentsToUpload.size(), tableNameWithType, forceMode);
+
+    // Trigger upload (with or without force mode)
+    try {
+      _pinotHelixResourceManager.getRealtimeSegmentManager()
+          .uploadToDeepStoreIfMissing(tableConfig, segmentsToUpload, 
forceMode);
+      String mode = forceMode ? "force upload" : "upload";
+      return new SuccessResponse(
+          String.format("Successfully queued %d segment(s) for %s to deep 
store for table: %s",
+              segmentsToUpload.size(), mode, tableNameWithType));
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to upload segments to deep store for table: 
%s. Error: %s",
+              tableNameWithType, e.getMessage()), 
Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @DELETE
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/segments/{tableName}/{segmentName}")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index ebbd8c9e5c2..a9ce6b0263c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1961,13 +1961,14 @@ public class PinotLLCRealtimeSegmentManager {
    * Since uploading to deep store involves expensive compression step (first 
tar up the segment and then upload),
    * we don't want to retry the uploading. Segment without deep store copy can 
still be downloaded from peer servers.
    *
+   * @param forceUpload if true, bypasses checks and forces upload even if 
deep store copy exists
+   *
    * @see <a href="
    * 
https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
    * "> By-passing deep-store requirement for Realtime segment 
completion:Failure cases and handling</a>
-   *
-   * TODO: Add an on-demand way to upload LLC segment to deep store for a 
specific table.
    */
-  public void uploadToDeepStoreIfMissing(TableConfig tableConfig, 
List<SegmentZKMetadata> segmentsZKMetadata) {
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig, 
List<SegmentZKMetadata> segmentsZKMetadata,
+      boolean forceUpload) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -1993,15 +1994,21 @@ public class PinotLLCRealtimeSegmentManager {
     //  2. Update the LLC segment ZK metadata by adding deep store download 
url.
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
       String segmentName = segmentZKMetadata.getSegmentName();
-      if (shouldSkipSegmentForDeepStoreUpload(tableConfig, segmentZKMetadata, 
retentionStrategy)) {
-        continue;
-      }
-      // Skip the fix if an upload is already queued for this segment
-      if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
-        int queueSize = _deepStoreUploadExecutorPendingSegments.size();
-        _controllerMetrics.setOrUpdateGauge(
-            
ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), 
queueSize);
-        continue;
+      if (!forceUpload) {
+        // Skip checks only if forceUpload is false
+        if (shouldSkipSegmentForDeepStoreUpload(tableConfig, 
segmentZKMetadata, retentionStrategy)) {
+          continue;
+        }
+        // Skip the fix if an upload is already queued for this segment
+        if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
+          int queueSize = _deepStoreUploadExecutorPendingSegments.size();
+          _controllerMetrics.setOrUpdateGauge(
+              
ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), 
queueSize);
+          continue;
+        }
+      } else {
+        // For force upload, still add to pending set to track
+        _deepStoreUploadExecutorPendingSegments.add(segmentName);
       }
 
       // create Runnable to perform the upload
@@ -2053,6 +2060,13 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  /**
+   * Overloaded method for backward compatibility. Defaults to forceUpload = 
false.
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig, 
List<SegmentZKMetadata> segmentsZKMetadata) {
+    uploadToDeepStoreIfMissing(tableConfig, segmentsZKMetadata, false);
+  }
+
   private void uploadToDeepStoreWithFallback(URI uri, String segmentName, 
String rawTableName,
       SegmentZKMetadata segmentZKMetadata, PinotFS pinotFS) {
     String serverUploadRequestUrl = getUploadUrl(uri, 
"uploadCommittedSegment");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to