This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1e58df7713 [bugfix] Do not move real-time segments to working dir on restart (#11226) 1e58df7713 is described below commit 1e58df771379e009e51518231462da8a8540129e Author: Alexey Pavlenko <131970931+sylph...@users.noreply.github.com> AuthorDate: Wed Aug 2 07:43:47 2023 +0300 [bugfix] Do not move real-time segments to working dir on restart (#11226) --- .../core/data/manager/BaseTableDataManager.java | 1 + .../manager/realtime/RealtimeTableDataManager.java | 5 ++ .../realtime/RealtimeTableDataManagerTest.java | 53 ++++++++++++++++++++++ .../starter/helix/HelixInstanceDataManager.java | 10 +++- 4 files changed, 68 insertions(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 04059f94c1..0187fb6836 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -233,6 +233,7 @@ public abstract class BaseTableDataManager implements TableDataManager { public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) throws Exception { indexLoadingConfig.setTableDataDir(_tableDataDir); + indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs()); addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, indexLoadingConfig.getSchema())); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 41c338d209..f431b63eb9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -390,6 +390,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { return; } + // Assign table directory and tier info to not let the segment be moved during loading/preprocessing + indexLoadingConfig.setTableDataDir(_tableDataDir); + indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs()); + indexLoadingConfig.setSegmentTier(segmentZKMetadata.getTier()); + File segmentDir = new File(_indexDir, segmentName); // Restart during segment reload might leave segment in inconsistent state (index directory might not exist but // segment backup directory existed), need to first try to recover from reload failure before checking the existence diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java index 45196e2730..e865e44c5e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java @@ -157,6 +157,59 @@ public class RealtimeTableDataManagerTest { assertEquals(llmd.getTotalDocs(), 5); } + @Test + public void testAddSegmentDefaultTierByTierBasedDirLoader() + throws Exception { + RealtimeTableDataManager tmgr1 = new RealtimeTableDataManager(null); + TableDataManagerConfig tableDataManagerConfig = createTableDataManagerConfig(); + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + TableConfig tableConfig = setupTableConfig(propertyStore); + Schema schema = setupSchema(propertyStore); + tmgr1.init(tableDataManagerConfig, "server01", propertyStore, + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null, + new TableDataManagerParams(0, false, -1)); + + // Create a raw segment and put it in deep store backed by local fs. + String segName = "seg_tiered_01"; + SegmentZKMetadata segmentZKMetadata = + TableDataManagerTestUtils.makeRawSegment(segName, createSegment(tableConfig, schema, segName), + new File(TEMP_DIR, segName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), true); + segmentZKMetadata.setStatus(Status.DONE); + + // Local segment dir doesn't exist, thus downloading from deep store. + File localSegDir = new File(TABLE_DATA_DIR, segName); + assertFalse(localSegDir.exists()); + + // Add segment + IndexLoadingConfig indexLoadingConfig = + TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, schema); + tmgr1.addSegment(segName, indexLoadingConfig, segmentZKMetadata); + assertTrue(localSegDir.exists()); + SegmentMetadataImpl llmd = new SegmentMetadataImpl(new File(TABLE_DATA_DIR, segName)); + assertEquals(llmd.getTotalDocs(), 5); + + // Now, repeat initialization of the table data manager + tmgr1.shutDown(); + RealtimeTableDataManager tmgr2 = new RealtimeTableDataManager(null); + tableDataManagerConfig = createTableDataManagerConfig(); + propertyStore = mock(ZkHelixPropertyStore.class); + tableConfig = setupTableConfig(propertyStore); + schema = setupSchema(propertyStore); + tmgr2.init(tableDataManagerConfig, "server01", propertyStore, + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null, + new TableDataManagerParams(0, false, -1)); + + // Reinitialize index loading config and try adding the segment + indexLoadingConfig = + TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, schema); + tmgr2.addSegment(segName, indexLoadingConfig, segmentZKMetadata); + + // Make sure that the segment hasn't been moved + assertTrue(localSegDir.exists()); + llmd = new SegmentMetadataImpl(new File(TABLE_DATA_DIR, segName)); + assertEquals(llmd.getTotalDocs(), 5); + } + @Test public void testAllowDownload() { RealtimeTableDataManager mgr = new RealtimeTableDataManager(null); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index f01d13ea94..bdc4baf58d 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -220,8 +220,16 @@ public class HelixInstanceDataManager implements InstanceDataManager { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", offlineTableName); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); + SegmentZKMetadata zkMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, offlineTableName, segmentName); + Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for offline segment: %s, table: %s", + segmentName, offlineTableName); + + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema); + indexLoadingConfig.setSegmentTier(zkMetadata.getTier()); + _tableDataManagerMap.computeIfAbsent(offlineTableName, k -> createTableDataManager(k, tableConfig)) - .addSegment(indexDir, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema)); + .addSegment(indexDir, indexLoadingConfig); LOGGER.info("Added segment: {} to table: {}", segmentName, offlineTableName); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org