mcvsubbu commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r436849376



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, 
LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {

Review comment:
       "PEER" is an unparseable uri, so it is not a good idea to put it into 
the downloadUrl in metadata. It is best to leave it unset (or explicit set to 
null). 
   I also suggest you should float metadata changes in an email like table 
config changes. I suggest that we should change as follows:
   (1) Change the segment completion protocol to mention "PEER" in the uri 
string (or, maybe a specific URI like "peer://uri/of/server/segmentName". 
   (2). On the controller, if it receives a peer scheme from the server, it 
knows that the segment needs to be committed without a URI in the segment 
metadata.
   

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, 
LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer 
download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), 
indexLoadingConfig);
+        } else {
+          throw e;
+        }
+      }
+    } else {
+      if (isPeerSegmentDownloadEnabled(tableConfig)) {
+        downloadSegmentFromPeer(segmentName, 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), 
indexLoadingConfig);
+      } else {
+        throw new RuntimeException("Peer segment download not enabled for 
segment " + segmentName);
+      }
+    }
+  }
+
+  private void downloadSegmentFromDeepStore(String segmentName, 
IndexLoadingConfig indexLoadingConfig, String uri) {
     File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + 
System.currentTimeMillis());
     File tempFile = new File(_indexDir, segmentName + ".tar.gz");
-    final File segmentFolder = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(segmentFolder);
     try {
       SegmentFetcherFactory.fetchSegmentToLocal(uri, tempFile);
       _logger.info("Downloaded file from {} to {}; Length of downloaded file: 
{}", uri, tempFile, tempFile.length());
-      TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, 
tempSegmentFolder);
-      FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
-      replaceLLSegment(segmentName, indexLoadingConfig);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, tempSegmentFolder, 
tempFile);
+    } catch (Exception e) {
+      _logger.warn("Failed to download segment {} from deep store: ", 
segmentName, e);
+      throw new RuntimeException(e);
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+      FileUtils.deleteQuietly(tempSegmentFolder);
+    }
+  }
+
+  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, File tempSegmentFolder,
+      File tempFile)
+      throws IOException, ArchiveException {
+    TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
+    _logger.info("Uncompressed file {} into tmp dir {}", tempFile, 
tempSegmentFolder);
+    final File segmentFolder = new File(_indexDir, segmentName);
+    FileUtils.deleteQuietly(segmentFolder);
+    FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
+    _logger.info("Replacing LLC Segment {}", segmentName);
+    replaceLLSegment(segmentName, indexLoadingConfig);
+  }
+
+  private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
+    return SegmentFetcherFactory.HTTP_PROTOCOL
+        
.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
+        || SegmentFetcherFactory.HTTPS_PROTOCOL
+        
.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+  }
+
+  private void downloadSegmentFromPeer(String segmentName, String 
downloadScheme, IndexLoadingConfig indexLoadingConfig) {
+    File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + 
System.currentTimeMillis());
+    File tempFile = new File(_indexDir, segmentName + ".tar.gz");
+    try {
+      RetryPolicies.exponentialBackoffRetryPolicy(RETRY_COUNT, RETRY_WAIT_MS, 
RETRY_DELAY_SCALE_FACTOR).attempt(() -> {

Review comment:
       fetchSegmentToLocal already does retries

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -35,11 +35,11 @@ private SegmentFetcherFactory() {
 
   public static final String PROTOCOLS_KEY = "protocols";
   public static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
+  public static final String HTTP_PROTOCOL = "http";

Review comment:
       I think there a few definitions of these floating around in the code. It 
may be useful to take this opportunity to move it to Constants class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to