This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0203dc0fc9 KAFKA-20322: Add API version discovery true for 
transaction manager (#21782)
a0203dc0fc9 is described below

commit a0203dc0fc9be294ef3b4ba9d9b6a1115efe9794
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Mar 18 08:03:28 2026 -0700

    KAFKA-20322: Add API version discovery true for transaction manager (#21782)
    
    
[KIP-1228](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1228%3A+Add+Transaction+Version+to+WriteTxnMarkersRequest)
    added a WriteTxnMarkersRequest v2 with a TransactionVersion field.
    However, TransactionMarkerChannelManager creates its NetworkClient with
    discoverBrokerVersions=false, which disables API version negotiation
    with peer brokers. Without version discovery, the ApiVersions cache is
    never populated — apiVersions.get(nodeId) returns null in
    NetworkClient.doSend(), causing it to fall through to
    builder.latestAllowedVersion() which blindly uses the highest version
    the sending broker knows about rather than negotiating a mutually
    supported version.  In this fix we enable discovery and also fix the
    system test that could've caught this issue earlier. The system test was
    run locally and I verified that it failed without the fix and passed
    with the fix.
    
    Reviewers: Artem Livshits <[email protected]>, David Jacot
     <[email protected]>
---
 .../TransactionMarkerChannelManager.scala          |  2 +-
 .../TransactionMarkerChannelManagerTest.scala      | 34 ++++++++++++++++--
 .../tests/core/transactions_mixed_versions_test.py | 40 ++++++++++++++++------
 3 files changed, 62 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 21d2876d824..91daeedf815 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -95,7 +95,7 @@ object TransactionMarkerChannelManager {
       config.connectionSetupTimeoutMs,
       config.connectionSetupTimeoutMaxMs,
       time,
-      false,
+      true,
       new ApiVersions,
       logContext,
       MetadataRecoveryStrategy.NONE
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 3a50faffc0d..926c5d87d5c 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -25,13 +25,14 @@ import org.apache.kafka.clients.{ClientResponse, 
NetworkClient}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.internal.RecordBatch
 import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, 
WriteTxnMarkersRequest, WriteTxnMarkersResponse}
-import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.utils.{LogContext, MockTime}
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.coordinator.transaction.{TransactionMetadata, 
TransactionState}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
-import org.apache.kafka.server.util.RequestAndCompletionHandler
+import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
@@ -627,6 +628,35 @@ class TransactionMarkerChannelManagerTest {
     })
   }
 
+  @Test
+  def shouldEnableApiVersionDiscoveryInFactoryMethod(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1))
+    val metrics = new Metrics()
+    val logContext = new LogContext()
+    try {
+      val channelManager = TransactionMarkerChannelManager(
+        config,
+        metrics,
+        metadataCache,
+        txnStateManager,
+        time,
+        logContext
+      )
+      try {
+        val field = 
classOf[InterBrokerSendThread].getDeclaredField("networkClient")
+        field.setAccessible(true)
+        val client = field.get(channelManager).asInstanceOf[NetworkClient]
+        assertTrue(client.discoverBrokerVersions(),
+          "TransactionMarkerChannelManager should enable API version discovery 
to " +
+            "ensure compatibility during rolling upgrades")
+      } finally {
+        channelManager.shutdown()
+      }
+    } finally {
+      metrics.close()
+    }
+  }
+
   /**
    * Adjusts the transaction metadata based on the transaction version.
    * When transaction V2 is enabled, the producer epoch is incremented
diff --git a/tests/kafkatest/tests/core/transactions_mixed_versions_test.py 
b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
index 15144682b33..ed98dffd169 100644
--- a/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
+++ b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
@@ -22,7 +22,7 @@ from kafkatest.utils import is_int
 from kafkatest.utils.transactions_utils import create_and_start_copiers
 from kafkatest.version import LATEST_3_3, LATEST_3_4, LATEST_3_5, \
     LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, LATEST_4_0, \
-    LATEST_4_1, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
+    LATEST_4_1, LATEST_4_2, DEV_BRANCH, KafkaVersion, 
LATEST_STABLE_METADATA_VERSION
 
 from ducktape.tests.test import Test
 from ducktape.mark import matrix
@@ -147,14 +147,25 @@ class TransactionsMixedVersionsTest(Test):
 
         return self.drain_consumer(concurrent_consumer, num_messages_to_copy)
 
-    def setup_topics(self):
-        assignment = ":".join(map(str, [self.kafka.idx(node) for node in 
self.kafka.nodes]))
-        transaction_assignment = ",".join(map(str, [assignment[::-1]] * 50))
+    def setup_topics(self, txn_state_leader_node, data_leader_node):
+        all_node_ids = [self.kafka.idx(node) for node in self.kafka.nodes]
+
+        # Data topics: data_leader_node first
+        data_leader_id = self.kafka.idx(data_leader_node)
+        data_replicas = [data_leader_id] + [nid for nid in all_node_ids if nid 
!= data_leader_id]
+        data_assignment = ":".join(map(str, data_replicas))
+
+        # Internal topics: txn_state_leader_node first
+        txn_leader_id = self.kafka.idx(txn_state_leader_node)
+        txn_replicas = [txn_leader_id] + [nid for nid in all_node_ids if nid 
!= txn_leader_id]
+        txn_assignment = ":".join(map(str, txn_replicas))
+        internal_topic_assignment = ",".join([txn_assignment] * 50)
+
         self.kafka.topics = {
             self.input_topic: {
                 "partitions": self.num_input_partitions,
                 "replication-factor": self.replication_factor,
-                "replica-assignment": assignment,
+                "replica-assignment": data_assignment,
                 "configs": {
                     "min.insync.replicas": 2
                 }
@@ -162,7 +173,7 @@ class TransactionsMixedVersionsTest(Test):
             self.output_topic: {
                 "partitions": self.num_output_partitions,
                 "replication-factor": self.replication_factor,
-                "replica-assignment": assignment,
+                "replica-assignment": data_assignment,
                 "configs": {
                     "min.insync.replicas": 2
                 }
@@ -170,7 +181,7 @@ class TransactionsMixedVersionsTest(Test):
             "__transaction_state": {
                 "partitions": 50,
                 "replication-factor": self.replication_factor,
-                "replica-assignment": transaction_assignment,
+                "replica-assignment": internal_topic_assignment,
                 "configs": {
                     "min.insync.replicas": 2
                 }
@@ -179,11 +190,12 @@ class TransactionsMixedVersionsTest(Test):
 
     @cluster(num_nodes=8)
     @matrix(
-        old_kafka_version=[str(LATEST_4_1), str(LATEST_4_0), str(LATEST_3_9), 
str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), 
str(LATEST_3_4), str(LATEST_3_3)],
+        old_kafka_version=[str(LATEST_4_2), str(LATEST_4_1), str(LATEST_4_0), 
str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), 
str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3)],
         metadata_quorum=[isolated_kraft],
-        group_protocol=[None]
+        group_protocol=[None],
+        coordinator_on_new_broker=[True, False]
     )
-    def test_transactions_mixed_versions(self, old_kafka_version, 
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
+    def test_transactions_mixed_versions(self, old_kafka_version, 
metadata_quorum=quorum.isolated_kraft, group_protocol=None, 
coordinator_on_new_broker=True):
         oldKafkaVersion = KafkaVersion(old_kafka_version)
         self.kafka = KafkaService(self.test_context,
                                   num_nodes=self.num_brokers,
@@ -193,6 +205,9 @@ class TransactionsMixedVersionsTest(Test):
 
         self.kafka.nodes[0].version = DEV_BRANCH
 
+        new_node = self.kafka.nodes[0]  # DEV_BRANCH
+        old_node = self.kafka.nodes[1]  # old version
+
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
@@ -202,7 +217,10 @@ class TransactionsMixedVersionsTest(Test):
 
         self.kafka.log_level = "DEBUG"
 
-        self.setup_topics()
+        if coordinator_on_new_broker:
+            self.setup_topics(txn_state_leader_node=new_node, 
data_leader_node=old_node)
+        else:
+            self.setup_topics(txn_state_leader_node=old_node, 
data_leader_node=new_node)
         self.kafka.start()
 
         input_messages = self.seed_messages(self.input_topic, 
self.num_seed_messages)

Reply via email to