AndrewJSchofield commented on code in PR #17009:
URL: https://github.com/apache/kafka/pull/17009#discussion_r1736687367


##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -0,0 +1,1651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.test.api;
+
+import kafka.api.BaseConsumerTest;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Timeout(1200)
+@Tag("integration")
+@SuppressWarnings("deprecation")
+public class ShareConsumerTest {
+    private KafkaClusterTestKit cluster;
+    private final TopicPartition tp = new TopicPartition("topic", 0);
+    private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
+
+    @BeforeEach
+    public void createCluster() throws Exception {
+        cluster = new KafkaClusterTestKit.Builder(
+                new TestKitNodes.Builder()
+                        .setNumBrokerNodes(1)
+                        .setNumControllerNodes(1)
+                        .build())
+                .setConfigProp("auto.create.topics.enable", "false")
+                .setConfigProp("group.coordinator.rebalance.protocols", 
"classic,consumer,share")
+                .setConfigProp("group.share.enable", "true")
+                .setConfigProp("group.share.partition.max.record.locks", 
"10000")
+                .setConfigProp("group.share.persister.class.name", 
"org.apache.kafka.server.group.share.NoOpShareStatePersister")
+                .setConfigProp("group.share.record.lock.duration.ms", "15000")
+                .setConfigProp("offsets.topic.replication.factor", "1")
+                .setConfigProp("share.coordinator.state.topic.min.isr", "1")
+                
.setConfigProp("share.coordinator.state.topic.replication.factor", "1")
+                .setConfigProp("transaction.state.log.min.isr", "1")
+                .setConfigProp("transaction.state.log.replication.factor", "1")
+                .setConfigProp("unstable.api.versions.enable", "true")
+                .build();
+        cluster.format();
+        cluster.startup();
+        cluster.waitForActiveController();
+        cluster.waitForReadyBrokers();
+        createTopic("topic");
+        warmup();
+    }
+
+    @AfterEach
+    public void destroyCluster() throws Exception {
+        cluster.close();
+    }
+
+    @Test
+    public void testPollNoSubscribeFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+    }
+
+    @Test
+    public void testSubscribeAndPollNoRecords() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribePollUnsubscribe() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.unsubscribe();
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribePollSubscribe() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        records = shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribeUnsubscribePollFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.unsubscribe();
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribeSubscribeEmptyPollFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.subscribe(Collections.emptySet());
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscriptionAndPoll() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testSubscriptionAndPollMultiple() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
+        // We get back the acknowledgment error code after the second poll.
+        // When we start the 3rd poll, the acknowledgment commit callback is 
invoked.
+        shareConsumer.poll(Duration.ofMillis(5000));

Review Comment:
   This is going to wait approximately 10 seconds for no records to arrive. I 
wonder if it can be tightened up. 10 unnecessary seconds per test soon adds up.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -0,0 +1,1651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.test.api;
+
+import kafka.api.BaseConsumerTest;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Timeout(1200)
+@Tag("integration")
+@SuppressWarnings("deprecation")
+public class ShareConsumerTest {
+    private KafkaClusterTestKit cluster;
+    private final TopicPartition tp = new TopicPartition("topic", 0);
+    private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
+
+    @BeforeEach
+    public void createCluster() throws Exception {
+        cluster = new KafkaClusterTestKit.Builder(
+                new TestKitNodes.Builder()

Review Comment:
   nit: Please indent fewer spaces.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -0,0 +1,1651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.test.api;
+
+import kafka.api.BaseConsumerTest;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Timeout(1200)
+@Tag("integration")
+@SuppressWarnings("deprecation")
+public class ShareConsumerTest {
+    private KafkaClusterTestKit cluster;
+    private final TopicPartition tp = new TopicPartition("topic", 0);
+    private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
+
+    @BeforeEach
+    public void createCluster() throws Exception {
+        cluster = new KafkaClusterTestKit.Builder(
+                new TestKitNodes.Builder()
+                        .setNumBrokerNodes(1)
+                        .setNumControllerNodes(1)
+                        .build())
+                .setConfigProp("auto.create.topics.enable", "false")
+                .setConfigProp("group.coordinator.rebalance.protocols", 
"classic,consumer,share")
+                .setConfigProp("group.share.enable", "true")
+                .setConfigProp("group.share.partition.max.record.locks", 
"10000")
+                .setConfigProp("group.share.persister.class.name", 
"org.apache.kafka.server.group.share.NoOpShareStatePersister")
+                .setConfigProp("group.share.record.lock.duration.ms", "15000")
+                .setConfigProp("offsets.topic.replication.factor", "1")
+                .setConfigProp("share.coordinator.state.topic.min.isr", "1")
+                
.setConfigProp("share.coordinator.state.topic.replication.factor", "1")
+                .setConfigProp("transaction.state.log.min.isr", "1")
+                .setConfigProp("transaction.state.log.replication.factor", "1")
+                .setConfigProp("unstable.api.versions.enable", "true")
+                .build();
+        cluster.format();
+        cluster.startup();
+        cluster.waitForActiveController();
+        cluster.waitForReadyBrokers();
+        createTopic("topic");
+        warmup();
+    }
+
+    @AfterEach
+    public void destroyCluster() throws Exception {
+        cluster.close();
+    }
+
+    @Test
+    public void testPollNoSubscribeFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+    }
+
+    @Test
+    public void testSubscribeAndPollNoRecords() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribePollUnsubscribe() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.unsubscribe();
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribePollSubscribe() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        records = shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribeUnsubscribePollFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.unsubscribe();
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribeSubscribeEmptyPollFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.subscribe(Collections.emptySet());
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscriptionAndPoll() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testSubscriptionAndPollMultiple() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
+        // We get back the acknowledgment error code after the second poll.
+        // When we start the 3rd poll, the acknowledgment commit callback is 
invoked.
+        shareConsumer.poll(Duration.ofMillis(5000));
+        shareConsumer.poll(Duration.ofMillis(5000));
+
+        // We expect null exception as the acknowledgment error code is null.
+        assertTrue(partitionExceptionMap.containsKey(tp));
+        assertNull(partitionExceptionMap.get(tp));
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testAcknowledgementCommitCallbackOnClose() {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+
+        // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
+        // We get back the acknowledgement error code asynchronously after the 
second poll.
+        // The acknowledgement commit callback is invoked in close.
+        shareConsumer.poll(Duration.ofMillis(5000));
+        shareConsumer.close();
+
+        // We expect null exception as the acknowledgment error code is null.
+        assertTrue(partitionExceptionMap.containsKey(tp));
+        assertNull(partitionExceptionMap.get(tp));
+        producer.close();
+    }
+
+    @Test
+    public void testAcknowledgementCommitCallbackInvalidRecordStateException() 
throws Exception {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+
+        // Waiting until the acquisition lock expires.
+        Thread.sleep(15000);
+
+        // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
+        // We get back the acknowledgment error code after the second poll.
+        // When we start the 3rd poll, the acknowledgment commit callback is 
invoked.
+        records = shareConsumer.poll(Duration.ofMillis(200));
+        assertEquals(1, records.count());
+
+        records = shareConsumer.poll(Duration.ofMillis(200));
+        assertEquals(0, records.count());
+
+        // As we tried to acknowledge a record after the acquisition lock 
expired,
+        // we wil get an InvalidRecordStateException.
+        assertInstanceOf(InvalidRecordStateException.class, 
partitionExceptionMap.get(tp));
+        shareConsumer.close();
+        producer.close();
+    }
+
+    private static class TestableAcknowledgeCommitCallback implements 
AcknowledgementCommitCallback {
+        private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
+        private final Map<TopicPartition, Exception> partitionExceptionMap;
+
+        public TestableAcknowledgeCommitCallback(Map<TopicPartition, 
Set<Long>> partitionOffsetsMap,
+                                                 Map<TopicPartition, 
Exception> partitionExceptionMap) {
+            this.partitionOffsetsMap = partitionOffsetsMap;
+            this.partitionExceptionMap = partitionExceptionMap;
+        }
+
+        @Override
+        public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, 
Exception exception) {
+            offsetsMap.forEach((partition, offsets) -> {
+                partitionOffsetsMap.merge(partition.topicPartition(), offsets, 
(oldOffsets, newOffsets) -> {
+                    Set<Long> mergedOffsets = new HashSet<>();
+                    mergedOffsets.addAll(oldOffsets);
+                    mergedOffsets.addAll(newOffsets);
+                    return mergedOffsets;
+                });
+                if 
(!partitionExceptionMap.containsKey(partition.topicPartition())) {
+                    partitionExceptionMap.put(partition.topicPartition(), 
exception);
+                }
+            });
+        }
+    }
+
+    @Test
+    public void testHeaders() {
+        int numRecords = 1;
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        record.headers().add("headerKey", "headerValue".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
+        assertEquals(numRecords, records.size());
+
+        for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+            Header header = consumerRecord.headers().lastHeader("headerKey");
+            if (header != null)
+                assertEquals("headerValue", new String(header.value()));
+        }
+        shareConsumer.close();
+        producer.close();
+    }
+
+    private void testHeadersSerializeDeserialize(Serializer<byte[]> 
serializer, Deserializer<byte[]> deserializer) {
+        int numRecords = 1;
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), serializer);
+        producer.send(record);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
+        assertEquals(numRecords, records.size());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testHeadersSerializerDeserializer() {
+        testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), 
new BaseConsumerTest.DeserializerImpl());
+    }
+
+    @Test
+    public void testMaxPollRecords() {
+        int maxPollRecords = 2;
+        int numRecords = 10000;
+
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        long startingTimestamp = System.currentTimeMillis();
+        produceMessagesWithTimestamp(numRecords, startingTimestamp);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
+                "group1", 
Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
String.valueOf(maxPollRecords)));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
+        long i = 0L;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(tp.topic(), record.topic());
+            assertEquals(tp.partition(), record.partition());
+            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+            assertEquals(startingTimestamp + i, record.timestamp());
+            assertEquals("key " + i, new String(record.key()));
+            assertEquals("value " + i, new String(record.value()));
+            // this is true only because K and V are byte arrays
+            assertEquals(("key " + i).length(), record.serializedKeySize());
+            assertEquals(("value " + i).length(), 
record.serializedValueSize());
+
+            i++;
+        }
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testControlRecordsSkipped() throws Exception {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+
+        KafkaProducer<byte[], byte[]> transactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1");
+        transactionalProducer.initTransactions();
+        transactionalProducer.beginTransaction();
+        RecordMetadata transactional1 = 
transactionalProducer.send(record).get();
+
+        KafkaProducer<byte[], byte[]> nonTransactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer());
+        RecordMetadata nonTransactional1 = 
nonTransactionalProducer.send(record).get();
+
+        transactionalProducer.commitTransaction();
+
+        transactionalProducer.beginTransaction();
+        RecordMetadata transactional2 = 
transactionalProducer.send(record).get();
+        transactionalProducer.abortTransaction();
+
+        RecordMetadata nonTransactional2 = 
nonTransactionalProducer.send(record).get();
+
+        transactionalProducer.close();
+        nonTransactionalProducer.close();
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(4, records.count());
+        assertEquals(transactional1.offset(), 
records.records(tp).get(0).offset());
+        assertEquals(nonTransactional1.offset(), 
records.records(tp).get(1).offset());
+        assertEquals(transactional2.offset(), 
records.records(tp).get(2).offset());
+        assertEquals(nonTransactional2.offset(), 
records.records(tp).get(3).offset());
+
+        // There will be control records on the topic-partition, so the 
offsets of the non-control records
+        // are not 0, 1, 2, 3. Just assert that the offset of the final one is 
not 3.
+        assertNotEquals(3, nonTransactional2.offset());
+
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(0, records.count());
+        shareConsumer.close();
+        transactionalProducer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgeSuccess() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(shareConsumer::acknowledge);
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgeCommitSuccess() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(shareConsumer::acknowledge);
+        producer.send(record);
+        Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+        assertEquals(1, result.size());
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgementCommitAsync() throws 
InterruptedException {
+        ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record3 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record1);
+        producer.send(record2);
+        producer.send(record3);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+        shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
+        shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(3, records.count());
+        Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
+
+        // Acknowledging 2 out of the 3 records received via commitAsync.
+        ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+        ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+        assertEquals(0L, firstRecord.offset());
+        assertEquals(1L, secondRecord.offset());
+
+        shareConsumer1.acknowledge(firstRecord);
+        shareConsumer1.acknowledge(secondRecord);
+        shareConsumer1.commitAsync();
+
+        // Allowing acquisition lock timeout to expire.
+        Thread.sleep(15000);
+
+        // The 3rd record should be reassigned to 2nd consumer when it polls.
+        ConsumerRecords<byte[], byte[]> records2 = 
shareConsumer2.poll(Duration.ofMillis(5000));
+        assertEquals(1, records2.count());
+        assertEquals(2L, records2.iterator().next().offset());
+
+        assertFalse(partitionExceptionMap1.containsKey(tp));
+        // The callback will receive the acknowledgement responses 
asynchronously after the next poll.
+        shareConsumer1.poll(Duration.ofMillis(500));
+
+        shareConsumer1.close();
+        shareConsumer2.close();
+        producer.close();
+
+        assertTrue(partitionExceptionMap1.containsKey(tp));
+        assertNull(partitionExceptionMap1.get(tp));
+    }
+
+    @Test
+    public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
+        ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record3 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record1);
+        producer.send(record2);
+        producer.send(record3);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(3, records.count());
+        Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
+
+        // Acknowledging 2 out of the 3 records received via commitAsync.
+        ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+        ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+        assertEquals(0L, firstRecord.offset());
+        assertEquals(1L, secondRecord.offset());
+
+        shareConsumer1.acknowledge(firstRecord);
+        shareConsumer1.acknowledge(secondRecord);
+        shareConsumer1.commitAsync();
+
+        // The 3rd record should be re-presented to the consumer when it polls 
again.
+        records = shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        iterator = records.iterator();
+        firstRecord = iterator.next();
+        assertEquals(2L, firstRecord.offset());
+
+        // And poll again without acknowledging - the callback will receive 
the acknowledgement responses too
+        records = shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        iterator = records.iterator();
+        firstRecord = iterator.next();
+        assertEquals(2L, firstRecord.offset());
+
+        shareConsumer1.acknowledge(firstRecord);
+
+        // The callback will receive the acknowledgement responses after 
polling. The callback is
+        // called on entry to the poll method or during close. The commit is 
being performed asynchronously, so
+        // we can only rely on the completion once the consumer has closed 
because that waits for the response.
+        shareConsumer1.poll(Duration.ofMillis(5000));
+
+        shareConsumer1.close();
+        producer.close();
+
+        assertTrue(partitionExceptionMap.containsKey(tp));
+        assertNull(partitionExceptionMap.get(tp));
+    }
+
+    @Test
+    public void testExplicitAcknowledgeReleasePollAccept() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(0, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgeReleaseAccept() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+        records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+        records = shareConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgeReleaseClose() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+        shareConsumer.close();
+        producer.close();
+    }
+
+
+    @Test
+    public void testExplicitAcknowledgeThrowsNotInBatch() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        ConsumerRecord<byte[], byte[]> consumedRecord = 
records.records(tp).get(0);
+        shareConsumer.acknowledge(consumedRecord);
+        records = shareConsumer.poll(Duration.ofMillis(5000));

Review Comment:
   And shorter here too.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -0,0 +1,1651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.test.api;
+
+import kafka.api.BaseConsumerTest;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Timeout(1200)
+@Tag("integration")
+@SuppressWarnings("deprecation")
+public class ShareConsumerTest {
+    private KafkaClusterTestKit cluster;
+    private final TopicPartition tp = new TopicPartition("topic", 0);
+    private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
+
+    @BeforeEach
+    public void createCluster() throws Exception {
+        cluster = new KafkaClusterTestKit.Builder(
+                new TestKitNodes.Builder()
+                        .setNumBrokerNodes(1)
+                        .setNumControllerNodes(1)
+                        .build())
+                .setConfigProp("auto.create.topics.enable", "false")
+                .setConfigProp("group.coordinator.rebalance.protocols", 
"classic,consumer,share")
+                .setConfigProp("group.share.enable", "true")
+                .setConfigProp("group.share.partition.max.record.locks", 
"10000")
+                .setConfigProp("group.share.persister.class.name", 
"org.apache.kafka.server.group.share.NoOpShareStatePersister")
+                .setConfigProp("group.share.record.lock.duration.ms", "15000")
+                .setConfigProp("offsets.topic.replication.factor", "1")
+                .setConfigProp("share.coordinator.state.topic.min.isr", "1")
+                
.setConfigProp("share.coordinator.state.topic.replication.factor", "1")
+                .setConfigProp("transaction.state.log.min.isr", "1")
+                .setConfigProp("transaction.state.log.replication.factor", "1")
+                .setConfigProp("unstable.api.versions.enable", "true")
+                .build();
+        cluster.format();
+        cluster.startup();
+        cluster.waitForActiveController();
+        cluster.waitForReadyBrokers();
+        createTopic("topic");
+        warmup();
+    }
+
+    @AfterEach
+    public void destroyCluster() throws Exception {
+        cluster.close();
+    }
+
+    @Test
+    public void testPollNoSubscribeFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+    }
+
+    @Test
+    public void testSubscribeAndPollNoRecords() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribePollUnsubscribe() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.unsubscribe();
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribePollSubscribe() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        records = shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribeUnsubscribePollFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.unsubscribe();
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscribeSubscribeEmptyPollFails() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        Set<String> subscription = Collections.singleton(tp.topic());
+        shareConsumer.subscribe(subscription);
+        assertEquals(subscription, shareConsumer.subscription());
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
+        shareConsumer.subscribe(Collections.emptySet());
+        assertEquals(Collections.emptySet(), shareConsumer.subscription());
+        // "Consumer is not subscribed to any topics."
+        assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
+        shareConsumer.close();
+        assertEquals(0, records.count());
+    }
+
+    @Test
+    public void testSubscriptionAndPoll() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testSubscriptionAndPollMultiple() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
+        // We get back the acknowledgment error code after the second poll.
+        // When we start the 3rd poll, the acknowledgment commit callback is 
invoked.
+        shareConsumer.poll(Duration.ofMillis(5000));
+        shareConsumer.poll(Duration.ofMillis(5000));
+
+        // We expect null exception as the acknowledgment error code is null.
+        assertTrue(partitionExceptionMap.containsKey(tp));
+        assertNull(partitionExceptionMap.get(tp));
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testAcknowledgementCommitCallbackOnClose() {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+
+        // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
+        // We get back the acknowledgement error code asynchronously after the 
second poll.
+        // The acknowledgement commit callback is invoked in close.
+        shareConsumer.poll(Duration.ofMillis(5000));
+        shareConsumer.close();
+
+        // We expect null exception as the acknowledgment error code is null.
+        assertTrue(partitionExceptionMap.containsKey(tp));
+        assertNull(partitionExceptionMap.get(tp));
+        producer.close();
+    }
+
+    @Test
+    public void testAcknowledgementCommitCallbackInvalidRecordStateException() 
throws Exception {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+
+        // Waiting until the acquisition lock expires.
+        Thread.sleep(15000);
+
+        // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
+        // We get back the acknowledgment error code after the second poll.
+        // When we start the 3rd poll, the acknowledgment commit callback is 
invoked.
+        records = shareConsumer.poll(Duration.ofMillis(200));
+        assertEquals(1, records.count());
+
+        records = shareConsumer.poll(Duration.ofMillis(200));
+        assertEquals(0, records.count());
+
+        // As we tried to acknowledge a record after the acquisition lock 
expired,
+        // we wil get an InvalidRecordStateException.
+        assertInstanceOf(InvalidRecordStateException.class, 
partitionExceptionMap.get(tp));
+        shareConsumer.close();
+        producer.close();
+    }
+
+    private static class TestableAcknowledgeCommitCallback implements 
AcknowledgementCommitCallback {
+        private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
+        private final Map<TopicPartition, Exception> partitionExceptionMap;
+
+        public TestableAcknowledgeCommitCallback(Map<TopicPartition, 
Set<Long>> partitionOffsetsMap,
+                                                 Map<TopicPartition, 
Exception> partitionExceptionMap) {
+            this.partitionOffsetsMap = partitionOffsetsMap;
+            this.partitionExceptionMap = partitionExceptionMap;
+        }
+
+        @Override
+        public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, 
Exception exception) {
+            offsetsMap.forEach((partition, offsets) -> {
+                partitionOffsetsMap.merge(partition.topicPartition(), offsets, 
(oldOffsets, newOffsets) -> {
+                    Set<Long> mergedOffsets = new HashSet<>();
+                    mergedOffsets.addAll(oldOffsets);
+                    mergedOffsets.addAll(newOffsets);
+                    return mergedOffsets;
+                });
+                if 
(!partitionExceptionMap.containsKey(partition.topicPartition())) {
+                    partitionExceptionMap.put(partition.topicPartition(), 
exception);
+                }
+            });
+        }
+    }
+
+    @Test
+    public void testHeaders() {
+        int numRecords = 1;
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        record.headers().add("headerKey", "headerValue".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
+        assertEquals(numRecords, records.size());
+
+        for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+            Header header = consumerRecord.headers().lastHeader("headerKey");
+            if (header != null)
+                assertEquals("headerValue", new String(header.value()));
+        }
+        shareConsumer.close();
+        producer.close();
+    }
+
+    private void testHeadersSerializeDeserialize(Serializer<byte[]> 
serializer, Deserializer<byte[]> deserializer) {
+        int numRecords = 1;
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), serializer);
+        producer.send(record);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
+        assertEquals(numRecords, records.size());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testHeadersSerializerDeserializer() {
+        testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), 
new BaseConsumerTest.DeserializerImpl());
+    }
+
+    @Test
+    public void testMaxPollRecords() {
+        int maxPollRecords = 2;
+        int numRecords = 10000;
+
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        long startingTimestamp = System.currentTimeMillis();
+        produceMessagesWithTimestamp(numRecords, startingTimestamp);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
+                "group1", 
Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
String.valueOf(maxPollRecords)));
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
+        long i = 0L;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(tp.topic(), record.topic());
+            assertEquals(tp.partition(), record.partition());
+            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+            assertEquals(startingTimestamp + i, record.timestamp());
+            assertEquals("key " + i, new String(record.key()));
+            assertEquals("value " + i, new String(record.value()));
+            // this is true only because K and V are byte arrays
+            assertEquals(("key " + i).length(), record.serializedKeySize());
+            assertEquals(("value " + i).length(), 
record.serializedValueSize());
+
+            i++;
+        }
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testControlRecordsSkipped() throws Exception {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+
+        KafkaProducer<byte[], byte[]> transactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1");
+        transactionalProducer.initTransactions();
+        transactionalProducer.beginTransaction();
+        RecordMetadata transactional1 = 
transactionalProducer.send(record).get();
+
+        KafkaProducer<byte[], byte[]> nonTransactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer());
+        RecordMetadata nonTransactional1 = 
nonTransactionalProducer.send(record).get();
+
+        transactionalProducer.commitTransaction();
+
+        transactionalProducer.beginTransaction();
+        RecordMetadata transactional2 = 
transactionalProducer.send(record).get();
+        transactionalProducer.abortTransaction();
+
+        RecordMetadata nonTransactional2 = 
nonTransactionalProducer.send(record).get();
+
+        transactionalProducer.close();
+        nonTransactionalProducer.close();
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(4, records.count());
+        assertEquals(transactional1.offset(), 
records.records(tp).get(0).offset());
+        assertEquals(nonTransactional1.offset(), 
records.records(tp).get(1).offset());
+        assertEquals(transactional2.offset(), 
records.records(tp).get(2).offset());
+        assertEquals(nonTransactional2.offset(), 
records.records(tp).get(3).offset());
+
+        // There will be control records on the topic-partition, so the 
offsets of the non-control records
+        // are not 0, 1, 2, 3. Just assert that the offset of the final one is 
not 3.
+        assertNotEquals(3, nonTransactional2.offset());
+
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(0, records.count());
+        shareConsumer.close();
+        transactionalProducer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgeSuccess() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(shareConsumer::acknowledge);
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgeCommitSuccess() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(shareConsumer::acknowledge);
+        producer.send(record);
+        Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+        assertEquals(1, result.size());
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testExplicitAcknowledgementCommitAsync() throws 
InterruptedException {
+        ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record3 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record1);
+        producer.send(record2);
+        producer.send(record3);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+        shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
+        shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(3, records.count());
+        Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
+
+        // Acknowledging 2 out of the 3 records received via commitAsync.
+        ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+        ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+        assertEquals(0L, firstRecord.offset());
+        assertEquals(1L, secondRecord.offset());
+
+        shareConsumer1.acknowledge(firstRecord);
+        shareConsumer1.acknowledge(secondRecord);
+        shareConsumer1.commitAsync();
+
+        // Allowing acquisition lock timeout to expire.
+        Thread.sleep(15000);
+
+        // The 3rd record should be reassigned to 2nd consumer when it polls.
+        ConsumerRecords<byte[], byte[]> records2 = 
shareConsumer2.poll(Duration.ofMillis(5000));
+        assertEquals(1, records2.count());
+        assertEquals(2L, records2.iterator().next().offset());
+
+        assertFalse(partitionExceptionMap1.containsKey(tp));
+        // The callback will receive the acknowledgement responses 
asynchronously after the next poll.
+        shareConsumer1.poll(Duration.ofMillis(500));
+
+        shareConsumer1.close();
+        shareConsumer2.close();
+        producer.close();
+
+        assertTrue(partitionExceptionMap1.containsKey(tp));
+        assertNull(partitionExceptionMap1.get(tp));
+    }
+
+    @Test
+    public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
+        ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        ProducerRecord<byte[], byte[]> record3 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record1);
+        producer.send(record2);
+        producer.send(record3);
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+        shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(3, records.count());
+        Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
+
+        // Acknowledging 2 out of the 3 records received via commitAsync.
+        ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+        ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+        assertEquals(0L, firstRecord.offset());
+        assertEquals(1L, secondRecord.offset());
+
+        shareConsumer1.acknowledge(firstRecord);
+        shareConsumer1.acknowledge(secondRecord);
+        shareConsumer1.commitAsync();
+
+        // The 3rd record should be re-presented to the consumer when it polls 
again.
+        records = shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        iterator = records.iterator();
+        firstRecord = iterator.next();
+        assertEquals(2L, firstRecord.offset());
+
+        // And poll again without acknowledging - the callback will receive 
the acknowledgement responses too
+        records = shareConsumer1.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        iterator = records.iterator();
+        firstRecord = iterator.next();
+        assertEquals(2L, firstRecord.offset());
+
+        shareConsumer1.acknowledge(firstRecord);
+
+        // The callback will receive the acknowledgement responses after 
polling. The callback is
+        // called on entry to the poll method or during close. The commit is 
being performed asynchronously, so
+        // we can only rely on the completion once the consumer has closed 
because that waits for the response.
+        shareConsumer1.poll(Duration.ofMillis(5000));
+
+        shareConsumer1.close();
+        producer.close();
+
+        assertTrue(partitionExceptionMap.containsKey(tp));
+        assertNull(partitionExceptionMap.get(tp));
+    }
+
+    @Test
+    public void testExplicitAcknowledgeReleasePollAccept() {
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+        records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+        records = shareConsumer.poll(Duration.ofMillis(5000));

Review Comment:
   This wait can definitely be shorter. Maybe 500.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to