This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ce017c00d4 Minor improvement to upsert preload (#11694)
ce017c00d4 is described below

commit ce017c00d406cdb56c712d85e4010babd90772a0
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Sep 27 12:11:53 2023 -0700

    Minor improvement to upsert preload (#11694)
---
 .../upsert/BaseTableUpsertMetadataManager.java     | 21 +++++---
 ...oncurrentMapTableUpsertMetadataManagerTest.java | 61 +++++++++-------------
 2 files changed, 39 insertions(+), 43 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 2fecc4fbe5..9de2349007 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -161,13 +161,14 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
       throws Exception {
     LOGGER.info("Preload segments from table: {} for fast upsert metadata 
recovery", _tableNameWithType);
     onPreloadStart();
-    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
_tableNameWithType);
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixManager.getHelixPropertyStore();
     String instanceId = getInstanceId();
     IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    Map<String, Map<String, String>> segmentAssignment = 
getSegmentAssignment();
     List<Future<?>> futures = new ArrayList<>();
-    for (String segmentName : idealState.getPartitionSet()) {
-      Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
+    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
       String state = instanceStateMap.get(instanceId);
       if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
         LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", 
segmentName, state);
@@ -208,18 +209,26 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   protected void onPreloadFinish() {
   }
 
-  private String getInstanceId() {
+  @VisibleForTesting
+  String getInstanceId() {
     InstanceDataManagerConfig instanceDataManagerConfig =
         
_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
     return instanceDataManagerConfig.getInstanceId();
   }
 
   @VisibleForTesting
-  protected IndexLoadingConfig createIndexLoadingConfig() {
+  IndexLoadingConfig createIndexLoadingConfig() {
     return new 
IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
         _tableConfig, _schema);
   }
 
+  @VisibleForTesting
+  Map<String, Map<String, String>> getSegmentAssignment() {
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
_tableNameWithType);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", _tableNameWithType);
+    return idealState.getRecord().getMapFields();
+  }
+
   private void preloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
       ZkHelixPropertyStore<ZNRecord> propertyStore) {
     LOGGER.info("Preload segment: {} from table: {}", segmentName, 
_tableNameWithType);
@@ -237,7 +246,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   }
 
   @VisibleForTesting
-  protected void preloadSegmentWithSnapshot(String segmentName, 
IndexLoadingConfig indexLoadingConfig,
+  void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
       SegmentZKMetadata zkMetadata) {
     // This method might modify the file on disk. Use segment lock to prevent 
race condition
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
index ee1d6ffbd2..526e0920c3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
@@ -29,20 +29,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -53,7 +48,6 @@ import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -117,21 +111,35 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
   @Test
   public void testPreloadOnlineSegments()
       throws Exception {
+    String instanceId = "server01";
+    Map<String, Map<String, String>> segmentAssignment = new HashMap<>();
     Set<String> preloadedSegments = new HashSet<>();
     AtomicBoolean wasPreloading = new AtomicBoolean(false);
     ConcurrentMapTableUpsertMetadataManager mgr = new 
ConcurrentMapTableUpsertMetadataManager() {
+
+      @Override
+      String getInstanceId() {
+        return instanceId;
+      }
+
       @Override
-      protected IndexLoadingConfig createIndexLoadingConfig() {
+      IndexLoadingConfig createIndexLoadingConfig() {
         return mock(IndexLoadingConfig.class);
       }
 
       @Override
-      protected void preloadSegmentWithSnapshot(String segmentName, 
IndexLoadingConfig indexLoadingConfig,
+      Map<String, Map<String, String>> getSegmentAssignment() {
+        return segmentAssignment;
+      }
+
+      @Override
+      void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
           SegmentZKMetadata zkMetadata) {
         wasPreloading.set(isPreloading());
         preloadedSegments.add(segmentName);
       }
     };
+
     // Setup mocks for TableConfig and Schema.
     String tableNameWithType = "myTable_REALTIME";
     TableConfig tableConfig = mock(TableConfig.class);
@@ -146,38 +154,16 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
 
     // Setup mocks for HelixManager.
     HelixManager helixManager = mock(HelixManager.class);
-    IdealState idealState = mock(IdealState.class);
-    HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
-    PropertyKey.Builder keyBuilder = mock(PropertyKey.Builder.class);
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
-    PropertyKey propKey = mock(PropertyKey.class);
-    when(helixManager.getHelixDataAccessor()).thenReturn(dataAccessor);
     when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-    when(dataAccessor.keyBuilder()).thenReturn(keyBuilder);
-    when(keyBuilder.idealStates(anyString())).thenReturn(propKey);
-    when(dataAccessor.getProperty(propKey)).thenReturn(idealState);
 
-    // Setup mocks to return the instanceId.
-    String instanceId = "server01";
-    TableDataManager tableDataManager = mock(TableDataManager.class);
-    TableDataManagerConfig tdmc = mock(TableDataManagerConfig.class);
-    InstanceDataManagerConfig idmc = mock(InstanceDataManagerConfig.class);
-    when(tableDataManager.getTableDataManagerConfig()).thenReturn(tdmc);
-    when(tdmc.getInstanceDataManagerConfig()).thenReturn(idmc);
-    when(idmc.getInstanceId()).thenReturn(instanceId);
-
-    // Only ONLINE segments are preloaded.
-    Map<String, Map<String, String>> segStates = new HashMap<>();
-    segStates.put("consuming_seg01", ImmutableMap.of(instanceId, "CONSUMING"));
-    segStates.put("consuming_seg02", ImmutableMap.of(instanceId, "CONSUMING"));
-    segStates.put("online_seg01", ImmutableMap.of(instanceId, "ONLINE"));
-    segStates.put("online_seg02", ImmutableMap.of(instanceId, "ONLINE"));
-    segStates.put("offline_seg01", ImmutableMap.of(instanceId, "OFFLINE"));
-    segStates.put("offline_seg02", ImmutableMap.of(instanceId, "OFFLINE"));
-    when(idealState.getPartitionSet()).thenReturn(segStates.keySet());
-    for (String segName : segStates.keySet()) {
-      
when(idealState.getInstanceStateMap(segName)).thenReturn(segStates.get(segName));
-    }
+    // Setup segment assignment. Only ONLINE segments are preloaded.
+    segmentAssignment.put("consuming_seg01", ImmutableMap.of(instanceId, 
"CONSUMING"));
+    segmentAssignment.put("consuming_seg02", ImmutableMap.of(instanceId, 
"CONSUMING"));
+    segmentAssignment.put("online_seg01", ImmutableMap.of(instanceId, 
"ONLINE"));
+    segmentAssignment.put("online_seg02", ImmutableMap.of(instanceId, 
"ONLINE"));
+    segmentAssignment.put("offline_seg01", ImmutableMap.of(instanceId, 
"OFFLINE"));
+    segmentAssignment.put("offline_seg02", ImmutableMap.of(instanceId, 
"OFFLINE"));
 
     // Setup mocks to get file path to validDocIds snapshot.
     SegmentZKMetadata realtimeSegmentZKMetadata = new 
SegmentZKMetadata("online_seg01");
@@ -192,6 +178,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
         anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
 
     // No snapshot file for online_seg01, so it's skipped.
+    TableDataManager tableDataManager = mock(TableDataManager.class);
     File seg01IdxDir = new File(TEMP_DIR, "online_seg01");
     FileUtils.forceMkdir(seg01IdxDir);
     when(tableDataManager.getSegmentDataDir("online_seg01", null, 
tableConfig)).thenReturn(seg01IdxDir);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to