This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1f80b5ea9da Add new /uploadFromServerToDeepstore endpoint (#17197)
1f80b5ea9da is described below
commit 1f80b5ea9dab2ca4cbb21dc7e5cd2375a33a177d
Author: tarun11Mavani <[email protected]>
AuthorDate: Fri Nov 21 23:52:24 2025 +0530
Add new /uploadFromServerToDeepstore endpoint (#17197)
---
.../api/resources/PinotSegmentRestletResource.java | 83 ++++++++++++++++++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 38 ++++++----
2 files changed, 109 insertions(+), 12 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 0aca6417f04..6355371273c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -466,6 +466,89 @@ public class PinotSegmentRestletResource {
}
}
+ @POST
+ @Path("segments/{tableNameWithType}/uploadFromServerToDeepstore")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action = Actions.Table.UPLOAD_SEGMENT)
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Upload realtime segments from server to deep store",
+ notes = "Uploads realtime segments from servers (with online replicas)
to deep store. "
+ + "When forceMode=false (default), only uploads segments missing
from deep store. "
+ + "When forceMode=true, bypasses all checks and reuploads segments
even if they already exist.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 400, message = "Bad request - table must be a
realtime table"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public SuccessResponse uploadFromServerToDeepstore(
+ @ApiParam(value = "Name of the realtime table with type", required =
true, example = "myTable_REALTIME")
+ @PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "List of segment names to upload. If not provided,
uploads all segments.",
+ allowMultiple = true) @QueryParam("segmentNames") @Nullable
List<String> segmentNames,
+ @ApiParam(value = "Force mode: when true, bypasses checks and reuploads
even if segments exist in deep store",
+ defaultValue = "false") @QueryParam("forceMode")
@DefaultValue("false") boolean forceMode,
+ @Context HttpHeaders headers) {
+ tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
+
+ // Validate this is a realtime table
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ throw new ControllerApplicationException(LOGGER,
+ "Upload from server to deep store is only supported for REALTIME
tables. Provided: " + tableNameWithType,
+ Status.BAD_REQUEST);
+ }
+
+ // Get table config
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Table config not found for table: " + tableNameWithType,
Status.NOT_FOUND);
+ }
+
+ // Get segments to upload
+ List<SegmentZKMetadata> segmentsToUpload;
+ if (segmentNames == null || segmentNames.isEmpty()) {
+ LOGGER.info("Empty/null segment list provided for table: {}. Not
uploading anything.", tableNameWithType);
+ return new SuccessResponse("No segments to upload for table: "
+ + tableNameWithType);
+ } else {
+ // Upload specific segments
+ segmentsToUpload = new ArrayList<>();
+ for (String segmentName : segmentNames) {
+ SegmentZKMetadata segmentZKMetadata =
+ _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType,
segmentName);
+ if (segmentZKMetadata == null) {
+ LOGGER.warn("Segment not found: {} in table: {} while reuploading to
deepstore",
+ segmentName, tableNameWithType);
+ continue;
+ }
+ segmentsToUpload.add(segmentZKMetadata);
+ }
+ }
+
+ if (segmentsToUpload.isEmpty()) {
+ return new SuccessResponse("No segments found to upload for table: " +
tableNameWithType);
+ }
+
+ LOGGER.info("Uploading {} segments from server to deep store for table: {}
(forceMode: {})",
+ segmentsToUpload.size(), tableNameWithType, forceMode);
+
+ // Trigger upload (with or without force mode)
+ try {
+ _pinotHelixResourceManager.getRealtimeSegmentManager()
+ .uploadToDeepStoreIfMissing(tableConfig, segmentsToUpload,
forceMode);
+ String mode = forceMode ? "force upload" : "upload";
+ return new SuccessResponse(
+ String.format("Successfully queued %d segment(s) for %s to deep
store for table: %s",
+ segmentsToUpload.size(), mode, tableNameWithType));
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to upload segments to deep store for table:
%s. Error: %s",
+ tableNameWithType, e.getMessage()),
Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Path("/segments/{tableName}/{segmentName}")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index ebbd8c9e5c2..a9ce6b0263c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1961,13 +1961,14 @@ public class PinotLLCRealtimeSegmentManager {
* Since uploading to deep store involves expensive compression step (first
tar up the segment and then upload),
* we don't want to retry the uploading. Segment without deep store copy can
still be downloaded from peer servers.
*
+ * @param forceUpload if true, bypasses checks and forces upload even if
deep store copy exists
+ *
* @see <a href="
*
https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
* "> By-passing deep-store requirement for Realtime segment
completion:Failure cases and handling</a>
- *
- * TODO: Add an on-demand way to upload LLC segment to deep store for a
specific table.
*/
- public void uploadToDeepStoreIfMissing(TableConfig tableConfig,
List<SegmentZKMetadata> segmentsZKMetadata) {
+ public void uploadToDeepStoreIfMissing(TableConfig tableConfig,
List<SegmentZKMetadata> segmentsZKMetadata,
+ boolean forceUpload) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
@@ -1993,15 +1994,21 @@ public class PinotLLCRealtimeSegmentManager {
// 2. Update the LLC segment ZK metadata by adding deep store download
url.
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
String segmentName = segmentZKMetadata.getSegmentName();
- if (shouldSkipSegmentForDeepStoreUpload(tableConfig, segmentZKMetadata,
retentionStrategy)) {
- continue;
- }
- // Skip the fix if an upload is already queued for this segment
- if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
- int queueSize = _deepStoreUploadExecutorPendingSegments.size();
- _controllerMetrics.setOrUpdateGauge(
-
ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(),
queueSize);
- continue;
+ if (!forceUpload) {
+ // Skip checks only if forceUpload is false
+ if (shouldSkipSegmentForDeepStoreUpload(tableConfig,
segmentZKMetadata, retentionStrategy)) {
+ continue;
+ }
+ // Skip the fix if an upload is already queued for this segment
+ if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
+ int queueSize = _deepStoreUploadExecutorPendingSegments.size();
+ _controllerMetrics.setOrUpdateGauge(
+
ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(),
queueSize);
+ continue;
+ }
+ } else {
+ // For force upload, still add to pending set to track
+ _deepStoreUploadExecutorPendingSegments.add(segmentName);
}
// create Runnable to perform the upload
@@ -2053,6 +2060,13 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ /**
+ * Overloaded method for backward compatibility. Defaults to forceUpload =
false.
+ */
+ public void uploadToDeepStoreIfMissing(TableConfig tableConfig,
List<SegmentZKMetadata> segmentsZKMetadata) {
+ uploadToDeepStoreIfMissing(tableConfig, segmentsZKMetadata, false);
+ }
+
private void uploadToDeepStoreWithFallback(URI uri, String segmentName,
String rawTableName,
SegmentZKMetadata segmentZKMetadata, PinotFS pinotFS) {
String serverUploadRequestUrl = getUploadUrl(uri,
"uploadCommittedSegment");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]