adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2032529508
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception
{
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest
+ public void testReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 8);
+ // 5th and 10th message transaction was aborted, hence they won't
be included in the fetched records.
+ assertEquals(8, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ if (messageCounter % 5 == 0)
+ messageCounter++;
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testReadUncommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ // Even though 5th and 10th message transaction was aborted, they
will be included in the fetched records since IsolationLevel is
READ_UNCOMMITTED.
+ assertEquals(10, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 2", new String(record.value()));
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ records = waitedPoll(shareConsumer, 2500L, 2);
+ // Message 3 and Message 4 would be returned by this poll.
+ assertEquals(2, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> recordIterator =
records.iterator();
+ record = recordIterator.next();
+ assertEquals("Message 3", new String(record.value()));
+ record = recordIterator.next();
+ assertEquals("Message 4", new String(record.value()));
+ // We will make Message 3 and Message 4 available for
re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // We are altering IsolationLevel to READ_COMMITTED now. We
will only read committed transactions now.
+ alterShareIsolationLevel("group1", "read_committed");
+
+ // Fifth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
5");
+ // Sixth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 6");
+ // Seventh transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 7");
+ // Eighth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
8");
+
+ // Since isolation level is READ_COMMITTED, we can consume
Message 3 (committed transaction that was released), Message 5 and Message 8.
+ // We cannot consume Message 4 (aborted transaction that was
released), Message 6 and Message 7 since they were aborted.
+ List<String> messages = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ if (pollRecords.count() > 0) {
+ for (ConsumerRecord<byte[], byte[]> pollRecord :
pollRecords)
+ messages.add(new String(pollRecord.value()));
+ pollRecords.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ }
+ return messages.size() == 3;
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
+
+ assertEquals("Message 3", messages.get(0));
+ assertEquals("Message 5", messages.get(1));
+ assertEquals("Message 8", messages.get(2));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ // We will not receive any records since the transaction was
aborted.
+ records = waitedPoll(shareConsumer, 2500L, 0);
Review Comment:
ys, in this implementation it did. I have changed the testing implementation
by setting an acknowledgement commit callback to verify that aborted marker
offset for Message 2 (3L) is fetched and acknowledged by the consumer. I have
used `TestUtils.waitForCondition()` and done a poll for `500ms` for the same.
This will verify that transaction corresponding to message 4 is fetched by the
consumer and also takes much less time now.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception
{
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest
+ public void testReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 8);
+ // 5th and 10th message transaction was aborted, hence they won't
be included in the fetched records.
+ assertEquals(8, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ if (messageCounter % 5 == 0)
+ messageCounter++;
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testReadUncommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ // Even though 5th and 10th message transaction was aborted, they
will be included in the fetched records since IsolationLevel is
READ_UNCOMMITTED.
+ assertEquals(10, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 2", new String(record.value()));
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ records = waitedPoll(shareConsumer, 2500L, 2);
+ // Message 3 and Message 4 would be returned by this poll.
+ assertEquals(2, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> recordIterator =
records.iterator();
+ record = recordIterator.next();
+ assertEquals("Message 3", new String(record.value()));
+ record = recordIterator.next();
+ assertEquals("Message 4", new String(record.value()));
+ // We will make Message 3 and Message 4 available for
re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // We are altering IsolationLevel to READ_COMMITTED now. We
will only read committed transactions now.
+ alterShareIsolationLevel("group1", "read_committed");
+
+ // Fifth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
5");
+ // Sixth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 6");
+ // Seventh transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 7");
+ // Eighth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
8");
+
+ // Since isolation level is READ_COMMITTED, we can consume
Message 3 (committed transaction that was released), Message 5 and Message 8.
+ // We cannot consume Message 4 (aborted transaction that was
released), Message 6 and Message 7 since they were aborted.
+ List<String> messages = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ if (pollRecords.count() > 0) {
+ for (ConsumerRecord<byte[], byte[]> pollRecord :
pollRecords)
+ messages.add(new String(pollRecord.value()));
+ pollRecords.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ }
+ return messages.size() == 3;
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
+
+ assertEquals("Message 3", messages.get(0));
+ assertEquals("Message 5", messages.get(1));
+ assertEquals("Message 8", messages.get(2));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ // We will not receive any records since the transaction was
aborted.
+ records = waitedPoll(shareConsumer, 2500L, 0);
Review Comment:
yes, in this implementation it did. I have changed the testing
implementation by setting an acknowledgement commit callback to verify that
aborted marker offset for Message 2 (3L) is fetched and acknowledged by the
consumer. I have used `TestUtils.waitForCondition()` and done a poll for
`500ms` for the same. This will verify that transaction corresponding to
message 4 is fetched by the consumer and also takes much less time now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]