Jackie-Jiang commented on code in PR #15316:
URL: https://github.com/apache/pinot/pull/15316#discussion_r2005914016


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName)
   @Override
   public File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
-    Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
-        "Segment: %s is still IN_PROGRESS and cannot be downloaded", 
zkMetadata.getSegmentName());
+    String segmentName = zkMetadata.getSegmentName();
+    Status status = zkMetadata.getStatus();
+    Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is 
still IN_PROGRESS and cannot be downloaded",
+        segmentName);
 
-    // Case: The commit protocol has completed, and the segment is ready to be 
downloaded either
-    // from deep storage or from a peer (if peer-to-peer download is enabled).
-    if (zkMetadata.getStatus() == Status.DONE) {
+    // The commit protocol has completed, and the segment is ready to be 
downloaded either from deep storage or from a
+    // peer (if peer-to-peer download is enabled).
+    if (status.isCompleted()) {
       return super.downloadSegment(zkMetadata);
     }
 
     // The segment status is COMMITTING, indicating that the segment commit 
process is incomplete.
     // Attempting a waited download within the configured time limit.
-    long downloadTimeoutMilliseconds =
-        
getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore,
 _tableNameWithType));
-    final long startTime = System.currentTimeMillis();
-    List<URI> onlineServerURIs;
-    while (System.currentTimeMillis() - startTime < 
downloadTimeoutMilliseconds) {
+    Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s 
for segment: %s", status, segmentName);
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
+    long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+    long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
+    while (System.currentTimeMillis() < deadlineMs) {
       // ZK Metadata may change during segment download process; fetch it on 
every retry.
-      zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());
-
-      if (zkMetadata.getDownloadUrl() != null) {
-        // The downloadSegment() will throw an exception in case there are 
some genuine issues.
-        // We don't want to retry in those scenarios and will throw an 
exception
-        return downloadSegmentFromDeepStore(zkMetadata);
-      }
-
-      if (_peerDownloadScheme != null) {
-        _logger.info("Peer download is enabled for the segment: {}", 
zkMetadata.getSegmentName());
-        try {
-          onlineServerURIs = new ArrayList<>();
-          
PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(),
-              _helixManager.getClusterName(), _tableNameWithType, 
zkMetadata.getSegmentName(), _peerDownloadScheme,
-              onlineServerURIs);
-          if (!onlineServerURIs.isEmpty()) {
-            return downloadSegmentFromPeers(zkMetadata);
-          }
-        } catch (Exception e) {
-          _logger.warn("Could not download segment: {} from peer", 
zkMetadata.getSegmentName(), e);
-        }
+      zkMetadata = fetchZKMetadata(segmentName);
+      if (zkMetadata.getStatus().isCompleted()) {
+        return super.downloadSegment(zkMetadata);
       }
 
-      long timeElapsed = System.currentTimeMillis() - startTime;
-      long timeRemaining = downloadTimeoutMilliseconds - timeElapsed;
-
-      if (timeRemaining <= 0) {
+      long timeRemainingMs = deadlineMs - System.currentTimeMillis();
+      if (timeRemainingMs <= 0) {
         break;
       }
 
-      _logger.info("Sleeping for 30 seconds as the segment url is missing. 
Time remaining: {} minutes",
-          Math.round(timeRemaining / 60000.0));
-
-      // Sleep for the shorter of our normal interval or remaining time
-      Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining));
+      long sleepTimeMs = Math.min(SLEEP_INTERVAL_MS, timeRemainingMs);
+      _logger.info("Sleeping for: {}ms waiting for segment: {} to be 
completed. Time remaining: {}ms", sleepTimeMs,
+          segmentName, timeRemainingMs);
+      //noinspection BusyWait
+      Thread.sleep(sleepTimeMs);
     }
 
     // If we exit the loop without returning, throw an exception
     throw new TimeoutException(
-        "Failed to download segment after " + 
TimeUnit.MILLISECONDS.toMinutes(downloadTimeoutMilliseconds)
-            + " minutes of retrying. Segment: " + zkMetadata.getSegmentName());
-  }
-
-  private long getDownloadTimeOutMilliseconds(@Nullable TableConfig 
tableConfig) {
-    return 
Optional.ofNullable(tableConfig).map(TableConfig::getIngestionConfig)
-        
.map(IngestionConfig::getStreamIngestionConfig).map(StreamIngestionConfig::getStreamConfigMaps)
-        .filter(maps -> !maps.isEmpty()).map(maps -> maps.get(0)).map(map -> 
map.get(SEGMENT_DOWNLOAD_TIMEOUT_MINUTES))
-        .map(timeoutStr -> {
-          try {
-            return TimeUnit.MINUTES.toMillis(Long.parseLong(timeoutStr));
-          } catch (NumberFormatException e) {
-            return DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS;
-          }
-        }).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS);
+        "Failed to download segment: " + segmentName + " after: " + 
downloadTimeoutMs + "ms of retrying");
+  }
+
+  private long getDownloadTimeoutMs(TableConfig tableConfig) {
+    Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
+    String timeoutSeconds = 
streamConfigMap.get(StreamConfigProperties.PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS);

Review Comment:
   It is for backward compatibility, and I've marked the current one as 
deprecated.
   I don't want to make it in minutes because it is not flexible. We don't have 
any timeout in minute



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to