apoorvmittal10 commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2194717592
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3016,6 +3016,11 @@ static final class InFlightState {
private InFlightState rollbackState;
// The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+ // The boolean determines if the record has achieved a final state of
ARCHIVED from which it cannot transition
+ // to any other state. This could happen because of LSO movement etc.
+ private boolean isMarkedArchived = false;
+ // The lock prevents concurrent state transitions possibly during
acknowledgements etc.
+ private final ReadWriteLock stateTransitionLock = new
ReentrantReadWriteLock();
Review Comment:
I don't think it's required here, we can just use `synchronized` for locks
as same offset/batch state access on multiple read paths should not happen.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3103,25 +3116,46 @@ private int updatedDeliveryCount(DeliveryCountOps ops) {
};
}
- private void archive(String newMemberId) {
- state = RecordState.ARCHIVED;
- memberId = newMemberId;
+ // Visible for testing.
+ void archive(String newMemberId) {
+ stateTransitionLock.writeLock().lock();
+ try {
+ if (rollbackState != null) {
+ isMarkedArchived = true;
+ }
+ state = RecordState.ARCHIVED;
+ memberId = newMemberId;
+ } finally {
+ stateTransitionLock.writeLock().unlock();
+ }
}
- private InFlightState startStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
- rollbackState = new InFlightState(state, deliveryCount, memberId,
acquisitionLockTimeoutTask);
- return tryUpdateState(newState, ops, maxDeliveryCount,
newMemberId);
+ // Visible for testing
+ InFlightState startStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+ stateTransitionLock.writeLock().lock();
+ try {
+ rollbackState = new InFlightState(state, deliveryCount,
memberId, acquisitionLockTimeoutTask);
+ return tryUpdateState(newState, ops, maxDeliveryCount,
newMemberId);
Review Comment:
Should it happen as below:
```suggestion
InFlightState currentState = new InFlightState(state,
deliveryCount, memberId, acquisitionLockTimeoutTask);
InflightState updatedState = tryUpdateState(newState, ops,
maxDeliveryCount, newMemberId);
if (updatedState != null) {
rollbackState = currentState;
}
return updatedState;
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3016,6 +3016,11 @@ static final class InFlightState {
private InFlightState rollbackState;
// The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+ // The boolean determines if the record has achieved a final state of
ARCHIVED from which it cannot transition
+ // to any other state. This could happen because of LSO movement etc.
+ private boolean isMarkedArchived = false;
+ // The lock prevents concurrent state transitions possibly during
acknowledgements etc.
+ private final ReadWriteLock stateTransitionLock = new
ReentrantReadWriteLock();
Review Comment:
You should synscronize the `state()` method as well.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3091,6 +3102,8 @@ private InFlightState tryUpdateState(RecordState
newState, DeliveryCountOps ops,
log.error("Failed to update state of the records", e);
rollbackState = null;
Review Comment:
Should we remove this here now?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]