jackjlli commented on a change in pull request #7969:
URL: https://github.com/apache/pinot/pull/7969#discussion_r777714621



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -333,92 +315,131 @@ public void reloadSegment(String segmentName, 
IndexLoadingConfig indexLoadingCon
 
   @Override
   public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
-      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata segmentMetadata)
       throws Exception {
-    if (!isNewSegment(zkMetadata, localMetadata)) {
-      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
-          _tableNameWithType, localMetadata.getCrc());
+    // Non-null segment metadata means the segment has already been loaded.
+    if (segmentMetadata != null) {
+      if (hasSameCRC(zkMetadata, segmentMetadata)) {
+        // Simply returns if the CRC hasn't changed. The table config may have 
changed
+        // since segment is loaded, but that is handled by reloadSegment() 
method.
+        LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc());
+      } else {
+        // Download the raw segment, reprocess and load it if the CRC has 
changed.
+        LOGGER.info("Segment: {} of table: {} already loaded but its crc: {} 
differs from new crc: {}", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc(), zkMetadata.getCrc());
+        downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, 
zkMetadata,
+            ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType));
+      }
       return;
     }
 
-    // Try to recover if no local metadata is provided.
-    if (localMetadata == null) {
-      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", 
segmentName, _tableNameWithType);
-      localMetadata = recoverSegmentQuietly(segmentName);
-      if (!isNewSegment(zkMetadata, localMetadata)) {
-        LOGGER.info("Segment: {} of table {} has crc: {} same as before, 
loading", segmentName, _tableNameWithType,
-            localMetadata.getCrc());
-        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
-          return;
-        }
-        // Set local metadata to null to indicate that the local segment fails 
to load,
-        // although it exists and has same crc with the remote one.
-        localMetadata = null;
-      }
+    // For local tier backend, try to recover the segment from potential
+    // reload failure. Continue upon any failure.
+    File indexDir = getSegmentDataDir(segmentName);
+    recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+    // Creates the SegmentDirectory object to access the segment metadata that
+    // may be from local tier backend or remote tier backend.
+    SegmentDirectoryLoaderContext segmentLoaderContext =
+        new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getInstanceId(),
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentLoader =
+        
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+    SegmentDirectory segmentDirectory = null;
+    try {
+      segmentDirectory = segmentLoader.load(indexDir.toURI(), 
segmentLoaderContext);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to create SegmentDirectory for segment: {} of table: 
{} due to error: {}", segmentName,
+          _tableNameWithType, e.getMessage());
     }
 
-    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: 
%s of table: %s does not allow download",
-        segmentName, _tableNameWithType);
-
-    // Download segment and replace the local one, either due to failure to 
recover local segment,
-    // or the segment data is updated and has new CRC now.
-    if (localMetadata == null) {
-      LOGGER.info("Download segment: {} of table: {} as no good one exists 
locally", segmentName, _tableNameWithType);
-    } else {
-      LOGGER.info("Download segment: {} of table: {} as local crc: {} 
mismatches remote crc: {}.", segmentName,
-          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    // Download the raw segment, reprocess and load it if it is never loaded 
or its CRC has changed.
+    if (segmentDirectory == null || !hasSameCRC(zkMetadata, 
segmentDirectory.getSegmentMetadata())) {
+      LOGGER.info("Segment: {} of table: {} not exist or its crc: {} differs 
from new crc: {}", segmentName,
+          _tableNameWithType, (segmentDirectory == null) ? "none" : 
segmentDirectory.getSegmentMetadata().getCrc(),
+          zkMetadata.getCrc());
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+          ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType));
+      return;
     }
-    File indexDir = downloadSegment(segmentName, zkMetadata);
-    addSegment(indexDir, indexLoadingConfig);
-    LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", 
segmentName, _tableNameWithType,
-        zkMetadata.getCrc());
-  }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    return true;
+    try {
+      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
+      if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, 
indexLoadingConfig, schema)) {
+        // The loaded segment is still consistent with current table config or 
schema.
+        LOGGER.info("Segment: {} of table: {} is consistent with table config 
and schema", segmentName,
+            _tableNameWithType);
+        addSegment(ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig, schema));
+        return;
+      }
+      // If any discrepancy is found, get the segment from tier backend, 
reprocess and load it.
+      // Please note that the segment is from tier backed, not deep store, for 
incremental processing.

Review comment:
       `s/backed/backend/`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -275,53 +278,32 @@ public void addSegmentError(String segmentName, 
SegmentErrorInfo segmentErrorInf
 
   @Override
   public void reloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
-      SegmentMetadata localMetadata, @Nullable Schema schema, boolean 
forceDownload)
+      SegmentMetadata segmentMetadata, @Nullable Schema schema, boolean 
forceDownload)
       throws Exception {
-    File indexDir = localMetadata.getIndexDir();
-    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is 
not a directory", indexDir);
-
-    File parentFile = indexDir.getParentFile();
-    File segmentBackupDir =
-        new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-
+    File indexDir = getSegmentDataDir(segmentName);
+    // Create backup dir to make segment reloading atomic for local tier 
backend.
+    LoaderUtils.createBackup(indexDir);
     try {
-      // First rename index directory to segment backup directory so that 
original segment have all file descriptors
-      // point to the segment backup directory to ensure original segment 
serves queries properly
-
-      // Rename index directory to segment backup directory (atomic)
-      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
-          "Failed to rename index directory: %s to segment backup directory: 
%s", indexDir, segmentBackupDir);
-
-      // Download from remote or copy from local backup directory into index 
directory,
-      // and then continue to load the segment from index directory.
-      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, 
localMetadata);
-      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
-        if (forceDownload) {
-          LOGGER.info("Segment: {} of table: {} is forced to download", 
segmentName, _tableNameWithType);
-        } else {
-          LOGGER.info("Download segment:{} of table: {} as local crc: {} 
mismatches remote crc: {}", segmentName,
-              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
-        }
-        indexDir = downloadSegment(segmentName, zkMetadata);
+      boolean shouldDownloadRawSegment = forceDownload || 
!hasSameCRC(zkMetadata, segmentMetadata);
+      if (shouldDownloadRawSegment && allowDownloadRawSegment(segmentName, 
zkMetadata)) {
+        downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, 
zkMetadata, schema);
       } else {
-        LOGGER.info("Reload the local copy of segment: {} of table: {}", 
segmentName, _tableNameWithType);
-        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+        SegmentDirectoryLoaderContext segmentLoaderContext =
+            new 
SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getInstanceId(),
+                segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+        SegmentDirectoryLoader segmentLoader =
+            
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+        // Get the segment from tier backend, reprocess and load it with 
current table config and schema.
+        // Please note that the segment is from tier backed, not deep store, 
for incremental processing.

Review comment:
       `s/backed/backend/`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -333,92 +315,131 @@ public void reloadSegment(String segmentName, 
IndexLoadingConfig indexLoadingCon
 
   @Override
   public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
-      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata segmentMetadata)
       throws Exception {
-    if (!isNewSegment(zkMetadata, localMetadata)) {
-      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
-          _tableNameWithType, localMetadata.getCrc());
+    // Non-null segment metadata means the segment has already been loaded.
+    if (segmentMetadata != null) {
+      if (hasSameCRC(zkMetadata, segmentMetadata)) {
+        // Simply returns if the CRC hasn't changed. The table config may have 
changed
+        // since segment is loaded, but that is handled by reloadSegment() 
method.
+        LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc());
+      } else {
+        // Download the raw segment, reprocess and load it if the CRC has 
changed.
+        LOGGER.info("Segment: {} of table: {} already loaded but its crc: {} 
differs from new crc: {}", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc(), zkMetadata.getCrc());
+        downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, 
zkMetadata,
+            ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType));
+      }
       return;
     }
 
-    // Try to recover if no local metadata is provided.
-    if (localMetadata == null) {
-      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", 
segmentName, _tableNameWithType);
-      localMetadata = recoverSegmentQuietly(segmentName);
-      if (!isNewSegment(zkMetadata, localMetadata)) {
-        LOGGER.info("Segment: {} of table {} has crc: {} same as before, 
loading", segmentName, _tableNameWithType,
-            localMetadata.getCrc());
-        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
-          return;
-        }
-        // Set local metadata to null to indicate that the local segment fails 
to load,
-        // although it exists and has same crc with the remote one.
-        localMetadata = null;
-      }
+    // For local tier backend, try to recover the segment from potential
+    // reload failure. Continue upon any failure.
+    File indexDir = getSegmentDataDir(segmentName);
+    recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+    // Creates the SegmentDirectory object to access the segment metadata that
+    // may be from local tier backend or remote tier backend.
+    SegmentDirectoryLoaderContext segmentLoaderContext =
+        new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getInstanceId(),
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentLoader =
+        
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+    SegmentDirectory segmentDirectory = null;
+    try {
+      segmentDirectory = segmentLoader.load(indexDir.toURI(), 
segmentLoaderContext);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to create SegmentDirectory for segment: {} of table: 
{} due to error: {}", segmentName,
+          _tableNameWithType, e.getMessage());
     }
 
-    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: 
%s of table: %s does not allow download",
-        segmentName, _tableNameWithType);
-
-    // Download segment and replace the local one, either due to failure to 
recover local segment,
-    // or the segment data is updated and has new CRC now.
-    if (localMetadata == null) {
-      LOGGER.info("Download segment: {} of table: {} as no good one exists 
locally", segmentName, _tableNameWithType);
-    } else {
-      LOGGER.info("Download segment: {} of table: {} as local crc: {} 
mismatches remote crc: {}.", segmentName,
-          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    // Download the raw segment, reprocess and load it if it is never loaded 
or its CRC has changed.
+    if (segmentDirectory == null || !hasSameCRC(zkMetadata, 
segmentDirectory.getSegmentMetadata())) {
+      LOGGER.info("Segment: {} of table: {} not exist or its crc: {} differs 
from new crc: {}", segmentName,
+          _tableNameWithType, (segmentDirectory == null) ? "none" : 
segmentDirectory.getSegmentMetadata().getCrc(),
+          zkMetadata.getCrc());
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+          ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType));
+      return;
     }
-    File indexDir = downloadSegment(segmentName, zkMetadata);
-    addSegment(indexDir, indexLoadingConfig);
-    LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", 
segmentName, _tableNameWithType,
-        zkMetadata.getCrc());
-  }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    return true;
+    try {
+      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
+      if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, 
indexLoadingConfig, schema)) {
+        // The loaded segment is still consistent with current table config or 
schema.
+        LOGGER.info("Segment: {} of table: {} is consistent with table config 
and schema", segmentName,
+            _tableNameWithType);
+        addSegment(ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig, schema));
+        return;
+      }
+      // If any discrepancy is found, get the segment from tier backend, 
reprocess and load it.
+      // Please note that the segment is from tier backed, not deep store, for 
incremental processing.
+      LOGGER.info("Segment: {} of table: {} needs reprocess with table config 
and schema", segmentName,
+          _tableNameWithType);
+      // Close the stale SegmentDirectory object before loading the newly 
processed segment.
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      segmentLoader.download(indexDir, segmentLoaderContext);
+      processAndLoadSegment(segmentName, indexDir, indexLoadingConfig, schema);
+      LOGGER.info("Segment: {} of table: {} is reprocessed and loaded", 
segmentName, _tableNameWithType);
+    } catch (Exception e) {
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      LOGGER.error("Failed to reprocess and load segment: {} of table: {}", 
segmentName, _tableNameWithType, e);
+      downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+          ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType));
+    }
   }
 
-  protected File downloadSegment(String segmentName, SegmentZKMetadata 
zkMetadata)
+  /**
+   * This method downloads the raw segment from the deep store and process it, 
mainly
+   * for cases where segment CRC has changed or the existing segment fails to 
load.
+   */
+  private void downloadRawSegmentAndProcess(String segmentName, 
IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable Schema schema)
       throws Exception {
-    // TODO: may support download from peer servers for RealTime table.
-    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+    Preconditions.checkState(allowDownloadRawSegment(segmentName, zkMetadata),
+        "Segment: %s of table: %s does not allow download raw segment", 
segmentName, _tableNameWithType);
+
+    File indexDir = downloadRawSegment(segmentName, zkMetadata);
+    processAndLoadSegment(segmentName, indexDir, indexLoadingConfig, schema);
+    LOGGER.info("Downloaded raw segment: {} of table: {} with crc: {} and 
loaded it", segmentName,
+        _tableNameWithType, zkMetadata.getCrc());
   }
 
-  /**
-   * Server restart during segment reload might leave segment directory in 
inconsistent state, like the index
-   * directory might not exist but segment backup directory existed. This 
method tries to recover from reload
-   * failure before checking the existence of the index directory and loading 
segment metadata from it.
-   */
-  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
-    File indexDir = getSegmentDataDir(segmentName);
+  private void processAndLoadSegment(String segmentName, File indexDir, 
IndexLoadingConfig indexLoadingConfig,
+      @Nullable Schema schema)
+      throws Exception {
+    // Preprocess the segment locally with current table config and schema.
+    ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, schema);
+
+    SegmentDirectoryLoaderContext segmentLoaderContext =
+        new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getInstanceId(),
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentLoader =
+        
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+    SegmentDirectory segmentDirectory = null;
     try {
-      LoaderUtils.reloadFailureRecovery(indexDir);
-      if (!indexDir.exists()) {
-        LOGGER.info("Segment: {} of table: {} is not found on disk", 
segmentName, _tableNameWithType);
-        return null;
-      }
-      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
-      LOGGER.info("Segment: {} of table: {} with crc: {} from disk is ready 
for loading", segmentName,
-          _tableNameWithType, localMetadata.getCrc());
-      return localMetadata;
+      // Upload the processed segment to server tier backend, which can be 
local or remote.
+      segmentLoader.upload(indexDir, segmentLoaderContext);
+      // Create the SegmentDirectory object with the newly processed segment 
from tier backend.
+      segmentDirectory = segmentLoader.load(indexDir.toURI(), 
segmentLoaderContext);
+      addSegment(ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig, schema));
     } catch (Exception e) {
-      LOGGER.error("Failed to recover segment: {} of table: {} from disk", 
segmentName, _tableNameWithType, e);
-      FileUtils.deleteQuietly(indexDir);
-      return null;
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      LOGGER.warn("Failed to load newly processed segment: {} of table: {}", 
segmentName, _tableNameWithType, e);

Review comment:
       Why not use `error` level here since an exception gets thrown?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to