This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new da83631684f KAFKA-17689 Migrate
FetchFromLeaderWithCorruptedCheckpointTest and PartitionsExpandTest (#22400)
da83631684f is described below
commit da83631684ffd3b6b9a36accf089796c5b22585b
Author: Murali Basani <[email protected]>
AuthorDate: Fri May 29 11:36:01 2026 +0200
KAFKA-17689 Migrate FetchFromLeaderWithCorruptedCheckpointTest and
PartitionsExpandTest (#22400)
Ref : https://issues.apache.org/jira/browse/KAFKA-17689
As part of the mass migration :
- FetchFromLeaderWithCorruptedCheckpointTest
- PartitionsExpandTest
executeFetchFromLeaderWithCorruptedCheckpointTest goes through
stop/start of the broker.
---
...FetchFromLeaderWithCorruptedCheckpointTest.java | 57 +++++++++++++++++++---
.../storage/integration/PartitionsExpandTest.java | 57 +++++++++++++++++++---
2 files changed, 98 insertions(+), 16 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
index 7f385fdb2a9..07af185b2e3 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
@@ -18,27 +18,55 @@ package org.apache.kafka.tiered.storage.integration;
import kafka.server.ReplicaManager;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.apache.kafka.storage.internals.log.LogManager;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
-public class FetchFromLeaderWithCorruptedCheckpointTest extends
TieredStorageTestHarness {
+public final class FetchFromLeaderWithCorruptedCheckpointTest {
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
- @Override
- public int brokerCount() {
- return 2;
+ @SuppressWarnings("unused")
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
FetchFromLeaderWithCorruptedCheckpointTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ @ClusterTemplate("clusterConfig")
+ public void
testFetchFromLeaderWithCorruptedCheckpointWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeFetchFromLeaderWithCorruptedCheckpointTest(clusterInstance,
GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testFetchFromLeaderWithCorruptedCheckpointWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeFetchFromLeaderWithCorruptedCheckpointTest(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private void
executeFetchFromLeaderWithCorruptedCheckpointTest(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws Exception {
final int broker0 = 0;
final int broker1 = 1;
final String topicA = "topicA";
@@ -53,7 +81,8 @@ public class FetchFromLeaderWithCorruptedCheckpointTest
extends TieredStorageTes
LogManager.RECOVERY_POINT_CHECKPOINT_FILE,
CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME);
- builder.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, assignment,
+ final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
+ .createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, assignment,
enableRemoteLogStorage)
// send records to partition 0
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
@@ -80,5 +109,17 @@ public class FetchFromLeaderWithCorruptedCheckpointTest
extends TieredStorageTes
.expectFetchFromTieredStorage(broker0, topicA, p0, 4)
.consume(topicA, p0, 0L, 5, 4);
+ final Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
index fea0bc6b1b5..f7e1ae2839d 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
@@ -16,25 +16,53 @@
*/
package org.apache.kafka.tiered.storage.integration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
-public final class PartitionsExpandTest extends TieredStorageTestHarness {
+public final class PartitionsExpandTest {
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
- @Override
- public int brokerCount() {
- return 2;
+ @SuppressWarnings("unused")
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
PartitionsExpandTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ @ClusterTemplate("clusterConfig")
+ public void testPartitionsExpandWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executePartitionsExpandTest(clusterInstance, GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void testPartitionsExpandWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executePartitionsExpandTest(clusterInstance, GroupProtocol.CONSUMER);
+ }
+
+ private void executePartitionsExpandTest(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws Exception {
final int broker0 = 0;
final int broker1 = 1;
final String topicA = "topicA";
@@ -49,7 +77,7 @@ public final class PartitionsExpandTest extends
TieredStorageTestHarness {
final List<Integer> p1Assignment = List.of(broker0, broker1);
final List<Integer> p2Assignment = List.of(broker1, broker0);
- builder
+ final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment,
Map.of(p0, p0Assignment), enableRemoteLogStorage)
// produce events to partition 0
@@ -102,5 +130,18 @@ public final class PartitionsExpandTest extends
TieredStorageTestHarness {
// consume from the middle of the topic for partition 2
.expectFetchFromTieredStorage(broker1, topicA, p2, 1)
.consume(topicA, p2, 1L, 2, 1);
+
+ final Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}