klsince commented on a change in pull request #7319:
URL: https://github.com/apache/pinot/pull/7319#discussion_r708500264



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, 
SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), 
Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema 
schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is 
not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that 
original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment 
serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: 
%s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index 
directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, 
localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", 
segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} 
mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", 
segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
+
+      // Rename segment backup directory to segment temporary directory 
(atomic)
+      // The reason to first rename then delete is that, renaming is an atomic 
operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, 
we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary 
directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", 
recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", 
segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, 
loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: 
%s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to 
recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists 
locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} 
mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with local 
crc now becomes: {} and remote crc: {}",
+        segmentName, _tableNameWithType, new 
SegmentMetadataImpl(indexDir).getCrc(), zkMetadata.getCrc());
+  }
+
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
+    return true;
+  }
+
+  protected File downloadSegment(String segmentName, SegmentZKMetadata 
zkMetadata)
+      throws Exception {
+    // TODO: may support download from peer servers for RealTime table.
+    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in 
inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This 
method tries to recover from reload
+   * failure before checking the existence of the index directory and loading 
segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+      if (!indexDir.exists()) {
+        LOGGER.info("Segment: {} of table: {} is not found on disk", 
segmentName, _tableNameWithType);
+        return null;
+      }
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      LOGGER.info("Recovered segment: {} of table: {} with crc: {} from disk", 
segmentName, _tableNameWithType,

Review comment:
       ack. changing to `Segment... is ready for loading`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to