liuchang0520 commented on a change in pull request #6778: URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r655897035
########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java ########## @@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream return uploadSegment(uri, segmentName, inputStream, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS); } + /** + * Controller periodic task uses this endpoint to ask servers to upload committed llc segment to segment store if missing. + * @param uri The uri to ask servers to upload segment to segment store + * @return the uploaded segment download url from segment store + * @throws URISyntaxException + * @throws IOException + * @throws HttpErrorStatusException + */ + public String uploadToSegmentStore(String uri) Review comment: This function was moved back to this class after we realized we need to utilize `SSLContext` in `FileUploadDownloadClient`. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -135,9 +158,20 @@ private final Lock[] _idealStateUpdateLocks; private final TableConfigCache _tableConfigCache; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; + private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled; private volatile boolean _isStopping = false; private AtomicInteger _numCompletingSegments = new AtomicInteger(0); + private FileUploadDownloadClient _fileUploadDownloadClient; + /** + * Map caching the LLC segment names without deep store download uri. + * Controller gets the LLC segment names from this map, and asks servers to upload the segments to segment store. + * This helps to alleviates excessive ZK access when fetching LLC segment list. + * Key: table name; Value: LLC segment names to be uploaded to segment store. + */ + private Map<String, Queue<String>> _llcSegmentMapForUpload; Review comment: There is no notion of ordering here. But I want to utilize the `java.util.concurrent` to build a concurrent list - during the committing phase, there may be concurrent modification when adding segment to the same list (the segments in the same table). And I think `ConcurrentLinkedQueue` is a fit for this use case. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,92 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + if (_isStopping) { + LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName); + return; + } + + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD)) { + try { + if (!isExceededMaxSegmentCompletionTime(realtimeTableName, segmentName, getCurrentTimeMs())) { + continue; + } + LOGGER.info("Fixing llc segment {} whose segment store copy is unavailable", segmentName); + + // Find servers which have online replica + List<URI> peerSegmentURIs = PeerServerSegmentFinder + .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager); + if (peerSegmentURIs.isEmpty()) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR, 1L); + LOGGER.error("Failed to upload segment {} to segment store because no online replica is found", segmentName); + continue; + } + + // Randomly ask one server to upload + URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size())); + String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); + LOGGER.info("Ask server to upload llc segment {} to segment store by this path: {}", segmentName, serverUploadRequestUrl); + String segmentDownloadUrl = uploadLLCSegmentByServer(serverUploadRequestUrl); + + // Update the segment ZK metadata to include segment download url + if (segmentDownloadUrl.isEmpty()) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR, 1L); + LOGGER.error("Failed to upload segment {} to segment store: no segment download url is returned from server.", segmentName); + continue; + } + segmentZKMetadata.setDownloadUrl(segmentDownloadUrl); + persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1); + LOGGER.info("Successfully uploaded llc segment {} to segment store with download url: {}", segmentName, segmentDownloadUrl); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR, 1L); + LOGGER.error("Failed to upload segment {} to segment store", segmentName, e); + } + } + } + } + + /** + * Helper function to ask server to upload llc segment to segment store + */ + private String uploadLLCSegmentByServer(String serverUploadRequestUrl) + throws URISyntaxException, IOException, HttpErrorStatusException { + HttpUriRequest request = buildServerUploadRequest(serverUploadRequestUrl); + try (CloseableHttpResponse response = _httpClient.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + String responseStr = response.getEntity() == null ? "" : EntityUtils.toString(response.getEntity()); + if (statusCode >= 300) { + StringBuilder errorMsgBuilder = new StringBuilder(String.format("Failed to ask server to upload LLC segment to segment store by url: %s. ", serverUploadRequestUrl)); + if (!responseStr.isEmpty()) { + errorMsgBuilder.append(responseStr); + } + throw new HttpErrorStatusException(errorMsgBuilder.toString(), statusCode); + } + + return responseStr; + } + } + + @VisibleForTesting + HttpUriRequest buildServerUploadRequest(String serverUploadRequestUrl) Review comment: @mcvsubbu FYI. ########## File path: pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java ########## @@ -67,6 +67,12 @@ // Percentage of segments we failed to get size for TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false), + // Number of errors during segment store upload retry of LLC segment + NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY("LLCSegmentDeepStoreUploadRetryError", false), Review comment: This change was based on your comment: https://github.com/apache/incubator-pinot/pull/6778#discussion_r644171605 . Did I misunderstand this comment? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java ########## @@ -67,6 +67,22 @@ public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourc Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0); } + @Override + protected void setUpTask() { + // Prefetch the LLC segment without segment store copy from ZK, which helps to alleviate ZK access. Review comment: TODO is above this function: [line 71](https://github.com/apache/incubator-pinot/pull/6778/files#diff-0a9ffdab99ed314e7a8e58bf86c125f020b7d6f86493d1f6d719beab62b7f106R71). ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1287,147 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + // Pre-fetch the LLC segments without deep store copy. + public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != TableType.REALTIME) { + return; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}", tableNameWithType); + return; + } + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + if (!streamConfig.hasLowLevelConsumerType()) { + return; + } + + long currentTimeMs = getCurrentTimeMs(); + List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType); + for (String segmentName : segmentNames) { + try { + if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) { + continue; + } + + LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat()); + // Cache the committed LLC segments without segment store download url + if (segmentZKMetadata.getStatus() == Status.DONE && + CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) { + cacheLLCSegmentNameForUpload(tableNameWithType, segmentName); + } + } catch (Exception e) { + _controllerMetrics.addValueToTableGauge(tableNameWithType, + ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L); + LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName); + } + } + } + + /** + * Only validate recently created LLC segment for missing deep store download url. + * The time range check is based on segment name. This step helps to alleviate ZK access. + */ + private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + long creationTimeMs = llcSegmentName.getCreationTimeMs(); + return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs; + } + + /** + * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. + * Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + if (_isStopping) { + LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName); Review comment: Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org