chia7712 commented on code in PR #20534:
URL: https://github.com/apache/kafka/pull/20534#discussion_r2376310312
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1385,10 +1385,19 @@ private AnalyzeAndValidateProducerStateResult
analyzeAndValidateProducerState(Lo
// transaction is completed or aborted. We can guarantee
the transaction coordinator knows about the transaction given step 1 and that
the transaction is still
// ongoing. If the transaction is expected to be ongoing,
we will not set a VerificationGuard. If the transaction is aborted,
hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will
throw an error. A subsequent produce request (retry) should create verification
state and return to phase 1.
- if (batch.isTransactional()
- && !hasOngoingTransaction(batch.producerId(),
batch.producerEpoch())
- && batchMissingRequiredVerification(batch,
requestVerificationGuard)) {
- throw new InvalidTxnStateException("Record was not
part of an ongoing transaction");
+ if (batch.isTransactional() &&
!hasOngoingTransaction(batch.producerId(), batch.producerEpoch())) {
+ // Check epoch first: if producer epoch is stale,
throw recoverable InvalidProducerEpochException.
+ ProducerStateEntry entry =
producerStateManager.activeProducers().get(batch.producerId());
Review Comment:
Should we access the `producerStateManager.activeProducers` under the lock?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1385,10 +1385,19 @@ private AnalyzeAndValidateProducerStateResult
analyzeAndValidateProducerState(Lo
// transaction is completed or aborted. We can guarantee
the transaction coordinator knows about the transaction given step 1 and that
the transaction is still
// ongoing. If the transaction is expected to be ongoing,
we will not set a VerificationGuard. If the transaction is aborted,
hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will
throw an error. A subsequent produce request (retry) should create verification
state and return to phase 1.
- if (batch.isTransactional()
- && !hasOngoingTransaction(batch.producerId(),
batch.producerEpoch())
- && batchMissingRequiredVerification(batch,
requestVerificationGuard)) {
- throw new InvalidTxnStateException("Record was not
part of an ongoing transaction");
+ if (batch.isTransactional() &&
!hasOngoingTransaction(batch.producerId(), batch.producerEpoch())) {
+ // Check epoch first: if producer epoch is stale,
throw recoverable InvalidProducerEpochException.
+ ProducerStateEntry entry =
producerStateManager.activeProducers().get(batch.producerId());
+ if (entry != null && batch.producerEpoch() <
entry.producerEpoch()) {
+ String message = "Epoch of producer " +
batch.producerId() + " is " + batch.producerEpoch() +
+ ", which is smaller than the last seen epoch "
+ entry.producerEpoch();
Review Comment:
Does this change render the above comment outdated?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]