swaminathanmanish commented on code in PR #13646:
URL: https://github.com/apache/pinot/pull/13646#discussion_r1705843939


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -359,6 +368,150 @@ public static void 
sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
     }
   }
 
+  public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, List<Header> headers, 
List<NameValuePair> parameters)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    Map<String, File> segmentMetadataFileMap = new HashMap<>();
+    List<String> segmentURIs = new ArrayList<>();
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for 
table: {}", segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      // segments stored in Pinot deep store do not have .tar.gz extension
+      String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+          ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+      SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+      File segmentMetadataFile;
+      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
+      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
+      // which requires downloading the entire segment tar gz file.
+
+      URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
+      LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+            SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
+                + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        if (segmentMetadataFile.exists()) {
+          FileUtils.forceDelete(segmentMetadataFile);
+        }
+        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
+      } else {
+        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      }
+      segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+      segmentURIs.add(segmentName);
+      segmentURIs.add(segmentUriPath);
+    }
+
+    File allSegmentsMetadataTarFile = 
createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap);
+    Map<String, File> allSegmentsMetadataMap = new HashMap<>();
+    // the key is unused in batch upload mode and hence 'noopKey'
+    allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile);
+
+    // perform metadata push in batch mode for every cluster
+    try {
+      for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+        URI controllerURI;
+        try {
+          controllerURI = new URI(pinotClusterSpec.getControllerURI());
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Got invalid controller uri: " + 
pinotClusterSpec.getControllerURI());
+        }
+        LOGGER.info("Pushing segments: {} to location: {} for table: {}",
+            segmentMetadataFileMap.keySet(), controllerURI, tableName);
+        int attempts = 1;
+        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
+          attempts = spec.getPushJobSpec().getPushAttempts();
+        }
+        long retryWaitMs = 1000L;
+        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+          retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+        }
+        RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+          List<Header> reqHttpHeaders = new ArrayList<>(headers);
+          try {
+            addHeaders(spec, reqHttpHeaders);
+            URI segmentUploadURI = getBatchSegmentUploadURI(controllerURI);
+            SimpleHttpResponse response = 
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI,
+                allSegmentsMetadataMap, reqHttpHeaders, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+            LOGGER.info("Response for pushing table {} segments {} to location 
{} - {}: {}", tableName,
+                segmentMetadataFileMap.keySet(), controllerURI, 
response.getStatusCode(), response.getResponse());
+            return true;
+          } catch (HttpErrorStatusException e) {
+            int statusCode = e.getStatusCode();
+            if (statusCode >= 500) {
+              // Temporary exception
+              LOGGER.warn("Caught temporary exception while pushing table: {} 
segments: {} to {}, will retry",
+                  tableName, segmentMetadataFileMap.keySet(), controllerURI, 
e);
+              return false;
+            } else {
+              // Permanent exception
+              LOGGER.error("Caught permanent exception while pushing table: {} 
segments: {} to {}, won't retry",
+                  tableName, segmentMetadataFileMap.keySet(), controllerURI, 
e);
+              throw e;
+            }
+          }
+        });
+      }
+    } finally {
+      for (Map.Entry<String, File> metadataFileEntry: 
segmentMetadataFileMap.entrySet()) {
+        FileUtils.deleteQuietly(metadataFileEntry.getValue());
+      }
+      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+    }
+  }
+
+  private static URI getBatchSegmentUploadURI(URI controllerURI)
+      throws URISyntaxException {
+    return FileUploadDownloadClient.getBatchSegmentUploadURI(controllerURI);
+  }
+
+  private static void addHeaders(SegmentGenerationJobSpec jobSpec, 
List<Header> headers) {
+    headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+        FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+    if (jobSpec.getPushJobSpec() != null) {
+      headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+          
String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+    }
+  }
+
+  // Method helps create an uber tar file which contains the metadata files 
for all segments that are to be uploaded.
+  // Additionally, it contains a segmentName to segmentDownloadURI mapping 
file.
+  @VisibleForTesting
+  static File createSegmentsMetadataTarFile(List<String> segmentURIs, 
Map<String, File> segmentMetadataFileMap)

Review Comment:
   Im assuming we dont need a limit here. This is capable of supporting batch 
push of 100's of segments?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -359,6 +368,150 @@ public static void 
sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
     }
   }
 
+  public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, List<Header> headers, 
List<NameValuePair> parameters)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    Map<String, File> segmentMetadataFileMap = new HashMap<>();
+    List<String> segmentURIs = new ArrayList<>();
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for 
table: {}", segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      // segments stored in Pinot deep store do not have .tar.gz extension
+      String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+          ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+      SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+      File segmentMetadataFile;
+      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
+      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
+      // which requires downloading the entire segment tar gz file.
+
+      URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
+      LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+            SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
+                + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        if (segmentMetadataFile.exists()) {
+          FileUtils.forceDelete(segmentMetadataFile);
+        }
+        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
+      } else {
+        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      }
+      segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+      segmentURIs.add(segmentName);
+      segmentURIs.add(segmentUriPath);
+    }
+
+    File allSegmentsMetadataTarFile = 
createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap);

Review Comment:
   In the non batch mode, do we fall back to sending one segment at a time? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -403,6 +421,156 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, TableType tabl
     }
   }
 
+  // Method used to update a list of segments in batch mode with the METADATA 
upload type.
+  private SuccessResponse uploadSegments(String tableName, TableType 
tableType, FormDataMultiPart multiPart,
+      boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders 
headers, Request request) {
+    long segmentsUploadStartTimeMs = System.currentTimeMillis();
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    String tableNameWithType = tableType == TableType.OFFLINE ? 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+        : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to fetch table 
config for table: " + tableNameWithType,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String clientAddress;
+    try {
+      clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
+    } catch (UnknownHostException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to resolve 
hostname from input request",
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    String uploadTypeStr = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+    FileUploadType uploadType = getUploadType(uploadTypeStr);
+    if (!FileUploadType.METADATA.equals(uploadType)) {
+      throw new ControllerApplicationException(LOGGER, "Unsupported upload 
type: " + uploadTypeStr,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String crypterClassNameInHeader = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+    String ingestionDescriptor = extractHttpHeader(headers, 
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+    ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
+    List<SegmentUploadMetadata> segmentUploadMetadataList = new ArrayList<>();
+    List<File> tempFiles = new ArrayList<>();
+    List<String> segmentNames = new ArrayList<>();
+    Map<String, SegmentMetadataInfo> segmentsMetadataInfoMap = 
createSegmentsMetadataInfoMap(multiPart);
+    LOGGER.info("Uploading segments in batch mode of size: {}", 
segmentsMetadataInfoMap.size());
+
+    try {
+      int entryCount = 0;
+      for (Map.Entry<String, SegmentMetadataInfo> entry: 
segmentsMetadataInfoMap.entrySet()) {
+        String segmentName = entry.getKey();
+        SegmentMetadataInfo segmentMetadataInfo = entry.getValue();
+        segmentNames.add(segmentName);
+        File tempEncryptedFile;
+        File tempDecryptedFile;
+        File tempSegmentDir;
+        String sourceDownloadURIStr = 
segmentMetadataInfo.getSegmentDownloadURI();
+        if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+          throw new ControllerApplicationException(LOGGER,
+              "'DOWNLOAD_URI' is required as a field within the multipart 
object for METADATA batch upload mode.",
+              Response.Status.BAD_REQUEST);
+        }
+        // The downloadUri for putting into segment zk metadata
+        String segmentDownloadURIStr = sourceDownloadURIStr;
+
+        String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
+        tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName + ENCRYPTED_SUFFIX);
+        tempFiles.add(tempEncryptedFile);
+        tempDecryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
+        tempFiles.add(tempDecryptedFile);
+        tempSegmentDir = new File(provider.getUntarredFileTempDir(), 
tempFileName);
+        tempFiles.add(tempSegmentDir);
+
+        boolean encryptSegment = 
StringUtils.isNotEmpty(crypterClassNameInHeader);
+        File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile;
+        // override copySegmentToFinalLocation if override provided in 
headers:COPY_SEGMENT_TO_DEEP_STORE
+        // else set to false for backward compatibility
+        String copySegmentToDeepStore = extractHttpHeader(headers,
+            FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+        boolean copySegmentToFinalLocation = 
Boolean.parseBoolean(copySegmentToDeepStore);
+        createSegmentFileFromSegmentMetadataInfo(segmentMetadataInfo, 
destFile);
+        if (encryptSegment) {
+          decryptFile(crypterClassNameInHeader, tempEncryptedFile, 
tempDecryptedFile);
+        }
+
+        String metadataProviderClass = 
DefaultMetadataExtractor.class.getName();
+        SegmentMetadata segmentMetadata = 
getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);
+        LOGGER.info("Processing upload request for segment: {} of table: {} 
with upload type: {} from client: {}, "
+                + "ingestion descriptor: {}", segmentName, tableNameWithType, 
uploadType, clientAddress,
+            ingestionDescriptor);
+
+        // Validate segment
+        if (tableConfig.getIngestionConfig() == null || 
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
+          SegmentValidationUtils.validateTimeInterval(segmentMetadata, 
tableConfig);
+        }
+        // TODO: Include the un-tarred segment size when using the METADATA 
push rest API. Currently we can only use the
+        //  tarred segment size as an approximation.
+        long segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr);
+        long untarredSegmentSizeInBytes = 0L;
+        if (segmentSizeInBytes > 0) {
+          untarredSegmentSizeInBytes = segmentSizeInBytes;
+        }
+        SegmentValidationUtils.checkStorageQuota(segmentName, 
untarredSegmentSizeInBytes, tableConfig, _controllerConf,

Review Comment:
   This only checks the quota I guess, but when we have a batch 100 segments, 
quota does not get updated as we check for every segment in this loop? A 
segment actually has to be added to the table for this to reflect in quota. In 
this case we will let in a batch of segments. 
   
   cc @shounakmk219 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to