This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f95a1f995d9 MINOR: increase test visibility to public in client
integration tests. (#22069)
f95a1f995d9 is described below
commit f95a1f995d9ed7c4b647ffcd3747129ac0fd658f
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Apr 15 11:49:41 2026 -0700
MINOR: increase test visibility to public in client integration tests.
(#22069)
To increase the extensibility of the tests
Reviewers: Lianet Magrans <[email protected]>
---
.../kafka/clients/admin/AdminFenceProducersTest.java | 6 +++---
.../admin/DescribeProducersWithBrokerIdTest.java | 8 ++++----
.../clients/consumer/ConsumerTopicCreationTest.java | 8 ++++----
.../clients/consumer/ShareConsumerRackAwareTest.java | 4 ++--
.../clients/producer/ProducerCompressionTest.java | 4 ++--
.../producer/ProducerFailureHandlingTest.java | 20 ++++++++++----------
.../clients/producer/ProducerIdExpirationTest.java | 6 +++---
.../clients/producer/ProducerIntegrationTest.java | 10 +++++-----
8 files changed, 33 insertions(+), 33 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminFenceProducersTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminFenceProducersTest.java
index ddc79479746..2b6a4eb11de 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminFenceProducersTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminFenceProducersTest.java
@@ -65,7 +65,7 @@ public class AdminFenceProducersTest {
}
@ClusterTest
- void testFenceAfterProducerCommit() throws Exception {
+ public void testFenceAfterProducerCommit() throws Exception {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (Producer<byte[], byte[]> producer = createProducer();
@@ -94,7 +94,7 @@ public class AdminFenceProducersTest {
}
@ClusterTest
- void testFenceProducerTimeoutMs() {
+ public void testFenceProducerTimeoutMs() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" +
INCORRECT_BROKER_PORT);
@@ -107,7 +107,7 @@ public class AdminFenceProducersTest {
}
@ClusterTest
- void testFenceBeforeProducerCommit() throws Exception {
+ public void testFenceBeforeProducerCommit() throws Exception {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (Producer<byte[], byte[]> producer = createProducer();
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeProducersWithBrokerIdTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeProducersWithBrokerIdTest.java
index 979989af1ca..3ac09d4a6ab 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeProducersWithBrokerIdTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeProducersWithBrokerIdTest.java
@@ -38,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
@ClusterTestDefaults(
brokers = 3
)
-class DescribeProducersWithBrokerIdTest {
+public class DescribeProducersWithBrokerIdTest {
private static final String TOPIC_NAME = "test-topic";
private static final int NUM_PARTITIONS = 1;
private static final short REPLICATION_FACTOR = 3;
@@ -86,7 +86,7 @@ class DescribeProducersWithBrokerIdTest {
}
@ClusterTest
- void testDescribeProducersDefaultRoutesToLeader() throws Exception {
+ public void testDescribeProducersDefaultRoutesToLeader() throws Exception {
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
var admin = clusterInstance.admin()) {
sendTestRecords(producer);
@@ -107,7 +107,7 @@ class DescribeProducersWithBrokerIdTest {
}
@ClusterTest
- void testDescribeProducersFromFollower() throws Exception {
+ public void testDescribeProducersFromFollower() throws Exception {
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
var admin = clusterInstance.admin()) {
sendTestRecords(producer);
@@ -128,7 +128,7 @@ class DescribeProducersWithBrokerIdTest {
}
@ClusterTest(brokers = 4)
- void testDescribeProducersWithInvalidBrokerId() throws Exception {
+ public void testDescribeProducersWithInvalidBrokerId() throws Exception {
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
var admin = clusterInstance.admin()) {
sendTestRecords(producer);
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerTopicCreationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerTopicCreationTest.java
index 0397e7ea779..b20608749df 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerTopicCreationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerTopicCreationTest.java
@@ -40,7 +40,7 @@ public class ConsumerTopicCreationTest {
private static final long POLL_TIMEOUT = 1000;
@ClusterTemplate("autoCreateTopicsConfigs")
- void
testAsyncConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance
cluster) throws Exception {
+ public void
testAsyncConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance
cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster,
GroupProtocol.CONSUMER, true)) {
subscribeAndPoll(consumer);
assertTopicCreateBasedOnPermission(cluster);
@@ -48,7 +48,7 @@ public class ConsumerTopicCreationTest {
}
@ClusterTemplate("autoCreateTopicsConfigs")
- void
testAsyncConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance
cluster) throws Exception {
+ public void
testAsyncConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance
cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster,
GroupProtocol.CONSUMER, false)) {
subscribeAndPoll(consumer);
assertTopicNotCreate(cluster);
@@ -56,7 +56,7 @@ public class ConsumerTopicCreationTest {
}
@ClusterTemplate("autoCreateTopicsConfigs")
- void
testClassicConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance
cluster) throws Exception {
+ public void
testClassicConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance
cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster,
GroupProtocol.CLASSIC, true)) {
subscribeAndPoll(consumer);
assertTopicCreateBasedOnPermission(cluster);
@@ -64,7 +64,7 @@ public class ConsumerTopicCreationTest {
}
@ClusterTemplate("autoCreateTopicsConfigs")
- void
testClassicConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance
cluster) throws Exception {
+ public void
testClassicConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance
cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster,
GroupProtocol.CLASSIC, false)) {
subscribeAndPoll(consumer);
assertTopicNotCreate(cluster);
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
index 98dec6a6357..668c84a0a06 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -63,7 +63,7 @@ public class ShareConsumerRackAwareTest {
@ClusterConfigProperty(key =
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
})
})
- void testShareConsumerWithRackAwareAssignor(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ public void testShareConsumerWithRackAwareAssignor(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
String groupId = "group0";
String topic = "test-topic";
try (Admin admin = clusterInstance.admin();
@@ -195,4 +195,4 @@ public class ShareConsumerRackAwareTest {
}
);
}
-}
\ No newline at end of file
+}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java
index 007dac91075..d8d3ebe3d86 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java
@@ -50,7 +50,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@ClusterTestDefaults(types = {Type.KRAFT})
-class ProducerCompressionTest {
+public class ProducerCompressionTest {
private final String topicName = "topic";
private final int numRecords = 2000;
@@ -61,7 +61,7 @@ class ProducerCompressionTest {
* Compressed messages should be able to sent and consumed correctly
*/
@ClusterTest
- void testCompression(ClusterInstance cluster) throws ExecutionException,
InterruptedException {
+ public void testCompression(ClusterInstance cluster) throws
ExecutionException, InterruptedException {
for (CompressionType compression : CompressionType.values()) {
processCompressionTest(cluster, compression);
}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java
index b69e982820d..e816e00b2a5 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java
@@ -88,7 +88,7 @@ public class ProducerFailureHandlingTest {
* With ack == 0 the future metadata will have no exceptions with offset -1
*/
@ClusterTest
- void testTooLargeRecordWithAckZero(ClusterInstance clusterInstance) throws
InterruptedException,
+ public void testTooLargeRecordWithAckZero(ClusterInstance clusterInstance)
throws InterruptedException,
ExecutionException {
clusterInstance.createTopic(topic1, 1, (short)
clusterInstance.brokers().size());
try (Producer<byte[], byte[]> producer =
clusterInstance.producer(producerConfig(0))) {
@@ -107,7 +107,7 @@ public class ProducerFailureHandlingTest {
* With ack == 1 the future metadata will throw ExecutionException caused
by RecordTooLargeException
*/
@ClusterTest
- void testTooLargeRecordWithAckOne(ClusterInstance clusterInstance) throws
InterruptedException {
+ public void testTooLargeRecordWithAckOne(ClusterInstance clusterInstance)
throws InterruptedException {
clusterInstance.createTopic(topic1, 1, (short)
clusterInstance.brokers().size());
try (Producer<byte[], byte[]> producer =
clusterInstance.producer(producerConfig(1))) {
@@ -123,7 +123,7 @@ public class ProducerFailureHandlingTest {
* This should succeed as the replica fetcher thread can handle oversized
messages since KIP-74
*/
@ClusterTest
- void testPartitionTooLargeForReplicationWithAckAll(ClusterInstance
clusterInstance) throws InterruptedException,
+ public void testPartitionTooLargeForReplicationWithAckAll(ClusterInstance
clusterInstance) throws InterruptedException,
ExecutionException {
checkTooLargeRecordForReplicationWithAckAll(clusterInstance,
replicaFetchMaxPartitionBytes);
}
@@ -132,7 +132,7 @@ public class ProducerFailureHandlingTest {
* This should succeed as the replica fetcher thread can handle oversized
messages since KIP-74
*/
@ClusterTest
- void testResponseTooLargeForReplicationWithAckAll(ClusterInstance
clusterInstance) throws InterruptedException,
+ public void testResponseTooLargeForReplicationWithAckAll(ClusterInstance
clusterInstance) throws InterruptedException,
ExecutionException {
checkTooLargeRecordForReplicationWithAckAll(clusterInstance,
replicaFetchMaxResponseBytes);
}
@@ -142,7 +142,7 @@ public class ProducerFailureHandlingTest {
* With non-exist-topic the future metadata should return
ExecutionException caused by TimeoutException
*/
@ClusterTest
- void testNonExistentTopic(ClusterInstance clusterInstance) {
+ public void testNonExistentTopic(ClusterInstance clusterInstance) {
// send a record with non-exist topic
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topic2, null, "key".getBytes(),
"value".getBytes());
@@ -156,7 +156,7 @@ public class ProducerFailureHandlingTest {
* With incorrect broker-list the future metadata should return
ExecutionException caused by TimeoutException
*/
@ClusterTest
- void testWrongBrokerList(ClusterInstance clusterInstance) throws
InterruptedException {
+ public void testWrongBrokerList(ClusterInstance clusterInstance) throws
InterruptedException {
clusterInstance.createTopic(topic1, 1, (short) 1);
// producer with incorrect broker list
Map<String, Object> producerConfig = new HashMap<>(producerConfig(1));
@@ -174,7 +174,7 @@ public class ProducerFailureHandlingTest {
* when partition is higher than the upper bound of partitions.
*/
@ClusterTest
- void testInvalidPartition(ClusterInstance clusterInstance) throws
InterruptedException {
+ public void testInvalidPartition(ClusterInstance clusterInstance) throws
InterruptedException {
// create topic with a single partition
clusterInstance.createTopic(topic1, 1, (short)
clusterInstance.brokers().size());
@@ -192,7 +192,7 @@ public class ProducerFailureHandlingTest {
* The send call after producer closed should throw IllegalStateException
*/
@ClusterTest
- void testSendAfterClosed(ClusterInstance clusterInstance) throws
InterruptedException, ExecutionException {
+ public void testSendAfterClosed(ClusterInstance clusterInstance) throws
InterruptedException, ExecutionException {
// create topic
clusterInstance.createTopic(topic1, 1, (short)
clusterInstance.brokers().size());
@@ -216,7 +216,7 @@ public class ProducerFailureHandlingTest {
}
@ClusterTest
- void testCannotSendToInternalTopic(ClusterInstance clusterInstance) throws
InterruptedException {
+ public void testCannotSendToInternalTopic(ClusterInstance clusterInstance)
throws InterruptedException {
try (Admin admin = clusterInstance.admin()) {
Map<String, String> topicConfig = new HashMap<>();
clusterInstance.brokers().get(0)
@@ -237,7 +237,7 @@ public class ProducerFailureHandlingTest {
}
@ClusterTest
- void testNotEnoughReplicasAfterBrokerShutdown(ClusterInstance
clusterInstance) throws InterruptedException,
+ public void testNotEnoughReplicasAfterBrokerShutdown(ClusterInstance
clusterInstance) throws InterruptedException,
ExecutionException {
String topicName = "minisrtest2";
int brokerNum = clusterInstance.brokers().size();
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
index a9489c88327..94bc50b4955 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
@@ -102,7 +102,7 @@ public class ProducerIdExpirationTest {
private final ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "");
@ClusterTest
- void testProducerIdExpirationWithNoTransactions(ClusterInstance cluster)
throws InterruptedException, ExecutionException {
+ public void testProducerIdExpirationWithNoTransactions(ClusterInstance
cluster) throws InterruptedException, ExecutionException {
cluster.createTopic(topic1, numPartitions, replicationFactor);
Producer<byte[], byte[]> producer =
cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true));
// Send records to populate producer state cache.
@@ -123,7 +123,7 @@ public class ProducerIdExpirationTest {
}
@ClusterTest
- void
testTransactionAfterTransactionIdExpiresButProducerIdRemains(ClusterInstance
cluster) throws InterruptedException, ExecutionException {
+ public void
testTransactionAfterTransactionIdExpiresButProducerIdRemains(ClusterInstance
cluster) throws InterruptedException, ExecutionException {
cluster.createTopic(topic1, numPartitions, replicationFactor);
Producer<byte[], byte[]> producer =
cluster.producer(transactionalProducerConfig());
producer.initTransactions();
@@ -188,7 +188,7 @@ public class ProducerIdExpirationTest {
}
@ClusterTest
- void testDynamicProducerIdExpirationMs(ClusterInstance cluster) throws
InterruptedException, ExecutionException {
+ public void testDynamicProducerIdExpirationMs(ClusterInstance cluster)
throws InterruptedException, ExecutionException {
cluster.createTopic(topic1, numPartitions, replicationFactor);
Producer<byte[], byte[]> producer =
cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true));
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
index db6a5b3e51a..d9bdd3347a3 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
@@ -66,10 +66,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
@ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
})
-class ProducerIntegrationTest {
+public class ProducerIntegrationTest {
@ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV3)
- void testUniqueProducerIds(ClusterInstance clusterInstance) {
+ public void testUniqueProducerIds(ClusterInstance clusterInstance) {
// Request enough PIDs from each broker to ensure each broker
generates two blocks
var ids = clusterInstance.brokers().values().stream().flatMap(broker
-> {
int port = broker.boundPort(clusterInstance.clientListener());
@@ -97,7 +97,7 @@ class ProducerIntegrationTest {
@ClusterTest(features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2)}),
})
- void testTransactionWithAndWithoutSend(ClusterInstance cluster) {
+ public void testTransactionWithAndWithoutSend(ClusterInstance cluster) {
Map<String, Object> properties = Map.of(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar",
ProducerConfig.CLIENT_ID_CONFIG, "test",
@@ -121,7 +121,7 @@ class ProducerIntegrationTest {
@ClusterTest(features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2)}),
})
- void testTransactionWithInvalidSendAndEndTxnRequestSent(ClusterInstance
cluster) {
+ public void
testTransactionWithInvalidSendAndEndTxnRequestSent(ClusterInstance cluster) {
var topic = new NewTopic("foobar", 1, (short) 1)
.configs(Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "100"));
String txnId = "test-txn";
@@ -153,7 +153,7 @@ class ProducerIntegrationTest {
@ClusterTest(features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2)}),
})
- void testTransactionWithSendOffset(ClusterInstance cluster) throws
ExecutionException, InterruptedException {
+ public void testTransactionWithSendOffset(ClusterInstance cluster) throws
ExecutionException, InterruptedException {
String inputTopic = "my-input-topic";
try (var producer = cluster.producer()) {
for (int i = 0; i < 5; i++) {