This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f1d313cc0f1 MINOR: Move TransactionStateManager leaf case classes to
Java records in transaction-coordinator (#22334)
f1d313cc0f1 is described below
commit f1d313cc0f1c69c4536e09e31743f7134fd45326
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Mon Jun 1 19:03:40 2026 +0530
MINOR: Move TransactionStateManager leaf case classes to Java records in
transaction-coordinator (#22334)
Reviewers: Mickael Maison <[email protected]>
---
.../import-control-transaction-coordinator.xml | 1 +
.../transaction/TransactionCoordinator.scala | 4 +-
.../transaction/TransactionStateManager.scala | 54 ++------
.../TransactionCoordinatorConcurrencyTest.scala | 4 +-
.../transaction/TransactionCoordinatorTest.scala | 154 ++++++++++-----------
.../TransactionMarkerChannelManagerTest.scala | 10 +-
...sactionMarkerRequestCompletionHandlerTest.scala | 6 +-
.../transaction/TransactionStateManagerTest.scala | 34 ++---
.../CoordinatorEpochAndTxnMetadata.java | 20 +++
.../coordinator/transaction/TransactionConfig.java | 47 +++++++
.../TransactionPartitionAndLeaderEpoch.java | 20 +++
.../TransactionalIdAndProducerIdEpoch.java | 27 ++++
...TransactionalIdCoordinatorEpochAndMetadata.java | 23 +++
...tionalIdCoordinatorEpochAndTransitMetadata.java | 27 ++++
.../transaction/TxnMetadataCacheEntry.java | 29 ++++
15 files changed, 309 insertions(+), 151 deletions(-)
diff --git a/checkstyle/import-control-transaction-coordinator.xml
b/checkstyle/import-control-transaction-coordinator.xml
index a5d29579b45..85f05a41d40 100644
--- a/checkstyle/import-control-transaction-coordinator.xml
+++ b/checkstyle/import-control-transaction-coordinator.xml
@@ -40,6 +40,7 @@
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.slf4j" />
<allow pkg="kafka.coordinator.transaction" />
+ <allow class="org.apache.kafka.server.config.ServerConfigs" />
<subpackage name="generated">
<allow pkg="com.fasterxml.jackson" />
</subpackage>
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 96dc7ad7f8d..43455a83e41 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.internal.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse,
TransactionResult}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.utils.internals.LogContext
-import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionLogConfig, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TxnTransitMetadata}
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionConfig, TransactionLogConfig, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TransactionalIdAndProducerIdEpoch,
TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.util.Scheduler
@@ -50,7 +50,7 @@ object TransactionCoordinator {
val transactionLogConfig = new TransactionLogConfig(config)
val transactionStateManagerConfig = new
TransactionStateManagerConfig(config)
- val txnConfig =
TransactionConfig(transactionStateManagerConfig.transactionalIdExpirationMs,
+ val txnConfig = new
TransactionConfig(transactionStateManagerConfig.transactionalIdExpirationMs,
transactionStateManagerConfig.transactionMaxTimeoutMs,
transactionLogConfig.transactionTopicPartitions,
transactionLogConfig.transactionTopicReplicationFactor,
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e5b94d11197..425789a8263 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -35,10 +35,9 @@ import
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicIdPartition,
TopicPartition}
-import org.apache.kafka.coordinator.transaction.{TransactionLog,
TransactionLogConfig, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TxnTransitMetadata}
+import
org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata,
TransactionConfig, TransactionLog, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TransactionPartitionAndLeaderEpoch,
TransactionalIdAndProducerIdEpoch, TransactionalIdCoordinatorEpochAndMetadata,
TransactionalIdCoordinatorEpochAndTransitMetadata, TxnMetadataCacheEntry,
TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
@@ -118,7 +117,7 @@ class TransactionStateManager(brokerId: Int,
// visible for testing only
private[transaction] def addLoadingPartition(partitionId: Int,
coordinatorEpoch: Int): Unit = {
- val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+ val partitionAndLeaderEpoch = new
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
inWriteLock(stateLock, () => {
loadingPartitions.add(partitionAndLeaderEpoch)
})
@@ -145,7 +144,7 @@ class TransactionStateManager(brokerId: Int,
}
}
}.map { case (txnId, txnMetadata) =>
- TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId,
txnMetadata.producerEpoch)
+ new TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId,
txnMetadata.producerEpoch)
}
}
})
@@ -193,7 +192,7 @@ class TransactionStateManager(brokerId: Int,
if (maybeAppendExpiration(txnMetadata, recordsBuilder,
currentTimeMs)) {
val transitMetadata = txnMetadata.prepareDead()
- expired += TransactionalIdCoordinatorEpochAndMetadata(
+ expired += new TransactionalIdCoordinatorEpochAndMetadata(
transactionalId,
txnMetadataCacheEntry.coordinatorEpoch,
transitMetadata
@@ -416,7 +415,7 @@ class TransactionStateManager(brokerId: Int,
.getOrElse(createdTxnMetadata)
}
}
-
Right(txnMetadata.map(CoordinatorEpochAndTxnMetadata(cacheEntry.coordinatorEpoch,
_)))
+ Right(txnMetadata.map(new
CoordinatorEpochAndTxnMetadata(cacheEntry.coordinatorEpoch, _)))
case None =>
Left(Errors.NOT_COORDINATOR)
@@ -537,7 +536,7 @@ class TransactionStateManager(brokerId: Int,
private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int,
coordinatorEpoch: Int,
loadedTransactions:
ConcurrentMap[String, TransactionMetadata]): Unit = {
- val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch,
loadedTransactions)
+ val txnMetadataCacheEntry = new TxnMetadataCacheEntry(coordinatorEpoch,
loadedTransactions)
val previousTxnMetadataCacheEntryOpt =
transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
previousTxnMetadataCacheEntryOpt.foreach { previousTxnMetadataCacheEntry =>
@@ -553,7 +552,7 @@ class TransactionStateManager(brokerId: Int,
*/
def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch:
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
- val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+ val partitionAndLeaderEpoch = new
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
inWriteLock(stateLock, () => {
loadingPartitions.add(partitionAndLeaderEpoch)
@@ -582,10 +581,10 @@ class TransactionStateManager(brokerId: Int,
txnMetadata.state match {
case TransactionState.PREPARE_ABORT =>
transactionsPendingForCompletion +=
-
TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId,
coordinatorEpoch, TransactionResult.ABORT, txnMetadata,
txnMetadata.prepareComplete(time.milliseconds()))
+ new
TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId,
coordinatorEpoch, TransactionResult.ABORT, txnMetadata,
txnMetadata.prepareComplete(time.milliseconds()))
case TransactionState.PREPARE_COMMIT =>
transactionsPendingForCompletion +=
-
TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId,
coordinatorEpoch, TransactionResult.COMMIT, txnMetadata,
txnMetadata.prepareComplete(time.milliseconds()))
+ new
TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId,
coordinatorEpoch, TransactionResult.COMMIT, txnMetadata,
txnMetadata.prepareComplete(time.milliseconds()))
case _ =>
// nothing needs to be done
}
@@ -844,38 +843,3 @@ class TransactionStateManager(brokerId: Int,
info("Shutdown complete")
}
}
-
-
-private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
-
metadataPerTransactionalId: ConcurrentMap[String, TransactionMetadata]) {
- override def toString: String = {
- s"TxnMetadataCacheEntry(coordinatorEpoch=$coordinatorEpoch,
numTransactionalEntries=${metadataPerTransactionalId.size})"
- }
-}
-
-private[transaction] case class
CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int,
-
transactionMetadata: TransactionMetadata)
-
-private[transaction] case class TransactionConfig(transactionalIdExpirationMs:
Int = TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
- transactionMaxTimeoutMs: Int
= TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
- transactionLogNumPartitions:
Int = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
-
transactionLogReplicationFactor: Short =
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
- transactionLogSegmentBytes:
Int = TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
-
transactionLogLoadBufferSize: Int =
TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
-
transactionLogMinInsyncReplicas: Int =
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
-
abortTimedOutTransactionsIntervalMs: Int =
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
-
removeExpiredTransactionalIdsIntervalMs: Int =
TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
- transaction2PCEnable:
Boolean = TransactionStateManagerConfig.TRANSACTIONS_2PC_ENABLED_DEFAULT,
- requestTimeoutMs: Int =
ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT)
-
-case class TransactionalIdAndProducerIdEpoch(transactionalId: String,
producerId: Long, producerEpoch: Short) {
- override def toString: String = {
- s"(transactionalId=$transactionalId, producerId=$producerId,
producerEpoch=$producerEpoch)"
- }
-}
-
-case class TransactionPartitionAndLeaderEpoch(txnPartitionId: Int,
coordinatorEpoch: Int)
-
-case class TransactionalIdCoordinatorEpochAndMetadata(transactionalId: String,
coordinatorEpoch: Int, transitMetadata: TxnTransitMetadata)
-
-case class TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId:
String, coordinatorEpoch: Int, result: TransactionResult, txnMetadata:
TransactionMetadata, transitMetadata: TxnTransitMetadata)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 0be4317134a..7cbf7ddfaee 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{MockTime, ProducerIdAndEpoch}
import org.apache.kafka.common.utils.internals.LogContext
import org.apache.kafka.common.{Node, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionLog, TransactionMetadata, TransactionState}
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionConfig, TransactionLog, TransactionMetadata, TransactionState}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion,
RequestLocal, TransactionVersion}
import org.apache.kafka.server.storage.log.FetchIsolation
@@ -57,7 +57,7 @@ class TransactionCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurren
private val coordinatorEpoch = 10
private val numPartitions = nThreads * 5
- private val txnConfig = TransactionConfig()
+ private val txnConfig = new TransactionConfig()
private var transactionCoordinator: TransactionCoordinator = _
private var txnStateManager: TransactionStateManager = _
private var txnMarkerChannelManager: TransactionMarkerChannelManager = _
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 7abfcea5592..c8094146b75 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.record.internal.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse,
TransactionResult}
import org.apache.kafka.common.utils.{MockTime, ProducerIdAndEpoch}
import org.apache.kafka.common.utils.internals.LogContext
-import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionMetadata, TransactionState, TransactionStateManagerConfig,
TxnTransitMetadata}
+import
org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata,
ProducerIdManager, TransactionConfig, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TransactionalIdAndProducerIdEpoch,
TxnTransitMetadata}
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
import org.apache.kafka.server.util.MockScheduler
@@ -62,7 +62,7 @@ class TransactionCoordinatorTest {
private val scheduler = new MockScheduler(time)
val coordinator = new TransactionCoordinator(
- TransactionConfig(),
+ new TransactionConfig(),
scheduler,
() => pidGenerator,
transactionManager,
@@ -139,7 +139,7 @@ class TransactionCoordinatorTest {
when(transactionManager.putTransactionStateIfNotExists(capturedTxn.capture()))
.thenAnswer(_ => {
- Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
capturedTxn.getValue))
+ Right(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
capturedTxn.getValue))
})
when(transactionManager.appendTransactionToLog(
@@ -171,7 +171,7 @@ class TransactionCoordinatorTest {
when(transactionManager.putTransactionStateIfNotExists(capturedTxn.capture()))
.thenAnswer(_ => {
- Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
capturedTxn.getValue))
+ Right(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
capturedTxn.getValue))
})
when(transactionManager.appendTransactionToLog(
@@ -202,7 +202,7 @@ class TransactionCoordinatorTest {
(Short.MaxValue - 2).toShort, txnTimeoutMs, TransactionState.EMPTY,
util.Set.of, time.milliseconds(), time.milliseconds(), TV_0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
@@ -242,8 +242,8 @@ class TransactionCoordinatorTest {
assertEquals(Short.MaxValue, txnMetadata2.producerEpoch)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata2))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
@@ -401,7 +401,7 @@ class TransactionCoordinatorTest {
def validateConcurrentTransactions(state: TransactionState): Unit = {
// Since the clientTransactionVersion doesn't matter, use 2 since the
states are TransactionState.PREPARE_COMMIT and TransactionState.PREPARE_ABORT.
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, 0, 0,
RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, state, util.Set.of, 0, 0,
TV_2)))))
@@ -413,7 +413,7 @@ class TransactionCoordinatorTest {
def shouldRespondWithProducerFencedOnAddPartitionsWhenEpochsAreDifferent():
Unit = {
// Since the clientTransactionVersion doesn't matter, use 2 since the
state is TransactionState.PREPARE_COMMIT.
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, 0, 0,
RecordBatch.NO_PRODUCER_ID,
10, 9, 0, TransactionState.PREPARE_COMMIT, util.Set.of, 0, 0,
TV_2)))))
@@ -449,7 +449,7 @@ class TransactionCoordinatorTest {
producerEpoch, (producerEpoch - 1).toShort, txnTimeoutMs, previousState,
util.Set.of, time.milliseconds(), time.milliseconds(), clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
coordinator.handleAddPartitionsToTransaction(transactionalId, producerId,
producerEpoch, partitions, errorsCallback, clientTransactionVersion)
@@ -467,7 +467,7 @@ class TransactionCoordinatorTest {
@Test
def
shouldRespondWithErrorsNoneOnAddPartitionWhenNoErrorsAndPartitionsTheSame():
Unit = {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, 0, 0,
RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, TransactionState.EMPTY,
partitions, 0, 0, TV_0)))))
@@ -484,7 +484,7 @@ class TransactionCoordinatorTest {
}
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, 0, 0,
RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, TransactionState.ONGOING,
partitions, 0, 0, TV_0)))))
@@ -503,7 +503,7 @@ class TransactionCoordinatorTest {
}
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, 0, 0,
RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, TransactionState.EMPTY,
partitions, 0, 0, TV_0)))))
@@ -533,7 +533,7 @@ class TransactionCoordinatorTest {
def
shouldReplyWithInvalidPidMappingOnEndTxnWhenPidDoesntMatchMapped(transactionVersion:
Short): Unit = {
val clientTransactionVersion =
TransactionVersion.fromFeatureLevel(transactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, 10, 10,
RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, TransactionState.ONGOING,
util.Set.of, 0, time.milliseconds(), TV_0)))))
@@ -547,7 +547,7 @@ class TransactionCoordinatorTest {
def
shouldReplyWithProducerFencedOnEndTxnWhenEpochIsNotSameAsTransaction(transactionVersion:
Short): Unit = {
val clientTransactionVersion =
TransactionVersion.fromFeatureLevel(transactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
(producerEpoch - 1).toShort, 1, TransactionState.ONGOING,
util.Set.of, 0, time.milliseconds(), TV_0)))))
@@ -561,7 +561,7 @@ class TransactionCoordinatorTest {
def testEndTxnWhenStatusIsCompleteCommitAndResultIsCommitInV1(isRetry:
Boolean): Unit = {
val clientTransactionVersion = TransactionVersion.fromFeatureLevel(0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
(producerEpoch - 1).toShort, 1, TransactionState.COMPLETE_COMMIT,
util.Set.of, 0, time.milliseconds(), clientTransactionVersion)))))
@@ -588,7 +588,7 @@ class TransactionCoordinatorTest {
def testEndTxnWhenStatusIsCompleteCommitAndResultIsCommitInV2(isRetry:
Boolean): Unit = {
val clientTransactionVersion = TransactionVersion.fromFeatureLevel(2)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
(producerEpoch - 1).toShort, 1, TransactionState.COMPLETE_COMMIT,
util.Set.of, 0, time.milliseconds(), clientTransactionVersion)))))
@@ -609,7 +609,7 @@ class TransactionCoordinatorTest {
val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, (producerEpoch - 1).toShort, 1,
TransactionState.COMPLETE_ABORT, util.Set.of, 0, time.milliseconds(),
clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
coordinator.handleEndTransaction(transactionalId, producerId,
nextProducerEpoch.toShort, TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
@@ -628,7 +628,7 @@ class TransactionCoordinatorTest {
val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, (producerEpoch - 1).toShort, 1,
TransactionState.COMPLETE_ABORT, util.Set.of, 0, time.milliseconds(),
clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
coordinator.handleEndTransaction(transactionalId, producerId,
nextProducerEpoch.toShort , TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
@@ -665,7 +665,7 @@ class TransactionCoordinatorTest {
val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, (producerEpoch - 1).toShort, 1,
TransactionState.COMPLETE_ABORT, util.Set.of, 0, time.milliseconds(),
clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
coordinator.handleEndTransaction(transactionalId, producerId,
requestEpoch(clientTransactionVersion), TransactionResult.COMMIT,
clientTransactionVersion, endTxnCallback)
assertEquals(Errors.INVALID_TXN_STATE, error)
@@ -678,7 +678,7 @@ class TransactionCoordinatorTest {
val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, (producerEpoch - 1).toShort,1,
TransactionState.COMPLETE_COMMIT, util.Set.of, 0, time.milliseconds(),
clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
coordinator.handleEndTransaction(transactionalId, producerId,
producerEpoch, TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
assertEquals(Errors.INVALID_TXN_STATE, error)
@@ -692,7 +692,7 @@ class TransactionCoordinatorTest {
val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, (producerEpoch - 1).toShort, 1,
TransactionState.COMPLETE_COMMIT, util.Set.of, 0, time.milliseconds(),
clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val epoch = if (isRetry) producerEpoch - 1 else producerEpoch
coordinator.handleEndTransaction(transactionalId, producerId,
epoch.toShort, TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
@@ -711,7 +711,7 @@ class TransactionCoordinatorTest {
val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, (producerEpoch - 1).toShort, 1,
TransactionState.COMPLETE_COMMIT, util.Set.of, 0, time.milliseconds(),
clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val epoch = if (isRetry) producerEpoch - 1 else producerEpoch
coordinator.handleEndTransaction(transactionalId, producerId,
epoch.toShort, TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
@@ -739,7 +739,7 @@ class TransactionCoordinatorTest {
def
shouldReturnConcurrentTransactionsOnEndTxnRequestWhenStatusIsPrepareCommit(transactionVersion:
Short): Unit = {
val clientTransactionVersion =
TransactionVersion.fromFeatureLevel(transactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch, (producerEpoch -
1).toShort, 1, TransactionState.PREPARE_COMMIT, util.Set.of, 0,
time.milliseconds(), clientTransactionVersion)))))
coordinator.handleEndTransaction(transactionalId, producerId,
requestEpoch(clientTransactionVersion), TransactionResult.COMMIT,
clientTransactionVersion, endTxnCallback)
@@ -752,7 +752,7 @@ class TransactionCoordinatorTest {
def
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsPrepareAbort(transactionVersion:
Short): Unit = {
val clientTransactionVersion =
TransactionVersion.fromFeatureLevel(transactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, 1, TransactionState.PREPARE_ABORT, util.Set.of,
0, time.milliseconds(), clientTransactionVersion)))))
coordinator.handleEndTransaction(transactionalId, producerId,
requestEpoch(clientTransactionVersion), TransactionResult.COMMIT,
clientTransactionVersion, endTxnCallback)
@@ -764,7 +764,7 @@ class TransactionCoordinatorTest {
def TestEndTxnRequestWhenEmptyTransactionStateForAbortInV1(): Unit = {
val clientTransactionVersion = TransactionVersion.fromFeatureLevel(0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, 1, TransactionState.EMPTY, util.Set.of, 0,
time.milliseconds(), clientTransactionVersion)))))
coordinator.handleEndTransaction(transactionalId, producerId,
producerEpoch, TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
@@ -777,7 +777,7 @@ class TransactionCoordinatorTest {
def TestEndTxnRequestWhenEmptyTransactionStateForAbortInV2(isRetry:
Boolean): Unit = {
val clientTransactionVersion = TransactionVersion.fromFeatureLevel(2)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, 1, TransactionState.EMPTY, util.Set.of, 0,
time.milliseconds(), clientTransactionVersion)))))
val epoch = if (isRetry) producerEpoch - 1 else producerEpoch
@@ -806,7 +806,7 @@ class TransactionCoordinatorTest {
def TestEndTxnRequestWhenEmptyTransactionStateForCommitInV2(isRetry:
Boolean): Unit = {
val clientTransactionVersion = TransactionVersion.fromFeatureLevel(2)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, 1, TransactionState.EMPTY, util.Set.of, 0,
time.milliseconds(), clientTransactionVersion)))))
val epoch = if (isRetry) producerEpoch - 1 else producerEpoch
@@ -822,7 +822,7 @@ class TransactionCoordinatorTest {
@Test
def shouldReturnInvalidTxnRequestOnEndTxnV2IfNotEndTxnV2Retry(): Unit = {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, 1, TransactionState.PREPARE_COMMIT, util.Set.of,
0, time.milliseconds(), TV_2)))))
// If producerEpoch is the same, this is not a retry of the EndTxnRequest,
but the next EndTxnRequest. Return PRODUCER_FENCED.
@@ -831,7 +831,7 @@ class TransactionCoordinatorTest {
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, 1, TransactionState.COMPLETE_COMMIT,
util.Set.of, 0, time.milliseconds(), TV_2)))))
// If producerEpoch is the same, this is not a retry of the EndTxnRequest,
but the next EndTxnRequest. Return INVALID_TXN_STATE.
@@ -843,7 +843,7 @@ class TransactionCoordinatorTest {
@Test
def shouldReturnOkOnEndTxnV2IfEndTxnV2RetryEpochOverflow(): Unit = {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId, producerId,
producerId2, Short.MaxValue, (Short.MaxValue - 1).toShort, 1,
TransactionState.PREPARE_COMMIT, util.Set.of, 0, time.milliseconds(), TV_2)))))
// Return CONCURRENT_TRANSACTIONS while transaction is still completing
@@ -852,7 +852,7 @@ class TransactionCoordinatorTest {
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId2, producerId,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, new
TransactionMetadata(transactionalId, producerId2, producerId,
RecordBatch.NO_PRODUCER_ID, 0, RecordBatch.NO_PRODUCER_EPOCH, 1,
TransactionState.COMPLETE_COMMIT, util.Set.of, 0, time.milliseconds(), TV_2)))))
coordinator.handleEndTransaction(transactionalId, producerId,
(Short.MaxValue - 1).toShort, TransactionResult.COMMIT, TV_2, endTxnCallback)
@@ -870,7 +870,7 @@ class TransactionCoordinatorTest {
val txnTransitMetadata =
prepareWithPending.prepareComplete(time.milliseconds())
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
prepareWithPending))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareWithPending))))
// Return CONCURRENT_TRANSACTIONS while transaction is still completing
coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2,
0, partitions, errorsCallback, TV_2)
@@ -880,7 +880,7 @@ class TransactionCoordinatorTest {
prepareWithPending.completeTransitionTo(txnTransitMetadata)
assertEquals(TransactionState.COMPLETE_COMMIT, prepareWithPending.state)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
prepareWithPending))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareWithPending))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
@@ -990,7 +990,7 @@ class TransactionCoordinatorTest {
private def verifyEndTxnEpoch(metadataEpoch: Short, requestEpoch: Short,
clientTransactionVersion: TransactionVersion): Unit = {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_ID, metadataEpoch, 1,
1, TransactionState.COMPLETE_COMMIT, util.Set.of, 0,
time.milliseconds(), clientTransactionVersion)))))
@@ -1039,7 +1039,7 @@ class TransactionCoordinatorTest {
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_0)
@@ -1087,8 +1087,8 @@ class TransactionCoordinatorTest {
(producerEpoch + 2).toShort, (producerEpoch - 1).toShort, txnTimeoutMs,
TransactionState.ONGOING, partitions, time.milliseconds(), time.milliseconds(),
TV_0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
bumpedTxnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, bumpedTxnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_0)
coordinator.handleInitProducerId(
@@ -1115,10 +1115,10 @@ class TransactionCoordinatorTest {
.thenReturn(true)
when(transactionManager.putTransactionStateIfNotExists(any[TransactionMetadata]()))
- .thenReturn(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata)))
+ .thenReturn(Right(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenAnswer(_ =>
Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+ .thenAnswer(_ => Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_0)
@@ -1210,9 +1210,9 @@ class TransactionCoordinatorTest {
val postFenceTxnMetadata = new TransactionMetadata(transactionalId,
producerId, producerId, RecordBatch.NO_PRODUCER_ID,
Short.MaxValue, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
TransactionState.PREPARE_ABORT, partitions, time.milliseconds(),
time.milliseconds(), TV_0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
postFenceTxnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, postFenceTxnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_0)
@@ -1281,7 +1281,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
// Capture the transition metadata to verify epoch increments
@@ -1358,9 +1358,9 @@ class TransactionCoordinatorTest {
// Mock the transaction manager to return our test transaction as timed out
when(transactionManager.timedOutTransactions())
- .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, epochAtMaxBoundary)))
+ .thenReturn(List(new TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, epochAtMaxBoundary)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
// Mock the append operation to simulate successful write and update the
metadata
@@ -1447,9 +1447,9 @@ class TransactionCoordinatorTest {
// Mock the transaction manager to return our test transaction as timed out
when(transactionManager.timedOutTransactions())
- .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, epochAtMaxBoundary)))
+ .thenReturn(List(new TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, epochAtMaxBoundary)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
// Mock the append operation to simulate successful write and update the
metadata
@@ -1618,7 +1618,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
when(transactionManager.appendTransactionToLog(
@@ -1761,9 +1761,9 @@ class TransactionCoordinatorTest {
// Mock the transaction manager to return our test transaction as timed out
when(transactionManager.timedOutTransactions())
- .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, epochAtMaxBoundary)))
+ .thenReturn(List(new TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, epochAtMaxBoundary)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
@@ -1887,7 +1887,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
// Simulate producer trying to continue after new producer has already
been initialized
coordinator.handleInitProducerId(
@@ -1910,7 +1910,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
// Simulate producer trying to continue after new producer has already
been initialized
coordinator.handleInitProducerId(
@@ -1934,7 +1934,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
@@ -1981,7 +1981,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val capturedTxnTransitMetadata : ArgumentCaptor[TxnTransitMetadata] =
ArgumentCaptor.forClass(classOf[TxnTransitMetadata])
when(transactionManager.appendTransactionToLog(
@@ -2033,7 +2033,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
@@ -2086,7 +2086,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
@@ -2141,9 +2141,9 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.ONGOING,
partitions, now, now, TV_0)
when(transactionManager.timedOutTransactions())
- .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
+ .thenReturn(List(new TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
// Transaction timeouts use FenceProducerEpoch so clientTransactionVersion
is 0.
val expectedTransition = new TxnTransitMetadata(producerId, producerId,
RecordBatch.NO_PRODUCER_EPOCH, (producerEpoch + 1).toShort,
@@ -2180,16 +2180,16 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.ONGOING,
partitions, now, now, TV_0)
when(transactionManager.timedOutTransactions())
- .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
+ .thenReturn(List(new TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_0)
val bumpedTxnMetadata = new TransactionMetadata(transactionalId,
producerId, producerId,
RecordBatch.NO_PRODUCER_EPOCH, (producerEpoch + 2).toShort,
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.ONGOING,
partitions, now, now, TV_0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
bumpedTxnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, bumpedTxnMetadata))))
def checkOnEndTransactionComplete(txnIdAndPidEpoch:
TransactionalIdAndProducerIdEpoch)(error: Errors, producerId: Long,
producerEpoch: Short): Unit = {
assertEquals(Errors.PRODUCER_FENCED, error)
@@ -2207,9 +2207,9 @@ class TransactionCoordinatorTest {
metadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_0,
RecordBatch.NO_PRODUCER_ID, time.milliseconds(), false)
when(transactionManager.timedOutTransactions())
- .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
+ .thenReturn(List(new TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
metadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, metadata))))
coordinator.startup(() => transactionStatePartitionCount, false)
time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
@@ -2225,14 +2225,14 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.ONGOING,
partitions, now, now, TV_0)
when(transactionManager.timedOutTransactions())
- .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
+ .thenReturn(List(new TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, producerEpoch)))
val txnMetadataAfterAppendFailure = new
TransactionMetadata(transactionalId, producerId, producerId,
RecordBatch.NO_PRODUCER_EPOCH, (producerEpoch + 1).toShort,
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.ONGOING,
partitions, now, now, TV_0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadataAfterAppendFailure))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadataAfterAppendFailure))))
// Transaction timeouts use FenceProducerEpoch so clientTransactionVersion
is 0.
val bumpedEpoch = (producerEpoch + 1).toShort
@@ -2276,7 +2276,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
coordinator.handleInitProducerId(
transactionalId,
@@ -2308,7 +2308,7 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.DEAD,
util.Set.of, time.milliseconds(),
time.milliseconds(), TV_0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val result = coordinator.handleDescribeTransactions(transactionalId)
assertEquals(transactionalId, result.transactionalId)
@@ -2334,7 +2334,7 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.ONGOING,
partitions, time.milliseconds(), time.milliseconds(), TV_0)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
coordinator.startup(() => transactionStatePartitionCount,
enableTransactionalIdExpiration = false)
val result = coordinator.handleDescribeTransactions(transactionalId)
@@ -2363,7 +2363,7 @@ class TransactionCoordinatorTest {
val metadata = new TransactionMetadata(transactionalId, 0, 0,
RecordBatch.NO_PRODUCER_EPOCH,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, state,
util.Set.of[TopicPartition](new TopicPartition("topic", 1)), 0, 0, TV_2)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
metadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, metadata))))
coordinator.handleInitProducerId(transactionalId, 10, enableTwoPCFlag =
false,
keepPreparedTxn = false, None, initProducerIdMockCallback)
@@ -2382,7 +2382,7 @@ class TransactionCoordinatorTest {
val metadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_EPOCH,
producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, state,
util.Set.of, time.milliseconds(), time.milliseconds(), clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
metadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, metadata))))
val capturedNewMetadata: ArgumentCaptor[TxnTransitMetadata] =
ArgumentCaptor.forClass(classOf[TxnTransitMetadata])
when(transactionManager.appendTransactionToLog(
@@ -2417,7 +2417,7 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, transactionState,
partitions, now, now, clientTransactionVersion)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
originalMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, originalMetadata))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
@@ -2466,7 +2466,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
// First attempt fails with COORDINATOR_NOT_AVAILABLE
@@ -2511,7 +2511,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
// Mock the appendTransactionToLog to succeed for the endTransaction call
@@ -2568,7 +2568,7 @@ class TransactionCoordinatorTest {
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(),
anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
when(transactionManager.appendTransactionToLog(
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index af48a4673b2..011441a37c7 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.utils.internals.LogContext
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.coordinator.transaction.{TransactionMetadata,
TransactionState}
+import
org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata,
TransactionMetadata, TransactionState}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -88,9 +88,9 @@ class TransactionMarkerChannelManagerTest {
when(txnStateManager.partitionFor(transactionalId2))
.thenReturn(txnTopicPartition2)
when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId1)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata2))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
when(metadataCache.metadataVersion())
.thenReturn(MetadataVersion.latestProduction())
}
@@ -222,7 +222,7 @@ class TransactionMarkerChannelManagerTest {
// COORDINATOR_LOAD_IN_PROGRESS
Left(Errors.COORDINATOR_LOAD_IN_PROGRESS),
// "Newly loaded" transaction state with the new epoch.
- Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2,
txnMetadata2)))
+ Right(Some(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch2,
txnMetadata2)))
)
clientResponses.foreach { clientResponse =>
@@ -253,7 +253,7 @@ class TransactionMarkerChannelManagerTest {
// Now drain and complete the marker from the new epoch.
when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2)))
-
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2,
txnMetadata2))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch2, txnMetadata2))))
val requests2 = channelManager.generateRequests().asScala
assertEquals(1, requests2.size)
requests2.head.handler.onComplete(successfulClientResponse)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 29e50409fbd..1678bc830ea 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.internal.RecordBatch
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult,
WriteTxnMarkersRequest, WriteTxnMarkersResponse}
-import org.apache.kafka.coordinator.transaction.{TransactionMetadata,
TransactionState}
+import
org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata,
TransactionMetadata, TransactionState}
import org.apache.kafka.server.common.TransactionVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -59,7 +59,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
when(txnStateManager.partitionFor(transactionalId))
.thenReturn(txnTopicPartition)
when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
- .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
}
@Test
@@ -110,7 +110,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
@Test
def shouldCompleteDelayedOperationWhenCoordinatorEpochChanged(): Unit = {
when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
-
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch+1,
txnMetadata))))
+ .thenReturn(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch+1, txnMetadata))))
verifyRemoveDelayedOperationOnError(Errors.NONE)
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index ea7e44b4149..4343987f1a0 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.coordinator.transaction.{TransactionLog,
TransactionMetadata, TransactionState, TxnTransitMetadata}
+import
org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata,
TransactionConfig, TransactionLog, TransactionMetadata,
TransactionPartitionAndLeaderEpoch, TransactionState,
TransactionalIdAndProducerIdEpoch, TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion,
RequestLocal, TransactionVersion}
import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
@@ -79,7 +79,7 @@ class TransactionStateManagerTest {
val metrics = new Metrics()
- val txnConfig = TransactionConfig()
+ val txnConfig = new TransactionConfig()
val transactionManager: TransactionStateManager = new
TransactionStateManager(0, scheduler,
replicaManager, metadataCache, txnConfig, time, metrics)
@@ -127,11 +127,11 @@ class TransactionStateManagerTest {
transactionManager.addLoadedTransactionsToCache(partitionId,
coordinatorEpoch, new ConcurrentHashMap[String, TransactionMetadata]())
assertEquals(Right(None),
transactionManager.getTransactionState(transactionalId1))
- assertEquals(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1)),
+ assertEquals(Right(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1)),
transactionManager.putTransactionStateIfNotExists(txnMetadata1))
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))),
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
- assertEquals(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata2)),
+ assertEquals(Right(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata2)),
transactionManager.putTransactionStateIfNotExists(txnMetadata2))
}
@@ -203,7 +203,7 @@ class TransactionStateManagerTest {
})
val coordinatorEpoch = 0
- val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+ val partitionAndLeaderEpoch = new
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
val loadingThread = new Thread(() => {
transactionManager.loadTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch, (_, _, _, _) => ())
@@ -262,7 +262,7 @@ class TransactionStateManagerTest {
})
val coordinatorEpoch = 0
- val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+ val partitionAndLeaderEpoch = new
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
val loadingThread = new Thread(() => {
transactionManager.loadTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch, (_, _, _, _) => ())
@@ -398,7 +398,7 @@ class TransactionStateManagerTest {
// append the new metadata into log
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch, newMetadata, assertCallback, requestLocal =
RequestLocal.withThreadConfinedCaching)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
}
@@ -413,25 +413,25 @@ class TransactionStateManagerTest {
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
val requestLocal = RequestLocal.withThreadConfinedCaching
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal =
requestLocal)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(util.Set.of(new
TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal =
requestLocal)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(util.Set.of(new
TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal =
requestLocal)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(util.Set.of(new
TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal =
requestLocal)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
}
@@ -446,7 +446,7 @@ class TransactionStateManagerTest {
prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER)
val requestLocal = RequestLocal.withThreadConfinedCaching
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal =
requestLocal)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(util.Set.of(new
TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
@@ -491,13 +491,13 @@ class TransactionStateManagerTest {
prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
val requestLocal = RequestLocal.withThreadConfinedCaching
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal =
requestLocal)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(util.Set.of(new
TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal =
requestLocal)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
}
@@ -511,7 +511,7 @@ class TransactionStateManagerTest {
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
transactionManager.appendTransactionToLog(transactionalId1,
coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true,
RequestLocal.withThreadConfinedCaching)
- assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+ assertEquals(Right(Some(new
CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertEquals(util.Optional.of(TransactionState.ONGOING),
txnMetadata1.pendingState)
}
@@ -667,7 +667,7 @@ class TransactionStateManagerTest {
time.sleep(2000)
val expiring = transactionManager.timedOutTransactions()
- assertEquals(List(TransactionalIdAndProducerIdEpoch("ongoing", 0, 0)),
expiring)
+ assertEquals(List(new TransactionalIdAndProducerIdEpoch("ongoing", 0, 0)),
expiring)
}
@Test
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/CoordinatorEpochAndTxnMetadata.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/CoordinatorEpochAndTxnMetadata.java
new file mode 100644
index 00000000000..8918785dc11
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/CoordinatorEpochAndTxnMetadata.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public record CoordinatorEpochAndTxnMetadata(int coordinatorEpoch,
TransactionMetadata transactionMetadata) {
+}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionConfig.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionConfig.java
new file mode 100644
index 00000000000..edf9be1e3d1
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+import org.apache.kafka.server.config.ServerConfigs;
+
+public record TransactionConfig(
+ int transactionalIdExpirationMs,
+ int transactionMaxTimeoutMs,
+ int transactionLogNumPartitions,
+ short transactionLogReplicationFactor,
+ int transactionLogSegmentBytes,
+ int transactionLogLoadBufferSize,
+ int transactionLogMinInsyncReplicas,
+ int abortTimedOutTransactionsIntervalMs,
+ int removeExpiredTransactionalIdsIntervalMs,
+ boolean transaction2PCEnable,
+ int requestTimeoutMs) {
+
+ public TransactionConfig() {
+
this(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
+
TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
+ TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
+
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
+ TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
+ TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
+ TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
+
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
+
TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
+ TransactionStateManagerConfig.TRANSACTIONS_2PC_ENABLED_DEFAULT,
+ ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT);
+ }
+}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionPartitionAndLeaderEpoch.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionPartitionAndLeaderEpoch.java
new file mode 100644
index 00000000000..7e241ea8c8a
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionPartitionAndLeaderEpoch.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public record TransactionPartitionAndLeaderEpoch(int txnPartitionId, int
coordinatorEpoch) {
+}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdAndProducerIdEpoch.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdAndProducerIdEpoch.java
new file mode 100644
index 00000000000..307530e3bfc
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdAndProducerIdEpoch.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public record TransactionalIdAndProducerIdEpoch(String transactionalId, long
producerId, short producerEpoch) {
+
+ @Override
+ public String toString() {
+ return "(transactionalId=" + transactionalId +
+ ", producerId=" + producerId +
+ ", producerEpoch=" + producerEpoch + ")";
+ }
+}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdCoordinatorEpochAndMetadata.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdCoordinatorEpochAndMetadata.java
new file mode 100644
index 00000000000..35b4d8cec79
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdCoordinatorEpochAndMetadata.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public record TransactionalIdCoordinatorEpochAndMetadata(
+ String transactionalId,
+ int coordinatorEpoch,
+ TxnTransitMetadata transitMetadata) {
+}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdCoordinatorEpochAndTransitMetadata.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdCoordinatorEpochAndTransitMetadata.java
new file mode 100644
index 00000000000..e669518d2fa
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionalIdCoordinatorEpochAndTransitMetadata.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+import org.apache.kafka.common.requests.TransactionResult;
+
+public record TransactionalIdCoordinatorEpochAndTransitMetadata(
+ String transactionalId,
+ int coordinatorEpoch,
+ TransactionResult result,
+ TransactionMetadata txnMetadata,
+ TxnTransitMetadata transitMetadata) {
+}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnMetadataCacheEntry.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnMetadataCacheEntry.java
new file mode 100644
index 00000000000..6be3bdfc8c1
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnMetadataCacheEntry.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+import java.util.concurrent.ConcurrentMap;
+
+public record TxnMetadataCacheEntry(int coordinatorEpoch,
+ ConcurrentMap<String, TransactionMetadata>
metadataPerTransactionalId) {
+
+ @Override
+ public String toString() {
+ return "TxnMetadataCacheEntry(coordinatorEpoch=" + coordinatorEpoch +
+ ", numTransactionalEntries=" +
metadataPerTransactionalId.size() + ")";
+ }
+}