This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 76ab0cf4804 KAFKA-17689 Migrate OffloadAndConsumeFromLeaderTest and
OffloadAndTxnConsumeFromLeaderTest to new test infra (#22396)
76ab0cf4804 is described below
commit 76ab0cf480400db865f7d82fb9dee1f4171a39d2
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Thu May 28 22:08:09 2026 +0800
KAFKA-17689 Migrate OffloadAndConsumeFromLeaderTest and
OffloadAndTxnConsumeFromLeaderTest to new test infra (#22396)
Migrate `OffloadAndConsumeFromLeaderTest` and
`OffloadAndTxnConsumeFromLeaderTest` to the new test infrastructure.
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../OffloadAndConsumeFromLeaderTest.java | 83 +++++++++++++++-----
.../OffloadAndTxnConsumeFromLeaderTest.java | 91 ++++++++++++++--------
2 files changed, 122 insertions(+), 52 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
index fad98604f72..539cd008168 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
@@ -16,31 +16,57 @@
*/
package org.apache.kafka.tiered.storage.integration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
/**
* Test Cases:
* Elementary offloads and fetches from tiered storage.
*/
-public final class OffloadAndConsumeFromLeaderTest extends
TieredStorageTestHarness {
+public final class OffloadAndConsumeFromLeaderTest {
+
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+ @SuppressWarnings("unused")
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
OffloadAndConsumeFromLeaderTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
+ }
- /**
- * Cluster of one broker
- * @return number of brokers in the cluster
- */
- @Override
- public int brokerCount() {
- return 1;
+ @ClusterTemplate("clusterConfig")
+ public void
testOffloadAndConsumeFromLeaderWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeOffloadAndConsumeFromLeaderTest(clusterInstance,
GroupProtocol.CLASSIC);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
- final int broker = 0;
+ @ClusterTemplate("clusterConfig")
+ public void
testOffloadAndConsumeFromLeaderWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeOffloadAndConsumeFromLeaderTest(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private void executeOffloadAndConsumeFromLeaderTest(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws Exception {
+ final int broker0 = 0;
final String topicA = "topicA";
final String topicB = "topicB";
final int p0 = 0;
@@ -48,10 +74,12 @@ public final class OffloadAndConsumeFromLeaderTest extends
TieredStorageTestHarn
final int replicationFactor = 1;
final int oneBatchPerSegment = 1;
final int twoBatchPerSegment = 2;
- final Map<Integer, List<Integer>> replicaAssignment = null;
+ // Pin the partition to broker 0 so that the broker0-based
expectations are deterministic
+ // regardless of how many brokers the cluster has.
+ final Map<Integer, List<Integer>> replicaAssignment = Map.of(p0,
List.of(broker0));
final boolean enableRemoteLogStorage = true;
- builder
+ final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
/*
* (1) Create a topic which segments contain only one batch
and produce three records
* with a batch size of 1.
@@ -77,8 +105,8 @@ public final class OffloadAndConsumeFromLeaderTest extends
TieredStorageTestHarn
*/
.createTopic(topicA, partitionCount, replicationFactor,
oneBatchPerSegment, replicaAssignment,
enableRemoteLogStorage)
- .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
- .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
@@ -107,9 +135,9 @@ public final class OffloadAndConsumeFromLeaderTest extends
TieredStorageTestHarn
.createTopic(topicB, partitionCount, replicationFactor,
twoBatchPerSegment, replicaAssignment,
enableRemoteLogStorage)
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 4L)
- .expectSegmentToBeOffloaded(broker, topicB, p0, 0,
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0,
new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1",
"v1"))
- .expectSegmentToBeOffloaded(broker, topicB, p0, 2,
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 2,
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3"))
.produce(topicB, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3"), new KeyValueSpec("k4", "v4"))
@@ -126,10 +154,23 @@ public final class OffloadAndConsumeFromLeaderTest
extends TieredStorageTestHarn
* - For topic B, two segments are present in the tiered
storage, as asserted by the
* previous sub-test-case.
*/
- .bounce(broker)
- .expectFetchFromTieredStorage(broker, topicA, p0, 1)
+ .bounce(broker0)
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 1)
.consume(topicA, p0, 1L, 2, 1)
- .expectFetchFromTieredStorage(broker, topicB, p0, 2)
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 2)
.consume(topicB, p0, 1L, 4, 3);
+
+ final Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
index 3f2a9ce1581..b5f99785e0f 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -17,68 +17,83 @@
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
-import java.util.Properties;
+import java.util.Set;
import static
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
/**
* Test Cases:
* Elementary offloads and fetches from tiered storage using consumer with
read_committed isolation level.
*/
-public final class OffloadAndTxnConsumeFromLeaderTest extends
TieredStorageTestHarness {
+public final class OffloadAndTxnConsumeFromLeaderTest {
- /**
- * Cluster of one broker
- * @return number of brokers in the cluster
- */
- @Override
- public int brokerCount() {
- return 1;
- }
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
- @Override
- public Properties overridingProps() {
- Properties props = super.overridingProps();
+ @SuppressWarnings("unused")
+ private static List<ClusterConfig> clusterConfig() {
+ Map<String, String> serverProps = createServerPropsForRemoteStorage(
+
OffloadAndTxnConsumeFromLeaderTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS);
// Configure the remote-log index cache size to hold one entry to
simulate eviction of cached index entries.
-
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
"1");
- return props;
+
serverProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
"1");
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(serverProps)
+ .build());
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testOffloadAndTxnConsumeFromLeaderWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeOffloadAndTxnConsumeFromLeaderTest(clusterInstance,
GroupProtocol.CLASSIC);
}
- @Override
- protected void overrideConsumerConfig(Map<String, Object> consumerConfig) {
- consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString());
+ @ClusterTemplate("clusterConfig")
+ public void
testOffloadAndTxnConsumeFromLeaderWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeOffloadAndTxnConsumeFromLeaderTest(clusterInstance,
GroupProtocol.CONSUMER);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
- final int broker = 0;
+ private void executeOffloadAndTxnConsumeFromLeaderTest(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws Exception {
+ final int broker0 = 0;
final String topicA = "topicA";
final int p0 = 0;
final int partitionCount = 1;
final int replicationFactor = 1;
final int oneBatchPerSegment = 1;
- final Map<Integer, List<Integer>> replicaAssignment = null;
+ // Pin the partition to broker 0 so that the broker0-based
expectations are deterministic
+ // regardless of how many brokers the cluster has.
+ final Map<Integer, List<Integer>> replicaAssignment = Map.of(p0,
List.of(broker0));
final boolean enableRemoteLogStorage = true;
- builder
+ final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
.createTopic(topicA, partitionCount, replicationFactor,
oneBatchPerSegment, replicaAssignment,
enableRemoteLogStorage)
- .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
- .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
- .expectSegmentToBeOffloaded(broker, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
- .expectSegmentToBeOffloaded(broker, topicA, p0, 3, new
KeyValueSpec("k3", "v3"))
- .expectSegmentToBeOffloaded(broker, topicA, p0, 4, new
KeyValueSpec("k4", "v4"))
- .expectSegmentToBeOffloaded(broker, topicA, p0, 5, new
KeyValueSpec("k5", "v5"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new
KeyValueSpec("k3", "v3"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new
KeyValueSpec("k4", "v4"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 5, new
KeyValueSpec("k5", "v5"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 6L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3"), new KeyValueSpec("k4", "v4"),
@@ -89,8 +104,22 @@ public final class OffloadAndTxnConsumeFromLeaderTest
extends TieredStorageTestH
// Total number of uploaded remote segments = 6. Total number
of index fetches = (6 * (6 + 1)) / 2 = 21
// Note that we skip the index fetch when the txn-index is
empty, so the effective index fetch count
// should be same as the segment count.
- .expectFetchFromTieredStorage(broker, topicA, p0,
getRemoteFetchCount())
+ .expectFetchFromTieredStorage(broker0, topicA, p0,
getRemoteFetchCount())
.consume(topicA, p0, 0L, 7, 6);
+
+ final Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT),
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString()
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
private static RemoteFetchCount getRemoteFetchCount() {