AndrewJSchofield commented on code in PR #17009: URL: https://github.com/apache/kafka/pull/17009#discussion_r1736687367
########## core/src/test/java/kafka/test/api/ShareConsumerTest.java: ########## @@ -0,0 +1,1651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.api; + +import kafka.api.BaseConsumerTest; +import kafka.testkit.KafkaClusterTestKit; +import kafka.testkit.TestKitNodes; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Timeout(1200) +@Tag("integration") +@SuppressWarnings("deprecation") +public class ShareConsumerTest { + private KafkaClusterTestKit cluster; + private final TopicPartition tp = new TopicPartition("topic", 0); + private final TopicPartition warmupTp = new TopicPartition("warmup", 0); + + @BeforeEach + public void createCluster() throws Exception { + cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setNumBrokerNodes(1) + .setNumControllerNodes(1) + .build()) + .setConfigProp("auto.create.topics.enable", "false") + .setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share") + .setConfigProp("group.share.enable", "true") + .setConfigProp("group.share.partition.max.record.locks", "10000") + .setConfigProp("group.share.persister.class.name", "org.apache.kafka.server.group.share.NoOpShareStatePersister") + .setConfigProp("group.share.record.lock.duration.ms", "15000") + .setConfigProp("offsets.topic.replication.factor", "1") + .setConfigProp("share.coordinator.state.topic.min.isr", "1") + .setConfigProp("share.coordinator.state.topic.replication.factor", "1") + .setConfigProp("transaction.state.log.min.isr", "1") + .setConfigProp("transaction.state.log.replication.factor", "1") + .setConfigProp("unstable.api.versions.enable", "true") + .build(); + cluster.format(); + cluster.startup(); + cluster.waitForActiveController(); + cluster.waitForReadyBrokers(); + createTopic("topic"); + warmup(); + } + + @AfterEach + public void destroyCluster() throws Exception { + cluster.close(); + } + + @Test + public void testPollNoSubscribeFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + } + + @Test + public void testSubscribeAndPollNoRecords() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribePollUnsubscribe() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribePollSubscribe() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribeUnsubscribePollFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribeSubscribeEmptyPollFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.subscribe(Collections.emptySet()); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscriptionAndPoll() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testSubscriptionAndPollMultiple() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() { + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgment error code after the second poll. + // When we start the 3rd poll, the acknowledgment commit callback is invoked. + shareConsumer.poll(Duration.ofMillis(5000)); Review Comment: This is going to wait approximately 10 seconds for no records to arrive. I wonder if it can be tightened up. 10 unnecessary seconds per test soon adds up. ########## core/src/test/java/kafka/test/api/ShareConsumerTest.java: ########## @@ -0,0 +1,1651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.api; + +import kafka.api.BaseConsumerTest; +import kafka.testkit.KafkaClusterTestKit; +import kafka.testkit.TestKitNodes; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Timeout(1200) +@Tag("integration") +@SuppressWarnings("deprecation") +public class ShareConsumerTest { + private KafkaClusterTestKit cluster; + private final TopicPartition tp = new TopicPartition("topic", 0); + private final TopicPartition warmupTp = new TopicPartition("warmup", 0); + + @BeforeEach + public void createCluster() throws Exception { + cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() Review Comment: nit: Please indent fewer spaces. ########## core/src/test/java/kafka/test/api/ShareConsumerTest.java: ########## @@ -0,0 +1,1651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.api; + +import kafka.api.BaseConsumerTest; +import kafka.testkit.KafkaClusterTestKit; +import kafka.testkit.TestKitNodes; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Timeout(1200) +@Tag("integration") +@SuppressWarnings("deprecation") +public class ShareConsumerTest { + private KafkaClusterTestKit cluster; + private final TopicPartition tp = new TopicPartition("topic", 0); + private final TopicPartition warmupTp = new TopicPartition("warmup", 0); + + @BeforeEach + public void createCluster() throws Exception { + cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setNumBrokerNodes(1) + .setNumControllerNodes(1) + .build()) + .setConfigProp("auto.create.topics.enable", "false") + .setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share") + .setConfigProp("group.share.enable", "true") + .setConfigProp("group.share.partition.max.record.locks", "10000") + .setConfigProp("group.share.persister.class.name", "org.apache.kafka.server.group.share.NoOpShareStatePersister") + .setConfigProp("group.share.record.lock.duration.ms", "15000") + .setConfigProp("offsets.topic.replication.factor", "1") + .setConfigProp("share.coordinator.state.topic.min.isr", "1") + .setConfigProp("share.coordinator.state.topic.replication.factor", "1") + .setConfigProp("transaction.state.log.min.isr", "1") + .setConfigProp("transaction.state.log.replication.factor", "1") + .setConfigProp("unstable.api.versions.enable", "true") + .build(); + cluster.format(); + cluster.startup(); + cluster.waitForActiveController(); + cluster.waitForReadyBrokers(); + createTopic("topic"); + warmup(); + } + + @AfterEach + public void destroyCluster() throws Exception { + cluster.close(); + } + + @Test + public void testPollNoSubscribeFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + } + + @Test + public void testSubscribeAndPollNoRecords() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribePollUnsubscribe() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribePollSubscribe() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribeUnsubscribePollFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribeSubscribeEmptyPollFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.subscribe(Collections.emptySet()); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscriptionAndPoll() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testSubscriptionAndPollMultiple() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() { + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgment error code after the second poll. + // When we start the 3rd poll, the acknowledgment commit callback is invoked. + shareConsumer.poll(Duration.ofMillis(5000)); + shareConsumer.poll(Duration.ofMillis(5000)); + + // We expect null exception as the acknowledgment error code is null. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testAcknowledgementCommitCallbackOnClose() { + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgement error code asynchronously after the second poll. + // The acknowledgement commit callback is invoked in close. + shareConsumer.poll(Duration.ofMillis(5000)); + shareConsumer.close(); + + // We expect null exception as the acknowledgment error code is null. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + producer.close(); + } + + @Test + public void testAcknowledgementCommitCallbackInvalidRecordStateException() throws Exception { + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + + // Waiting until the acquisition lock expires. + Thread.sleep(15000); + + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgment error code after the second poll. + // When we start the 3rd poll, the acknowledgment commit callback is invoked. + records = shareConsumer.poll(Duration.ofMillis(200)); + assertEquals(1, records.count()); + + records = shareConsumer.poll(Duration.ofMillis(200)); + assertEquals(0, records.count()); + + // As we tried to acknowledge a record after the acquisition lock expired, + // we wil get an InvalidRecordStateException. + assertInstanceOf(InvalidRecordStateException.class, partitionExceptionMap.get(tp)); + shareConsumer.close(); + producer.close(); + } + + private static class TestableAcknowledgeCommitCallback implements AcknowledgementCommitCallback { + private final Map<TopicPartition, Set<Long>> partitionOffsetsMap; + private final Map<TopicPartition, Exception> partitionExceptionMap; + + public TestableAcknowledgeCommitCallback(Map<TopicPartition, Set<Long>> partitionOffsetsMap, + Map<TopicPartition, Exception> partitionExceptionMap) { + this.partitionOffsetsMap = partitionOffsetsMap; + this.partitionExceptionMap = partitionExceptionMap; + } + + @Override + public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) { + offsetsMap.forEach((partition, offsets) -> { + partitionOffsetsMap.merge(partition.topicPartition(), offsets, (oldOffsets, newOffsets) -> { + Set<Long> mergedOffsets = new HashSet<>(); + mergedOffsets.addAll(oldOffsets); + mergedOffsets.addAll(newOffsets); + return mergedOffsets; + }); + if (!partitionExceptionMap.containsKey(partition.topicPartition())) { + partitionExceptionMap.put(partition.topicPartition(), exception); + } + }); + } + } + + @Test + public void testHeaders() { + int numRecords = 1; + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + record.headers().add("headerKey", "headerValue".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); + assertEquals(numRecords, records.size()); + + for (ConsumerRecord<byte[], byte[]> consumerRecord : records) { + Header header = consumerRecord.headers().lastHeader("headerKey"); + if (header != null) + assertEquals("headerValue", new String(header.value())); + } + shareConsumer.close(); + producer.close(); + } + + private void testHeadersSerializeDeserialize(Serializer<byte[]> serializer, Deserializer<byte[]> deserializer) { + int numRecords = 1; + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), serializer); + producer.send(record); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); + assertEquals(numRecords, records.size()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testHeadersSerializerDeserializer() { + testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl()); + } + + @Test + public void testMaxPollRecords() { + int maxPollRecords = 2; + int numRecords = 10000; + + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + long startingTimestamp = System.currentTimeMillis(); + produceMessagesWithTimestamp(numRecords, startingTimestamp); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), + "group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords))); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); + long i = 0L; + for (ConsumerRecord<byte[], byte[]> record : records) { + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(startingTimestamp + i, record.timestamp()); + assertEquals("key " + i, new String(record.key())); + assertEquals("value " + i, new String(record.value())); + // this is true only because K and V are byte arrays + assertEquals(("key " + i).length(), record.serializedKeySize()); + assertEquals(("value " + i).length(), record.serializedValueSize()); + + i++; + } + shareConsumer.close(); + producer.close(); + } + + @Test + public void testControlRecordsSkipped() throws Exception { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + + KafkaProducer<byte[], byte[]> transactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1"); + transactionalProducer.initTransactions(); + transactionalProducer.beginTransaction(); + RecordMetadata transactional1 = transactionalProducer.send(record).get(); + + KafkaProducer<byte[], byte[]> nonTransactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + RecordMetadata nonTransactional1 = nonTransactionalProducer.send(record).get(); + + transactionalProducer.commitTransaction(); + + transactionalProducer.beginTransaction(); + RecordMetadata transactional2 = transactionalProducer.send(record).get(); + transactionalProducer.abortTransaction(); + + RecordMetadata nonTransactional2 = nonTransactionalProducer.send(record).get(); + + transactionalProducer.close(); + nonTransactionalProducer.close(); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(4, records.count()); + assertEquals(transactional1.offset(), records.records(tp).get(0).offset()); + assertEquals(nonTransactional1.offset(), records.records(tp).get(1).offset()); + assertEquals(transactional2.offset(), records.records(tp).get(2).offset()); + assertEquals(nonTransactional2.offset(), records.records(tp).get(3).offset()); + + // There will be control records on the topic-partition, so the offsets of the non-control records + // are not 0, 1, 2, 3. Just assert that the offset of the final one is not 3. + assertNotEquals(3, nonTransactional2.offset()); + + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(0, records.count()); + shareConsumer.close(); + transactionalProducer.close(); + } + + @Test + public void testExplicitAcknowledgeSuccess() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(shareConsumer::acknowledge); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testExplicitAcknowledgeCommitSuccess() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(shareConsumer::acknowledge); + producer.send(record); + Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testExplicitAcknowledgementCommitAsync() throws InterruptedException { + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + + KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + shareConsumer2.subscribe(Collections.singleton(tp.topic())); + + Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>(); + shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1)); + + ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(3, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + // Acknowledging 2 out of the 3 records received via commitAsync. + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); + + shareConsumer1.acknowledge(firstRecord); + shareConsumer1.acknowledge(secondRecord); + shareConsumer1.commitAsync(); + + // Allowing acquisition lock timeout to expire. + Thread.sleep(15000); + + // The 3rd record should be reassigned to 2nd consumer when it polls. + ConsumerRecords<byte[], byte[]> records2 = shareConsumer2.poll(Duration.ofMillis(5000)); + assertEquals(1, records2.count()); + assertEquals(2L, records2.iterator().next().offset()); + + assertFalse(partitionExceptionMap1.containsKey(tp)); + // The callback will receive the acknowledgement responses asynchronously after the next poll. + shareConsumer1.poll(Duration.ofMillis(500)); + + shareConsumer1.close(); + shareConsumer2.close(); + producer.close(); + + assertTrue(partitionExceptionMap1.containsKey(tp)); + assertNull(partitionExceptionMap1.get(tp)); + } + + @Test + public void testExplicitAcknowledgementCommitAsyncPartialBatch() { + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + + KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + + ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(3, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + // Acknowledging 2 out of the 3 records received via commitAsync. + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); + + shareConsumer1.acknowledge(firstRecord); + shareConsumer1.acknowledge(secondRecord); + shareConsumer1.commitAsync(); + + // The 3rd record should be re-presented to the consumer when it polls again. + records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + iterator = records.iterator(); + firstRecord = iterator.next(); + assertEquals(2L, firstRecord.offset()); + + // And poll again without acknowledging - the callback will receive the acknowledgement responses too + records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + iterator = records.iterator(); + firstRecord = iterator.next(); + assertEquals(2L, firstRecord.offset()); + + shareConsumer1.acknowledge(firstRecord); + + // The callback will receive the acknowledgement responses after polling. The callback is + // called on entry to the poll method or during close. The commit is being performed asynchronously, so + // we can only rely on the completion once the consumer has closed because that waits for the response. + shareConsumer1.poll(Duration.ofMillis(5000)); + + shareConsumer1.close(); + producer.close(); + + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + } + + @Test + public void testExplicitAcknowledgeReleasePollAccept() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(0, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testExplicitAcknowledgeReleaseAccept() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testExplicitAcknowledgeReleaseClose() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + shareConsumer.close(); + producer.close(); + } + + + @Test + public void testExplicitAcknowledgeThrowsNotInBatch() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0); + shareConsumer.acknowledge(consumedRecord); + records = shareConsumer.poll(Duration.ofMillis(5000)); Review Comment: And shorter here too. ########## core/src/test/java/kafka/test/api/ShareConsumerTest.java: ########## @@ -0,0 +1,1651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.api; + +import kafka.api.BaseConsumerTest; +import kafka.testkit.KafkaClusterTestKit; +import kafka.testkit.TestKitNodes; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Timeout(1200) +@Tag("integration") +@SuppressWarnings("deprecation") +public class ShareConsumerTest { + private KafkaClusterTestKit cluster; + private final TopicPartition tp = new TopicPartition("topic", 0); + private final TopicPartition warmupTp = new TopicPartition("warmup", 0); + + @BeforeEach + public void createCluster() throws Exception { + cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setNumBrokerNodes(1) + .setNumControllerNodes(1) + .build()) + .setConfigProp("auto.create.topics.enable", "false") + .setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share") + .setConfigProp("group.share.enable", "true") + .setConfigProp("group.share.partition.max.record.locks", "10000") + .setConfigProp("group.share.persister.class.name", "org.apache.kafka.server.group.share.NoOpShareStatePersister") + .setConfigProp("group.share.record.lock.duration.ms", "15000") + .setConfigProp("offsets.topic.replication.factor", "1") + .setConfigProp("share.coordinator.state.topic.min.isr", "1") + .setConfigProp("share.coordinator.state.topic.replication.factor", "1") + .setConfigProp("transaction.state.log.min.isr", "1") + .setConfigProp("transaction.state.log.replication.factor", "1") + .setConfigProp("unstable.api.versions.enable", "true") + .build(); + cluster.format(); + cluster.startup(); + cluster.waitForActiveController(); + cluster.waitForReadyBrokers(); + createTopic("topic"); + warmup(); + } + + @AfterEach + public void destroyCluster() throws Exception { + cluster.close(); + } + + @Test + public void testPollNoSubscribeFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + } + + @Test + public void testSubscribeAndPollNoRecords() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribePollUnsubscribe() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribePollSubscribe() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribeUnsubscribePollFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscribeSubscribeEmptyPollFails() { + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + Set<String> subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.subscribe(Collections.emptySet()); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + shareConsumer.close(); + assertEquals(0, records.count()); + } + + @Test + public void testSubscriptionAndPoll() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testSubscriptionAndPollMultiple() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() { + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgment error code after the second poll. + // When we start the 3rd poll, the acknowledgment commit callback is invoked. + shareConsumer.poll(Duration.ofMillis(5000)); + shareConsumer.poll(Duration.ofMillis(5000)); + + // We expect null exception as the acknowledgment error code is null. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testAcknowledgementCommitCallbackOnClose() { + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgement error code asynchronously after the second poll. + // The acknowledgement commit callback is invoked in close. + shareConsumer.poll(Duration.ofMillis(5000)); + shareConsumer.close(); + + // We expect null exception as the acknowledgment error code is null. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + producer.close(); + } + + @Test + public void testAcknowledgementCommitCallbackInvalidRecordStateException() throws Exception { + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + + // Waiting until the acquisition lock expires. + Thread.sleep(15000); + + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgment error code after the second poll. + // When we start the 3rd poll, the acknowledgment commit callback is invoked. + records = shareConsumer.poll(Duration.ofMillis(200)); + assertEquals(1, records.count()); + + records = shareConsumer.poll(Duration.ofMillis(200)); + assertEquals(0, records.count()); + + // As we tried to acknowledge a record after the acquisition lock expired, + // we wil get an InvalidRecordStateException. + assertInstanceOf(InvalidRecordStateException.class, partitionExceptionMap.get(tp)); + shareConsumer.close(); + producer.close(); + } + + private static class TestableAcknowledgeCommitCallback implements AcknowledgementCommitCallback { + private final Map<TopicPartition, Set<Long>> partitionOffsetsMap; + private final Map<TopicPartition, Exception> partitionExceptionMap; + + public TestableAcknowledgeCommitCallback(Map<TopicPartition, Set<Long>> partitionOffsetsMap, + Map<TopicPartition, Exception> partitionExceptionMap) { + this.partitionOffsetsMap = partitionOffsetsMap; + this.partitionExceptionMap = partitionExceptionMap; + } + + @Override + public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) { + offsetsMap.forEach((partition, offsets) -> { + partitionOffsetsMap.merge(partition.topicPartition(), offsets, (oldOffsets, newOffsets) -> { + Set<Long> mergedOffsets = new HashSet<>(); + mergedOffsets.addAll(oldOffsets); + mergedOffsets.addAll(newOffsets); + return mergedOffsets; + }); + if (!partitionExceptionMap.containsKey(partition.topicPartition())) { + partitionExceptionMap.put(partition.topicPartition(), exception); + } + }); + } + } + + @Test + public void testHeaders() { + int numRecords = 1; + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + record.headers().add("headerKey", "headerValue".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); + assertEquals(numRecords, records.size()); + + for (ConsumerRecord<byte[], byte[]> consumerRecord : records) { + Header header = consumerRecord.headers().lastHeader("headerKey"); + if (header != null) + assertEquals("headerValue", new String(header.value())); + } + shareConsumer.close(); + producer.close(); + } + + private void testHeadersSerializeDeserialize(Serializer<byte[]> serializer, Deserializer<byte[]> deserializer) { + int numRecords = 1; + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), serializer); + producer.send(record); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); + assertEquals(numRecords, records.size()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testHeadersSerializerDeserializer() { + testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl()); + } + + @Test + public void testMaxPollRecords() { + int maxPollRecords = 2; + int numRecords = 10000; + + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + long startingTimestamp = System.currentTimeMillis(); + produceMessagesWithTimestamp(numRecords, startingTimestamp); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), + "group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords))); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords); + long i = 0L; + for (ConsumerRecord<byte[], byte[]> record : records) { + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(startingTimestamp + i, record.timestamp()); + assertEquals("key " + i, new String(record.key())); + assertEquals("value " + i, new String(record.value())); + // this is true only because K and V are byte arrays + assertEquals(("key " + i).length(), record.serializedKeySize()); + assertEquals(("value " + i).length(), record.serializedValueSize()); + + i++; + } + shareConsumer.close(); + producer.close(); + } + + @Test + public void testControlRecordsSkipped() throws Exception { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + + KafkaProducer<byte[], byte[]> transactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1"); + transactionalProducer.initTransactions(); + transactionalProducer.beginTransaction(); + RecordMetadata transactional1 = transactionalProducer.send(record).get(); + + KafkaProducer<byte[], byte[]> nonTransactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + RecordMetadata nonTransactional1 = nonTransactionalProducer.send(record).get(); + + transactionalProducer.commitTransaction(); + + transactionalProducer.beginTransaction(); + RecordMetadata transactional2 = transactionalProducer.send(record).get(); + transactionalProducer.abortTransaction(); + + RecordMetadata nonTransactional2 = nonTransactionalProducer.send(record).get(); + + transactionalProducer.close(); + nonTransactionalProducer.close(); + + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(4, records.count()); + assertEquals(transactional1.offset(), records.records(tp).get(0).offset()); + assertEquals(nonTransactional1.offset(), records.records(tp).get(1).offset()); + assertEquals(transactional2.offset(), records.records(tp).get(2).offset()); + assertEquals(nonTransactional2.offset(), records.records(tp).get(3).offset()); + + // There will be control records on the topic-partition, so the offsets of the non-control records + // are not 0, 1, 2, 3. Just assert that the offset of the final one is not 3. + assertNotEquals(3, nonTransactional2.offset()); + + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(0, records.count()); + shareConsumer.close(); + transactionalProducer.close(); + } + + @Test + public void testExplicitAcknowledgeSuccess() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(shareConsumer::acknowledge); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testExplicitAcknowledgeCommitSuccess() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(shareConsumer::acknowledge); + producer.send(record); + Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @Test + public void testExplicitAcknowledgementCommitAsync() throws InterruptedException { + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + + KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + shareConsumer2.subscribe(Collections.singleton(tp.topic())); + + Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>(); + shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1)); + + ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(3, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + // Acknowledging 2 out of the 3 records received via commitAsync. + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); + + shareConsumer1.acknowledge(firstRecord); + shareConsumer1.acknowledge(secondRecord); + shareConsumer1.commitAsync(); + + // Allowing acquisition lock timeout to expire. + Thread.sleep(15000); + + // The 3rd record should be reassigned to 2nd consumer when it polls. + ConsumerRecords<byte[], byte[]> records2 = shareConsumer2.poll(Duration.ofMillis(5000)); + assertEquals(1, records2.count()); + assertEquals(2L, records2.iterator().next().offset()); + + assertFalse(partitionExceptionMap1.containsKey(tp)); + // The callback will receive the acknowledgement responses asynchronously after the next poll. + shareConsumer1.poll(Duration.ofMillis(500)); + + shareConsumer1.close(); + shareConsumer2.close(); + producer.close(); + + assertTrue(partitionExceptionMap1.containsKey(tp)); + assertNull(partitionExceptionMap1.get(tp)); + } + + @Test + public void testExplicitAcknowledgementCommitAsyncPartialBatch() { + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + + KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + + Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>(); + Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>(); + shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + + ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(3, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + // Acknowledging 2 out of the 3 records received via commitAsync. + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); + + shareConsumer1.acknowledge(firstRecord); + shareConsumer1.acknowledge(secondRecord); + shareConsumer1.commitAsync(); + + // The 3rd record should be re-presented to the consumer when it polls again. + records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + iterator = records.iterator(); + firstRecord = iterator.next(); + assertEquals(2L, firstRecord.offset()); + + // And poll again without acknowledging - the callback will receive the acknowledgement responses too + records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + iterator = records.iterator(); + firstRecord = iterator.next(); + assertEquals(2L, firstRecord.offset()); + + shareConsumer1.acknowledge(firstRecord); + + // The callback will receive the acknowledgement responses after polling. The callback is + // called on entry to the poll method or during close. The commit is being performed asynchronously, so + // we can only rely on the completion once the consumer has closed because that waits for the response. + shareConsumer1.poll(Duration.ofMillis(5000)); + + shareConsumer1.close(); + producer.close(); + + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + } + + @Test + public void testExplicitAcknowledgeReleasePollAccept() { + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + producer.send(record); + KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); + records = shareConsumer.poll(Duration.ofMillis(5000)); Review Comment: This wait can definitely be shorter. Maybe 500. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
