This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 63facae3163 KAFKA-17687 Migrate 
DeleteSegmentsDueToLogStartOffsetBreachTest to new test infra (#22395)
63facae3163 is described below

commit 63facae316327ebe964132591e4d10fb36a7d367
Author: TengYao Chi <[email protected]>
AuthorDate: Thu May 28 15:16:42 2026 +0100

    KAFKA-17687 Migrate DeleteSegmentsDueToLogStartOffsetBreachTest to new test 
infra (#22395)
    
    This PR aims to rewrite `DeleteSegmentsDueToLogStartOffsetBreachTest`
    with new test infra.
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 ...eleteSegmentsDueToLogStartOffsetBreachTest.java | 58 +++++++++++++++++++---
 1 file changed, 51 insertions(+), 7 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
index 8a189700bfb..5db13d74760 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
@@ -16,26 +16,55 @@
  */
 package org.apache.kafka.tiered.storage.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
-public final class DeleteSegmentsDueToLogStartOffsetBreachTest extends 
TieredStorageTestHarness {
+public final class DeleteSegmentsDueToLogStartOffsetBreachTest {
 
-    @Override
-    public int brokerCount() {
-        return 2;
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
DeleteSegmentsDueToLogStartOffsetBreachTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                        BROKER_COUNT,
+                        NUM_REMOTE_LOG_METADATA_PARTITIONS))
+                .build());
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testDeleteSegmentsDueToLogStartOffsetBreachWithClassicGroupProtocol(ClusterInstance
 clusterInstance) throws Exception {
+        executeDeleteSegmentsDueToLogStartOffsetBreachTest(clusterInstance, 
GroupProtocol.CLASSIC);
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+    @ClusterTemplate("clusterConfig")
+    public void 
testDeleteSegmentsDueToLogStartOffsetBreachWithConsumerGroupProtocol(ClusterInstance
 clusterInstance) throws Exception {
+        executeDeleteSegmentsDueToLogStartOffsetBreachTest(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void 
executeDeleteSegmentsDueToLogStartOffsetBreachTest(ClusterInstance 
clusterInstance,
+                                                                    
GroupProtocol groupProtocol) throws Exception {
         final int broker0 = 0;
         final int broker1 = 1;
         final String topicA = "topicA";
@@ -50,6 +79,8 @@ public final class 
DeleteSegmentsDueToLogStartOffsetBreachTest extends TieredSto
         final long beforeOffset = 3L;
         final long beforeOffset1 = 7L;
 
+        TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
+
         // Create topicA with 1 partition and 2 RF
         builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
                         enableRemoteLogStorage)
@@ -87,5 +118,18 @@ public final class 
DeleteSegmentsDueToLogStartOffsetBreachTest extends TieredSto
                 // consume from the topic with fetch-offset 7 to read data 
from local and remote storage
                 .expectFetchFromTieredStorage(broker1, topicA, p0, 1)
                 .consume(topicA, p0, 7L, 3, 1);
+
+        Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 }

Reply via email to