This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8293facfa0 Keep last completed segment for retention (#10754) 8293facfa0 is described below commit 8293facfa065ae7b317403a1bbeb969881a84ec1 Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Thu May 18 18:13:43 2023 -0700 Keep last completed segment for retention (#10754) --- .../helix/core/PinotHelixResourceManager.java | 21 ++++++ .../helix/core/retention/RetentionManager.java | 8 +++ .../helix/core/retention/RetentionManagerTest.java | 77 ++++++++++++++++++++++ 3 files changed, 106 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 9142b93792..111ea6a24b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -34,6 +34,7 @@ import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -118,6 +119,8 @@ import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.tier.TierSegmentSelector; import org.apache.pinot.common.utils.BcryptUtils; import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; @@ -872,6 +875,24 @@ public class PinotHelixResourceManager { return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType); } + public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) { + Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>(); + for (SegmentZKMetadata segMetadata : getSegmentsZKMetadata(tableNameWithType)) { + if (SegmentName.isLowLevelConsumerSegmentName(segMetadata.getSegmentName()) + && segMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) { + LLCSegmentName llcName = LLCSegmentName.of(segMetadata.getSegmentName()); + int partitionGroupId = llcName.getPartitionGroupId(); + int sequenceNumber = llcName.getSequenceNumber(); + String lastCompletedSegName = partitionIdToLastLLCCompletedSegmentMap.get(partitionGroupId); + if (lastCompletedSegName == null + || LLCSegmentName.of(lastCompletedSegName).getSequenceNumber() < sequenceNumber) { + partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, segMetadata.getSegmentName()); + } + } + } + return partitionIdToLastLLCCompletedSegmentMap.values(); + } + public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) { return deleteSegments(tableNameWithType, segmentNames, null); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java index 36443146e0..93c7d8573b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java @@ -159,6 +159,10 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { } } } + + // Remove last sealed segments such that the table can still create new consuming segments if it's paused + segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(realtimeTableName)); + if (!segmentsToDelete.isEmpty()) { LOGGER.info("Deleting {} segments from table: {}", segmentsToDelete.size(), realtimeTableName); _pinotHelixResourceManager.deleteSegments(realtimeTableName, segmentsToDelete); @@ -214,6 +218,10 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { // Write back to the lineage entry if (SegmentLineageAccessHelper.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion)) { + // Remove last sealed segments such that the table can still create new consuming segments if it's paused + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(tableNameWithType)); + } // Delete segments based on the segment lineage if (!segmentsToDelete.isEmpty()) { _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index 6da50f5c5b..2f16f830b9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -233,6 +233,39 @@ public class RetentionManagerTest { verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList()); } + // This test makes sure that we do not clean up last llc completed segments + @Test + public void testRealtimeLastLLCCleanup() + throws Exception { + final long now = System.currentTimeMillis(); + final int replicaCount = 1; + + TableConfig tableConfig = createRealtimeTableConfig1(replicaCount); + List<String> removedSegments = new ArrayList<>(); + LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + PinotHelixResourceManager pinotHelixResourceManager = + setupSegmentMetadataForPausedTable(tableConfig, now, removedSegments); + setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager); + + ControllerConf conf = new ControllerConf(); + ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); + conf.setRetentionControllerFrequencyInSeconds(0); + conf.setDeletedSegmentsRetentionInDays(0); + RetentionManager retentionManager = + new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics); + retentionManager.start(); + retentionManager.run(); + + SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager(); + + // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called. + verify(deletionManager, times(1)).removeAgedDeletedSegments(leadControllerManager); + + // Verify that the deleteSegments method is actually called. + verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList()); + } + private PinotHelixResourceManager setupSegmentMetadata(TableConfig tableConfig, final long now, final int nSegments, List<String> segmentsToBeDeleted) { final int replicaCount = tableConfig.getReplication(); @@ -297,6 +330,50 @@ public class RetentionManagerTest { return pinotHelixResourceManager; } + private PinotHelixResourceManager setupSegmentMetadataForPausedTable(TableConfig tableConfig, final long now, + List<String> segmentsToBeDeleted) { + final int replicaCount = tableConfig.getReplication(); + + List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); + + IdealState idealState = + PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true); + + final int kafkaPartition = 5; + final long millisInDays = TimeUnit.DAYS.toMillis(1); + final String serverName = "Server_localhost_0"; + LLCSegmentName llcSegmentName0 = new LLCSegmentName(TEST_TABLE_NAME, kafkaPartition, 0, now); + SegmentZKMetadata segmentZKMetadata0 = createSegmentZKMetadata(llcSegmentName0.getSegmentName(), replicaCount, now); + segmentZKMetadata0.setTimeUnit(TimeUnit.MILLISECONDS); + segmentZKMetadata0.setStartTime(now - 30 * millisInDays); + segmentZKMetadata0.setEndTime(now - 20 * millisInDays); + segmentZKMetadata0.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + segmentsZKMetadata.add(segmentZKMetadata0); + idealState.setPartitionState(llcSegmentName0.getSegmentName(), serverName, "ONLINE"); + segmentsToBeDeleted.add(llcSegmentName0.getSegmentName()); + + LLCSegmentName llcSegmentName1 = new LLCSegmentName(TEST_TABLE_NAME, kafkaPartition, 1, now); + SegmentZKMetadata segmentZKMetadata1 = createSegmentZKMetadata(llcSegmentName1.getSegmentName(), replicaCount, now); + segmentZKMetadata1.setTimeUnit(TimeUnit.MILLISECONDS); + segmentZKMetadata1.setStartTime(now - 20 * millisInDays); + segmentZKMetadata1.setEndTime(now - 10 * millisInDays); + segmentZKMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + segmentsZKMetadata.add(segmentZKMetadata1); + idealState.setPartitionState(llcSegmentName1.getSegmentName(), serverName, "ONLINE"); + + PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(pinotHelixResourceManager.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segmentsZKMetadata); + when(pinotHelixResourceManager.getHelixClusterName()).thenReturn(HELIX_CLUSTER_NAME); + when(pinotHelixResourceManager.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod(); + + HelixAdmin helixAdmin = mock(HelixAdmin.class); + when(helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(idealState); + when(pinotHelixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); + + return pinotHelixResourceManager; + } + private SegmentZKMetadata createSegmentZKMetadata(String segmentName, int replicaCount, long segmentCreationTime) { SegmentZKMetadata segmentMetadata = new SegmentZKMetadata(segmentName); segmentMetadata.setCreationTime(segmentCreationTime); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org