mcvsubbu commented on a change in pull request #6778: URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r655517240
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java ########## @@ -142,6 +148,8 @@ private static long getRandomInitialDelayInSeconds() { private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60; private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60; + + private static final int DEFAULT_VALIDATION_RANGE_IN_DAYS_TO_CHECK_MISSING_SEGMENT_STORE_COPY = 3; Review comment: Please keep the units at the end of the member name (*_IN_DAYS). Also, try to shorten the name if possible ########## File path: pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java ########## @@ -67,6 +67,12 @@ // Percentage of segments we failed to get size for TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false), + // Number of errors during segment store upload retry of LLC segment + NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY("LLCSegmentDeepStoreUploadRetryError", false), Review comment: Why gauge? These two should be meters. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1287,147 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addValueToTableGauge(tableNameWithType, + ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Only validate recently created LLC segment for missing deep store download url. + * The time range check is based on segment name. This step helps to alleviate ZK access. + */ + private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + long creationTimeMs = llcSegmentName.getCreationTimeMs(); + return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs; + } + + /** + * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. + * Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + if (_isStopping) { + LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName); Review comment: If there is nothing to fix, then this is a wrong message. An operator reading this message will think that there is something to be fixed for the table. Move the logic to below line 1358 once you have ascertained that there is something to fix for the table. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1287,147 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addValueToTableGauge(tableNameWithType, + ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Only validate recently created LLC segment for missing deep store download url. + * The time range check is based on segment name. This step helps to alleviate ZK access. + */ + private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + long creationTimeMs = llcSegmentName.getCreationTimeMs(); + return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs; + } + + /** + * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. + * Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + if (_isStopping) { + LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName); + return; + } + + Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName); + if (segmentQueue == null || segmentQueue.isEmpty()) { + return; + } + + // Store the segments to be fixed again in the case of fix failure, or skip in this round + Queue<String> segmentsNotFixed = new LinkedList<>(); + RetentionStrategy retentionStrategy = new TimeRetentionStrategy( + TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()), + Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue())); + + // Iterate through LLC segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the LLC segment ZK metadata by adding segment store download url. + while (!segmentQueue.isEmpty()) { + String segmentName = segmentQueue.poll(); + // Check if it's null in case of the while condition doesn't stand true anymore in the step of dequeue. Dequeue returns null if queue is empty. + if (segmentName == null) { + break; + } + + try { + Stat stat = new Stat(); + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat); + // if the download url is already fixed, skip the fix for this segment. + if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + LOGGER.info("Skipped fixing LLC segment {} whose deep store download url is already available", segmentName); + continue; + } + // skip the fix for the segment if it is already out of retention. + if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) { Review comment: What happens if the retention is a short time away, and the retention manager walks in and starts to remove the segment while we are trying to upload it? I think we have a race condition here that we need to think about. ########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java ########## @@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream return uploadSegment(uri, segmentName, inputStream, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS); } + /** + * Controller periodic task uses this endpoint to ask servers to upload committed llc segment to segment store if missing. + * @param uri The uri to ask servers to upload segment to segment store + * @return the uploaded segment download url from segment store + * @throws URISyntaxException + * @throws IOException + * @throws HttpErrorStatusException + */ + public String uploadToSegmentStore(String uri) Review comment: How was this resolved? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1287,147 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addValueToTableGauge(tableNameWithType, + ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Only validate recently created LLC segment for missing deep store download url. + * The time range check is based on segment name. This step helps to alleviate ZK access. + */ + private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) { Review comment: remove the method and fold the logic into the place where it is called. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -135,9 +158,20 @@ private final Lock[] _idealStateUpdateLocks; private final TableConfigCache _tableConfigCache; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; + private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled; private volatile boolean _isStopping = false; private AtomicInteger _numCompletingSegments = new AtomicInteger(0); + private FileUploadDownloadClient _fileUploadDownloadClient; + /** + * Map caching the LLC segment names without deep store download uri. + * Controller gets the LLC segment names from this map, and asks servers to upload the segments to segment store. Review comment: ```suggestion * Controller gets the LLC segment names from this map, and asks servers to upload the segments to deep store. ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -122,6 +137,14 @@ * The segment will be eligible for repairs by the validation manager, if the time exceeds this value */ private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES + /** + * Controller waits this amount of time before asking servers to upload LLC segments without deep store copy. + * The reason is after step 1 of segment completion is done (segment ZK metadata status changed to be DONE), + * servers may be still in the process of loading segments. Review comment: ```suggestion * servers may be still in the process of transitioning segments from CONSUMING to ONLINE states ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -135,9 +158,20 @@ private final Lock[] _idealStateUpdateLocks; private final TableConfigCache _tableConfigCache; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; + private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled; private volatile boolean _isStopping = false; private AtomicInteger _numCompletingSegments = new AtomicInteger(0); + private FileUploadDownloadClient _fileUploadDownloadClient; + /** + * Map caching the LLC segment names without deep store download uri. + * Controller gets the LLC segment names from this map, and asks servers to upload the segments to segment store. + * This helps to alleviates excessive ZK access when fetching LLC segment list. + * Key: table name; Value: LLC segment names to be uploaded to segment store. Review comment: ```suggestion * Key: table name; Value: LLC segment names to be uploaded to deep store. ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java ########## @@ -121,6 +121,12 @@ public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS = "controller.segmentRelocator.initialDelayInSeconds"; + // configs for uploading missing LLC segments copy to segment store + public static final String ENABLE_UPLOAD_MISSING_LLC_SEGMENT_TO_SEGMENT_STORE = Review comment: nit: ENABLE_DEEP_STORE_LLC_SEGMENT_CHECK ? I don't know, I am just thinking of some shorter name here. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -135,9 +158,20 @@ private final Lock[] _idealStateUpdateLocks; private final TableConfigCache _tableConfigCache; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; + private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled; private volatile boolean _isStopping = false; private AtomicInteger _numCompletingSegments = new AtomicInteger(0); + private FileUploadDownloadClient _fileUploadDownloadClient; + /** + * Map caching the LLC segment names without deep store download uri. + * Controller gets the LLC segment names from this map, and asks servers to upload the segments to segment store. + * This helps to alleviates excessive ZK access when fetching LLC segment list. + * Key: table name; Value: LLC segment names to be uploaded to segment store. + */ + private Map<String, Queue<String>> _llcSegmentMapForUpload; Review comment: Why is the value a `Queue`? Is there a notion of ordering here? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -122,6 +137,14 @@ * The segment will be eligible for repairs by the validation manager, if the time exceeds this value */ private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES + /** + * Controller waits this amount of time before asking servers to upload LLC segments without deep store copy. Review comment: ```suggestion * Controller waits this amount of time before asking servers to upload LLC segments missing in deep store. ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1287,147 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addValueToTableGauge(tableNameWithType, + ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Only validate recently created LLC segment for missing deep store download url. + * The time range check is based on segment name. This step helps to alleviate ZK access. + */ + private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + long creationTimeMs = llcSegmentName.getCreationTimeMs(); + return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs; + } + + /** + * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. + * Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + if (_isStopping) { + LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName); + return; + } + + Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName); + if (segmentQueue == null || segmentQueue.isEmpty()) { + return; + } + + // Store the segments to be fixed again in the case of fix failure, or skip in this round + Queue<String> segmentsNotFixed = new LinkedList<>(); + RetentionStrategy retentionStrategy = new TimeRetentionStrategy( + TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()), + Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue())); + + // Iterate through LLC segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the LLC segment ZK metadata by adding segment store download url. + while (!segmentQueue.isEmpty()) { + String segmentName = segmentQueue.poll(); + // Check if it's null in case of the while condition doesn't stand true anymore in the step of dequeue. Dequeue returns null if queue is empty. + if (segmentName == null) { Review comment: I don't understand this null check. We already checked for queue empty above. Unless there are other threads modifying the queue (in which case, we need some precise synchronization rather than an if check everywhere), why check for null again? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -135,9 +158,20 @@ private final Lock[] _idealStateUpdateLocks; private final TableConfigCache _tableConfigCache; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; + private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled; private volatile boolean _isStopping = false; private AtomicInteger _numCompletingSegments = new AtomicInteger(0); + private FileUploadDownloadClient _fileUploadDownloadClient; + /** + * Map caching the LLC segment names without deep store download uri. Review comment: ```suggestion * Map caching the LLC segment names that are missing deep store download uri in segment metadata. ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1287,147 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addValueToTableGauge(tableNameWithType, + ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Only validate recently created LLC segment for missing deep store download url. + * The time range check is based on segment name. This step helps to alleviate ZK access. + */ + private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + long creationTimeMs = llcSegmentName.getCreationTimeMs(); + return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs; + } + + /** + * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. + * Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + if (_isStopping) { + LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName); + return; + } + + Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName); + if (segmentQueue == null || segmentQueue.isEmpty()) { + return; + } + + // Store the segments to be fixed again in the case of fix failure, or skip in this round + Queue<String> segmentsNotFixed = new LinkedList<>(); + RetentionStrategy retentionStrategy = new TimeRetentionStrategy( + TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()), + Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue())); + + // Iterate through LLC segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the LLC segment ZK metadata by adding segment store download url. + while (!segmentQueue.isEmpty()) { + String segmentName = segmentQueue.poll(); + // Check if it's null in case of the while condition doesn't stand true anymore in the step of dequeue. Dequeue returns null if queue is empty. + if (segmentName == null) { + break; + } + + try { + Stat stat = new Stat(); + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat); + // if the download url is already fixed, skip the fix for this segment. Review comment: Can you elaborate on how this can happen? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -135,9 +158,20 @@ private final Lock[] _idealStateUpdateLocks; private final TableConfigCache _tableConfigCache; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; + private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled; private volatile boolean _isStopping = false; private AtomicInteger _numCompletingSegments = new AtomicInteger(0); + private FileUploadDownloadClient _fileUploadDownloadClient; + /** + * Map caching the LLC segment names without deep store download uri. + * Controller gets the LLC segment names from this map, and asks servers to upload the segments to segment store. + * This helps to alleviates excessive ZK access when fetching LLC segment list. Review comment: ```suggestion * A cache helps to alleviate excessive ZK access when fetching LLC segment list. ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java ########## @@ -67,6 +67,22 @@ public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourc Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0); } + @Override + protected void setUpTask() { + // Prefetch the LLC segment without segment store copy from ZK, which helps to alleviate ZK access. Review comment: Where is the TODO? Please add it here, otherwise someone going through the code will have the same question (worse, may make the same assumption in other places). Make it clear that this may not work all the time and it is a TODO to take action when leadership is established. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { Review comment: This check does not belong here. It belongs in the place where you decide to ask the server to upload to deepstore. Please move it. Also, we don't need a method for this. it is 2 lines long. Fold it in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org