This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9cfce82385 Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188) 9cfce82385 is described below commit 9cfce82385dfcd11eed4467c07f2409b76622d57 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Thu Dec 21 15:16:20 2023 -0800 Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188) --- .../protocols/SegmentCompletionProtocol.java | 2 ++ .../resources/LLCSegmentCompletionHandlers.java | 3 ++- .../segment/CommittingSegmentDescriptor.java | 10 ++++++++++ .../segment/SegmentFlushThresholdComputer.java | 14 +++++++++---- .../segment/SegmentFlushThresholdComputerTest.java | 23 ++++++++++++++++++++++ .../realtime/RealtimeSegmentDataManager.java | 2 +- 6 files changed, 48 insertions(+), 6 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index 3b0753919d..1cad2d09f5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -255,6 +255,7 @@ public class SegmentCompletionProtocol { _memoryUsedBytes = MEMORY_USED_BYTES_DEFAULT; _segmentSizeBytes = SEGMENT_SIZE_BYTES_DEFAULT; _streamPartitionMsgOffset = null; + _reason = null; } public Params(Params params) { @@ -269,6 +270,7 @@ public class SegmentCompletionProtocol { _memoryUsedBytes = params.getMemoryUsedBytes(); _segmentSizeBytes = params.getSegmentSizeBytes(); _streamPartitionMsgOffset = params.getStreamPartitionMsgOffset(); + _reason = params.getReason(); } @Deprecated diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index cef0de67b4..993028585c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -363,6 +363,7 @@ public class LLCSegmentCompletionHandlers { @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes, + @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason, FormDataMultiPart metadataFiles) { if (instanceId == null || segmentName == null || segmentLocation == null || metadataFiles == null || (offset == -1 && streamPartitionMsgOffset == null)) { @@ -376,7 +377,7 @@ public class LLCSegmentCompletionHandlers { SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentLocation(segmentLocation) .withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis) - .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes); + .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes).withReason(stopReason); extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentCommitEndWithMetadata:{}", requestParams.toString()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java index c264143024..f26b2d5d39 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java @@ -31,6 +31,7 @@ public class CommittingSegmentDescriptor { private String _segmentLocation; private String _nextOffset; private SegmentMetadataImpl _segmentMetadata; + private String _stopReason; public static CommittingSegmentDescriptor fromSegmentCompletionReqParams( SegmentCompletionProtocol.Request.Params reqParams) { @@ -38,6 +39,7 @@ public class CommittingSegmentDescriptor { new CommittingSegmentDescriptor(reqParams.getSegmentName(), reqParams.getStreamPartitionMsgOffset(), reqParams.getSegmentSizeBytes()); committingSegmentDescriptor.setSegmentLocation(reqParams.getSegmentLocation()); + committingSegmentDescriptor.setStopReason(reqParams.getReason()); return committingSegmentDescriptor; } @@ -95,4 +97,12 @@ public class CommittingSegmentDescriptor { public void setSegmentMetadata(SegmentMetadataImpl segmentMetadata) { _segmentMetadata = segmentMetadata; } + + public String getStopReason() { + return _stopReason; + } + + public void setStopReason(String stopReason) { + _stopReason = stopReason; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java index 808642a345..2c826dd60e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java @@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import java.time.Clock; import javax.annotation.Nullable; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.TimeUtils; + class SegmentFlushThresholdComputer { public static final int MINIMUM_NUM_ROWS_THRESHOLD = 10_000; static final double CURRENT_SEGMENT_RATIO_WEIGHT = 0.1; @@ -78,11 +80,15 @@ class SegmentFlushThresholdComputer { } final long committingSegmentSizeBytes = committingSegmentDescriptor.getSegmentSizeBytes(); - if (committingSegmentSizeBytes <= 0) { // repair segment case + if (committingSegmentSizeBytes <= 0 // repair segment case + || SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED.equals( + committingSegmentDescriptor.getStopReason())) { + String reason = committingSegmentSizeBytes <= 0 // + ? "Committing segment size is not available" // + : "Committing segment is due to force-commit"; final int targetNumRows = committingSegmentZKMetadata.getSizeThresholdToFlushSegment(); - SegmentSizeBasedFlushThresholdUpdater.LOGGER.info( - "Committing segment size is not available, setting thresholds from previous segment for {} as {}", - newSegmentName, targetNumRows); + SegmentSizeBasedFlushThresholdUpdater.LOGGER.info("{}, setting thresholds from previous segment for {} as {}", + reason, newSegmentName, targetNumRows); return targetNumRows; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java index 9bdc1656a0..a9d1c27221 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java @@ -26,6 +26,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.testng.annotations.Test; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -105,6 +106,28 @@ public class SegmentFlushThresholdComputerTest { assertEquals(threshold, segmentSizeThreshold); } + @Test + public void testUseLastSegmentsThresholdIfSegmentIsCommittingDueToForceCommit() { + long committingSegmentSizeBytes = 500_000L; + int committingSegmentSizeThreshold = 25_000; + SegmentFlushThresholdComputer computer = new SegmentFlushThresholdComputer(); + + CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class); + when(committingSegmentDescriptor.getSegmentSizeBytes()).thenReturn(committingSegmentSizeBytes); + when(committingSegmentDescriptor.getStopReason()).thenReturn(REASON_FORCE_COMMIT_MESSAGE_RECEIVED); + + SegmentZKMetadata committingSegmentZKMetadata = mock(SegmentZKMetadata.class); + when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(committingSegmentSizeThreshold); + + StreamConfig streamConfig = mock(StreamConfig.class); + + int newSegmentSizeThreshold = + computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata, + "newSegmentName"); + + assertEquals(newSegmentSizeThreshold, committingSegmentSizeThreshold); + } + @Test public void testApplyMultiplierToTotalDocsWhenTimeThresholdNotReached() { long currentTime = 1640216032391L; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index d68c11f8e6..fa1501e868 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1060,7 +1060,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString()) - .withNumRows(_numRowsConsumed).withInstanceId(_instanceId) + .withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason) .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis()) .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes()) .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org