This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e87c055 Fix segment status checker to skip push in-progress segments (#8323) e87c055 is described below commit e87c05594f5d48143c5df4f9c498bd9ba74f7e8b Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Thu Mar 10 10:15:47 2022 -0800 Fix segment status checker to skip push in-progress segments (#8323) --- .../pinot/controller/helix/SegmentStatusChecker.java | 18 +++++++----------- .../controller/helix/SegmentStatusCheckerTest.java | 19 +++++++++++++++++++ .../tests/ControllerPeriodicTasksIntegrationTest.java | 3 +++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 93350fe..1eea570 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -178,21 +178,17 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh // No online segments in ideal state continue; } + SegmentZKMetadata segmentZKMetadata = + _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, partitionName); + if (segmentZKMetadata != null + && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) { + // Push is not finished yet, skip the segment + continue; + } nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState .getInstanceStateMap(partitionName).size() : nReplicasIdealMax; if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) { // No replicas for this segment - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) { - SegmentZKMetadata segmentZKMetadata = - _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, partitionName); - - if (segmentZKMetadata != null - && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) { - // push not yet finished, skip - continue; - } - } nOffline++; if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) { LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 6244265..5462839 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -340,12 +340,17 @@ public class SegmentStatusCheckerTest { idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot2", "ONLINE"); + idealState.setPartitionState("myTable_2", "pinot1", "ONLINE"); + idealState.setPartitionState("myTable_2", "pinot2", "ONLINE"); idealState.setReplicas("2"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); ExternalView externalView = new ExternalView(offlineTableName); externalView.setState("myTable_1", "pinot1", "ONLINE"); externalView.setState("myTable_1", "pinot2", "ONLINE"); + // myTable_2 is push in-progress and only one replica has been downloaded by servers. It will be skipped for + // the segment status check. + externalView.setState("myTable_2", "pinot1", "ONLINE"); ZNRecord znrecord = new ZNRecord("myTable_0"); znrecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1"); @@ -359,6 +364,18 @@ public class SegmentStatusCheckerTest { znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); + ZNRecord znrecord2 = new ZNRecord("myTable_2"); + znrecord2.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1"); + znrecord2.setLongField(CommonConstants.Segment.START_TIME, 1000); + znrecord2.setLongField(CommonConstants.Segment.END_TIME, 2000); + znrecord2.setSimpleField(CommonConstants.Segment.TIME_UNIT, TimeUnit.HOURS.toString()); + znrecord2.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000); + znrecord2.setLongField(CommonConstants.Segment.CRC, 1235); + znrecord2.setLongField(CommonConstants.Segment.CREATION_TIME, 3000); + znrecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_2"); + znrecord2.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); + znrecord2.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); + { _helixResourceManager = mock(PinotHelixResourceManager.class); when(_helixResourceManager.getAllTables()).thenReturn(allTableNames); @@ -366,6 +383,8 @@ public class SegmentStatusCheckerTest { when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_0")) .thenReturn(new SegmentZKMetadata(znrecord)); + when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_2")) + .thenReturn(new SegmentZKMetadata(znrecord2)); } { _config = mock(ControllerConf.class); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 45bf638..64a894b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -62,6 +62,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 30; private static final int PERIODIC_TASK_FREQUENCY_SECONDS = 5; private static final String PERIODIC_TASK_FREQUENCY = "5s"; + private static final String PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD = "5s"; private static final int NUM_REPLICAS = 2; private static final String TENANT_NAME = "TestTenant"; @@ -124,6 +125,8 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati PERIODIC_TASK_INITIAL_DELAY_SECONDS); properties.put(ControllerPeriodicTasksConf.DEPRECATED_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS, PERIODIC_TASK_FREQUENCY_SECONDS); + properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD, + PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD); startController(properties); startBrokers(NUM_BROKERS); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org