This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c079f405ef KAFKA-17689 Migrate TieredStorageTestHarness to new test 
infra (AlterLogDirTest)  (#22306)
8c079f405ef is described below

commit 8c079f405ef6db002cfd785e5bd6f58974813c2a
Author: Ken Huang <[email protected]>
AuthorDate: Thu May 21 15:58:31 2026 +0800

    KAFKA-17689 Migrate TieredStorageTestHarness to new test infra 
(AlterLogDirTest)  (#22306)
    
    Migrate `AlterLogDirTest` to the new test infrastructure
    
    Reviewers: Chia-Ping Tsai <[email protected]>, PoAn Yang
     <[email protected]>, TaiJuWu <[email protected]>
---
 .../storage/integration/AlterLogDirTest.java       | 59 +++++++++++++++++++---
 .../TransactionsWithTieredStoreTest.java           | 18 +------
 .../storage/utils/TieredStorageTestUtils.java      | 19 +++++++
 3 files changed, 72 insertions(+), 24 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
index b9230b20812..d406d8e2eba 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
@@ -16,24 +16,53 @@
  */
 package org.apache.kafka.tiered.storage.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
-public final class AlterLogDirTest extends TieredStorageTestHarness {
+public final class AlterLogDirTest {
 
-    @Override
-    public int brokerCount() {
-        return 2;
+    private static final int BROKER_COUNT = 3;
+
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setDisksPerBroker(2)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
AlterLogDirTest.class.getSimpleName().toLowerCase(Locale.ROOT), 
+                        BROKER_COUNT, 
+                        5))
+                .build());
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void testAlterLogDirWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeAlterLogDirTest(clusterInstance, GroupProtocol.CLASSIC);
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+    @ClusterTemplate("clusterConfig")
+    public void testAlterLogDirWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeAlterLogDirTest(clusterInstance, GroupProtocol.CONSUMER);
+    }
+
+    private void executeAlterLogDirTest(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol) throws Exception {
         final String topicB = "topicB";
         final int p0 = 0;
         final int partitionCount = 1;
@@ -43,8 +72,9 @@ public final class AlterLogDirTest extends 
TieredStorageTestHarness {
         final int broker0 = 0;
         final int broker1 = 1;
 
+        TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
         builder
-                // create topicB with 1 partition and 1 RF
+                // create topicB with 1 partition and 2 RF
                 .createTopic(topicB, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
                         mkMap(mkEntry(p0, List.of(broker1, broker0))), 
enableRemoteLogStorage)
                 // send records to partition 0
@@ -63,5 +93,18 @@ public final class AlterLogDirTest extends 
TieredStorageTestHarness {
                 // consume from the beginning of the topic to read data from 
local and remote storage
                 .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
                 .consume(topicB, p0, 0L, 4, 3);
+
+        Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
index 0eab6c6f685..dff3347248a 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -23,11 +23,9 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfig;
 import org.apache.kafka.common.test.api.ClusterTemplate;
-import org.apache.kafka.common.test.api.TestKitDefaults;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.config.ServerLogConfigs;
-import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.test.TestUtils;
 import 
org.apache.kafka.tiered.storage.integration.TransactionsTestHelper.TransactionHooks;
 import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@@ -35,12 +33,11 @@ import 
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
-import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
 
 /**
@@ -56,10 +53,7 @@ public class TransactionsWithTieredStoreTest {
     private static final int BROKER_COUNT = 3;
     
     private static Map<String, String> baseServerProperties() {
-        String storageDirPath = TestUtils.tempDirectory(
-                "kafka-remote-tier-" + TEST_CLASS_NAME).getAbsolutePath();
-
-        Map<String, String> serverProps = new HashMap<>();
+        Map<String, String> serverProps = 
createServerPropsForRemoteStorage(TEST_CLASS_NAME, BROKER_COUNT, 3);
         serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
"false");
         serverProps.put("offsets.topic.num.partitions", "1");
         serverProps.put("transaction.state.log.num.partitions", "3");
@@ -70,14 +64,6 @@ public class TransactionsWithTieredStoreTest {
         serverProps.put("auto.leader.rebalance.enable", "false");
         serverProps.put("group.initial.rebalance.delay.ms", "0");
         
serverProps.put("transaction.abort.timed.out.transaction.cleanup.interval.ms", 
"200");
-
-        Properties tieredProps = createPropsForRemoteStorage(
-                TEST_CLASS_NAME, storageDirPath, BROKER_COUNT, 3, new 
Properties());
-        tieredProps.forEach((k, v) -> serverProps.put(k.toString(), 
v.toString()));
-
-        
serverProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
-                TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME);
-
         return serverProps;
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
index bc4ffdf238f..cd0d243da04 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.test.api.TestKitDefaults;
 import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.config.ServerLogConfigs;
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
@@ -164,6 +165,24 @@ public class TieredStorageTestUtils {
         return overridingProps;
     }
 
+    public static Map<String, String> createServerPropsForRemoteStorage(
+            String testClassName, 
+            int brokerCount, 
+            int numRemoteLogMetadataPartitions
+    ) {
+        String storageDirPath = org.apache.kafka.test.TestUtils
+                .tempDirectory("kafka-remote-tier-" + 
testClassName).getAbsolutePath();
+        Properties tieredProps = createPropsForRemoteStorage(
+                testClassName, storageDirPath, brokerCount, 
numRemoteLogMetadataPartitions, new Properties());
+        Map<String, String> serverProps = new HashMap<>();
+        tieredProps.forEach((k, v) -> serverProps.put(k.toString(), 
v.toString()));
+        serverProps.put(
+                REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
+                TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME
+        );
+        return serverProps;
+    }
+
     public static Map<String, String> 
createTopicConfigForRemoteStorage(boolean enableRemoteStorage,
             int maxRecordBatchPerSegment) {
         Map<String, String> topicProps = new HashMap<>();

Reply via email to