9aman commented on code in PR #15316:
URL: https://github.com/apache/pinot/pull/15316#discussion_r2007034972


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName)
   @Override
   public File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
-    Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
-        "Segment: %s is still IN_PROGRESS and cannot be downloaded", 
zkMetadata.getSegmentName());
+    String segmentName = zkMetadata.getSegmentName();
+    Status status = zkMetadata.getStatus();
+    Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is 
still IN_PROGRESS and cannot be downloaded",
+        segmentName);
 
-    // Case: The commit protocol has completed, and the segment is ready to be 
downloaded either
-    // from deep storage or from a peer (if peer-to-peer download is enabled).
-    if (zkMetadata.getStatus() == Status.DONE) {
+    // The commit protocol has completed, and the segment is ready to be 
downloaded either from deep storage or from a
+    // peer (if peer-to-peer download is enabled).
+    if (status.isCompleted()) {
       return super.downloadSegment(zkMetadata);
     }
 
     // The segment status is COMMITTING, indicating that the segment commit 
process is incomplete.
     // Attempting a waited download within the configured time limit.
-    long downloadTimeoutMilliseconds =
-        
getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore,
 _tableNameWithType));
-    final long startTime = System.currentTimeMillis();
-    List<URI> onlineServerURIs;
-    while (System.currentTimeMillis() - startTime < 
downloadTimeoutMilliseconds) {
+    Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s 
for segment: %s", status, segmentName);
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
+    long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+    long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
+    while (System.currentTimeMillis() < deadlineMs) {
       // ZK Metadata may change during segment download process; fetch it on 
every retry.
-      zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());
-
-      if (zkMetadata.getDownloadUrl() != null) {
-        // The downloadSegment() will throw an exception in case there are 
some genuine issues.
-        // We don't want to retry in those scenarios and will throw an 
exception
-        return downloadSegmentFromDeepStore(zkMetadata);
-      }
-
-      if (_peerDownloadScheme != null) {
-        _logger.info("Peer download is enabled for the segment: {}", 
zkMetadata.getSegmentName());
-        try {
-          onlineServerURIs = new ArrayList<>();
-          
PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(),
-              _helixManager.getClusterName(), _tableNameWithType, 
zkMetadata.getSegmentName(), _peerDownloadScheme,
-              onlineServerURIs);
-          if (!onlineServerURIs.isEmpty()) {
-            return downloadSegmentFromPeers(zkMetadata);
-          }
-        } catch (Exception e) {
-          _logger.warn("Could not download segment: {} from peer", 
zkMetadata.getSegmentName(), e);
-        }
+      zkMetadata = fetchZKMetadata(segmentName);
+      if (zkMetadata.getStatus().isCompleted()) {

Review Comment:
   What would happen when the `commitEndMetadata` call to update the ZK 
metadata with the download url, etc fails ?
   
   The segment will be stuck in the `COMMITTING` state and might also be 
available on the lead server for peer download. We will not enter there as the 
`zkMetadata.getStatus().isCompleted()` will always be `false`. 
   
   These situation of missing download url's is fixed during `segment level 
validations` done in RealtimeSegmentValidationManager. The frequency of these 
runs is kept lower as it scans all the segments ZK metadata.
   
   If we remove this check the `downloadSegment` will fail as 
   
   ```
       Preconditions.checkState(downloadUrl != null,
           "Failed to find download URL in ZK metadata for segment: %s of 
table: %s", segmentName, _tableNameWithType);
   ```
   
   Normal ingestion ensures that we either have a proper download url or an 
empty string in case upload fails and we have peer download enabled. 
   
   This is the reason for writing a different `downloadSegment` instead of 
using the parent class one's. It felt that I will end up changing a lot of 
things for the OFFLINE tables as well that use the base class download function.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName)
   @Override
   public File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
-    Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
-        "Segment: %s is still IN_PROGRESS and cannot be downloaded", 
zkMetadata.getSegmentName());
+    String segmentName = zkMetadata.getSegmentName();
+    Status status = zkMetadata.getStatus();
+    Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is 
still IN_PROGRESS and cannot be downloaded",
+        segmentName);
 
-    // Case: The commit protocol has completed, and the segment is ready to be 
downloaded either
-    // from deep storage or from a peer (if peer-to-peer download is enabled).
-    if (zkMetadata.getStatus() == Status.DONE) {
+    // The commit protocol has completed, and the segment is ready to be 
downloaded either from deep storage or from a
+    // peer (if peer-to-peer download is enabled).
+    if (status.isCompleted()) {
       return super.downloadSegment(zkMetadata);
     }
 
     // The segment status is COMMITTING, indicating that the segment commit 
process is incomplete.
     // Attempting a waited download within the configured time limit.
-    long downloadTimeoutMilliseconds =
-        
getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore,
 _tableNameWithType));
-    final long startTime = System.currentTimeMillis();
-    List<URI> onlineServerURIs;
-    while (System.currentTimeMillis() - startTime < 
downloadTimeoutMilliseconds) {
+    Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s 
for segment: %s", status, segmentName);
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
+    long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+    long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
+    while (System.currentTimeMillis() < deadlineMs) {
       // ZK Metadata may change during segment download process; fetch it on 
every retry.
-      zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());
-
-      if (zkMetadata.getDownloadUrl() != null) {
-        // The downloadSegment() will throw an exception in case there are 
some genuine issues.
-        // We don't want to retry in those scenarios and will throw an 
exception
-        return downloadSegmentFromDeepStore(zkMetadata);
-      }
-
-      if (_peerDownloadScheme != null) {
-        _logger.info("Peer download is enabled for the segment: {}", 
zkMetadata.getSegmentName());
-        try {
-          onlineServerURIs = new ArrayList<>();
-          
PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(),
-              _helixManager.getClusterName(), _tableNameWithType, 
zkMetadata.getSegmentName(), _peerDownloadScheme,
-              onlineServerURIs);
-          if (!onlineServerURIs.isEmpty()) {
-            return downloadSegmentFromPeers(zkMetadata);
-          }
-        } catch (Exception e) {
-          _logger.warn("Could not download segment: {} from peer", 
zkMetadata.getSegmentName(), e);
-        }
+      zkMetadata = fetchZKMetadata(segmentName);
+      if (zkMetadata.getStatus().isCompleted()) {

Review Comment:
   cc: @KKcorps 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to