chia7712 commented on code in PR #19319: URL: https://github.com/apache/kafka/pull/19319#discussion_r2032812457
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + + +@ClusterTestDefaults(types = {Type.KRAFT}) +class ProducerCompressionTest { + + private final String topicName = "topic"; + private final int numRecords = 2000; + + /** + * testCompression + * <p> + * Compressed messages should be able to sent and consumed correctly + */ + @ClusterTest + void testCompression(ClusterInstance cluster) { + for (CompressionType compression : CompressionType.values()) { + try { + processCompressionTest(cluster, compression); + } catch (InterruptedException | ExecutionException e) { + fail(e); + } + } + } + + + void processCompressionTest(ClusterInstance cluster, CompressionType compression) throws InterruptedException, + ExecutionException { + String compressionTopic = topicName + "_" + compression.name; + cluster.createTopic(compressionTopic, 1, (short) 1); + Map<String, Object> producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression.name); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200"); + Consumer<byte[], byte[]> classicConsumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")); + Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")); + try (Producer<byte[], byte[]> producer = cluster.producer(producerProps)) { + int partition = 0; + // prepare the messages + List<String> messages = IntStream.range(0, numRecords).mapToObj(this::messageValue).toList(); + Header[] headerArr = new Header[]{new RecordHeader("key", "value".getBytes())}; + RecordHeaders headers = new RecordHeaders(headerArr); + + // make sure the returned messages are correct + long now = System.currentTimeMillis(); + List<Future<RecordMetadata>> responses = new ArrayList<>(); + messages.forEach(message -> { + // 1. send message without key and header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, null, + message.getBytes()))); + // 2. send message with key, without header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, + String.valueOf(message.length()).getBytes(), message.getBytes()))); + // 3. send message with key and header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, + String.valueOf(message.length()).getBytes(), message.getBytes(), headers))); + }); + for (int offset = 0; offset < responses.size(); offset++) { + assertEquals(offset, responses.get(offset).get().offset(), compression.name); + } + verifyConsumerRecords(consumer, messages, now, headerArr, partition, compressionTopic, compression.name); + verifyConsumerRecords(classicConsumer, messages, now, headerArr, partition, compressionTopic, compression.name); + } finally { + // This consumer close very slowly, which may cause the entire test to time out, and we can't wait for + // it to auto close + consumer.close(Duration.ofSeconds(1)); + classicConsumer.close(Duration.ofSeconds(1)); Review Comment: please use the new `close(CloseOptions)` method ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + + +@ClusterTestDefaults(types = {Type.KRAFT}) +class ProducerCompressionTest { + + private final String topicName = "topic"; + private final int numRecords = 2000; + + /** + * testCompression + * <p> + * Compressed messages should be able to sent and consumed correctly + */ + @ClusterTest + void testCompression(ClusterInstance cluster) { + for (CompressionType compression : CompressionType.values()) { + try { + processCompressionTest(cluster, compression); + } catch (InterruptedException | ExecutionException e) { Review Comment: it is ok to throw exception in testing. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC; +import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // 15000 is filed serverMessageMaxBytes + @ClusterConfigProperty(key = MESSAGE_MAX_BYTES_CONFIG, value = "15000"), + // 15200 is filed replicaFetchMaxBytes + @ClusterConfigProperty(key = REPLICA_FETCH_MAX_BYTES_CONFIG, value = "15200"), + // 15400 is filed replicaFetchMaxResponseBytes + @ClusterConfigProperty(key = REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, value = "15400"), + // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + } +) +public class ProducerFailureHandlingTest { + + private final int producerBufferSize = 30000; + private final int serverMessageMaxBytes = producerBufferSize / 2; + private final int replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200; + private final int replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 200; + private final String topic1 = "topic-1"; + private final String topic2 = "topic-2"; + + + /** + * With ack == 0 the future metadata will have no exceptions with offset -1 + */ + @ClusterTest + void testTooLargeRecordWithAckZero(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) { + // send a too-large record + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]); + + RecordMetadata recordMetadata = producer.send(record).get(); + assertNotNull(recordMetadata); + assertFalse(recordMetadata.hasOffset()); + assertEquals(-1L, recordMetadata.offset()); + } + } + + /** + * With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException + */ + @ClusterTest + void testTooLargeRecordWithAckOne(ClusterInstance clusterInstance) throws InterruptedException { + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(1))) { + // send a too-large record + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]); + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + + /** + * This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 + */ + @ClusterTest + void testPartitionTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxPartitionBytes); + } + + /** + * This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 + */ + @ClusterTest + void testResponseTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxResponseBytes); + } + + + /** + * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException + */ + @ClusterTest + void testNonExistentTopic(ClusterInstance clusterInstance) { + // send a record with non-exist topic + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic2, null, "key".getBytes(), "value".getBytes()); + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) { + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + + /** + * With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException + * <p> + * TODO: other exceptions that can be thrown in ExecutionException: Review Comment: Do we need those comments? ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidPidMappingException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; + +import org.opentest4j.AssertionFailedError; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +@ClusterTestDefaults( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long. + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"), + @ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + // ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is not a constant + @ClusterConfigProperty(key = "unclean.leader.election.enable", value = "false"), + @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200"), + @ClusterConfigProperty(key = TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"), + @ClusterConfigProperty(key = TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value = "500"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "10000"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500") + } +) +public class ProducerIdExpirationTest { + private final String topic1 = "topic1"; + private final int numPartitions = 1; + private final short replicationFactor = 3; + private final TopicPartition tp0 = new TopicPartition(topic1, 0); + private final String transactionalId = "transactionalProducer"; + private final ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + @ClusterTest + void testProducerIdExpirationWithNoTransactions(ClusterInstance cluster) throws InterruptedException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer<byte[], byte[]> producer = cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true)); + // Send records to populate producer state cache. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + try (Admin admin = cluster.admin(); producer) { + assertEquals(1, producerStates(admin).size()); + + waitProducerIdExpire(admin); + + // Send more records to send producer ID back to brokers. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + + // Producer IDs should repopulate. + assertEquals(1, producerStates(admin).size()); + } + } + + @ClusterTest + void testTransactionAfterTransactionIdExpiresButProducerIdRemains(ClusterInstance cluster) throws InterruptedException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer<byte[], byte[]> producer = cluster.producer(transactionalProducerConfig()); + producer.initTransactions(); + + // Start and then abort a transaction to allow the producer ID to expire. + producer.beginTransaction(); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", false)); + producer.flush(); + Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ISOLATION_LEVEL_CONFIG, "read_committed")); + + try (Admin admin = cluster.admin()) { + // Ensure producer IDs are added. + TestUtils.waitUntilTrue(() -> producerStates(admin).size() == 1, () -> "Producer IDs were not added.", + DEFAULT_MAX_WAIT_MS, 100); + + producer.abortTransaction(); + + // Wait for the transactional ID to expire. + waitUntilTransactionalStateExpires(admin); + + // Producer IDs should be retained. + assertEquals(1, producerStates(admin).size()); + + // Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail + // due to the expired transactional ID, resulting in a fatal error. + producer.beginTransaction(); + Future<RecordMetadata> failedFuture = + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", false)); + TestUtils.waitUntilTrue(failedFuture::isDone, () -> "Producer future never completed.", + DEFAULT_MAX_WAIT_MS, 100); + assertFutureThrows(InvalidPidMappingException.class, failedFuture); + + // Assert that aborting the transaction throws a KafkaException due to the fatal error. + assertThrows(KafkaException.class, producer::abortTransaction); + + // Close the producer and reinitialize to recover from the fatal error. + producer.close(); + producer = cluster.producer(transactionalProducerConfig()); + producer.initTransactions(); + + producer.beginTransaction(); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", true)); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "3", "3", true)); + + // Producer IDs should be retained. + assertFalse(producerStates(admin).isEmpty()); + + producer.commitTransaction(); + + // Check we can still consume the transaction. + consumer.subscribe(List.of(topic1)); + consumeRecords(consumer, 2, DEFAULT_MAX_WAIT_MS).foreach(TestUtils::assertCommittedAndGetValue); + } finally { + producer.close(); + consumer.close(Duration.ofSeconds(1)); + } + } + + @ClusterTest + void testDynamicProducerIdExpirationMs(ClusterInstance cluster) throws InterruptedException, ExecutionException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer<byte[], byte[]> producer = cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true)); + + // Send records to populate producer state cache. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + + try (Admin admin = cluster.admin(); producer) { + assertEquals(1, producerStates(admin).size()); + + waitProducerIdExpire(admin); + + // Update the producer ID expiration ms to a very high value. + admin.incrementalAlterConfigs(producerIdExpirationConfig("100000")); + + cluster.brokers().values().forEach(broker -> { + TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100000, + () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100); + }); + // Send more records to send producer ID back to brokers. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + + // Producer IDs should repopulate. + assertEquals(1, producerStates(admin).size()); + + // Ensure producer ID does not expire within 4 seconds. + assertThrows(AssertionFailedError.class, () -> waitProducerIdExpire(admin, TimeUnit.SECONDS.toMillis(4))); + + // Update the expiration time to a low value again. + admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get(); + + KafkaBroker kafkaBroker = cluster.brokers().get(0); + kafkaBroker.shutdown(Duration.ofMinutes(5)); + kafkaBroker.awaitShutdown(); + kafkaBroker.startup(); + cluster.waitForReadyBrokers(); + cluster.brokers().values().forEach(broker -> { + TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100, + () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100); + }); + + // Ensure producer ID expires quickly again. + waitProducerIdExpire(admin); + } + } + + + private void waitProducerIdExpire(Admin admin) { + waitProducerIdExpire(admin, DEFAULT_MAX_WAIT_MS); + } + + private void waitProducerIdExpire(Admin admin, long timeout) { + TestUtils.waitUntilTrue(() -> producerStates(admin).isEmpty(), () -> "Producer ID expired.", timeout, 100); + } + + private Map<ConfigResource, Collection<AlterConfigOp>> producerIdExpirationConfig(String configValue) { + ConfigEntry producerIdCfg = new ConfigEntry(PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue); + return Map.of(configResource, List.of(new AlterConfigOp(producerIdCfg, AlterConfigOp.OpType.SET))); + } + + + private void waitUntilTransactionalStateExpires(Admin admin) { + TestUtils.waitUntilTrue(() -> { + boolean removedTransactionState = false; + try { + admin.describeTransactions(List.of(transactionalId)) + .description(transactionalId) + .get(); + } catch (Exception e) { + removedTransactionState = e.getCause() instanceof TransactionalIdNotFoundException; + } + return removedTransactionState; + }, () -> "Transaction state never expired.", DEFAULT_MAX_WAIT_MS, 100); + } + + private Map<String, Object> transactionalProducerConfig() { + return Map.of( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId, + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true, + ProducerConfig.ACKS_CONFIG, "all"); + } + + private List<ProducerState> producerStates(Admin admin) { + try { + return admin.describeProducers(Collections.singletonList(tp0)) + .partitionResult(tp0) + .get() + .activeProducers(); + } catch (InterruptedException | ExecutionException e) { Review Comment: it is ok to throw exception in testing. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidPidMappingException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; + +import org.opentest4j.AssertionFailedError; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +@ClusterTestDefaults( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long. + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"), + @ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + // ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is not a constant + @ClusterConfigProperty(key = "unclean.leader.election.enable", value = "false"), + @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200"), + @ClusterConfigProperty(key = TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"), + @ClusterConfigProperty(key = TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value = "500"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "10000"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500") + } +) +public class ProducerIdExpirationTest { + private final String topic1 = "topic1"; + private final int numPartitions = 1; + private final short replicationFactor = 3; + private final TopicPartition tp0 = new TopicPartition(topic1, 0); + private final String transactionalId = "transactionalProducer"; + private final ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + @ClusterTest + void testProducerIdExpirationWithNoTransactions(ClusterInstance cluster) throws InterruptedException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer<byte[], byte[]> producer = cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true)); + // Send records to populate producer state cache. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + try (Admin admin = cluster.admin(); producer) { + assertEquals(1, producerStates(admin).size()); + + waitProducerIdExpire(admin); + + // Send more records to send producer ID back to brokers. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + + // Producer IDs should repopulate. + assertEquals(1, producerStates(admin).size()); + } + } + + @ClusterTest + void testTransactionAfterTransactionIdExpiresButProducerIdRemains(ClusterInstance cluster) throws InterruptedException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer<byte[], byte[]> producer = cluster.producer(transactionalProducerConfig()); + producer.initTransactions(); + + // Start and then abort a transaction to allow the producer ID to expire. + producer.beginTransaction(); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", false)); + producer.flush(); + Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ISOLATION_LEVEL_CONFIG, "read_committed")); + + try (Admin admin = cluster.admin()) { + // Ensure producer IDs are added. + TestUtils.waitUntilTrue(() -> producerStates(admin).size() == 1, () -> "Producer IDs were not added.", + DEFAULT_MAX_WAIT_MS, 100); + + producer.abortTransaction(); + + // Wait for the transactional ID to expire. + waitUntilTransactionalStateExpires(admin); + + // Producer IDs should be retained. + assertEquals(1, producerStates(admin).size()); + + // Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail + // due to the expired transactional ID, resulting in a fatal error. + producer.beginTransaction(); + Future<RecordMetadata> failedFuture = + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", false)); + TestUtils.waitUntilTrue(failedFuture::isDone, () -> "Producer future never completed.", + DEFAULT_MAX_WAIT_MS, 100); + assertFutureThrows(InvalidPidMappingException.class, failedFuture); + + // Assert that aborting the transaction throws a KafkaException due to the fatal error. + assertThrows(KafkaException.class, producer::abortTransaction); + + // Close the producer and reinitialize to recover from the fatal error. + producer.close(); + producer = cluster.producer(transactionalProducerConfig()); + producer.initTransactions(); + + producer.beginTransaction(); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", true)); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "3", "3", true)); + + // Producer IDs should be retained. + assertFalse(producerStates(admin).isEmpty()); + + producer.commitTransaction(); + + // Check we can still consume the transaction. + consumer.subscribe(List.of(topic1)); + consumeRecords(consumer, 2, DEFAULT_MAX_WAIT_MS).foreach(TestUtils::assertCommittedAndGetValue); + } finally { + producer.close(); + consumer.close(Duration.ofSeconds(1)); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
