kamalcph commented on code in PR #20428:
URL: https://github.com/apache/kafka/pull/20428#discussion_r2757622885
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
leaderEpochInRequest:
Optional[Integer],
fetchPartitionData:
PartitionData): Boolean = {
try {
- val newFetchState = fetchTierStateMachine.start(topicPartition,
fetchState, fetchPartitionData)
+ val fetchFromLastTieredOffset =
shouldFetchFromLastTieredOffset(topicPartition, fetchState)
+ val epochAndLogStartOffset = leader.fetchEarliestOffset(topicPartition,
fetchState.currentLeaderEpoch())
Review Comment:
`epochAndLogStartOffset` -> `leaderLogStartOffsetAndEpoch`
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
leaderEpochInRequest:
Optional[Integer],
fetchPartitionData:
PartitionData): Boolean = {
try {
- val newFetchState = fetchTierStateMachine.start(topicPartition,
fetchState, fetchPartitionData)
+ val fetchFromLastTieredOffset =
shouldFetchFromLastTieredOffset(topicPartition, fetchState)
Review Comment:
Shall we rename `fetchFromLastTieredOffset` to
`isLastTieredOffsetFetchEnabled`?
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -671,7 +671,13 @@ abstract class AbstractFetcherThread(name: String,
*/
val offsetAndEpoch = leader.fetchLatestOffset(topicPartition,
currentLeaderEpoch)
val leaderEndOffset = offsetAndEpoch.offset
- if (leaderEndOffset < replicaEndOffset) {
+ val fetchFromLastTieredOffset =
shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset,
replicaEndOffset)
+
+ if (fetchFromLastTieredOffset) {
+ val leaderStartOffset = leader.fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
Review Comment:
can `leaderStartOffset` be renamed to `leaderStartOffsetAndEpoch`? And,
`epochAndStartingOffset` to `earliestPendingUploadOffsetAndEpoch`
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -182,7 +179,7 @@ private void buildProducerSnapshotFile(UnifiedLog
unifiedLog,
* fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
* next offset following the end offset of the remote log portion.
*/
- private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
Review Comment:
Why do we change the method access specifier from `private` to `protected`?
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -671,7 +671,13 @@ abstract class AbstractFetcherThread(name: String,
*/
val offsetAndEpoch = leader.fetchLatestOffset(topicPartition,
currentLeaderEpoch)
val leaderEndOffset = offsetAndEpoch.offset
- if (leaderEndOffset < replicaEndOffset) {
+ val fetchFromLastTieredOffset =
shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset,
replicaEndOffset)
+
+ if (fetchFromLastTieredOffset) {
+ val leaderStartOffset = leader.fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
+ val epochAndStartingOffset = earliestPendingUploadOffset(topicPartition,
currentLeaderEpoch, leaderStartOffset)
+ fetchTierStateMachine.start(topicPartition, topicId.asJava,
currentLeaderEpoch, epochAndStartingOffset, leaderStartOffset.offset())
Review Comment:
replace:
topicId.asJava -> topicId.toJava to avoid using deprecated methods
##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,7 +156,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
new OffsetAndEpoch(-1L, -1)
}
case _ =>
- val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1,
logStartOffset.offset())
+ val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1,
Math.max(logStartOffset.offset(), localLogStartOffset.offset()))
Review Comment:
shall we add an unit test for this case in LocalLeaderEndPointTest?
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -805,6 +819,29 @@ abstract class AbstractFetcherThread(name: String,
}
}
+ /**
+ * Determines the earliest offset for pending uploads, taking into account
+ * both local and remote storage conditions.
+ */
+ private def earliestPendingUploadOffset(topicPartition: TopicPartition,
currentLeaderEpoch: Int, leaderLogStartOffset: OffsetAndEpoch): OffsetAndEpoch
= {
+ val earliestPendingUploadOffset =
leader.fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch)
+ if (earliestPendingUploadOffset.offset == -1L) {
+ val leaderLocalStartOffset =
leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
+ if (leaderLocalStartOffset.offset == leaderLogStartOffset.offset) {
+ return leaderLocalStartOffset
+ }
+ throw new OffsetNotAvailableException("Segments are uploaded to remote
storage, but the leader does not have the information about the uploaded
segments")
Review Comment:
Shall we update the error message to be specific?
```
egments are uploaded to remote storage, but the leader does not know the
earliest pending upload offset.
```
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
leaderEpochInRequest:
Optional[Integer],
fetchPartitionData:
PartitionData): Boolean = {
try {
- val newFetchState = fetchTierStateMachine.start(topicPartition,
fetchState, fetchPartitionData)
+ val fetchFromLastTieredOffset =
shouldFetchFromLastTieredOffset(topicPartition, fetchState)
+ val epochAndLogStartOffset = leader.fetchEarliestOffset(topicPartition,
fetchState.currentLeaderEpoch())
+ val epochAndStartingOffset = if (fetchFromLastTieredOffset) {
Review Comment:
`epochAndStartingOffset` -> `fetchOffsetAndEpoch`
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -805,6 +819,29 @@ abstract class AbstractFetcherThread(name: String,
}
}
+ /**
+ * Determines the earliest offset for pending uploads, taking into account
+ * both local and remote storage conditions.
+ */
+ private def earliestPendingUploadOffset(topicPartition: TopicPartition,
currentLeaderEpoch: Int, leaderLogStartOffset: OffsetAndEpoch): OffsetAndEpoch
= {
Review Comment:
can this method be renamed to `fetchEarliestPendingUploadOffset` for
clarity? Also, rename the parameter `leaderLogStartOffset` ->
`leaderLogStartOffsetAndEpoch`.
##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##########
@@ -18,7 +18,6 @@ package kafka.server
import kafka.cluster.Partition
import kafka.log.LogManager
-
Review Comment:
can this change be avoided?
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -85,21 +85,19 @@ public TierStateMachine(LeaderEndPoint leader,
/**
* Start the tier state machine for the provided topic partition.
*
- * @param topicPartition the topic partition
- * @param currentFetchState the current PartitionFetchState which will
- * be used to derive the return value
- * @param fetchPartitionData the data from the fetch response that
returned the offset moved to tiered storage error
- *
- * @return the new PartitionFetchState after the successful start of the
- * tier state machine
+ * @param topicPartition the topic partition for which the tier
state machine is to be started
+ * @param topicId the optional unique identifier of the
topic
+ * @param currentLeaderEpoch the current leader epoch of the partition
+ * @param epochAndStartingOffset the offset on the leader's local log
from which to start replicating logs
+ * @param leaderLogStartOffset the starting offset in the leader's log
+ * @return the new PartitionFetchState after the successful start of the
tier state machine
+ * @throws Exception if an error occurs during the process, such as issues
with remote storage
*/
PartitionFetchState start(TopicPartition topicPartition,
- PartitionFetchState currentFetchState,
- PartitionData fetchPartitionData) throws
Exception {
- OffsetAndEpoch epochAndLeaderLocalStartOffset =
leader.fetchEarliestLocalOffset(topicPartition,
currentFetchState.currentLeaderEpoch());
- int epoch = epochAndLeaderLocalStartOffset.epoch();
- long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
-
+ Optional<Uuid> topicId,
+ int currentLeaderEpoch,
+ OffsetAndEpoch epochAndStartingOffset,
Review Comment:
Shall we rename `epochAndStartingOffset` parameter to `fetchOffsetAndEpoch`
/ `fetchStartOffsetAndEpoch` or something similar?
##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1740,4 +1741,1157 @@ class AbstractFetcherThreadTest {
assertEquals(151, replicaState.logEndOffset)
assertEquals(151, replicaState.highWatermark)
}
+
+ @Test
Review Comment:
Please add the Javadoc for the understand the tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]