artemlivshits commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1524022035
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
}
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
// keep track of the requests per txn topic partition so we can easily clear
the queue
// during partition emigration
- private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int,
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+ private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int,
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
- def removeMarkersForTxnTopicPartition(partition: Int):
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+ def removeMarkersForTxnTopicPartition(partition: Int):
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
markersPerTxnTopicPartition.remove(partition)
}
- def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry):
Unit = {
- val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition,
txnTopicPartition,
- new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
- queue.add(txnIdAndMarker)
+ def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker:
PendingCompleteTxnAndMarkerEntry): Unit = {
+ val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition,
txnTopicPartition, {
+ info(s"Creating new marker queue for txn partition $txnTopicPartition to
destination broker ${destination.id}")
+ new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]()
+ })
+ queue.add(pendingCompleteTxnAndMarker)
+
+ if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) {
+ // This could happen if the queue got removed concurrently.
Review Comment:
As far as I can see, it shouldn't affect the user visible behavior. It does
create an interesting state when the queue is removed in
removeMarkersForTxnTopicPartition -- we could have:
1. [addMarkers] Retrieve queue.
2. [removeMarkersForTxnTopicPartition] Remove queue.
3. [removeMarkersForTxnTopicPartition] Iterate over queue, but not
removeMarkersForTxn because queue is empty.
4. [addMarkers] Add markers to the queue.
Now we've effectively removed the markers while
transactionsWithPendingMarkers has an entry.
This state could last for a while if the removal happened on unload (and
technically the txn id could expire or etc. so this state may stay indefinitely
until broker restart), but as soon as real workflow happens on this txn id that
sends out markers, the proper entry will be created and the actual
functionality will work as expected.
In other words, this race can lead to an orphan entry in
transactionsWithPendingMarkers, but it doesn't affect anything (other than
leaking a small amount of memory) until the markers are sent, and sending
markers will fix it.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala:
##########
@@ -90,9 +90,10 @@ class TransactionMarkerRequestCompletionHandler(brokerId:
Int,
val writeTxnMarkerResponse =
response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
val responseErrors = writeTxnMarkerResponse.errorsByProducerId
- for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
Review Comment:
I agree this code could benefit from some refactor, we should probably
structure it so that instead of branching on wasDisconnected at the top, it
should just iterate over the pending entries and check wasDisconnected in
specific cases. But I think it should be done separately, as this change is
fairly mechanical, that simplifies the review of what it does.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
}
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
// keep track of the requests per txn topic partition so we can easily clear
the queue
// during partition emigration
- private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int,
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+ private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int,
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
- def removeMarkersForTxnTopicPartition(partition: Int):
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+ def removeMarkersForTxnTopicPartition(partition: Int):
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
markersPerTxnTopicPartition.remove(partition)
}
- def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry):
Unit = {
- val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition,
txnTopicPartition,
- new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
- queue.add(txnIdAndMarker)
+ def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker:
PendingCompleteTxnAndMarkerEntry): Unit = {
+ val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition,
txnTopicPartition, {
+ info(s"Creating new marker queue for txn partition $txnTopicPartition to
destination broker ${destination.id}")
Review Comment:
That's correct, this is logged when the value is created, a more precise
logic would be to make atomicGetOrUpdate return an indication if the value is
actually created (then we'd log it exactly once), but I didn't think it would
be worth the complexity -- at worst we'd get a couple logs at the same time and
one of them would create the queue. I'll add a comment.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -419,25 +432,34 @@ class TransactionMarkerChannelManager(
def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach
{ queue =>
- for (entry: TxnIdAndMarkerEntry <- queue.asScala)
- removeMarkersForTxnId(entry.txnId)
+ for (entry <- queue.asScala) {
Review Comment:
In cases I investigated it was a couple dozens or so, but I don't have the
precise stats from a large selection of cases. We log messages on every retry
of failed marker send, I think this message would be much less spammy than that
(these would happen only when partitions are changed).
The disadvantage of having a single log is that it'll get eventually
truncated, but these logs can help to see interesting transitions that could
help to investigate race conditions related to load / unload.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]