junrao commented on code in PR #16893:
URL: https://github.com/apache/kafka/pull/16893#discussion_r1720079897


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -261,7 +257,9 @@ class TransactionMarkerChannelManager(
     }.filter { case (_, entries) => !entries.isEmpty }.map { case (node, 
entries) =>
       val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava
       val requestCompletionHandler = new 
TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, 
entries)
-      val request = new 
WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend)
+      val request = new WriteTxnMarkersRequest.Builder(
+        config.interBrokerProtocolVersion.writeTxnMarkersRequestVersion(), 
markersToSend

Review Comment:
   It seems that we should get the MV from `metadataCache.metadataVersion() 
`instead of `config.interBrokerProtocolVersion` since MV can be set 
dynamically. @jolshan : What do you think?



##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -466,6 +467,15 @@ public void assertLatestProductionIsLessThanLatest() {
             " to be less than the latest of " + 
MetadataVersion.latestTesting());
     }
 
+    @Test
+    public void testLastMetadataProductionDontReturnUnstableVersion() {

Review Comment:
   Perhaps add a comment like the following.
   
   `The broker picks the version for a few inter broker RPCs based on the 
metadata version, instead of the supported version from ApiResponse. We need to 
make sure that the latest production MV doesn't accidentally depend on an 
unstable request version.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to