mcvsubbu commented on a change in pull request #6778: URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r618529894
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); Review comment: This should not be precondition. If segment manager is stopping, just return with an info log. Include table name in the log ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { Review comment: We should think of a different way to do this. Getting all the segment metadata for all tables every time this task runs can overload zookeeper. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && (segmentZKMetadata.getDownloadUrl() == null || segmentZKMetadata.getDownloadUrl().isEmpty() || segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))) { + try { + if (!isExceededMaxSegmentCompletionTime(realtimeTableName, segmentName, getCurrentTimeMs())) { + continue; + } + LOGGER.info("Fixing llc segment {} whose segment store copy is unavailable", segmentName); + + // Find servers which have online replica + List<URI> peerSegmentURIs = PeerServerSegmentFinder + .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager); + if (peerSegmentURIs.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store because no online replica is found", segmentName); Review comment: Bump a metric here, so we can monitor it. Beter, throw an exception so that it can be caught below and metric bumped ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && (segmentZKMetadata.getDownloadUrl() == null || segmentZKMetadata.getDownloadUrl().isEmpty() || segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))) { + try { + if (!isExceededMaxSegmentCompletionTime(realtimeTableName, segmentName, getCurrentTimeMs())) { + continue; + } + LOGGER.info("Fixing llc segment {} whose segment store copy is unavailable", segmentName); + + // Find servers which have online replica + List<URI> peerSegmentURIs = PeerServerSegmentFinder + .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager); + if (peerSegmentURIs.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store because no online replica is found", segmentName); + continue; + } + + // Randomly ask one server to upload + URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size())); + String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); + LOGGER.info("Ask server to upload llc segment to segment store by this path: {}", serverUploadRequestUrl); + String segmentDownloadUrl = uploadLLCSegmentByServer(serverUploadRequestUrl); + + // Update the segment ZK metadata to include segment download url + if (segmentDownloadUrl.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store: no segment download url is returned from server.", segmentName); + continue; + } + segmentZKMetadata.setDownloadUrl(segmentDownloadUrl); + persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1); + LOGGER.info("Successfully uploaded llc segment {} to segment store", segmentName); Review comment: Add the download url to this log ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && (segmentZKMetadata.getDownloadUrl() == null || segmentZKMetadata.getDownloadUrl().isEmpty() || segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))) { + try { + if (!isExceededMaxSegmentCompletionTime(realtimeTableName, segmentName, getCurrentTimeMs())) { + continue; + } + LOGGER.info("Fixing llc segment {} whose segment store copy is unavailable", segmentName); + + // Find servers which have online replica + List<URI> peerSegmentURIs = PeerServerSegmentFinder + .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager); + if (peerSegmentURIs.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store because no online replica is found", segmentName); + continue; + } + + // Randomly ask one server to upload + URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size())); + String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); + LOGGER.info("Ask server to upload llc segment to segment store by this path: {}", serverUploadRequestUrl); + String segmentDownloadUrl = uploadLLCSegmentByServer(serverUploadRequestUrl); + + // Update the segment ZK metadata to include segment download url + if (segmentDownloadUrl.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store: no segment download url is returned from server.", segmentName); + continue; Review comment: bump a metric. better, throw an exception ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && (segmentZKMetadata.getDownloadUrl() == null || segmentZKMetadata.getDownloadUrl().isEmpty() || segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))) { + try { + if (!isExceededMaxSegmentCompletionTime(realtimeTableName, segmentName, getCurrentTimeMs())) { + continue; + } + LOGGER.info("Fixing llc segment {} whose segment store copy is unavailable", segmentName); + + // Find servers which have online replica + List<URI> peerSegmentURIs = PeerServerSegmentFinder + .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager); + if (peerSegmentURIs.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store because no online replica is found", segmentName); + continue; + } + + // Randomly ask one server to upload + URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size())); + String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); + LOGGER.info("Ask server to upload llc segment to segment store by this path: {}", serverUploadRequestUrl); + String segmentDownloadUrl = uploadLLCSegmentByServer(serverUploadRequestUrl); + + // Update the segment ZK metadata to include segment download url + if (segmentDownloadUrl.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store: no segment download url is returned from server.", segmentName); + continue; + } + segmentZKMetadata.setDownloadUrl(segmentDownloadUrl); + persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1); + LOGGER.info("Successfully uploaded llc segment {} to segment store", segmentName); + } catch (Exception e) { + LOGGER.error("Failed to upload segment {} to segment store", segmentName, e); Review comment: Bump a metric ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1243,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && (segmentZKMetadata.getDownloadUrl() == null || segmentZKMetadata.getDownloadUrl().isEmpty() || segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))) { + try { + if (!isExceededMaxSegmentCompletionTime(realtimeTableName, segmentName, getCurrentTimeMs())) { + continue; + } + LOGGER.info("Fixing llc segment {} whose segment store copy is unavailable", segmentName); + + // Find servers which have online replica + List<URI> peerSegmentURIs = PeerServerSegmentFinder + .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager); + if (peerSegmentURIs.isEmpty()) { + LOGGER.error("Failed to upload segment {} to segment store because no online replica is found", segmentName); + continue; + } + + // Randomly ask one server to upload + URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size())); + String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); + LOGGER.info("Ask server to upload llc segment to segment store by this path: {}", serverUploadRequestUrl); Review comment: Include segment name in the log ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java ########## @@ -100,6 +100,7 @@ protected void processTable(String tableNameWithType, Context context) { IngestionConfigUtils.getStreamConfigMap(tableConfig)); if (streamConfig.hasLowLevelConsumerType()) { _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig); + _llcRealtimeSegmentManager.uploadToSegmentStoreIfMissing(tableConfig); Review comment: +1 ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1239,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && (segmentZKMetadata.getDownloadUrl() == null || segmentZKMetadata.getDownloadUrl().isEmpty() || segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))) { Review comment: The convention is that if a segment is in DONE state, then the value is either METADATA_URI_FOR_PEER_DOWNLOAD or the real one. If this is not followed, other things will break. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1214,4 +1239,86 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup; } } + + /** + * Validate the committed low level consumer segments to see if its segment store copy is available. Fix the missing segment store copy by asking servers to upload to segment store. + * Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + // Get all the LLC segment ZK metadata for this table + List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, realtimeTableName); + + // Iterate through llc segments and upload missing segment store copy by following steps: + // 1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading. + // 2. Update the llc segment ZK metadata by adding segment store download url. + for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + String segmentName = segmentZKMetadata.getSegmentName(); + // Only fix the committed llc segment without segment store copy + if (segmentZKMetadata.getStatus() == Status.DONE && (segmentZKMetadata.getDownloadUrl() == null || segmentZKMetadata.getDownloadUrl().isEmpty() || segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))) { + try { + if (!isExceededMaxSegmentCompletionTime(realtimeTableName, segmentName, getCurrentTimeMs())) { Review comment: +1 not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org