jugomezv commented on code in PR #8753:
URL: https://github.com/apache/pinot/pull/8753#discussion_r934994303


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -409,11 +421,35 @@ private File downloadSegmentFromDeepStore(String 
segmentName, SegmentZKMetadata
       throws Exception {
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
     FileUtils.forceMkdir(tempRootDir);
+    if (_isSegmentDownloadUntarStreamed && zkMetadata.getCrypterName() == 
null) {

Review Comment:
   so seems possible for me to configure streaming and encryption which is not 
a valid config but we do not log and just default to non streaming, should we 
log that in the else below? 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +221,29 @@ public static void untarOneFile(File inputFile, String 
fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", 
fileName, inputFile));
     }
   }
+
+  public static long copyLarge(InputStream inputStream, FileOutputStream 
outputStream, long rateLimit)

Review Comment:
   Should we assert that rateLimit is not NO_LIMIT?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -123,6 +132,16 @@ public static List<File> untar(File inputFile, File 
outputDir)
    */
   public static List<File> untar(InputStream inputStream, File outputDir)
       throws IOException {
+      return untarRateLimit(inputStream, outputDir, NO_RATE_LIMIT);
+  }
+
+  /**
+   * Un-tars an inputstream of a tar.gz file into a directory, returns all the 
untarred files/directories.
+   * RateLimit limits the untar rate
+   * <p>For security reason, the untarred files must reside in the output 
directory.
+   */
+  public static List<File> untarRateLimit(InputStream inputStream, File 
outputDir, long rateLimit)

Review Comment:
   for the rateLimit argument, can we include the units in the argument name to 
make code more readable?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java:
##########
@@ -90,6 +91,54 @@ public void fetchSegmentToLocal(URI downloadURI, File dest)
     });
   }
 
+  @Override
+  public File fetchUntarSegmentToLocalStreamed(URI downloadURI, File dest, 
long rateLimit)
+      throws Exception {
+    // Create a RoundRobinURIProvider to round robin IP addresses when retry 
uploading. Otherwise may always try to
+    // download from a same broken host as: 1) DNS may not RR the IP addresses 
2) OS cache the DNS resolution result.
+    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+    int retryCount = Math.max(_retryCount, uriProvider.numAddresses());
+    AtomicReference<File> ret = new AtomicReference<>(); // return the untared 
segment directory
+    _logger.info("Retry downloading for {} times. retryCount from pinot server 
config: {}, number of IP addresses for "
+        + "download URI: {}", retryCount, _retryCount, 
uriProvider.numAddresses());
+    RetryPolicies.exponentialBackoffRetryPolicy(retryCount, _retryWaitMs, 
_retryDelayScaleFactor).attempt(() -> {
+      URI uri = uriProvider.next();
+      try {
+        String hostName = downloadURI.getHost();
+        int port = downloadURI.getPort();
+        // If the original download address is specified as host name, need 
add a "HOST" HTTP header to the HTTP
+        // request. Otherwise, if the download address is a LB address, when 
the LB be configured as "disallow direct
+        // access by IP address", downloading will fail.
+        List<Header> httpHeaders = new LinkedList<>();
+        if (!InetAddresses.isInetAddress(hostName)) {
+          httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + 
port));
+        }
+        ret.set(_httpClient.downloadUntarFileStreamed(uri, dest, 
_authProvider, httpHeaders, rateLimit));
+
+        return true;
+      } catch (HttpErrorStatusException e) {
+        int statusCode = e.getStatusCode();
+        if (statusCode == HttpStatus.SC_NOT_FOUND || statusCode >= 500) {
+          // Temporary exception
+          // 404 is treated as a temporary exception, as the downloadURI may 
be backed by multiple hosts,
+          // if singe host is down, can retry with another host.
+          _logger.warn("Got temporary error status code: {} while downloading 
segment from: {} to: {}", statusCode, uri,

Review Comment:
   is it useful to log the retry attempt here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to