rajagopr commented on code in PR #13646:
URL: https://github.com/apache/pinot/pull/13646#discussion_r1706300126


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -359,6 +368,150 @@ public static void 
sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
     }
   }
 
+  public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, List<Header> headers, 
List<NameValuePair> parameters)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    Map<String, File> segmentMetadataFileMap = new HashMap<>();
+    List<String> segmentURIs = new ArrayList<>();
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for 
table: {}", segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      // segments stored in Pinot deep store do not have .tar.gz extension
+      String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+          ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+      SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+      File segmentMetadataFile;
+      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
+      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
+      // which requires downloading the entire segment tar gz file.
+
+      URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
+      LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+            SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
+                + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        if (segmentMetadataFile.exists()) {
+          FileUtils.forceDelete(segmentMetadataFile);
+        }
+        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
+      } else {
+        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      }
+      segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+      segmentURIs.add(segmentName);
+      segmentURIs.add(segmentUriPath);
+    }
+
+    File allSegmentsMetadataTarFile = 
createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap);

Review Comment:
   yes, that is correct.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -555,6 +723,67 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart 
multiPart,
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/v3/segments")

Review Comment:
   sure, will make this change.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -359,6 +368,150 @@ public static void 
sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
     }
   }
 
+  public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, List<Header> headers, 
List<NameValuePair> parameters)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    Map<String, File> segmentMetadataFileMap = new HashMap<>();
+    List<String> segmentURIs = new ArrayList<>();
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for 
table: {}", segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      // segments stored in Pinot deep store do not have .tar.gz extension
+      String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+          ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+      SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+      File segmentMetadataFile;
+      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
+      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
+      // which requires downloading the entire segment tar gz file.
+
+      URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
+      LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+            SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
+                + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        if (segmentMetadataFile.exists()) {
+          FileUtils.forceDelete(segmentMetadataFile);
+        }
+        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
+      } else {
+        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      }
+      segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+      segmentURIs.add(segmentName);
+      segmentURIs.add(segmentUriPath);
+    }
+
+    File allSegmentsMetadataTarFile = 
createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap);
+    Map<String, File> allSegmentsMetadataMap = new HashMap<>();
+    // the key is unused in batch upload mode and hence 'noopKey'
+    allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile);
+
+    // perform metadata push in batch mode for every cluster
+    try {
+      for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+        URI controllerURI;
+        try {
+          controllerURI = new URI(pinotClusterSpec.getControllerURI());
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Got invalid controller uri: " + 
pinotClusterSpec.getControllerURI());
+        }
+        LOGGER.info("Pushing segments: {} to location: {} for table: {}",
+            segmentMetadataFileMap.keySet(), controllerURI, tableName);
+        int attempts = 1;
+        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
+          attempts = spec.getPushJobSpec().getPushAttempts();
+        }
+        long retryWaitMs = 1000L;
+        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+          retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+        }
+        RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+          List<Header> reqHttpHeaders = new ArrayList<>(headers);
+          try {
+            addHeaders(spec, reqHttpHeaders);
+            URI segmentUploadURI = getBatchSegmentUploadURI(controllerURI);
+            SimpleHttpResponse response = 
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI,
+                allSegmentsMetadataMap, reqHttpHeaders, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+            LOGGER.info("Response for pushing table {} segments {} to location 
{} - {}: {}", tableName,
+                segmentMetadataFileMap.keySet(), controllerURI, 
response.getStatusCode(), response.getResponse());
+            return true;
+          } catch (HttpErrorStatusException e) {
+            int statusCode = e.getStatusCode();
+            if (statusCode >= 500) {
+              // Temporary exception
+              LOGGER.warn("Caught temporary exception while pushing table: {} 
segments: {} to {}, will retry",
+                  tableName, segmentMetadataFileMap.keySet(), controllerURI, 
e);
+              return false;
+            } else {
+              // Permanent exception
+              LOGGER.error("Caught permanent exception while pushing table: {} 
segments: {} to {}, won't retry",
+                  tableName, segmentMetadataFileMap.keySet(), controllerURI, 
e);
+              throw e;
+            }
+          }
+        });
+      }
+    } finally {
+      for (Map.Entry<String, File> metadataFileEntry: 
segmentMetadataFileMap.entrySet()) {
+        FileUtils.deleteQuietly(metadataFileEntry.getValue());
+      }
+      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+    }
+  }
+
+  private static URI getBatchSegmentUploadURI(URI controllerURI)
+      throws URISyntaxException {
+    return FileUploadDownloadClient.getBatchSegmentUploadURI(controllerURI);
+  }
+
+  private static void addHeaders(SegmentGenerationJobSpec jobSpec, 
List<Header> headers) {
+    headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+        FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+    if (jobSpec.getPushJobSpec() != null) {
+      headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+          
String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+    }
+  }
+
+  // Method helps create an uber tar file which contains the metadata files 
for all segments that are to be uploaded.
+  // Additionally, it contains a segmentName to segmentDownloadURI mapping 
file.
+  @VisibleForTesting
+  static File createSegmentsMetadataTarFile(List<String> segmentURIs, 
Map<String, File> segmentMetadataFileMap)

Review Comment:
   yes, while we have not tested the upper bound, it would easily work for few 
hundred segments since we are only sending the metadata files.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -403,6 +421,156 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, TableType tabl
     }
   }
 
+  // Method used to update a list of segments in batch mode with the METADATA 
upload type.
+  private SuccessResponse uploadSegments(String tableName, TableType 
tableType, FormDataMultiPart multiPart,
+      boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders 
headers, Request request) {
+    long segmentsUploadStartTimeMs = System.currentTimeMillis();
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    String tableNameWithType = tableType == TableType.OFFLINE ? 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+        : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to fetch table 
config for table: " + tableNameWithType,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String clientAddress;
+    try {
+      clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
+    } catch (UnknownHostException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to resolve 
hostname from input request",
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    String uploadTypeStr = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+    FileUploadType uploadType = getUploadType(uploadTypeStr);
+    if (!FileUploadType.METADATA.equals(uploadType)) {
+      throw new ControllerApplicationException(LOGGER, "Unsupported upload 
type: " + uploadTypeStr,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String crypterClassNameInHeader = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+    String ingestionDescriptor = extractHttpHeader(headers, 
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+    ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
+    List<SegmentUploadMetadata> segmentUploadMetadataList = new ArrayList<>();
+    List<File> tempFiles = new ArrayList<>();
+    List<String> segmentNames = new ArrayList<>();
+    Map<String, SegmentMetadataInfo> segmentsMetadataInfoMap = 
createSegmentsMetadataInfoMap(multiPart);
+    LOGGER.info("Uploading segments in batch mode of size: {}", 
segmentsMetadataInfoMap.size());
+
+    try {
+      int entryCount = 0;
+      for (Map.Entry<String, SegmentMetadataInfo> entry: 
segmentsMetadataInfoMap.entrySet()) {
+        String segmentName = entry.getKey();
+        SegmentMetadataInfo segmentMetadataInfo = entry.getValue();
+        segmentNames.add(segmentName);
+        File tempEncryptedFile;
+        File tempDecryptedFile;
+        File tempSegmentDir;
+        String sourceDownloadURIStr = 
segmentMetadataInfo.getSegmentDownloadURI();
+        if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+          throw new ControllerApplicationException(LOGGER,
+              "'DOWNLOAD_URI' is required as a field within the multipart 
object for METADATA batch upload mode.",
+              Response.Status.BAD_REQUEST);
+        }
+        // The downloadUri for putting into segment zk metadata
+        String segmentDownloadURIStr = sourceDownloadURIStr;
+
+        String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
+        tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName + ENCRYPTED_SUFFIX);
+        tempFiles.add(tempEncryptedFile);
+        tempDecryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
+        tempFiles.add(tempDecryptedFile);
+        tempSegmentDir = new File(provider.getUntarredFileTempDir(), 
tempFileName);
+        tempFiles.add(tempSegmentDir);
+
+        boolean encryptSegment = 
StringUtils.isNotEmpty(crypterClassNameInHeader);
+        File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile;
+        // override copySegmentToFinalLocation if override provided in 
headers:COPY_SEGMENT_TO_DEEP_STORE
+        // else set to false for backward compatibility
+        String copySegmentToDeepStore = extractHttpHeader(headers,
+            FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+        boolean copySegmentToFinalLocation = 
Boolean.parseBoolean(copySegmentToDeepStore);
+        createSegmentFileFromSegmentMetadataInfo(segmentMetadataInfo, 
destFile);
+        if (encryptSegment) {
+          decryptFile(crypterClassNameInHeader, tempEncryptedFile, 
tempDecryptedFile);
+        }
+
+        String metadataProviderClass = 
DefaultMetadataExtractor.class.getName();
+        SegmentMetadata segmentMetadata = 
getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);
+        LOGGER.info("Processing upload request for segment: {} of table: {} 
with upload type: {} from client: {}, "
+                + "ingestion descriptor: {}", segmentName, tableNameWithType, 
uploadType, clientAddress,
+            ingestionDescriptor);
+
+        // Validate segment
+        if (tableConfig.getIngestionConfig() == null || 
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
+          SegmentValidationUtils.validateTimeInterval(segmentMetadata, 
tableConfig);
+        }
+        // TODO: Include the un-tarred segment size when using the METADATA 
push rest API. Currently we can only use the
+        //  tarred segment size as an approximation.
+        long segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr);
+        long untarredSegmentSizeInBytes = 0L;
+        if (segmentSizeInBytes > 0) {
+          untarredSegmentSizeInBytes = segmentSizeInBytes;
+        }
+        SegmentValidationUtils.checkStorageQuota(segmentName, 
untarredSegmentSizeInBytes, tableConfig, _controllerConf,

Review Comment:
   This observation is correct and quota check is not evaluated correctly in 
batch mode. Couple of issues here:
   1) quota check can be moved inside the if block prior to calling 
`zkOperator.completeSegmentsOperations`
   2) The segment size used here is the metadata file size as opposed to the 
actual segment size. This is the case for the existing single segment upload as 
well as the batch mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to