This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da83631684f KAFKA-17689 Migrate 
FetchFromLeaderWithCorruptedCheckpointTest and PartitionsExpandTest (#22400)
da83631684f is described below

commit da83631684ffd3b6b9a36accf089796c5b22585b
Author: Murali Basani <[email protected]>
AuthorDate: Fri May 29 11:36:01 2026 +0200

    KAFKA-17689 Migrate FetchFromLeaderWithCorruptedCheckpointTest and 
PartitionsExpandTest (#22400)
    
    Ref : https://issues.apache.org/jira/browse/KAFKA-17689
    As part of the mass migration :
    - FetchFromLeaderWithCorruptedCheckpointTest
    - PartitionsExpandTest
    
    executeFetchFromLeaderWithCorruptedCheckpointTest goes through
    stop/start of the broker.
---
 ...FetchFromLeaderWithCorruptedCheckpointTest.java | 57 +++++++++++++++++++---
 .../storage/integration/PartitionsExpandTest.java  | 57 +++++++++++++++++++---
 2 files changed, 98 insertions(+), 16 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
index 7f385fdb2a9..07af185b2e3 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
@@ -18,27 +18,55 @@ package org.apache.kafka.tiered.storage.integration;
 
 import kafka.server.ReplicaManager;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
 import org.apache.kafka.storage.internals.log.LogManager;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
-public class FetchFromLeaderWithCorruptedCheckpointTest extends 
TieredStorageTestHarness {
+public final class FetchFromLeaderWithCorruptedCheckpointTest {
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
 
-    @Override
-    public int brokerCount() {
-        return 2;
+    @SuppressWarnings("unused")
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
FetchFromLeaderWithCorruptedCheckpointTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                        BROKER_COUNT,
+                        NUM_REMOTE_LOG_METADATA_PARTITIONS))
+                .build());
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+    @ClusterTemplate("clusterConfig")
+    public void 
testFetchFromLeaderWithCorruptedCheckpointWithClassicGroupProtocol(ClusterInstance
 clusterInstance) throws Exception {
+        executeFetchFromLeaderWithCorruptedCheckpointTest(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testFetchFromLeaderWithCorruptedCheckpointWithConsumerGroupProtocol(ClusterInstance
 clusterInstance) throws Exception {
+        executeFetchFromLeaderWithCorruptedCheckpointTest(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void 
executeFetchFromLeaderWithCorruptedCheckpointTest(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws Exception {
         final int broker0 = 0;
         final int broker1 = 1;
         final String topicA = "topicA";
@@ -53,7 +81,8 @@ public class FetchFromLeaderWithCorruptedCheckpointTest 
extends TieredStorageTes
                 LogManager.RECOVERY_POINT_CHECKPOINT_FILE,
                 CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME);
 
-        builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,
+        final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
+                .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,
                         enableRemoteLogStorage)
                 // send records to partition 0
                 .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
@@ -80,5 +109,17 @@ public class FetchFromLeaderWithCorruptedCheckpointTest 
extends TieredStorageTes
                 .expectFetchFromTieredStorage(broker0, topicA, p0, 4)
                 .consume(topicA, p0, 0L, 5, 4);
 
+        final Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
index fea0bc6b1b5..f7e1ae2839d 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
@@ -16,25 +16,53 @@
  */
 package org.apache.kafka.tiered.storage.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
-public final class PartitionsExpandTest extends TieredStorageTestHarness {
+public final class PartitionsExpandTest {
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
 
-    @Override
-    public int brokerCount() {
-        return 2;
+    @SuppressWarnings("unused")
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
PartitionsExpandTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                        BROKER_COUNT,
+                        NUM_REMOTE_LOG_METADATA_PARTITIONS))
+                .build());
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+    @ClusterTemplate("clusterConfig")
+    public void testPartitionsExpandWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executePartitionsExpandTest(clusterInstance, GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void testPartitionsExpandWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executePartitionsExpandTest(clusterInstance, GroupProtocol.CONSUMER);
+    }
+
+    private void executePartitionsExpandTest(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol) throws Exception {
         final int broker0 = 0;
         final int broker1 = 1;
         final String topicA = "topicA";
@@ -49,7 +77,7 @@ public final class PartitionsExpandTest extends 
TieredStorageTestHarness {
         final List<Integer> p1Assignment = List.of(broker0, broker1);
         final List<Integer> p2Assignment = List.of(broker1, broker0);
 
-        builder
+        final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
                 .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
                         Map.of(p0, p0Assignment), enableRemoteLogStorage)
                 // produce events to partition 0
@@ -102,5 +130,18 @@ public final class PartitionsExpandTest extends 
TieredStorageTestHarness {
                 // consume from the middle of the topic for partition 2
                 .expectFetchFromTieredStorage(broker1, topicA, p2, 1)
                 .consume(topicA, p2, 1L, 2, 1);
+
+        final Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 }

Reply via email to