This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a81878fc0b KAFKA-18209 Clean up transaction state topic config logic
(#22456)
8a81878fc0b is described below
commit 8a81878fc0b32025277086833de155227520c6c6
Author: majialong <[email protected]>
AuthorDate: Sat Jun 6 23:54:17 2026 +0800
KAFKA-18209 Clean up transaction state topic config logic (#22456)
This PR cleans up the `__transaction_state` topic config logic by
removing scattered configuration related constants from `TransactionLog`
and keeping it focused on serialization/deserialization.
It also moves transaction state topic config construction to
`TransactionCoordinator` and renames `transactionTopicConfigs` to
`transactionStateTopicConfigs` for clarity, aligning with the current
group/share coordinator pattern where coordinators expose internal topic
configs to the broker.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../transaction/TransactionCoordinator.scala | 18 ++++++++++++++-
.../transaction/TransactionStateManager.scala | 25 +++++----------------
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../transaction/TransactionStateManagerTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 ++--
.../coordinator/transaction/TransactionLog.java | 26 ----------------------
7 files changed, 27 insertions(+), 52 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 43455a83e41..70db45b1eba 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -19,6 +19,7 @@ package kafka.coordinator.transaction
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
import org.apache.kafka.common.message.{DescribeTransactionsResponseData,
ListTransactionsResponseData}
@@ -31,6 +32,7 @@ import org.apache.kafka.common.utils.internals.LogContext
import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionConfig, TransactionLogConfig, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TransactionalIdAndProducerIdEpoch,
TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
+import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import java.util
@@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.OptionConverters._
object TransactionCoordinator {
+ val EnforcedRequiredAcks: Short = -1.toShort
def apply(config: KafkaConfig,
replicaManager: ReplicaManager,
@@ -1004,7 +1007,20 @@ class TransactionCoordinator(txnConfig:
TransactionConfig,
}
}
- def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
+ /**
+ * Return the configuration properties of the transaction state topic.
+ *
+ * @return Properties of the transaction state topic.
+ */
+ def transactionStateTopicConfigs: Properties = {
+ val props = new Properties
+ props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
+ props.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.UNCOMPRESSED.name)
+ props.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
+ props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
txnConfig.transactionLogMinInsyncReplicas.toString)
+ props.put(TopicConfig.SEGMENT_BYTES_CONFIG,
txnConfig.transactionLogSegmentBytes.toString)
+ props
+ }
def partitionFor(transactionalId: String): Int =
txnManager.partitionFor(transactionalId)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 425789a8263..99894ce759a 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -17,13 +17,12 @@
package kafka.coordinator.transaction
import java.nio.ByteBuffer
-import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.ReplicaManager
import kafka.utils.Logging
-import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ListTransactionsResponseData
import org.apache.kafka.common.metrics.Metrics
@@ -38,7 +37,6 @@ import org.apache.kafka.common.{KafkaException,
TopicIdPartition, TopicPartition
import
org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata,
TransactionConfig, TransactionLog, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TransactionPartitionAndLeaderEpoch,
TransactionalIdAndProducerIdEpoch, TransactionalIdCoordinatorEpochAndMetadata,
TransactionalIdCoordinatorEpochAndTransitMetadata, TxnMetadataCacheEntry,
TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
@@ -183,7 +181,7 @@ class TransactionStateManager(brokerId: Int,
if (recordsBuilder == null) {
recordsBuilder = MemoryRecords.builder(
ByteBuffer.allocate(math.min(16384, maxBatchSize)),
- TransactionLog.ENFORCED_COMPRESSION,
+ Compression.NONE,
TimestampType.CREATE_TIME,
0L,
maxBatchSize
@@ -290,7 +288,7 @@ class TransactionStateManager(brokerId: Int,
inReadLock[Exception](stateLock, () => {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs,
- requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
+ requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition =
Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords),
@@ -438,19 +436,6 @@ class TransactionStateManager(brokerId: Int,
enableTwoPC || (txnTimeoutMs <= config.transactionMaxTimeoutMs &&
txnTimeoutMs > 0)
}
- def transactionTopicConfigs: Properties = {
- val props = new Properties
-
- // enforce disabled unclean leader election, no compression types, and
compact cleanup policy
- props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
- props.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.UNCOMPRESSED.name)
- props.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
- props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
config.transactionLogMinInsyncReplicas.toString)
- props.put(TopicConfig.SEGMENT_BYTES_CONFIG,
config.transactionLogSegmentBytes.toString)
-
- props
- }
-
def partitionFor(transactionalId: String): Int =
Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount
private def loadTransactionMetadata(topicPartition: TopicPartition,
coordinatorEpoch: Int): ConcurrentMap[String, TransactionMetadata] = {
@@ -672,7 +657,7 @@ class TransactionStateManager(brokerId: Int,
val valueBytes = TransactionLog.valueToBytes(newMetadata,
transactionVersionLevel())
val timestamp = time.milliseconds()
- val records =
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, new
SimpleRecord(timestamp, keyBytes, valueBytes))
+ val records = MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(timestamp, keyBytes, valueBytes))
val transactionStateTopicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME,
partitionFor(transactionalId))
val transactionStateTopicIdPartition =
replicaManager.topicIdPartition(transactionStateTopicPartition)
val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
@@ -815,7 +800,7 @@ class TransactionStateManager(brokerId: Int,
if (append) {
replicaManager.appendRecords(
timeout = newMetadata.txnTimeoutMs.toLong,
- requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
+ requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = recordsPerPartition,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c339a1d595a..8467b285f33 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -419,7 +419,7 @@ class BrokerServer(
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
() => groupCoordinator.groupMetadataTopicConfigs,
- () => transactionCoordinator.transactionTopicConfigs,
+ () => transactionCoordinator.transactionStateTopicConfigs,
() => shareCoordinator.shareGroupStateTopicConfigs,
new KRaftTopicCreator(clientToControllerChannelManager),
time,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a530454cb91..9d82174ab85 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1836,7 +1836,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (controlRecords.nonEmpty) {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
- requiredAcks = -1,
+ requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = controlRecords,
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 4343987f1a0..11714a66ab6 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -1182,7 +1182,7 @@ class TransactionStateManagerTest {
val partitionId = transactionManager.partitionFor(transactionalId1)
val topicPartition = new TopicIdPartition(transactionTopicId,
partitionId, TRANSACTION_STATE_TOPIC_NAME)
val expectedTombstone = new SimpleRecord(time.milliseconds(),
TransactionLog.keyToBytes(transactionalId1), null)
- val expectedRecords =
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION,
expectedTombstone)
+ val expectedRecords = MemoryRecords.withRecords(Compression.NONE,
expectedTombstone)
assertEquals(Set(topicPartition), appendedRecords.keySet)
assertEquals(Seq(expectedRecords), appendedRecords(topicPartition).toSeq)
} else {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 4239b49a962..4c36833ab73 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -766,7 +766,7 @@ class KafkaApisTest extends Logging {
case CoordinatorType.TRANSACTION =>
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
numBrokersNeeded.toString)
- when(txnCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
+ when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new
Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE,
ResourceType.TRANSACTIONAL_ID,
groupId, AuthorizationResult.ALLOWED)
Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -938,7 +938,7 @@ class KafkaApisTest extends Logging {
case Topic.TRANSACTION_STATE_TOPIC_NAME =>
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
numBrokersNeeded.toString)
- when(txnCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
+ when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new
Properties)
true
case _ =>
topicConfigOverride.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG,
numBrokersNeeded.toString)
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
index b63141ca5dc..fbf3a21304c 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.record.internal.RecordBatch;
@@ -42,14 +41,6 @@ import java.util.stream.Collectors;
*/
public class TransactionLog {
- // enforce always using
- // 1. cleanup policy = compact
- // 2. compression = none
- // 3. unclean leader election = disabled
- // 4. required acks = -1 when writing
- public static final Compression ENFORCED_COMPRESSION = Compression.NONE;
- public static final short ENFORCED_REQUIRED_ACKS = (short) -1;
-
/**
* Generates the bytes for transaction log message key
*
@@ -103,23 +94,6 @@ public class TransactionLog {
return MessageUtil.toVersionPrefixedBytes(logValueVersion, value);
}
- /**
- * Decodes the transaction log messages' key
- *
- * @return the transactional id
- * @throws IllegalStateException if the version is not a valid transaction
log key version
- */
- public static String readTxnRecordKey(ByteBuffer buffer) {
- short version = buffer.getShort();
- if (version == CoordinatorRecordType.TRANSACTION_LOG.id()) {
- return new TransactionLogKey(new ByteBufferAccessor(buffer),
(short) 0).transactionalId();
- } else {
- throw new IllegalStateException("Unknown version " + version + "
from the transaction log message key");
- }
- }
-
-
-
public sealed interface ReadResult permits TxnRecord, TxnTombstone,
UnknownKeyVersion, UnknownValueVersion { }
public record TxnRecord(String transactionId, TransactionMetadata
metadata) implements ReadResult { }