jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1525559114
##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala:
##########
@@ -177,6 +178,86 @@ class TransactionMarkerChannelManagerTest {
any())
}
+ @Test
+ def shouldNotLoseTxnCompletionAfterLoad(): Unit = {
+ mockCache()
+
+ val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds())
+
+ when(metadataCache.getPartitionLeaderEndpoint(
+ ArgumentMatchers.eq(partition1.topic),
+ ArgumentMatchers.eq(partition1.partition),
+ any())
+ ).thenReturn(Some(broker1))
+
+ // Build a successful client response.
+ val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
+ val successfulResponse = new WriteTxnMarkersResponse(
+ Collections.singletonMap(producerId2: java.lang.Long,
Collections.singletonMap(partition1, Errors.NONE)))
+ val successfulClientResponse = new ClientResponse(header, null, null,
+ time.milliseconds(), time.milliseconds(), false, null, null,
+ successfulResponse)
+
+ // Build a disconnected client response.
+ val disconnectedClientResponse = new ClientResponse(header, null, null,
+ time.milliseconds(), time.milliseconds(), true, null, null,
+ null)
+
+ // Test matrix to cover various scenarios:
+ val clientResponses = Seq(successfulClientResponse,
disconnectedClientResponse)
+ val getTransactionStateResponses = Seq(
+ // NOT_COORDINATOR error case
+ Left(Errors.NOT_COORDINATOR),
+ // COORDINATOR_LOAD_IN_PROGRESS
+ Left(Errors.COORDINATOR_LOAD_IN_PROGRESS),
+ // "Newly loaded" transaction state with the new epoch.
+ Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2,
txnMetadata2)))
+ )
+
+ clientResponses.foreach { clientResponse =>
+ getTransactionStateResponses.foreach { getTransactionStateResponse =>
+ // Reset data from previous iteration.
+ txnMetadata2.topicPartitions.add(partition1)
+ clearInvocations(txnStateManager)
+ // Send out markers for a transaction before load.
+ channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult,
+ txnMetadata2, expectedTransition)
+
+ // Drain the marker to make it "in-flight".
+ val requests1 = channelManager.generateRequests().asScala
+ assertEquals(1, requests1.size)
+
+ // Simulate a partition load:
Review Comment:
To confirm my understanding, this test simulates sending out marker requests
with various responses. We want to send out the requests and then unload and
load the pending markers so that we bump the epoch and send out the requests
again. This test ensures that when we do these operations, we still correctly
complete the transaction when we get a successful response after reloading.
Is there ever a case of `getTransactionStateResponse` where we have the old
epoch? I noticed we have the case with the new epoch.
Also in the case with the new epoch, do we want to confirm the transaction
doesn't complete?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]