rreddy-22 commented on code in PR #20039:
URL: https://github.com/apache/kafka/pull/20039#discussion_r2181020011
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1500,8 +1500,20 @@ public void handleResponse(AbstractResponse response) {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
initProducerIdResponse.data().producerEpoch());
setProducerIdAndEpoch(producerIdAndEpoch);
- // TO_DO Add code to handle transition to prepared_txn when
keepPrepared = true
- transitionTo(State.READY);
+ // If this is a transaction with keepPreparedTxn=true,
transition directly
+ // to PREPARED_TRANSACTION state IFF there is an ongoing
transaction.
+ if (builder.data.keepPreparedTxn() &&
+ initProducerIdResponse.data().ongoingTxnProducerId() !=
RecordBatch.NO_PRODUCER_ID
+ ) {
+ transitionTo(State.PREPARED_TRANSACTION);
+ // Update the preparedTxnState with the ongoing pid and
epoch from the response.
+ // This will be used to complete the transaction later.
+ String serializedState =
initProducerIdResponse.data().ongoingTxnProducerId() +
+ ":" +
initProducerIdResponse.data().ongoingTxnProducerEpoch();
+ TransactionManager.this.preparedTxnState = new
PreparedTxnState(serializedState);
Review Comment:
Just moved the preparedTxnState to the internal directory so I can leverage
the other package private constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]