soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582791175
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
Review Comment:
@kamalcph what specific part of the review are you concerned with? My
thinking is that a refactoring is already in place with the merge of
`ReplicaAlterLogDirsTierStateMachine` and `ReplicaFetcherTierStateMachine`. In
my experience is that it's generally easier to reason about what's happening
after the code is simplified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]