liuchang0520 commented on a change in pull request #6778: URL: https://github.com/apache/pinot/pull/6778#discussion_r698067226
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1228,4 +1301,146 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + // Only fetch recently created LLC segment to alleviate ZK access. Validate segment creation time from segment name. + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + if (currentTimeMs - llcSegmentName.getCreationTimeMs() > _validationRangeForLLCSegmentsDeepStoreCopyMs) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); Review comment: Yes, it waits for the server response which has the download url, and update the ZK metadata. This is based on the logic in `ControllerPeriodicTask.processTables`. Then in the `RealtimeSegmentValidationManager.processTable`, each task is performed sync and sequential. For deep store upload fix, do you think which one is better: **async** vs **sync**? Most of the time, the missing deep store copy is rare. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org