Jackie-Jiang commented on code in PR #15001:
URL: https://github.com/apache/pinot/pull/15001#discussion_r1955194993


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -815,38 +814,49 @@ protected File 
downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
           segmentName, System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
     }
     try {
-      File untarredSegmentDir;
-      if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
-        _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}", segmentName,
-            _streamSegmentDownloadUntarRateLimitBytesPerSec);
-        AtomicInteger failedAttempts = new AtomicInteger(0);
-        try {
-          untarredSegmentDir = 
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
-              _streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
-          _logger.info("Downloaded and untarred segment: {} from: {}, failed 
attempts: {}", segmentName, downloadUrl,
-              failedAttempts.get());
-        } finally {
-          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
-              failedAttempts.get());
+      if (_segmentOperationsThrottler != null) {
+        _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();
+      }
+      try {
+        File untarredSegmentDir;
+        if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
+          _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}",
+              segmentName,
+              _streamSegmentDownloadUntarRateLimitBytesPerSec);

Review Comment:
   (nit) reformat



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -815,38 +814,49 @@ protected File 
downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
           segmentName, System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
     }
     try {
-      File untarredSegmentDir;
-      if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
-        _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}", segmentName,
-            _streamSegmentDownloadUntarRateLimitBytesPerSec);
-        AtomicInteger failedAttempts = new AtomicInteger(0);
-        try {
-          untarredSegmentDir = 
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
-              _streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
-          _logger.info("Downloaded and untarred segment: {} from: {}, failed 
attempts: {}", segmentName, downloadUrl,
-              failedAttempts.get());
-        } finally {
-          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
-              failedAttempts.get());
+      if (_segmentOperationsThrottler != null) {
+        _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();

Review Comment:
   You might want to add some log similar to the table level semaphore for 
debugging purpose



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -858,6 +868,9 @@ protected File downloadSegmentFromPeers(SegmentZKMetadata 
zkMetadata)
     _logger.info("Downloading segment: {} from peers", segmentName);
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
     File segmentTarFile = new File(tempRootDir, segmentName + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
+    if (_segmentOperationsThrottler != null) {
+      _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();

Review Comment:
   Same here, add some info for debugging purpose



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -815,38 +814,49 @@ protected File 
downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
           segmentName, System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
     }
     try {
-      File untarredSegmentDir;
-      if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
-        _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}", segmentName,
-            _streamSegmentDownloadUntarRateLimitBytesPerSec);
-        AtomicInteger failedAttempts = new AtomicInteger(0);
-        try {
-          untarredSegmentDir = 
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
-              _streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
-          _logger.info("Downloaded and untarred segment: {} from: {}, failed 
attempts: {}", segmentName, downloadUrl,
-              failedAttempts.get());
-        } finally {
-          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
-              failedAttempts.get());
+      if (_segmentOperationsThrottler != null) {
+        _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();
+      }
+      try {
+        File untarredSegmentDir;
+        if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
+          _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}",
+              segmentName,
+              _streamSegmentDownloadUntarRateLimitBytesPerSec);
+          AtomicInteger failedAttempts = new AtomicInteger(0);
+          try {
+            untarredSegmentDir = 
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
+                _streamSegmentDownloadUntarRateLimitBytesPerSec, 
failedAttempts);
+            _logger.info("Downloaded and untarred segment: {} from: {}, failed 
attempts: {}", segmentName, downloadUrl,
+                failedAttempts.get());
+          } finally {
+            _serverMetrics.addMeteredTableValue(_tableNameWithType,
+                ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
+                failedAttempts.get());

Review Comment:
   (nit) reformat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to