This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 80f6bddcef3 KAFKA-17689 Migrate TieredStorageTestHarness to new test 
infra (#22361)
80f6bddcef3 is described below

commit 80f6bddcef36990bba4a8d89928d79df034f9a1d
Author: TaiJuWu <[email protected]>
AuthorDate: Sun May 24 17:31:36 2026 +0800

    KAFKA-17689 Migrate TieredStorageTestHarness to new test infra (#22361)
    
    Migrate DeleteSegmentsTest to new test infra
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../integration/BaseDeleteSegmentsTest.java        |  74 -----------
 .../DeleteSegmentsByRetentionSizeTest.java         |  29 -----
 .../DeleteSegmentsByRetentionTimeTest.java         |  29 -----
 .../storage/integration/DeleteSegmentsTest.java    | 137 +++++++++++++++++++++
 4 files changed, 137 insertions(+), 132 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
deleted file mode 100644
index 3c6886b8871..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
-import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
-
-public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
-
-    @Override
-    public int brokerCount() {
-        return 1;
-    }
-
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
-        final int broker0 = 0;
-        final String topicA = "topicA";
-        final int p0 = 0;
-        final int partitionCount = 1;
-        final int replicationFactor = 1;
-        final int maxBatchCountPerSegment = 1;
-        final Map<Integer, List<Integer>> replicaAssignment = null;
-        final boolean enableRemoteLogStorage = true;
-        final int beginEpoch = 0;
-        final long startOffset = 3;
-
-        // Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
-        builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
-                        enableRemoteLogStorage)
-                // produce events to partition 0 and expect 3 segments to be 
offloaded
-                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
-                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
-                .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
-                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
-                .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", 
"v0"), new KeyValueSpec("k1", "v1"),
-                        // DeleteSegmentsByRetentionTimeTest uses a tiny 
retention time, which could cause the active 
-                        // segment to be rolled and deleted. We use a future 
timestamp to prevent that from happening.
-                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3", System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))
-                // update the topic config such that it triggers the deletion 
of segments
-                .updateTopicConfig(topicA, configsToBeAdded(), List.of())
-                // expect that the three offloaded remote log segments are 
deleted
-                .expectDeletionInRemoteStorage(broker0, topicA, p0, 
DELETE_SEGMENT, 3)
-                .waitForRemoteLogSegmentDeletion(topicA)
-                // expect that the leader epoch checkpoint is updated
-                .expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch, 
startOffset)
-                // consume from the beginning of the topic to read data from 
local and remote storage
-                .expectFetchFromTieredStorage(broker0, topicA, p0, 0)
-                .consume(topicA, p0, 0L, 1, 0);
-    }
-
-    protected abstract Map<String, String> configsToBeAdded();
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
deleted file mode 100644
index dd4dc59e75e..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.common.config.TopicConfig;
-
-import java.util.Map;
-
-public final class DeleteSegmentsByRetentionSizeTest extends 
BaseDeleteSegmentsTest {
-
-    @Override
-    protected Map<String, String> configsToBeAdded() {
-        return Map.of(TopicConfig.RETENTION_BYTES_CONFIG, "1");
-    }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
deleted file mode 100644
index 69fdac88b72..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.common.config.TopicConfig;
-
-import java.util.Map;
-
-public final class DeleteSegmentsByRetentionTimeTest extends 
BaseDeleteSegmentsTest {
-
-    @Override
-    protected Map<String, String> configsToBeAdded() {
-        return Map.of(TopicConfig.RETENTION_MS_CONFIG, "1");
-    }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsTest.java
new file mode 100644
index 00000000000..e416e6be458
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
+
+public final class DeleteSegmentsTest {
+
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+    private static final Map<String, String> RETENTION_SIZE_CONFIG =
+            Map.of(TopicConfig.RETENTION_BYTES_CONFIG, "1");
+    private static final Map<String, String> RETENTION_TIME_CONFIG =
+            Map.of(TopicConfig.RETENTION_MS_CONFIG, "1");
+
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
DeleteSegmentsTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                        BROKER_COUNT,
+                        NUM_REMOTE_LOG_METADATA_PARTITIONS))
+                .build());
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testDeleteSegmentsByRetentionSizeWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CLASSIC, 
RETENTION_SIZE_CONFIG);
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testDeleteSegmentsByRetentionSizeWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CONSUMER, 
RETENTION_SIZE_CONFIG);
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testDeleteSegmentsByRetentionTimeWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CLASSIC, 
RETENTION_TIME_CONFIG);
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testDeleteSegmentsByRetentionTimeWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CONSUMER, 
RETENTION_TIME_CONFIG);
+    }
+
+    private static void executeDeleteSegmentsTest(ClusterInstance 
clusterInstance,
+                                                  GroupProtocol groupProtocol,
+                                                  Map<String, String> 
configsToBeAdded) throws Exception {
+        final int broker0 = 0;
+        final String topicA = "topicA";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        // Pin the partition to broker 0 so the broker0-based expectations are 
deterministic
+        // regardless of how many brokers the cluster has.
+        final Map<Integer, List<Integer>> replicaAssignment = 
mkMap(mkEntry(p0, List.of(broker0)));
+        final boolean enableRemoteLogStorage = true;
+        final int beginEpoch = 0;
+        final long startOffset = 3;
+
+        TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
+
+        // Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
+        builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+                        enableRemoteLogStorage)
+                // produce events to partition 0 and expect 3 segments to be 
offloaded
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+                .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", 
"v0"), new KeyValueSpec("k1", "v1"),
+                        // testDeleteSegmentsByRetentionTime* uses a tiny 
retention time, which could cause the active
+                        // segment to be rolled and deleted. We use a future 
timestamp to prevent that from happening.
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3", System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))
+                // update the topic config such that it triggers the deletion 
of segments
+                .updateTopicConfig(topicA, configsToBeAdded, List.of())
+                // expect that the three offloaded remote log segments are 
deleted
+                .expectDeletionInRemoteStorage(broker0, topicA, p0, 
DELETE_SEGMENT, 3)
+                .waitForRemoteLogSegmentDeletion(topicA)
+                // expect that the leader epoch checkpoint is updated
+                .expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch, 
startOffset)
+                // consume from the beginning of the topic to read data from 
local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 0)
+                .consume(topicA, p0, 0L, 1, 0);
+
+        Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
+    }
+}

Reply via email to