This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ce017c00d4 Minor improvement to upsert preload (#11694) ce017c00d4 is described below commit ce017c00d406cdb56c712d85e4010babd90772a0 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Sep 27 12:11:53 2023 -0700 Minor improvement to upsert preload (#11694) --- .../upsert/BaseTableUpsertMetadataManager.java | 21 +++++--- ...oncurrentMapTableUpsertMetadataManagerTest.java | 61 +++++++++------------- 2 files changed, 39 insertions(+), 43 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 2fecc4fbe5..9de2349007 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -161,13 +161,14 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad throws Exception { LOGGER.info("Preload segments from table: {} for fast upsert metadata recovery", _tableNameWithType); onPreloadStart(); - IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType); ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore(); String instanceId = getInstanceId(); IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig(); + Map<String, Map<String, String>> segmentAssignment = getSegmentAssignment(); List<Future<?>> futures = new ArrayList<>(); - for (String segmentName : idealState.getPartitionSet()) { - Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName); + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); String state = instanceStateMap.get(instanceId); if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) { LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state); @@ -208,18 +209,26 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad protected void onPreloadFinish() { } - private String getInstanceId() { + @VisibleForTesting + String getInstanceId() { InstanceDataManagerConfig instanceDataManagerConfig = _tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(); return instanceDataManagerConfig.getInstanceId(); } @VisibleForTesting - protected IndexLoadingConfig createIndexLoadingConfig() { + IndexLoadingConfig createIndexLoadingConfig() { return new IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(), _tableConfig, _schema); } + @VisibleForTesting + Map<String, Map<String, String>> getSegmentAssignment() { + IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", _tableNameWithType); + return idealState.getRecord().getMapFields(); + } + private void preloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) { LOGGER.info("Preload segment: {} from table: {}", segmentName, _tableNameWithType); @@ -237,7 +246,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad } @VisibleForTesting - protected void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig, + void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata) { // This method might modify the file on disk. Use segment lock to prevent race condition Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java index ee1d6ffbd2..526e0920c3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java @@ -29,20 +29,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; -import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; -import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.V1Constants; -import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; @@ -53,7 +48,6 @@ import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -117,21 +111,35 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { @Test public void testPreloadOnlineSegments() throws Exception { + String instanceId = "server01"; + Map<String, Map<String, String>> segmentAssignment = new HashMap<>(); Set<String> preloadedSegments = new HashSet<>(); AtomicBoolean wasPreloading = new AtomicBoolean(false); ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager() { + + @Override + String getInstanceId() { + return instanceId; + } + @Override - protected IndexLoadingConfig createIndexLoadingConfig() { + IndexLoadingConfig createIndexLoadingConfig() { return mock(IndexLoadingConfig.class); } @Override - protected void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig, + Map<String, Map<String, String>> getSegmentAssignment() { + return segmentAssignment; + } + + @Override + void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata) { wasPreloading.set(isPreloading()); preloadedSegments.add(segmentName); } }; + // Setup mocks for TableConfig and Schema. String tableNameWithType = "myTable_REALTIME"; TableConfig tableConfig = mock(TableConfig.class); @@ -146,38 +154,16 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { // Setup mocks for HelixManager. HelixManager helixManager = mock(HelixManager.class); - IdealState idealState = mock(IdealState.class); - HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); - PropertyKey.Builder keyBuilder = mock(PropertyKey.Builder.class); ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); - PropertyKey propKey = mock(PropertyKey.class); - when(helixManager.getHelixDataAccessor()).thenReturn(dataAccessor); when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); - when(dataAccessor.keyBuilder()).thenReturn(keyBuilder); - when(keyBuilder.idealStates(anyString())).thenReturn(propKey); - when(dataAccessor.getProperty(propKey)).thenReturn(idealState); - // Setup mocks to return the instanceId. - String instanceId = "server01"; - TableDataManager tableDataManager = mock(TableDataManager.class); - TableDataManagerConfig tdmc = mock(TableDataManagerConfig.class); - InstanceDataManagerConfig idmc = mock(InstanceDataManagerConfig.class); - when(tableDataManager.getTableDataManagerConfig()).thenReturn(tdmc); - when(tdmc.getInstanceDataManagerConfig()).thenReturn(idmc); - when(idmc.getInstanceId()).thenReturn(instanceId); - - // Only ONLINE segments are preloaded. - Map<String, Map<String, String>> segStates = new HashMap<>(); - segStates.put("consuming_seg01", ImmutableMap.of(instanceId, "CONSUMING")); - segStates.put("consuming_seg02", ImmutableMap.of(instanceId, "CONSUMING")); - segStates.put("online_seg01", ImmutableMap.of(instanceId, "ONLINE")); - segStates.put("online_seg02", ImmutableMap.of(instanceId, "ONLINE")); - segStates.put("offline_seg01", ImmutableMap.of(instanceId, "OFFLINE")); - segStates.put("offline_seg02", ImmutableMap.of(instanceId, "OFFLINE")); - when(idealState.getPartitionSet()).thenReturn(segStates.keySet()); - for (String segName : segStates.keySet()) { - when(idealState.getInstanceStateMap(segName)).thenReturn(segStates.get(segName)); - } + // Setup segment assignment. Only ONLINE segments are preloaded. + segmentAssignment.put("consuming_seg01", ImmutableMap.of(instanceId, "CONSUMING")); + segmentAssignment.put("consuming_seg02", ImmutableMap.of(instanceId, "CONSUMING")); + segmentAssignment.put("online_seg01", ImmutableMap.of(instanceId, "ONLINE")); + segmentAssignment.put("online_seg02", ImmutableMap.of(instanceId, "ONLINE")); + segmentAssignment.put("offline_seg01", ImmutableMap.of(instanceId, "OFFLINE")); + segmentAssignment.put("offline_seg02", ImmutableMap.of(instanceId, "OFFLINE")); // Setup mocks to get file path to validDocIds snapshot. SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg01"); @@ -192,6 +178,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord()); // No snapshot file for online_seg01, so it's skipped. + TableDataManager tableDataManager = mock(TableDataManager.class); File seg01IdxDir = new File(TEMP_DIR, "online_seg01"); FileUtils.forceMkdir(seg01IdxDir); when(tableDataManager.getSegmentDataDir("online_seg01", null, tableConfig)).thenReturn(seg01IdxDir); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org