kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Update high watermark with offset metadata. The new high watermark will
be lower
- * bounded by the log start offset and upper bounded by the log end offset.
+ * bounded by the local-log-start-offset and upper bounded by the
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset
metadata
* @return the updated high watermark offset
*/
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
val endOffsetMetadata = localLog.logEndOffsetMetadata
- val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset <
logStartOffset) {
- new LogOffsetMetadata(logStartOffset)
+ val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset <
_localLogStartOffset) {
Review Comment:
> In the rare case, the restarted broker is elected as the leader before
caught up through unclean election. Is this the case that you want to address?
yes, we want to address this case too. And, the issue can also happen during
clean preferred-leader-election:
```
Call stack: The replica (1002) has full data but HW is invalid, then the
fetch-offset will be equal to LeaderLog(1001).highWatermark
Leader (1001):
KafkaApis.handleFetchRequest
ReplicaManager.fetchMessages
ReplicaManager.readFromLocalLog
Partition.fetchRecords
Partition.updateFollowerFetchState
Partition.maybeExpandIsr
Partition.submitAlterPartition
...
...
...
# If there is not enough data to respond and there is no remote
data, we will let the fetch request wait for new data.
# parks the request in the DelayedFetchPurgatory
Another thread, runs Preferred-Leader-Election in controller (1003), since
the replica 1002 joined the ISR list, it can be elected as the preferred
leader. The controller sends LeaderAndIsr requests to all the brokers.
KafkaController.processReplicaLeaderElection
KafkaController.onReplicaElection
PartitionStateMachine.handleStateChanges
PartitionStateMachine.doHandleStateChanges
PartitionStateMachine.electLeaderForPartitions
ControllerChannelManager.sendRequestsToBrokers
Replica 1002 got elected as Leader and have invalid highWatermark since it
didn't process the fetch-response from the previous leader 1001, throws
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that
in LeaderAndIsr request even if one partition fails, then the remaining
partitions in that request won't be processed.
KafkaApis.handleLeaderAndIsrRequest
ReplicaManager.becomeLeaderOrFollower
ReplicaManager.makeLeaders
Partition.makeLeader
Partition.maybeIncrementLeaderHW
UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
UnifiedLog.fetchHighWatermarkMetadata
The controller assumes that the current-leader for the tp0 is 1002, but the
broker 1002 couldn't process the LISR. The controller retries the LISR until
the broker 1002 becomes leader for tp0. During this time, the producers won't
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION
error-code to the producer.
During this time, if a follower sends the FETCH request to read from the
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned:
KafkaApis.handleFetchRequest
ReplicaManager.fetchMessages
ReplicaManager.readFromLog
Partition.fetchRecords
Partition.readRecords
UnifiedLog.read
UnifiedLog.fetchHighWatermarkMetadata
UnifiedLog.convertToOffsetMetadataOrThrow
LocalLog.convertToOffsetMetadataOrThrow
LocalLog.read
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]