liuchang0520 commented on a change in pull request #6778: URL: https://github.com/apache/pinot/pull/6778#discussion_r702036777
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1243,4 +1327,167 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + // Only fetch recently created LLC segment to alleviate ZK access. + // Validate segment creation time from segment name. + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + if (currentTimeMs - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) { + continue; + } + + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without deep store download url + if (segmentZKMetadata.getStatus() == Status.DONE + && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(tableNameWithType, + ControllerMeter.LLC_SEGMENTS_ZK_METADATA_PREFETCH_ERROR, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK. + * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload), + * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers. + * + * @see <a href=" + * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion + * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToDeepStoreIfMissing(TableConfig tableConfig) { Review comment: From `RealtimeSegmentValidationManagerprocessTable` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org