liuchang0520 commented on a change in pull request #6778: URL: https://github.com/apache/pinot/pull/6778#discussion_r701504812
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1287,147 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addValueToTableGauge(tableNameWithType, + ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Only validate recently created LLC segment for missing deep store download url. + * The time range check is based on segment name. This step helps to alleviate ZK access. + */ + private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + long creationTimeMs = llcSegmentName.getCreationTimeMs(); + return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs; + } + + /** + * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. + * Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + if (_isStopping) { + LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName); + return; + } + + Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName); + if (segmentQueue == null || segmentQueue.isEmpty()) { + return; + } + + // Store the segments to be fixed again in the case of fix failure, or skip in this round + Queue<String> segmentsNotFixed = new LinkedList<>(); + RetentionStrategy retentionStrategy = new TimeRetentionStrategy( + TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()), + Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue())); + + // Iterate through LLC segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the LLC segment ZK metadata by adding segment store download url. + while (!segmentQueue.isEmpty()) { + String segmentName = segmentQueue.poll(); + // Check if it's null in case of the while condition doesn't stand true anymore in the step of dequeue. Dequeue returns null if queue is empty. + if (segmentName == null) { + break; + } + + try { + Stat stat = new Stat(); + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat); + // if the download url is already fixed, skip the fix for this segment. + if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + LOGGER.info("Skipped fixing LLC segment {} whose deep store download url is already available", segmentName); + continue; + } + // skip the fix for the segment if it is already out of retention. + if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) { Review comment: Thanks @mcvsubbu . I added a value to avoid this issue: `MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org