Jackie-Jiang commented on a change in pull request #7319: URL: https://github.com/apache/pinot/pull/7319#discussion_r699715552
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java ########## @@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue)); } } + + @Override + public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean forceDownload) Review comment: annotate local metadata as nullable ########## File path: pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java ########## @@ -240,7 +239,7 @@ public void testReplace() runStorageServer(numQueryThreads, runTimeSec, tableDataManager); // replaces segments while online -// System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + ",nCreates=" + _allSegments.size()); + // System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + ",nCreates=" + _allSegments.size()); Review comment: Remove it ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -248,19 +249,27 @@ public boolean isPartialUpsertEnabled() { return _upsertMode == UpsertConfig.Mode.PARTIAL; } + /** + * Add or replace an immutable segment created by the REALTIME table. + * Skipping segment still in consuming state. + */ + @Override + public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean forceDownload) + throws Exception { + throw new UnsupportedOperationException(); Review comment: Why do we throw exception here? ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java ########## @@ -146,4 +156,12 @@ void addSegment(String segmentName, TableConfig tableConfig, IndexLoadingConfig * @return List of {@link SegmentErrorInfo} */ Map<String, SegmentErrorInfo> getSegmentErrors(); + + /** + * Get the dir for the given table segment, when TableDataManager object is not + * yet initialized for the given table. + */ + static File getSegmentDataDir(String instanceDataDir, String tableNameWithType, String segmentName) { Review comment: This is instance level method, and seems only used in `InstanceDataManager`. Can we inline it instead of putting it as an interface method? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -248,19 +249,27 @@ public boolean isPartialUpsertEnabled() { return _upsertMode == UpsertConfig.Mode.PARTIAL; } + /** + * Add or replace an immutable segment created by the REALTIME table. + * Skipping segment still in consuming state. + */ + @Override + public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean forceDownload) Review comment: Annotate as nullable ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java ########## @@ -86,11 +86,28 @@ void reloadSegment(String tableNameWithType, String segmentName) throws Exception; /** - * Reloads all segment in a table. + * Reloads all segments in a table. */ void reloadAllSegments(String tableNameWithType) throws Exception; + /** + * Adds or replaces a segment in a table. Different from segment reloading, + * 1. this method doesn't assume the existence of TableDataManager object and it + * can actually initialize the TableDataManager for the segment; + * 2. this method can download a new segment to replace the local one before loading. + * Download is conducted when local segment's CRC is different from the one of the + * remote segment, but can also be forced to do regardless of CRC difference. + */ + void addOrReplaceSegment(String tableNameWithType, String segmentName, boolean forceDownload) Review comment: The `forceDownload` should not be part of this API. We can add separate APIs for `redownloadSegment` and `redownloadAllSegments`, or add the flag into the `reload` apis. ########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java ########## @@ -311,6 +310,47 @@ private void reloadSegment(String tableNameWithType, SegmentMetadata segmentMeta } } + @Override + public void addOrReplaceSegment(String tableNameWithType, String segmentName, boolean forceDownload) + throws Exception { + LOGGER.info("Adding or replacing segment: {} for table: {} downloadIsForced: {}", segmentName, tableNameWithType, + forceDownload); + + // Get updated table config and segment metadata from Zookeeper. + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + Preconditions.checkNotNull(tableConfig); + SegmentZKMetadata zkMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName); + Preconditions.checkNotNull(zkMetadata); + + // This method might modify the file on disk. Use segment lock to prevent race condition + Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); + try { + // Lock the segment to get its metadata, so that no other threads are modifying + // the disk files of this segment. + segmentLock.lock(); + SegmentMetadata localMetadata = getSegmentMetadata(tableNameWithType, segmentName); + + _tableDataManagerMap.computeIfAbsent(tableNameWithType, k -> createTableDataManager(k, tableConfig)) + .addOrReplaceSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), + localMetadata, zkMetadata, forceDownload); + LOGGER.info("Added or replaced segment: {} of table: {}", segmentName, tableNameWithType); + } finally { + segmentLock.unlock(); + } + } + + @Override + public void addOrReplaceAllSegments(String tableNameWithType, boolean forceDownload) + throws Exception { + LOGGER.info("Adding or replacing all segments in table: {} downloadIsForced: {}", tableNameWithType, forceDownload); + List<SegmentMetadata> segMds = getAllSegmentsMetadata(tableNameWithType); + for (SegmentMetadata segMd : segMds) { + addOrReplaceSegment(tableNameWithType, segMd.getName(), forceDownload); Review comment: Add a helper method with `TableConfig` and `SegmentZKMetadata` as the arguments, and make `addOrReplaceSegment()` and `addOrReplaceAllSegments()` calling the helper method. We should avoid reading them for each segment for bulk replace ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java ########## @@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue)); } } + + @Override + public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean forceDownload) + throws Exception { + if (!forceDownload && !isNewSegment(zkMetadata, localMetadata)) { + LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName, + _tableNameWithType, localMetadata.getCrc()); + return; + } + + // If not forced to download, then try to recover if no local metadata is provided. + if (!forceDownload && localMetadata == null) { + LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType); + localMetadata = recoverSegmentQuietly(segmentName); + if (!isNewSegment(zkMetadata, localMetadata)) { + LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType, + localMetadata.getCrc()); + if (loadSegmentQuietly(segmentName, indexLoadingConfig)) { + return; + } + localMetadata = null; + } + } + + // Download segment and replace the local one, either due to being forced to download, or the + // local segment is not able to get loaded, or the segment data is updated and has new CRC now. + if (forceDownload) { + LOGGER.info("Force to download segment: {} of table: {}", segmentName, _tableNameWithType); + } else if (localMetadata == null) { + LOGGER.info("Download segment: {} of table: {} as no one exists locally", segmentName, _tableNameWithType); + } else { + LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName, + _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc()); + } + File indexDir = downloadSegmentFromDeepStore(segmentName, zkMetadata); + SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir); + addSegment(indexDir, indexLoadingConfig); + LOGGER.info("Downloaded and replaced segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType, + segmentMetadata.getCrc()); + } + + /** + * Server restart during segment reload might leave segment directory in inconsistent state, like the index + * directory might not exist but segment backup directory existed. This method tries to recover from reload + * failure before checking the existence of the index directory and loading segment metadata from it. + */ + private SegmentMetadata recoverSegmentQuietly(String segmentName) { Review comment: (nit) I don't think we need to name it as `quietly`, same for `loadSegmentQuietly` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java ########## @@ -78,6 +80,14 @@ void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) void addSegment(String segmentName, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig) throws Exception; + /** + * Add or replace an immutable segment for current table, which can be either + * OFFLINE or REALTIME table. + */ + void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + @Nullable SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean forceDownload) Review comment: (nit) for slightly better readability: ```suggestion SegmentZKMetadata remoteZKMetadata, @Nullable SegmentMetadata localMetadata, boolean forceDownload) ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java ########## @@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue)); } } + + @Override + public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean forceDownload) + throws Exception { + if (!forceDownload && !isNewSegment(zkMetadata, localMetadata)) { + LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName, + _tableNameWithType, localMetadata.getCrc()); + return; + } + + // If not forced to download, then try to recover if no local metadata is provided. + if (!forceDownload && localMetadata == null) { + LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType); + localMetadata = recoverSegmentQuietly(segmentName); + if (!isNewSegment(zkMetadata, localMetadata)) { + LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType, + localMetadata.getCrc()); + if (loadSegmentQuietly(segmentName, indexLoadingConfig)) { Review comment: What does quietly stand for? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java ########## @@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue)); } } + + @Override + public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean forceDownload) + throws Exception { + if (!forceDownload && !isNewSegment(zkMetadata, localMetadata)) { + LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName, + _tableNameWithType, localMetadata.getCrc()); + return; Review comment: This is not correct. The segment is on local disk, but we still need to load the segment into the memory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org