This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9be8ca1d9b8 KAFKA-17689 Migrate ReassignReplicaMoveTest and 
ReassignReplicaExpandTest to new test infra (#22343)
9be8ca1d9b8 is described below

commit 9be8ca1d9b86fb55d04bae2a2884af5bc65dc134
Author: majialong <[email protected]>
AuthorDate: Sat May 23 10:22:58 2026 +0800

    KAFKA-17689 Migrate ReassignReplicaMoveTest and ReassignReplicaExpandTest 
to new test infra (#22343)
    
    Migrate `ReassignReplicaMoveTest` and `ReassignReplicaExpandTest` to new
    test infra, and integrated into a single test file.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../integration/BaseReassignReplicaTest.java       | 101 ----------------
 .../integration/ReassignReplicaExpandTest.java     |  31 -----
 .../ReassignReplicaMoveAndExpandTest.java          | 133 +++++++++++++++++++++
 .../integration/ReassignReplicaMoveTest.java       |  31 -----
 4 files changed, 133 insertions(+), 163 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
deleted file mode 100644
index ff31f0dd248..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
-import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-
-public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness 
{
-    protected final Integer broker0 = 0;
-    protected final Integer broker1 = 1;
-
-    /**
-     * Cluster of two brokers
-     * @return number of brokers in the cluster
-     */
-    @Override
-    public int brokerCount() {
-        return 2;
-    }
-
-    /**
-     * Number of partitions in the '__remote_log_metadata' topic
-     * @return number of partitions in the '__remote_log_metadata' topic
-     */
-    @Override
-    public int numRemoteLogMetadataPartitions() {
-        return 2;
-    }
-
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
-        final String topicA = "topicA";
-        final String topicB = "topicB";
-        final int p0 = 0;
-        final int partitionCount = 1;
-        final int replicationFactor = 1;
-        final int maxBatchCountPerSegment = 1;
-        final Map<Integer, List<Integer>> replicaAssignment = null;
-        final boolean enableRemoteLogStorage = true;
-        final List<Integer> metadataPartitions = new ArrayList<>();
-        for (int i = 0; i < numRemoteLogMetadataPartitions(); i++) {
-            metadataPartitions.add(i);
-        }
-
-        builder
-                // create topicA with 50 partitions and 2 RF. Using 50 
partitions to ensure that the user-partitions
-                // are mapped to all the __remote_log_metadata partitions. 
This is required to ensure that
-                // TBRLMM able to handle the assignment of the newly created 
replica to one of the already assigned
-                // metadata partition
-                .createTopic(topicA, 50, 2, maxBatchCountPerSegment,
-                        replicaAssignment, enableRemoteLogStorage)
-                .expectUserTopicMappedToMetadataPartitions(topicA, 
metadataPartitions)
-                // create topicB with 1 partition and 1 RF
-                .createTopic(topicB, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
-                        mkMap(mkEntry(p0, List.of(broker0))), 
enableRemoteLogStorage)
-                // send records to partition 0
-                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new 
KeyValueSpec("k0", "v0"))
-                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new 
KeyValueSpec("k1", "v1"))
-                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
-                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
-                        new KeyValueSpec("k2", "v2"))
-                // The newly created replica gets mapped to one of the 
metadata partition which is being actively
-                // consumed by both the brokers
-                .reassignReplica(topicB, p0, replicaIds())
-                .expectLeader(topicB, p0, broker1, true)
-                // produce some more events and verify the earliest local 
offset
-                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
-                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
-                // consume from the beginning of the topic to read data from 
local and remote storage
-                .expectFetchFromTieredStorage(broker1, topicB, p0, 3)
-                .consume(topicB, p0, 0L, 4, 3);
-    }
-
-    /**
-     * Replicas of the topic
-     * @return the replica-ids of the topic
-     */
-    protected abstract List<Integer> replicaIds();
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
deleted file mode 100644
index ddcf84109af..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import java.util.List;
-
-public final class ReassignReplicaExpandTest extends BaseReassignReplicaTest {
-
-    /**
-     * Expand the replication factor of the topic by changing the replica list 
from 0 to 0, 1
-     * @return the replica-ids of the topic
-     */
-    @Override
-    protected List<Integer> replicaIds() {
-        return List.of(broker0, broker1);
-    }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveAndExpandTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveAndExpandTest.java
new file mode 100644
index 00000000000..3c48f3cdc09
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveAndExpandTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
+
+public final class ReassignReplicaMoveAndExpandTest {
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
+
+    private static final Integer BROKER_0 = 0;
+    private static final Integer BROKER_1 = 1;
+
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+            .setTypes(Set.of(Type.KRAFT))
+            .setBrokers(BROKER_COUNT)
+            .setServerProperties(createServerPropsForRemoteStorage(
+                
ReassignReplicaMoveAndExpandTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                BROKER_COUNT,
+                NUM_REMOTE_LOG_METADATA_PARTITIONS))
+            .build());
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testReassignReplicaExpandWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeReassignReplicaTest(clusterInstance, GroupProtocol.CLASSIC, 
List.of(BROKER_0, BROKER_1));
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testReassignReplicaExpandWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeReassignReplicaTest(clusterInstance, GroupProtocol.CONSUMER, 
List.of(BROKER_0, BROKER_1));
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testReassignReplicaMoveWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeReassignReplicaTest(clusterInstance, GroupProtocol.CLASSIC, 
List.of(BROKER_1));
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testReassignReplicaMoveWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeReassignReplicaTest(clusterInstance, GroupProtocol.CONSUMER, 
List.of(BROKER_1));
+    }
+
+    private void executeReassignReplicaTest(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol, List<Integer> replicaIds) throws Exception {
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final List<Integer> metadataPartitions = new ArrayList<>();
+        for (int i = 0; i < NUM_REMOTE_LOG_METADATA_PARTITIONS; i++) {
+            metadataPartitions.add(i);
+        }
+
+        TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
+        builder
+                // create topicA with 50 partitions and 2 RF. Using 50 
partitions to ensure that the user-partitions
+                // are mapped to all the __remote_log_metadata partitions. 
This is required to ensure that
+                // TBRLMM able to handle the assignment of the newly created 
replica to one of the already assigned
+                // metadata partition
+                .createTopic(topicA, 50, 2, maxBatchCountPerSegment,
+                        null, enableRemoteLogStorage)
+                .expectUserTopicMappedToMetadataPartitions(topicA, 
metadataPartitions)
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, List.of(BROKER_0))), 
enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(BROKER_0, topicB, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(BROKER_0, topicB, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // The newly created replica gets mapped to one of the 
metadata partition which is being actively
+                // consumed by both the brokers
+                .reassignReplica(topicB, p0, replicaIds)
+                .expectLeader(topicB, p0, BROKER_1, true)
+                // produce some more events and verify the earliest local 
offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from 
local and remote storage
+                .expectFetchFromTieredStorage(BROKER_1, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+
+        Map<String, Object> extraConsumerProps = Map.of(
+            ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
deleted file mode 100644
index e81a0405b69..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import java.util.List;
-
-public final class ReassignReplicaMoveTest extends BaseReassignReplicaTest {
-
-    /**
-     * Move the replica of the topic from broker0 to broker1
-     * @return the replica-ids of the topic
-     */
-    @Override
-    protected List<Integer> replicaIds() {
-        return List.of(broker1);
-    }
-}

Reply via email to