klsince commented on a change in pull request #7969: URL: https://github.com/apache/pinot/pull/7969#discussion_r779041798
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java ########## @@ -333,94 +306,162 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon @Override public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, - SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata) + SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata segmentMetadata) throws Exception { - if (!isNewSegment(zkMetadata, localMetadata)) { - LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName, - _tableNameWithType, localMetadata.getCrc()); + // Non-null segment metadata means the segment has already been loaded. + if (segmentMetadata != null) { + if (hasSameCRC(zkMetadata, segmentMetadata)) { + // Simply returns if the CRC hasn't changed. The table config may have changed + // since segment is loaded, but that is handled by reloadSegment() method. + LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName, + _tableNameWithType, segmentMetadata.getCrc()); + } else { + // Download the raw segment, reprocess and load it if the CRC has changed. + LOGGER.info("Segment: {} of table: {} already loaded but its crc: {} differs from new crc: {}", segmentName, + _tableNameWithType, segmentMetadata.getCrc(), zkMetadata.getCrc()); + downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata, + ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType)); + } return; } - // Try to recover if no local metadata is provided. - if (localMetadata == null) { - LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType); - localMetadata = recoverSegmentQuietly(segmentName); - if (!isNewSegment(zkMetadata, localMetadata)) { - LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType, - localMetadata.getCrc()); - if (loadSegmentQuietly(segmentName, indexLoadingConfig)) { - return; - } - // Set local metadata to null to indicate that the local segment fails to load, - // although it exists and has same crc with the remote one. - localMetadata = null; - } - } + // For local tier backend, try to recover the segment from potential + // reload failure. Continue upon any failure. + File indexDir = getSegmentDataDir(segmentName); + recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir); - Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download", - segmentName, _tableNameWithType); + // Creates the SegmentDirectory object to access the segment metadata that + // may be from local tier backend or remote tier backend. + SegmentDirectory segmentDirectory = null; + try { + SegmentDirectoryLoaderContext loaderContext = + new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getInstanceId(), + segmentName, indexLoadingConfig.getSegmentDirectoryConfigs()); + SegmentDirectoryLoader segmentDirectoryLoader = + SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader()); + segmentDirectory = segmentDirectoryLoader.load(indexDir.toURI(), loaderContext); + } catch (Exception e) { + LOGGER.warn("Failed to create SegmentDirectory for segment: {} of table: {} due to error: {}", segmentName, + _tableNameWithType, e.getMessage()); + } - // Download segment and replace the local one, either due to failure to recover local segment, - // or the segment data is updated and has new CRC now. - if (localMetadata == null) { - LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType); - } else { - LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName, - _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc()); + // Download the raw segment, reprocess and load it if it is never loaded or its CRC has changed. + if (segmentDirectory == null) { + LOGGER.info("Segment: {} of table: {} does not exist", segmentName, _tableNameWithType); + downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata, + ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType)); + return; + } + if (!hasSameCRC(zkMetadata, segmentDirectory.getSegmentMetadata())) { + LOGGER.info("Segment: {} of table: {} has crc: {} different from new crc: {}", segmentName, + _tableNameWithType, segmentDirectory.getSegmentMetadata().getCrc(), zkMetadata.getCrc()); + closeSegmentDirectoryQuietly(segmentDirectory); + downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata, + ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType)); + return; } - File indexDir = downloadSegment(segmentName, zkMetadata); - addSegment(indexDir, indexLoadingConfig); - LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType, - zkMetadata.getCrc()); - } - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - return true; + try { + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); + if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema)) { + // The loaded segment is still consistent with current table config or schema. + LOGGER.info("Segment: {} of table: {} is consistent with table config and schema", segmentName, + _tableNameWithType); + loadSegment(segmentDirectory, indexLoadingConfig, schema); + return; + } + // If any discrepancy is found, get the segment from tier backend, reprocess and load it. + LOGGER.info("Segment: {} of table: {} needs reprocess to reflect latest table config and schema", segmentName, + _tableNameWithType); + // Close the stale SegmentDirectory object before loading the newly processed segment. + closeSegmentDirectoryQuietly(segmentDirectory); + downloadTierSegmentAndProcess(segmentName, indexLoadingConfig, schema); + } catch (Exception e) { + closeSegmentDirectoryQuietly(segmentDirectory); + LOGGER.error("Failed to reprocess and load segment: {} of table: {}", segmentName, _tableNameWithType, e); + downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata, + ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType)); + } } - protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata) + /** + * Get the segment from the configured tier backend, reprocess it with the latest table + * config and schema and then load it. Please note that the segment is from tier backend, + * not deep store, for incremental processing. + */ + private void downloadTierSegmentAndProcess(String segmentName, IndexLoadingConfig indexLoadingConfig, + @Nullable Schema schema) throws Exception { - // TODO: may support download from peer servers for RealTime table. - return downloadSegmentFromDeepStore(segmentName, zkMetadata); + SegmentDirectoryLoaderContext loaderContext = + new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getInstanceId(), + segmentName, indexLoadingConfig.getSegmentDirectoryConfigs()); + SegmentDirectoryLoader segmentDirectoryLoader = + SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader()); + File indexDir = getSegmentDataDir(segmentName); + segmentDirectoryLoader.download(indexDir, loaderContext); Review comment: The download and upload methods are introduced to SegmentDirectoryLoader interface is because today the SegmentPreprocessor and pretty much all kinds of IndexCreators used by it requires the `File indexDir` as an input param. If they were able to work with SegmentDirectory interface instead of `File indexDir`, the download/upload methods wouldn't be needed in SegmentDirectoryLoader interface. (This is briefly explained in the comments of SegmentDirectoryLoader interface) Hope this adds more clarity on why downloading raw segment from deep store is not part of SegmentDirectoryLoader interface. As there can be a path forward to remove download/upload methods from SegmentDirectoryLoader, if segment preprocessing logic could work with SegmentDirectory interface instead of requiring File index explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org