This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 8bc7a2d1aca KAFKA-20322: Add API version discovery true for
transaction manager (#21782)
8bc7a2d1aca is described below
commit 8bc7a2d1acaadd6a9f4271ba432c3ace309b4c2e
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Mar 18 08:03:28 2026 -0700
KAFKA-20322: Add API version discovery true for transaction manager (#21782)
[KIP-1228](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1228%3A+Add+Transaction+Version+to+WriteTxnMarkersRequest)
added a WriteTxnMarkersRequest v2 with a TransactionVersion field.
However, TransactionMarkerChannelManager creates its NetworkClient with
discoverBrokerVersions=false, which disables API version negotiation
with peer brokers. Without version discovery, the ApiVersions cache is
never populated — apiVersions.get(nodeId) returns null in
NetworkClient.doSend(), causing it to fall through to
builder.latestAllowedVersion() which blindly uses the highest version
the sending broker knows about rather than negotiating a mutually
supported version. In this fix we enable discovery and also fix the
system test that could've caught this issue earlier. The system test was
run locally and I verified that it failed without the fix and passed
with the fix.
Reviewers: Artem Livshits <[email protected]>, David Jacot
<[email protected]>
---
.../TransactionMarkerChannelManager.scala | 2 +-
.../TransactionMarkerChannelManagerTest.scala | 34 ++++++++++++++++--
.../tests/core/transactions_mixed_versions_test.py | 40 ++++++++++++++++------
3 files changed, 62 insertions(+), 14 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 21d2876d824..91daeedf815 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -95,7 +95,7 @@ object TransactionMarkerChannelManager {
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
time,
- false,
+ true,
new ApiVersions,
logContext,
MetadataRecoveryStrategy.NONE
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 56da07509eb..1c29de0f466 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -25,13 +25,14 @@ import org.apache.kafka.clients.{ClientResponse,
NetworkClient}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult,
WriteTxnMarkersRequest, WriteTxnMarkersResponse}
-import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.utils.{LogContext, MockTime}
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.coordinator.transaction.{TransactionMetadata,
TransactionState}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
-import org.apache.kafka.server.util.RequestAndCompletionHandler
+import org.apache.kafka.server.util.{InterBrokerSendThread,
RequestAndCompletionHandler}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@@ -627,6 +628,35 @@ class TransactionMarkerChannelManagerTest {
})
}
+ @Test
+ def shouldEnableApiVersionDiscoveryInFactoryMethod(): Unit = {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1))
+ val metrics = new Metrics()
+ val logContext = new LogContext()
+ try {
+ val channelManager = TransactionMarkerChannelManager(
+ config,
+ metrics,
+ metadataCache,
+ txnStateManager,
+ time,
+ logContext
+ )
+ try {
+ val field =
classOf[InterBrokerSendThread].getDeclaredField("networkClient")
+ field.setAccessible(true)
+ val client = field.get(channelManager).asInstanceOf[NetworkClient]
+ assertTrue(client.discoverBrokerVersions(),
+ "TransactionMarkerChannelManager should enable API version discovery
to " +
+ "ensure compatibility during rolling upgrades")
+ } finally {
+ channelManager.shutdown()
+ }
+ } finally {
+ metrics.close()
+ }
+ }
+
/**
* Adjusts the transaction metadata based on the transaction version.
* When transaction V2 is enabled, the producer epoch is incremented
diff --git a/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
index 15144682b33..ed98dffd169 100644
--- a/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
+++ b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
@@ -22,7 +22,7 @@ from kafkatest.utils import is_int
from kafkatest.utils.transactions_utils import create_and_start_copiers
from kafkatest.version import LATEST_3_3, LATEST_3_4, LATEST_3_5, \
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, LATEST_4_0, \
- LATEST_4_1, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
+ LATEST_4_1, LATEST_4_2, DEV_BRANCH, KafkaVersion,
LATEST_STABLE_METADATA_VERSION
from ducktape.tests.test import Test
from ducktape.mark import matrix
@@ -147,14 +147,25 @@ class TransactionsMixedVersionsTest(Test):
return self.drain_consumer(concurrent_consumer, num_messages_to_copy)
- def setup_topics(self):
- assignment = ":".join(map(str, [self.kafka.idx(node) for node in
self.kafka.nodes]))
- transaction_assignment = ",".join(map(str, [assignment[::-1]] * 50))
+ def setup_topics(self, txn_state_leader_node, data_leader_node):
+ all_node_ids = [self.kafka.idx(node) for node in self.kafka.nodes]
+
+ # Data topics: data_leader_node first
+ data_leader_id = self.kafka.idx(data_leader_node)
+ data_replicas = [data_leader_id] + [nid for nid in all_node_ids if nid
!= data_leader_id]
+ data_assignment = ":".join(map(str, data_replicas))
+
+ # Internal topics: txn_state_leader_node first
+ txn_leader_id = self.kafka.idx(txn_state_leader_node)
+ txn_replicas = [txn_leader_id] + [nid for nid in all_node_ids if nid
!= txn_leader_id]
+ txn_assignment = ":".join(map(str, txn_replicas))
+ internal_topic_assignment = ",".join([txn_assignment] * 50)
+
self.kafka.topics = {
self.input_topic: {
"partitions": self.num_input_partitions,
"replication-factor": self.replication_factor,
- "replica-assignment": assignment,
+ "replica-assignment": data_assignment,
"configs": {
"min.insync.replicas": 2
}
@@ -162,7 +173,7 @@ class TransactionsMixedVersionsTest(Test):
self.output_topic: {
"partitions": self.num_output_partitions,
"replication-factor": self.replication_factor,
- "replica-assignment": assignment,
+ "replica-assignment": data_assignment,
"configs": {
"min.insync.replicas": 2
}
@@ -170,7 +181,7 @@ class TransactionsMixedVersionsTest(Test):
"__transaction_state": {
"partitions": 50,
"replication-factor": self.replication_factor,
- "replica-assignment": transaction_assignment,
+ "replica-assignment": internal_topic_assignment,
"configs": {
"min.insync.replicas": 2
}
@@ -179,11 +190,12 @@ class TransactionsMixedVersionsTest(Test):
@cluster(num_nodes=8)
@matrix(
- old_kafka_version=[str(LATEST_4_1), str(LATEST_4_0), str(LATEST_3_9),
str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5),
str(LATEST_3_4), str(LATEST_3_3)],
+ old_kafka_version=[str(LATEST_4_2), str(LATEST_4_1), str(LATEST_4_0),
str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6),
str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3)],
metadata_quorum=[isolated_kraft],
- group_protocol=[None]
+ group_protocol=[None],
+ coordinator_on_new_broker=[True, False]
)
- def test_transactions_mixed_versions(self, old_kafka_version,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
+ def test_transactions_mixed_versions(self, old_kafka_version,
metadata_quorum=quorum.isolated_kraft, group_protocol=None,
coordinator_on_new_broker=True):
oldKafkaVersion = KafkaVersion(old_kafka_version)
self.kafka = KafkaService(self.test_context,
num_nodes=self.num_brokers,
@@ -193,6 +205,9 @@ class TransactionsMixedVersionsTest(Test):
self.kafka.nodes[0].version = DEV_BRANCH
+ new_node = self.kafka.nodes[0] # DEV_BRANCH
+ old_node = self.kafka.nodes[1] # old version
+
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
@@ -202,7 +217,10 @@ class TransactionsMixedVersionsTest(Test):
self.kafka.log_level = "DEBUG"
- self.setup_topics()
+ if coordinator_on_new_broker:
+ self.setup_topics(txn_state_leader_node=new_node,
data_leader_node=old_node)
+ else:
+ self.setup_topics(txn_state_leader_node=old_node,
data_leader_node=new_node)
self.kafka.start()
input_messages = self.seed_messages(self.input_topic,
self.num_seed_messages)