This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c0041e9c9e9 KAFKA-17689 Migrate ReassignReplicaShrinkTest and 
RollAndOffloadActiveSegmentTest to new test infra (#22376)
c0041e9c9e9 is described below

commit c0041e9c9e9386ad7e3f70b7c09c7a0b9c858827
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed May 27 23:49:21 2026 +0800

    KAFKA-17689 Migrate ReassignReplicaShrinkTest and 
RollAndOffloadActiveSegmentTest to new test infra (#22376)
    
    Migrate `ReassignReplicaShrinkTest` and
    `RollAndOffloadActiveSegmentTest` to the new test infrastructure.
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../integration/ReassignReplicaShrinkTest.java     | 66 +++++++++++++++------
 .../RollAndOffloadActiveSegmentTest.java           | 67 ++++++++++++++++++----
 2 files changed, 103 insertions(+), 30 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
index 0dee8b0bda6..10604a92b46 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
@@ -16,38 +16,53 @@
  */
 package org.apache.kafka.tiered.storage.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
-public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness {
+public final class ReassignReplicaShrinkTest {
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
 
-    /**
-     * Cluster of two brokers
-     * @return number of brokers in the cluster
-     */
-    @Override
-    public int brokerCount() {
-        return 2;
+    @SuppressWarnings("unused")
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
ReassignReplicaShrinkTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                        BROKER_COUNT,
+                        NUM_REMOTE_LOG_METADATA_PARTITIONS))
+                .build());
     }
 
-    /**
-     * Number of partitions in the '__remote_log_metadata' topic
-     * @return number of partitions in the '__remote_log_metadata' topic
-     */
-    @Override
-    public int numRemoteLogMetadataPartitions() {
-        return 2;
+    @ClusterTemplate("clusterConfig")
+    public void 
testReassignReplicaShrinkWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeReassignReplicaShrinkTest(clusterInstance, 
GroupProtocol.CLASSIC);
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+    @ClusterTemplate("clusterConfig")
+    public void 
testReassignReplicaShrinkWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeReassignReplicaShrinkTest(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void executeReassignReplicaShrinkTest(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws Exception {
         final int broker0 = 0;
         final int broker1 = 1;
         final String topicA = "topicA";
@@ -62,7 +77,7 @@ public final class ReassignReplicaShrinkTest extends 
TieredStorageTestHarness {
                 mkEntry(p1, List.of(broker1, broker0))
         );
 
-        builder
+        final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
                 // create topicA with 2 partitions and 2 RF
                 .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
                         replicaAssignment, enableRemoteLogStorage)
@@ -100,5 +115,18 @@ public final class ReassignReplicaShrinkTest extends 
TieredStorageTestHarness {
                 .consume(topicA, p0, 0L, 4, 3)
                 .expectFetchFromTieredStorage(broker0, topicA, p1, 3)
                 .consume(topicA, p1, 0L, 4, 3);
+
+        final Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
index 5009793b94b..e960c234760 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
@@ -16,39 +16,71 @@
  */
 package org.apache.kafka.tiered.storage.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
 /**
  * Test to verify that the active segment is rolled and uploaded to remote 
storage when the segment breaches the
  * local log retention policy.
  */
-public class RollAndOffloadActiveSegmentTest extends TieredStorageTestHarness {
+public final class RollAndOffloadActiveSegmentTest {
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+    @SuppressWarnings("unused")
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
RollAndOffloadActiveSegmentTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                        BROKER_COUNT,
+                        NUM_REMOTE_LOG_METADATA_PARTITIONS))
+                .build());
+    }
 
-    @Override
-    public int brokerCount() {
-        return 1;
+    @ClusterTemplate("clusterConfig")
+    public void 
testRollAndOffloadActiveSegmentWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeRollAndOffloadActiveSegmentTest(clusterInstance, 
GroupProtocol.CLASSIC);
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+    @ClusterTemplate("clusterConfig")
+    public void 
testRollAndOffloadActiveSegmentWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeRollAndOffloadActiveSegmentTest(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void executeRollAndOffloadActiveSegmentTest(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws Exception {
         final int broker0 = 0;
         final String topicA = "topicA";
         final int p0 = 0;
         final int partitionCount = 1;
         final int replicationFactor = 1;
         final int maxBatchCountPerSegment = 1;
-        final Map<Integer, List<Integer>> replicaAssignment = null;
+        // Pin the partition to broker 0 so the broker0-based expectations are 
deterministic
+        // regardless of how many brokers the cluster has.
+        final Map<Integer, List<Integer>> replicaAssignment = Map.of(p0, 
List.of(broker0));
         final boolean enableRemoteLogStorage = true;
 
-        // Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
-        builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+        final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
+                // Create topicA with 1 partition, 1 RF and enabled with 
remote storage.
+                .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
                         enableRemoteLogStorage)
                 // update the topic config such that it triggers the rolling 
of the active segment
                 .updateTopicConfig(topicA, configsToBeAdded(), List.of())
@@ -63,9 +95,22 @@ public class RollAndOffloadActiveSegmentTest extends 
TieredStorageTestHarness {
                 // consume from the beginning of the topic to read data from 
local and remote storage
                 .expectFetchFromTieredStorage(broker0, topicA, p0, 4)
                 .consume(topicA, p0, 0L, 4, 4);
+
+        final Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 
-    private Map<String, String> configsToBeAdded() {
+    private static Map<String, String> configsToBeAdded() {
         // Update localLog retentionMs to 1 ms and segment roll-time to 10 ms
         Map<String, String> topicConfigs = new HashMap<>();
         topicConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1");

Reply via email to