This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7eeb5c83443 MINOR: Removing incorrect multi threaded state transition 
tests (#20436)
7eeb5c83443 is described below

commit 7eeb5c834438395e50ea8fe18448ae092c03d8f5
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Aug 29 07:45:07 2025 +0100

    MINOR: Removing incorrect multi threaded state transition tests (#20436)
    
    These tests were written while finalizing approach for making inflight
    state class thread safe but later approach changed and the lock is now
    always required by SharePartition to change inflight state. Hence these
    tests are incorrect and do not add any value.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/server/share/SharePartitionTest.java     | 64 ----------------------
 1 file changed, 64 deletions(-)

diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c787fbea1d2..d1c6f9977b0 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -56,7 +56,6 @@ import 
org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
-import org.apache.kafka.server.share.fetch.DeliveryCountOps;
 import org.apache.kafka.server.share.fetch.InFlightState;
 import org.apache.kafka.server.share.fetch.RecordState;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -91,7 +90,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -8535,68 +8533,6 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
     }
 
-    @Test
-    public void inFlightStateRollbackAndArchiveStateTransition() throws 
InterruptedException {
-        InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 
1, MEMBER_ID);
-
-        inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, 
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
-        assertTrue(inFlightState.hasOngoingStateTransition());
-
-        // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED 
which is not committed yet. At the same
-        // time when we have a call to completeStateTransition with false 
commit value, we get a call to ARCHIVE the record.
-        // No matter the order of the 2 calls, we should always be getting the 
final state as ARCHIVED.
-        ExecutorService executorService = Executors.newFixedThreadPool(2);
-        try {
-            List<Callable<Void>> callables = List.of(
-                () -> {
-                    inFlightState.archive();
-                    return null;
-                },
-                () -> {
-                    inFlightState.completeStateTransition(false);
-                    return null;
-                }
-            );
-            executorService.invokeAll(callables);
-        } finally {
-            if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
-                executorService.shutdown();
-        }
-        assertEquals(RecordState.ARCHIVED, inFlightState.state());
-        assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
-    }
-
-    @Test
-    public void inFlightStateCommitSuccessAndArchiveStateTransition() throws 
InterruptedException {
-        InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 
1, MEMBER_ID);
-
-        inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, 
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
-        assertTrue(inFlightState.hasOngoingStateTransition());
-
-        // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED 
which is not committed yet. At the same
-        // time when we have a call to completeStateTransition with true 
commit value, we get a call to ARCHIVE the record.
-        // No matter the order of the 2 calls, we should always be getting the 
final state as ARCHIVED.
-        ExecutorService executorService = Executors.newFixedThreadPool(2);
-        try {
-            List<Callable<Void>> callables = List.of(
-                () -> {
-                    inFlightState.archive();
-                    return null;
-                },
-                () -> {
-                    inFlightState.completeStateTransition(true);
-                    return null;
-                }
-            );
-            executorService.invokeAll(callables);
-        } finally {
-            if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
-                executorService.shutdown();
-        }
-        assertEquals(RecordState.ARCHIVED, inFlightState.state());
-        assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
-    }
-
     @Test
     public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws 
InterruptedException {
         Persister persister = Mockito.mock(Persister.class);

Reply via email to