This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dda1211fa2 Fix fsm creation when segment is in committing state 
(#16847)
9dda1211fa2 is described below

commit 9dda1211fa2422e8cc89cbc735db002a1f735140
Author: NOOB <[email protected]>
AuthorDate: Tue Sep 23 02:13:17 2025 +0530

    Fix fsm creation when segment is in committing state (#16847)
---
 .../helix/core/realtime/BlockingSegmentCompletionFSM.java     |  4 +++-
 .../helix/core/realtime/SegmentCompletionFSMFactoryTest.java  | 11 +++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
index f828059aae0..fa6fb5a0035 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
@@ -143,7 +143,9 @@ public class BlockingSegmentCompletionFSM implements 
SegmentCompletionFSM {
     _maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
     _controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
 
-    if (segmentMetadata.getStatus() != 
CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
+    // NOTE: If segment ZK status is COMMITTING, The current behaviour expects 
the segment protocol calls to fail and
+    // abort leaving it to realtime segment validation job to fix it.
+    if (segmentMetadata.getStatus().isCompleted()) {
       _state = BlockingSegmentCompletionFSMState.COMMITTED;
       StreamPartitionMsgOffsetFactory factory =
           
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
index 1eda2de31d6..c64dc6af0ff 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -69,6 +70,11 @@ public class SegmentCompletionFSMFactoryTest {
     SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
     when(segmentZKMetadata.getNumReplicas()).thenReturn(3);
     when(segmentZKMetadata.getEndOffset()).thenReturn("100");
+    if (Math.random() < 0.5) {
+      
when(segmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    } else {
+      
when(segmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
+    }
 
     PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager = 
mock(PinotLLCRealtimeSegmentManager.class);
     
when(pinotLLCRealtimeSegmentManager.getCommitTimeoutMS(anyString())).thenReturn(System.currentTimeMillis());
@@ -108,6 +114,11 @@ public class SegmentCompletionFSMFactoryTest {
     SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
     when(segmentZKMetadata.getNumReplicas()).thenReturn(3);
     when(segmentZKMetadata.getEndOffset()).thenReturn("100");
+    if (Math.random() < 0.5) {
+      
when(segmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
+    } else {
+      
when(segmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.COMMITTING);
+    }
 
     PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager = 
mock(PinotLLCRealtimeSegmentManager.class);
     
when(pinotLLCRealtimeSegmentManager.getCommitTimeoutMS(anyString())).thenReturn(System.currentTimeMillis());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to