This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 80f6bddcef3 KAFKA-17689 Migrate TieredStorageTestHarness to new test
infra (#22361)
80f6bddcef3 is described below
commit 80f6bddcef36990bba4a8d89928d79df034f9a1d
Author: TaiJuWu <[email protected]>
AuthorDate: Sun May 24 17:31:36 2026 +0800
KAFKA-17689 Migrate TieredStorageTestHarness to new test infra (#22361)
Migrate DeleteSegmentsTest to new test infra
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../integration/BaseDeleteSegmentsTest.java | 74 -----------
.../DeleteSegmentsByRetentionSizeTest.java | 29 -----
.../DeleteSegmentsByRetentionTimeTest.java | 29 -----
.../storage/integration/DeleteSegmentsTest.java | 137 +++++++++++++++++++++
4 files changed, 137 insertions(+), 132 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
deleted file mode 100644
index 3c6886b8871..00000000000
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
-import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
-
-public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
-
- @Override
- public int brokerCount() {
- return 1;
- }
-
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
- final int broker0 = 0;
- final String topicA = "topicA";
- final int p0 = 0;
- final int partitionCount = 1;
- final int replicationFactor = 1;
- final int maxBatchCountPerSegment = 1;
- final Map<Integer, List<Integer>> replicaAssignment = null;
- final boolean enableRemoteLogStorage = true;
- final int beginEpoch = 0;
- final long startOffset = 3;
-
- // Create topicA with 1 partition, 1 RF and enabled with remote
storage.
- builder.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, replicaAssignment,
- enableRemoteLogStorage)
- // produce events to partition 0 and expect 3 segments to be
offloaded
- .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
- .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
- .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
- .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
- .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0",
"v0"), new KeyValueSpec("k1", "v1"),
- // DeleteSegmentsByRetentionTimeTest uses a tiny
retention time, which could cause the active
- // segment to be rolled and deleted. We use a future
timestamp to prevent that from happening.
- new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3", System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))
- // update the topic config such that it triggers the deletion
of segments
- .updateTopicConfig(topicA, configsToBeAdded(), List.of())
- // expect that the three offloaded remote log segments are
deleted
- .expectDeletionInRemoteStorage(broker0, topicA, p0,
DELETE_SEGMENT, 3)
- .waitForRemoteLogSegmentDeletion(topicA)
- // expect that the leader epoch checkpoint is updated
- .expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch,
startOffset)
- // consume from the beginning of the topic to read data from
local and remote storage
- .expectFetchFromTieredStorage(broker0, topicA, p0, 0)
- .consume(topicA, p0, 0L, 1, 0);
- }
-
- protected abstract Map<String, String> configsToBeAdded();
-}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
deleted file mode 100644
index dd4dc59e75e..00000000000
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.common.config.TopicConfig;
-
-import java.util.Map;
-
-public final class DeleteSegmentsByRetentionSizeTest extends
BaseDeleteSegmentsTest {
-
- @Override
- protected Map<String, String> configsToBeAdded() {
- return Map.of(TopicConfig.RETENTION_BYTES_CONFIG, "1");
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
deleted file mode 100644
index 69fdac88b72..00000000000
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.common.config.TopicConfig;
-
-import java.util.Map;
-
-public final class DeleteSegmentsByRetentionTimeTest extends
BaseDeleteSegmentsTest {
-
- @Override
- protected Map<String, String> configsToBeAdded() {
- return Map.of(TopicConfig.RETENTION_MS_CONFIG, "1");
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsTest.java
new file mode 100644
index 00000000000..e416e6be458
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
+
+public final class DeleteSegmentsTest {
+
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+ private static final Map<String, String> RETENTION_SIZE_CONFIG =
+ Map.of(TopicConfig.RETENTION_BYTES_CONFIG, "1");
+ private static final Map<String, String> RETENTION_TIME_CONFIG =
+ Map.of(TopicConfig.RETENTION_MS_CONFIG, "1");
+
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
DeleteSegmentsTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testDeleteSegmentsByRetentionSizeWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CLASSIC,
RETENTION_SIZE_CONFIG);
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testDeleteSegmentsByRetentionSizeWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CONSUMER,
RETENTION_SIZE_CONFIG);
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testDeleteSegmentsByRetentionTimeWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CLASSIC,
RETENTION_TIME_CONFIG);
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testDeleteSegmentsByRetentionTimeWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeDeleteSegmentsTest(clusterInstance, GroupProtocol.CONSUMER,
RETENTION_TIME_CONFIG);
+ }
+
+ private static void executeDeleteSegmentsTest(ClusterInstance
clusterInstance,
+ GroupProtocol groupProtocol,
+ Map<String, String>
configsToBeAdded) throws Exception {
+ final int broker0 = 0;
+ final String topicA = "topicA";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ // Pin the partition to broker 0 so the broker0-based expectations are
deterministic
+ // regardless of how many brokers the cluster has.
+ final Map<Integer, List<Integer>> replicaAssignment =
mkMap(mkEntry(p0, List.of(broker0)));
+ final boolean enableRemoteLogStorage = true;
+ final int beginEpoch = 0;
+ final long startOffset = 3;
+
+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
+
+ // Create topicA with 1 partition, 1 RF and enabled with remote
storage.
+ builder.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, replicaAssignment,
+ enableRemoteLogStorage)
+ // produce events to partition 0 and expect 3 segments to be
offloaded
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+ .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0",
"v0"), new KeyValueSpec("k1", "v1"),
+ // testDeleteSegmentsByRetentionTime* uses a tiny
retention time, which could cause the active
+ // segment to be rolled and deleted. We use a future
timestamp to prevent that from happening.
+ new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3", System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))
+ // update the topic config such that it triggers the deletion
of segments
+ .updateTopicConfig(topicA, configsToBeAdded, List.of())
+ // expect that the three offloaded remote log segments are
deleted
+ .expectDeletionInRemoteStorage(broker0, topicA, p0,
DELETE_SEGMENT, 3)
+ .waitForRemoteLogSegmentDeletion(topicA)
+ // expect that the leader epoch checkpoint is updated
+ .expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch,
startOffset)
+ // consume from the beginning of the topic to read data from
local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 0)
+ .consume(topicA, p0, 0L, 1, 0);
+
+ Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
+ }
+}