This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e58df7713 [bugfix] Do not move real-time segments to working dir on 
restart (#11226)
1e58df7713 is described below

commit 1e58df771379e009e51518231462da8a8540129e
Author: Alexey Pavlenko <131970931+sylph...@users.noreply.github.com>
AuthorDate: Wed Aug 2 07:43:47 2023 +0300

    [bugfix] Do not move real-time segments to working dir on restart (#11226)
---
 .../core/data/manager/BaseTableDataManager.java    |  1 +
 .../manager/realtime/RealtimeTableDataManager.java |  5 ++
 .../realtime/RealtimeTableDataManagerTest.java     | 53 ++++++++++++++++++++++
 .../starter/helix/HelixInstanceDataManager.java    | 10 +++-
 4 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 04059f94c1..0187fb6836 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -233,6 +233,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
       throws Exception {
     indexLoadingConfig.setTableDataDir(_tableDataDir);
+    
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
     addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
indexLoadingConfig.getSchema()));
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 41c338d209..f431b63eb9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -390,6 +390,11 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       return;
     }
 
+    // Assign table directory and tier info to not let the segment be moved 
during loading/preprocessing
+    indexLoadingConfig.setTableDataDir(_tableDataDir);
+    
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+    indexLoadingConfig.setSegmentTier(segmentZKMetadata.getTier());
+
     File segmentDir = new File(_indexDir, segmentName);
     // Restart during segment reload might leave segment in inconsistent state 
(index directory might not exist but
     // segment backup directory existed), need to first try to recover from 
reload failure before checking the existence
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index 45196e2730..e865e44c5e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -157,6 +157,59 @@ public class RealtimeTableDataManagerTest {
     assertEquals(llmd.getTotalDocs(), 5);
   }
 
+  @Test
+  public void testAddSegmentDefaultTierByTierBasedDirLoader()
+      throws Exception {
+    RealtimeTableDataManager tmgr1 = new RealtimeTableDataManager(null);
+    TableDataManagerConfig tableDataManagerConfig = 
createTableDataManagerConfig();
+    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    TableConfig tableConfig = setupTableConfig(propertyStore);
+    Schema schema = setupSchema(propertyStore);
+    tmgr1.init(tableDataManagerConfig, "server01", propertyStore,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
+        new TableDataManagerParams(0, false, -1));
+
+    // Create a raw segment and put it in deep store backed by local fs.
+    String segName = "seg_tiered_01";
+    SegmentZKMetadata segmentZKMetadata =
+        TableDataManagerTestUtils.makeRawSegment(segName, 
createSegment(tableConfig, schema, segName),
+            new File(TEMP_DIR, segName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
+    segmentZKMetadata.setStatus(Status.DONE);
+
+    // Local segment dir doesn't exist, thus downloading from deep store.
+    File localSegDir = new File(TABLE_DATA_DIR, segName);
+    assertFalse(localSegDir.exists());
+
+    // Add segment
+    IndexLoadingConfig indexLoadingConfig =
+        TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", 
tableConfig, schema);
+    tmgr1.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
+    assertTrue(localSegDir.exists());
+    SegmentMetadataImpl llmd = new SegmentMetadataImpl(new 
File(TABLE_DATA_DIR, segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+
+    // Now, repeat initialization of the table data manager
+    tmgr1.shutDown();
+    RealtimeTableDataManager tmgr2 = new RealtimeTableDataManager(null);
+    tableDataManagerConfig = createTableDataManagerConfig();
+    propertyStore = mock(ZkHelixPropertyStore.class);
+    tableConfig = setupTableConfig(propertyStore);
+    schema = setupSchema(propertyStore);
+    tmgr2.init(tableDataManagerConfig, "server01", propertyStore,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
+        new TableDataManagerParams(0, false, -1));
+
+    // Reinitialize index loading config and try adding the segment
+    indexLoadingConfig =
+        TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", 
tableConfig, schema);
+    tmgr2.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
+
+    // Make sure that the segment hasn't been moved
+    assertTrue(localSegDir.exists());
+    llmd = new SegmentMetadataImpl(new File(TABLE_DATA_DIR, segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+  }
+
   @Test
   public void testAllowDownload() {
     RealtimeTableDataManager mgr = new RealtimeTableDataManager(null);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index f01d13ea94..bdc4baf58d 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -220,8 +220,16 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", offlineTableName);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
offlineTableName, segmentName);
+    Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for offline segment: %s, table: %s",
+        segmentName, offlineTableName);
+
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
+    indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
+
     _tableDataManagerMap.computeIfAbsent(offlineTableName, k -> 
createTableDataManager(k, tableConfig))
-        .addSegment(indexDir, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema));
+        .addSegment(indexDir, indexLoadingConfig);
     LOGGER.info("Added segment: {} to table: {}", segmentName, 
offlineTableName);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to