mjsax commented on code in PR #17022:
URL: https://github.com/apache/kafka/pull/17022#discussion_r2048203504
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -772,6 +779,14 @@ synchronized void handleFailedBatch(ProducerBatch batch,
RuntimeException except
}
}
+ /**
+ * Returns {@code true} if the given {@link ProducerBatch} has the same
producer ID but a different epoch than the
+ * {@link #producerIdAndEpoch cached producer ID and epoch}.
+ */
+ synchronized boolean isStaleBatch(ProducerBatch batch) {
Review Comment:
Does this method need to by `synchronized`?
Also seems it could be `private`?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -737,14 +737,21 @@ public synchronized void
maybeTransitionToErrorState(RuntimeException exception)
}
synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException
exception, boolean adjustSequenceNumbers) {
- maybeTransitionToErrorState(exception);
+ if (!isStaleBatch(batch) && !hasFatalError())
Review Comment:
Not sure if I understand the `!hasFatalError()` condition. Can you
elaborate? -- I thought we want to call
`maybeTransitionToErrorState(exception);` for any non-stale batch, independent
of the current error state?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]