Jackie-Jiang commented on a change in pull request #5782: URL: https://github.com/apache/incubator-pinot/pull/5782#discussion_r463913518
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -2366,6 +2404,23 @@ public void endReplaceSegments(String tableNameWithType, String segmentLineageEn tableNameWithType, segmentLineageEntryId); } + private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) { + ExternalView externalView = getTableExternalView(tableNameWithType); + if (externalView != null) { + Set<String> partitionSet = externalView.getPartitionSet(); + Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(partitionSet.size())); + for (String partition : partitionSet) { + Map<String, String> instanceStateMap = externalView.getStateMap(partition); + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap + .containsValue(SegmentStateModel.CONSUMING)) { + onlineSegments.add(partition); + } + } + return onlineSegments; + } else { + return null; Review comment: Directly throw exception instead of returning `null` here to avoid NPE on the caller side: `Preconditions.checkState(externalView != null, ...)` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -2366,6 +2404,23 @@ public void endReplaceSegments(String tableNameWithType, String segmentLineageEn tableNameWithType, segmentLineageEntryId); } + private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) { + ExternalView externalView = getTableExternalView(tableNameWithType); + if (externalView != null) { + Set<String> partitionSet = externalView.getPartitionSet(); + Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(partitionSet.size())); + for (String partition : partitionSet) { + Map<String, String> instanceStateMap = externalView.getStateMap(partition); + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap + .containsValue(SegmentStateModel.CONSUMING)) { + onlineSegments.add(partition); + } + } + return onlineSegments; Review comment: Use `externalView.getMapFields()` to avoid extra map lookups. ```suggestion Map<String, Map<String, String>> segmentAssignment = externalView.getMapFields(); Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { Map<String, String> instanceStateMap = entry.getValue(); if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap .containsValue(SegmentStateModel.CONSUMING)) { onlineSegments.add(entry.getKey()); } } return onlineSegments; ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -2344,14 +2361,35 @@ public void endReplaceSegments(String tableNameWithType, String segmentLineageEn return true; } + // Check that all the segments from 'segmentsTo' exist in the table + Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType)); + Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()), String.format( + "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', " + + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable)); + + // Check that all the segments from 'segmentsTo' became ONLINE in the external view + Set<String> onlineSegmentsFromEV = getOnlineSegmentsFromExternalView(tableNameWithType); + Preconditions.checkArgument(onlineSegmentsFromEV.containsAll(lineageEntry.getSegmentsTo()), String.format( Review comment: We should not throw exception here. It takes some time for segment to become online in external view, so we might want to block for some time (put a proper timeout) ########## File path: pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java ########## @@ -639,6 +643,29 @@ public void testBatchUpload() Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.COMPLETED); } + private void waitForEVUpdateForSegments(String tableNameWithType, Set<String> segments) Review comment: The API should handle this wait instead of the client. User doesn't need to understand that they need to wait for external view to change before ending the replacement ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org