AndrewJSchofield commented on code in PR #20286:
URL: https://github.com/apache/kafka/pull/20286#discussion_r2247368119
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1799,6 +1793,10 @@ private Optional<Throwable> acknowledgeBatchRecords(
if (throwable.isPresent()) {
return throwable;
}
+
+ if (inFlightBatch.batchHasOngoingStateTransition()) {
+ return Optional.of(new
InvalidRecordStateException("The batch has on-going acknowledgement."));
Review Comment:
I agree, but I think this string will go back to the consuming application.
I would use something more generic like "The record state is invalid. The
acknowledgement of delivery could not be completed.".
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1899,7 +1897,15 @@ private Optional<Throwable>
acknowledgePerOffsetBatchRecords(
+ " partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId,
topicIdPartition);
return Optional.of(new InvalidRecordStateException(
- "The batch cannot be acknowledged. The offset is not
acquired."));
+ "The offset cannot be acknowledged. The offset is not
acquired."));
+ }
+
+ if (offsetState.getValue().hasOngoingStateTransition()) {
+ log.debug("The offset has on-going transition, offset: {}
batch: {} for the share"
+ + " partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId,
+ topicIdPartition);
+ return Optional.of(new InvalidRecordStateException(
+ "The offset cannot be acknowledged. The offset has
on-going acknowledgement."));
Review Comment:
Again, I would use something more generic like "The record state is invalid.
The acknowledgement of delivery could not be completed.". I don't think the
distinction in why the state is invalid matters to the consuming application.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2389,10 +2405,18 @@ private AcquisitionLockTimerTask
acquisitionLockTimerTask(
}
private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
- return (memberId, firstOffset, lastOffset) -> {
+ return (memberId, firstOffset, lastOffset, timerTask) -> {
List<PersisterStateBatch> stateBatches;
lock.writeLock().lock();
try {
+ // Check if timer task is already cancelled. This can happen
when concurrent requests
+ // happen to acknoweldge in-flight state and timeout handler
is waiting for the lock
Review Comment:
nit: acknoweldge
##########
server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java:
##########
@@ -138,6 +157,16 @@ public boolean hasOngoingStateTransition() {
*/
public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps
ops, int maxDeliveryCount, String newMemberId) {
try {
+ // If the state transition is in progress, the state should not be
updated.
+ if (hasOngoingStateTransition()) {
+ // A misbehaving client can send multiple requests to update
the same records hence
+ // do not proceed if the transition is already in progress. Do
not log an error here
+ // as it might not bea an error rather concurrent update of
same state due to multiple
+ // requests.
+ log.info("{} has ongoing state transition, cannot update",
this);
Review Comment:
I wouldn't use info logging here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]