This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7eeb5c83443 MINOR: Removing incorrect multi threaded state transition
tests (#20436)
7eeb5c83443 is described below
commit 7eeb5c834438395e50ea8fe18448ae092c03d8f5
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Aug 29 07:45:07 2025 +0100
MINOR: Removing incorrect multi threaded state transition tests (#20436)
These tests were written while finalizing approach for making inflight
state class thread safe but later approach changed and the lock is now
always required by SharePartition to change inflight state. Hence these
tests are incorrect and do not add any value.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/server/share/SharePartitionTest.java | 64 ----------------------
1 file changed, 64 deletions(-)
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c787fbea1d2..d1c6f9977b0 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -56,7 +56,6 @@ import
org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
-import org.apache.kafka.server.share.fetch.DeliveryCountOps;
import org.apache.kafka.server.share.fetch.InFlightState;
import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -91,7 +90,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -8535,68 +8533,6 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(7L).batchState());
}
- @Test
- public void inFlightStateRollbackAndArchiveStateTransition() throws
InterruptedException {
- InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED,
1, MEMBER_ID);
-
- inFlightState.startStateTransition(RecordState.ACKNOWLEDGED,
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
- assertTrue(inFlightState.hasOngoingStateTransition());
-
- // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED
which is not committed yet. At the same
- // time when we have a call to completeStateTransition with false
commit value, we get a call to ARCHIVE the record.
- // No matter the order of the 2 calls, we should always be getting the
final state as ARCHIVED.
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- try {
- List<Callable<Void>> callables = List.of(
- () -> {
- inFlightState.archive();
- return null;
- },
- () -> {
- inFlightState.completeStateTransition(false);
- return null;
- }
- );
- executorService.invokeAll(callables);
- } finally {
- if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
- executorService.shutdown();
- }
- assertEquals(RecordState.ARCHIVED, inFlightState.state());
- assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
- }
-
- @Test
- public void inFlightStateCommitSuccessAndArchiveStateTransition() throws
InterruptedException {
- InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED,
1, MEMBER_ID);
-
- inFlightState.startStateTransition(RecordState.ACKNOWLEDGED,
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
- assertTrue(inFlightState.hasOngoingStateTransition());
-
- // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED
which is not committed yet. At the same
- // time when we have a call to completeStateTransition with true
commit value, we get a call to ARCHIVE the record.
- // No matter the order of the 2 calls, we should always be getting the
final state as ARCHIVED.
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- try {
- List<Callable<Void>> callables = List.of(
- () -> {
- inFlightState.archive();
- return null;
- },
- () -> {
- inFlightState.completeStateTransition(true);
- return null;
- }
- );
- executorService.invokeAll(callables);
- } finally {
- if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
- executorService.shutdown();
- }
- assertEquals(RecordState.ARCHIVED, inFlightState.state());
- assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
- }
-
@Test
public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws
InterruptedException {
Persister persister = Mockito.mock(Persister.class);