artemlivshits commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1526939166
##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala:
##########
@@ -177,6 +178,86 @@ class TransactionMarkerChannelManagerTest {
any())
}
+ @Test
+ def shouldNotLoseTxnCompletionAfterLoad(): Unit = {
+ mockCache()
+
+ val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds())
+
+ when(metadataCache.getPartitionLeaderEndpoint(
+ ArgumentMatchers.eq(partition1.topic),
+ ArgumentMatchers.eq(partition1.partition),
+ any())
+ ).thenReturn(Some(broker1))
+
+ // Build a successful client response.
+ val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
+ val successfulResponse = new WriteTxnMarkersResponse(
+ Collections.singletonMap(producerId2: java.lang.Long,
Collections.singletonMap(partition1, Errors.NONE)))
+ val successfulClientResponse = new ClientResponse(header, null, null,
+ time.milliseconds(), time.milliseconds(), false, null, null,
+ successfulResponse)
+
+ // Build a disconnected client response.
+ val disconnectedClientResponse = new ClientResponse(header, null, null,
+ time.milliseconds(), time.milliseconds(), true, null, null,
+ null)
+
+ // Test matrix to cover various scenarios:
+ val clientResponses = Seq(successfulClientResponse,
disconnectedClientResponse)
+ val getTransactionStateResponses = Seq(
+ // NOT_COORDINATOR error case
+ Left(Errors.NOT_COORDINATOR),
+ // COORDINATOR_LOAD_IN_PROGRESS
+ Left(Errors.COORDINATOR_LOAD_IN_PROGRESS),
+ // "Newly loaded" transaction state with the new epoch.
+ Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2,
txnMetadata2)))
+ )
+
+ clientResponses.foreach { clientResponse =>
+ getTransactionStateResponses.foreach { getTransactionStateResponse =>
+ // Reset data from previous iteration.
+ txnMetadata2.topicPartitions.add(partition1)
+ clearInvocations(txnStateManager)
+ // Send out markers for a transaction before load.
+ channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult,
+ txnMetadata2, expectedTransition)
+
+ // Drain the marker to make it "in-flight".
+ val requests1 = channelManager.generateRequests().asScala
+ assertEquals(1, requests1.size)
+
+ // Simulate a partition load:
Review Comment:
It covers cases in transaction marker request completion handler where we
call txnMarkerChannelManager.removeMarkersForTxn (I literally put breakpoints
on every line that calls txnMarkerChannelManager.removeMarkersForTxn and ran
the test removing breakpoint as it gets hit so at the end of the test there
were no breakpoints).
The idea is that previously, there were cases (some more likely, some very
unlikely) when a "zombie" reply that was issued before load could
unconditionally delete state that was created by newly sent markers that that
would lead to transaction being "stuck", now all those cases would leave the
state alone so the new transaction completes.
> Is there ever a case of getTransactionStateResponse where we have the old
epoch?
in that case we won't try to unconditionally delete
txnMarkerChannelManager.removeMarkersForTxn so we won't hit this particular
anomaly. The "zombie" reply will complete the transactions, and the reply for
the send that happened to be after the load would be a no-op.
We may hit a different anomaly, but assuming that we prevent loading
transaction partition with the same coordinator epoch, we should never
encounter that case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]