swaminathanmanish commented on code in PR #13646: URL: https://github.com/apache/pinot/pull/13646#discussion_r1726827037
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java: ########## @@ -359,6 +368,150 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino } } + public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, + Map<String, String> segmentUriToTarPathMap, List<Header> headers, List<NameValuePair> parameters) + throws Exception { + String tableName = spec.getTableSpec().getTableName(); + Map<String, File> segmentMetadataFileMap = new HashMap<>(); + List<String> segmentURIs = new ArrayList<>(); + LOGGER.info("Start pushing segment metadata: {} to locations: {} for table: {}", segmentUriToTarPathMap, + Arrays.toString(spec.getPinotClusterSpecs()), tableName); + for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { + String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath); + String fileName = new File(tarFilePath).getName(); + // segments stored in Pinot deep store do not have .tar.gz extension + String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT) + ? fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()) : fileName; + SegmentNameUtils.validatePartialOrFullSegmentName(segmentName); + File segmentMetadataFile; + // Check if there is a segment metadata tar gz file named `segmentName.metadata.tar.gz`, already in the remote + // directory. This is to avoid generating a new segment metadata tar gz file every time we push a segment, + // which requires downloading the entire segment tar gz file. + + URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, segmentName); + LOGGER.info("Checking if metadata tar gz file {} exists", metadataTarGzFilePath); + if (spec.getPushJobSpec().isPreferMetadataTarGz() && fileSystem.exists(metadataTarGzFilePath)) { + segmentMetadataFile = new File(FileUtils.getTempDirectory(), + SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + UUID.randomUUID() + + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + if (segmentMetadataFile.exists()) { + FileUtils.forceDelete(segmentMetadataFile); + } + fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile); + } else { + segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + } + segmentMetadataFileMap.put(segmentName, segmentMetadataFile); + segmentURIs.add(segmentName); + segmentURIs.add(segmentUriPath); + } + + File allSegmentsMetadataTarFile = createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap); + Map<String, File> allSegmentsMetadataMap = new HashMap<>(); + // the key is unused in batch upload mode and hence 'noopKey' + allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile); + + // perform metadata push in batch mode for every cluster + try { + for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { + URI controllerURI; + try { + controllerURI = new URI(pinotClusterSpec.getControllerURI()); + } catch (URISyntaxException e) { + throw new RuntimeException("Got invalid controller uri: " + pinotClusterSpec.getControllerURI()); + } + LOGGER.info("Pushing segments: {} to location: {} for table: {}", + segmentMetadataFileMap.keySet(), controllerURI, tableName); + int attempts = 1; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) { + attempts = spec.getPushJobSpec().getPushAttempts(); + } + long retryWaitMs = 1000L; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) { + retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis(); + } + RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { + List<Header> reqHttpHeaders = new ArrayList<>(headers); + try { + addHeaders(spec, reqHttpHeaders); + URI segmentUploadURI = getBatchSegmentUploadURI(controllerURI); + SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI, + allSegmentsMetadataMap, reqHttpHeaders, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); + LOGGER.info("Response for pushing table {} segments {} to location {} - {}: {}", tableName, + segmentMetadataFileMap.keySet(), controllerURI, response.getStatusCode(), response.getResponse()); + return true; + } catch (HttpErrorStatusException e) { + int statusCode = e.getStatusCode(); + if (statusCode >= 500) { + // Temporary exception + LOGGER.warn("Caught temporary exception while pushing table: {} segments: {} to {}, will retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, e); + return false; + } else { + // Permanent exception + LOGGER.error("Caught permanent exception while pushing table: {} segments: {} to {}, won't retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, e); + throw e; + } + } + }); + } + } finally { + for (Map.Entry<String, File> metadataFileEntry: segmentMetadataFileMap.entrySet()) { + FileUtils.deleteQuietly(metadataFileEntry.getValue()); + } + FileUtils.forceDelete(allSegmentsMetadataTarFile); + } + } + + private static URI getBatchSegmentUploadURI(URI controllerURI) + throws URISyntaxException { + return FileUploadDownloadClient.getBatchSegmentUploadURI(controllerURI); + } + + private static void addHeaders(SegmentGenerationJobSpec jobSpec, List<Header> headers) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString())); + if (jobSpec.getPushJobSpec() != null) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, + String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush()))); + } + } + + // Method helps create an uber tar file which contains the metadata files for all segments that are to be uploaded. Review Comment: Could you also explain why we need to do this in the comment ? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java: ########## @@ -276,6 +336,144 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se } } + // process a batch of existing segments Review Comment: Are we not able to leverage the existing method processExistingSegment for a single segment, to avoid duplication of code ? Basically reuse this method as iterate over list of segments. Same for processNewSegments and other methods in ZkOperator. ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java: ########## @@ -110,6 +115,61 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata } } + // Complete segment operations for a list of segments in batch mode + public void completeSegmentsOperations(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, + List<SegmentUploadMetadata> segmentUploadMetadataList) + throws Exception { + boolean refreshOnly = + Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY)); + List<SegmentUploadMetadata> newSegmentsList = new ArrayList<>(); + List<SegmentUploadMetadata> existingSegmentsList = new ArrayList<>(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); + + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, + existingSegmentMetadataZNRecord, enableParallelPushProtection)) { + LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", + tableNameWithType, segmentName); + Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), + "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); + existingSegmentMetadataZNRecord = null; + } + + if (existingSegmentMetadataZNRecord == null) { + // Add a new segment + if (refreshOnly) { + throw new ControllerApplicationException(LOGGER, + String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), + Response.Status.GONE); + } + LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); + newSegmentsList.add(segmentUploadMetadata); + } else { + // Refresh an existing segment + if (!allowRefresh) { + // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the + // check done up-front but ends up getting created before the check here, we could incorrectly refresh an + // existing segment. + throw new ControllerApplicationException(LOGGER, + String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, + tableNameWithType), Response.Status.CONFLICT); + } + LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); + segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord); + existingSegmentsList.add(segmentUploadMetadata); + } + } + // process new segments + processNewSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, newSegmentsList); + + // process existing segments + processExistingSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, existingSegmentsList); Review Comment: Just checking if processExistingSegments is idempotent. When there's a batch call for refresh and a failure, the client can safely call batch upload with refresh again and it should be no-op for previously completed ones in the batch ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2358,6 +2358,84 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { } } + // Assign a list of segments in batch mode + public void assignTableSegments(String tableNameWithType, List<String> segmentNames) { + Map<String, String> segmentZKMetadataPathMap = new HashMap<>(); + for (String segmentName: segmentNames) { + String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, + segmentName); + segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); + } + // Assign instances for the segment and add it into IdealState + try { + TableConfig tableConfig = getTableConfig(tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + fetchOrComputeInstancePartitions(tableNameWithType, tableConfig); + + // Initialize tier information only in case direct tier assignment is configured + if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { + List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), + TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); + for (String segmentName: segmentNames) { + // Update segment tier to support direct assignment for multiple data directories + updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); + InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( + tableNameWithType, segmentName, sortedTiers, _helixZkManager); + if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + // Override instance partitions for offline table + LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", + tierInstancePartitions, segmentName, tableNameWithType); + instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + } + } + } + + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); + synchronized (getIdealStateUpdaterLock(tableNameWithType)) { + long segmentAssignmentStartTs = System.currentTimeMillis(); + Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + assert idealState != null; + for (String segmentName: segmentNames) { + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + SegmentStateModel.ONLINE)); + } + } + return idealState; + }); + LOGGER.info("Added {} segments: {} to IdealState for table: {} in {} ms", segmentNames.size(), segmentNames, + tableNameWithType, System.currentTimeMillis() - segmentAssignmentStartTs); + } + } catch (Exception e) { + LOGGER.error( Review Comment: Curious if there are existing tests for this failure path? ########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java: ########## @@ -80,4 +101,40 @@ public void testGenerateSegmentMetadataURI() SegmentPushUtils.generateSegmentMetadataURI("hdfs://a/b/c/my-segment.tar.gz", "my-segment"), URI.create("hdfs://a/b/c/my-segment.metadata.tar.gz")); } + + @Test + public void testCreateSegmentsMetadataTarFile() throws IOException { Review Comment: Thanks for writing tests for this :) createSegmentsMetadataTarFile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org