This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new c0983f7 Update time boundary only when segment is available on server (#6925) c0983f7 is described below commit c0983f78f8aac1253f5fdbbcc38742fb855b0c0b Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon May 17 12:40:50 2021 -0700 Update time boundary only when segment is available on server (#6925) In TimeBoundaryManager, only update the segment end time when there are ONLINE/CONSUMING instances in the external view to prevent moving the time boundary before the new segment is picked up by the servers. This can solve the flakiness in BasicAuthRealtimeIntegrationTest --- .../routing/timeboundary/TimeBoundaryManager.java | 13 ++++++---- .../timeboundary/TimeBoundaryManagerTest.java | 28 ++++++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java index c31e311..e62cdb3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java @@ -38,6 +38,7 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,15 +160,19 @@ public class TimeBoundaryManager { * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector). * <p>NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed * ones. The refreshed segment ZK metadata change won't be picked up. - * <p>NOTE: {@code externalView} and {@code idealState} are unused, but intentionally passed in in case they are - * needed in the future. + * <p>NOTE: {@code idealState} is unused, but intentionally passed in in case it is needed in the future. */ @SuppressWarnings("unused") public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) { for (String segment : onlineSegments) { - _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, - _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))); + // NOTE: Only update the segment end time when there are ONLINE instances in the external view to prevent moving + // the time boundary before the new segment is picked up by the servers + Map<String, String> instanceStateMap = externalView.getStateMap(segment); + if (instanceStateMap != null && instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { + _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, + _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))); + } } _endTimeMsMap.keySet().retainAll(onlineSegments); updateTimeBoundaryInfo(getMaxEndTimeMs()); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java index b1a5c18..8ae20b9 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.broker.routing.timeboundary; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.helix.ZNRecord; @@ -43,6 +45,9 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE; +import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -101,33 +106,44 @@ public class TimeBoundaryManagerTest { } private void testDailyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit) { - // NOTE: External view and ideal state are not used in the current implementation. - ExternalView externalView = Mockito.mock(ExternalView.class); - IdealState idealState = Mockito.mock(IdealState.class); + ExternalView externalView = new ExternalView(tableConfig.getTableName()); + Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields(); + Map<String, String> onlineInstanceStateMap = Collections.singletonMap("server", ONLINE); + Map<String, String> offlineInstanceStateMap = Collections.singletonMap("server", OFFLINE); + Set<String> onlineSegments = new HashSet<>(); + // NOTE: Ideal state is not used in the current implementation. + IdealState idealState = mock(IdealState.class); // Start with no segment TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore); - Set<String> onlineSegments = new HashSet<>(); timeBoundaryManager.init(externalView, idealState, onlineSegments); assertNull(timeBoundaryManager.getTimeBoundaryInfo()); // Add the first segment should update the time boundary String segment0 = "segment0"; onlineSegments.add(segment0); + segmentAssignment.put(segment0, onlineInstanceStateMap); setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit); timeBoundaryManager.init(externalView, idealState, onlineSegments); verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS)); - // Add a new segment with larger end time should update the time boundary + // Add a new segment with larger end time but no ONLINE instance should not update the time boundary String segment1 = "segment1"; onlineSegments.add(segment1); + segmentAssignment.put(segment1, offlineInstanceStateMap); setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit); timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments); + verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS)); + + // Turn the new segment ONLINE should update the time boundary + segmentAssignment.put(segment1, onlineInstanceStateMap); + timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments); verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS)); // Add new segment with larger end time but 0 total docs, should not update time boundary String segmentEmpty = "segmentEmpty"; onlineSegments.add(segmentEmpty); + segmentAssignment.put(segmentEmpty, onlineInstanceStateMap); setSegmentZKMetadataWithTotalDocs(rawTableName, segmentEmpty, 6, timeUnit, 0); timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments); verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS)); @@ -135,12 +151,14 @@ public class TimeBoundaryManagerTest { // Add a new segment with smaller end time should not change the time boundary String segment2 = "segment2"; onlineSegments.add(segment2); + segmentAssignment.put(segment2, onlineInstanceStateMap); setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit); timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments); verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS)); // Remove the segment with largest end time should update the time boundary onlineSegments.remove(segment1); + segmentAssignment.remove(segment1); timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments); verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org