soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1580567800
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
Review Comment:
This can also be simplified:
```java
int targetEpoch = epochForLeaderLocalLogStartOffset;
// If the existing epoch is 0, no need to fetch from earlier epoch as the
// desired offset (leaderLogStartOffset - 1) will have the same epoch.
if (epochForLeaderLocalLogStartOffset != 0) {
...
}
```
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the
leader.
+ OffsetForLeaderEpochResponseData.EpochEndOffset
earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition,
currentLeaderEpoch);
+ // Check if the target offset lies within the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset() >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will
handle any gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader
epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch
as 1, end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1
like below.
+ targetEpoch = earlierEpochEndOffset.leaderEpoch();
+ } else {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> maybeRlsm =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset);
+
+ if (maybeRlsm.isPresent()) {
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata =
maybeRlsm.get();
+ // Build leader epoch cache, producer snapshots until
remoteLogSegmentMetadata.endOffset() and start
+ // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+ // Assign nextOffset with the offset from which next fetch
should happen.
+ nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+ // Truncate the existing local log before restoring the leader
epoch cache and producer snapshots.
+ Partition partition =
replicaMgr.getPartitionOrException(topicPartition);
+ partition.truncateFullyAndStartAt(nextOffset, useFutureLog,
Option.apply(leaderLogStartOffset));
+ // Increment start offsets
+ unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset,
LeaderOffsetIncremented);
+ unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset,
LeaderOffsetIncremented);
+
+ // Build leader epoch cache.
+ List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm,
remoteLogSegmentMetadata);
+ if (unifiedLog.leaderEpochCache().isDefined()) {
+ unifiedLog.leaderEpochCache().get().assign(epochs);
+ }
+
+ log.info("Updated the epoch cache from remote tier till
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(),
partition);
+
+ // Restore producer snapshot
+ File snapshotFile =
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+ buildProducerSnapshotFile(snapshotFile,
remoteLogSegmentMetadata, rlm);
+
+ // Reload producer snapshots.
+
unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+ unifiedLog.loadProducerState(nextOffset);
+ log.debug("Built the leader epoch cache and producer snapshots
from remote tier for {}, " +
+ "with active producers size: {},
leaderLogStartOffset: {}, and logEndOffset: {}",
+ partition,
unifiedLog.producerStateManager().activeProducers().size(),
leaderLogStartOffset, nextOffset);
+ } else {
+ throw new RemoteStorageException("Couldn't build the state
from remote store for partition: " + topicPartition +
+ ", currentLeaderEpoch: " + currentLeaderEpoch +
+ ", leaderLocalLogStartOffset: " +
leaderLocalLogStartOffset +
+ ", leaderLogStartOffset: " + leaderLogStartOffset +
+ ", epoch: " + targetEpoch +
+ "as the previous remote log segment metadata was not
found");
+ }
+ } else {
+ // If the tiered storage is not enabled throw an exception back so
that it will retry until the tiered storage
+ // is set as expected.
+ throw new RemoteStorageException("Couldn't build the state from
remote store for partition " + topicPartition + ", as remote log storage is not
yet enabled");
+ }
Review Comment:
This method would be easier to read if we move this up, throw the exception
early if `!unifiedLog.remoteStorageSystemEnable() ||
unifiedLog.config().remoteStorageEnable()`, and drop one indentation level for
the happy case branch, like we do when we check
`replicaMgr.remoteLogManager().isEmpty()`.
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the
leader.
+ OffsetForLeaderEpochResponseData.EpochEndOffset
earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition,
currentLeaderEpoch);
+ // Check if the target offset lies within the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset() >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will
handle any gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader
epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch
as 1, end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1
like below.
+ targetEpoch = earlierEpochEndOffset.leaderEpoch();
+ } else {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
Review Comment:
It looks like we can drop this else branch if you apply my previous
suggestion above.
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the
leader.
+ OffsetForLeaderEpochResponseData.EpochEndOffset
earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition,
currentLeaderEpoch);
+ // Check if the target offset lies within the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset() >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will
handle any gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader
epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch
as 1, end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1
like below.
+ targetEpoch = earlierEpochEndOffset.leaderEpoch();
+ } else {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> maybeRlsm =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset);
+
+ if (maybeRlsm.isPresent()) {
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata =
maybeRlsm.get();
+ // Build leader epoch cache, producer snapshots until
remoteLogSegmentMetadata.endOffset() and start
+ // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+ // Assign nextOffset with the offset from which next fetch
should happen.
+ nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+ // Truncate the existing local log before restoring the leader
epoch cache and producer snapshots.
+ Partition partition =
replicaMgr.getPartitionOrException(topicPartition);
+ partition.truncateFullyAndStartAt(nextOffset, useFutureLog,
Option.apply(leaderLogStartOffset));
+ // Increment start offsets
+ unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset,
LeaderOffsetIncremented);
+ unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset,
LeaderOffsetIncremented);
+
+ // Build leader epoch cache.
+ List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm,
remoteLogSegmentMetadata);
+ if (unifiedLog.leaderEpochCache().isDefined()) {
+ unifiedLog.leaderEpochCache().get().assign(epochs);
+ }
+
+ log.info("Updated the epoch cache from remote tier till
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(),
partition);
+
+ // Restore producer snapshot
+ File snapshotFile =
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+ buildProducerSnapshotFile(snapshotFile,
remoteLogSegmentMetadata, rlm);
+
+ // Reload producer snapshots.
+
unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+ unifiedLog.loadProducerState(nextOffset);
+ log.debug("Built the leader epoch cache and producer snapshots
from remote tier for {}, " +
+ "with active producers size: {},
leaderLogStartOffset: {}, and logEndOffset: {}",
+ partition,
unifiedLog.producerStateManager().activeProducers().size(),
leaderLogStartOffset, nextOffset);
+ } else {
+ throw new RemoteStorageException("Couldn't build the state
from remote store for partition: " + topicPartition +
+ ", currentLeaderEpoch: " + currentLeaderEpoch +
+ ", leaderLocalLogStartOffset: " +
leaderLocalLogStartOffset +
+ ", leaderLogStartOffset: " + leaderLogStartOffset +
+ ", epoch: " + targetEpoch +
+ "as the previous remote log segment metadata was not
found");
+ }
Review Comment:
Same here, we could drop the indentation level if we invert the condition:
```java
if (!maybeRlsm.isPresent()) {
throw new RemoteStorageException ...
}
...
```
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the
leader.
+ OffsetForLeaderEpochResponseData.EpochEndOffset
earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition,
currentLeaderEpoch);
+ // Check if the target offset lies within the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset() >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will
handle any gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader
epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch
as 1, end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1
like below.
+ targetEpoch = earlierEpochEndOffset.leaderEpoch();
+ } else {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> maybeRlsm =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset);
+
+ if (maybeRlsm.isPresent()) {
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata =
maybeRlsm.get();
Review Comment:
We can move the exception below into a `orElseThrow`
```java
RemoteLogSegmentMetadata segmentMetadata =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset)
.orElseThrow(() -> new RemoteStorageException(...));
```
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
Review Comment:
KAFKA-13560 seems to stale for a while. When (and if?) it gets implemented,
it would be simple to add this async placeholder. In case it doesn't, or even
if it simply takes a long time, I think it would make sense to keep the
interface and definiton in this class free from that future intention. We could
drop this unused method, rename `start()` to something like `nextState()`, and
move any documentation here that may be relevant in the future to the JIRA.
What do you think?
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
Review Comment:
This declaration can be joined with the first assignment to the variable.
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the
leader.
+ OffsetForLeaderEpochResponseData.EpochEndOffset
earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition,
currentLeaderEpoch);
+ // Check if the target offset lies within the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset() >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will
handle any gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader
epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch
as 1, end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1
like below.
+ targetEpoch = earlierEpochEndOffset.leaderEpoch();
+ } else {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> maybeRlsm =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset);
+
+ if (maybeRlsm.isPresent()) {
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata =
maybeRlsm.get();
+ // Build leader epoch cache, producer snapshots until
remoteLogSegmentMetadata.endOffset() and start
+ // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+ // Assign nextOffset with the offset from which next fetch
should happen.
+ nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+ // Truncate the existing local log before restoring the leader
epoch cache and producer snapshots.
+ Partition partition =
replicaMgr.getPartitionOrException(topicPartition);
+ partition.truncateFullyAndStartAt(nextOffset, useFutureLog,
Option.apply(leaderLogStartOffset));
+ // Increment start offsets
+ unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset,
LeaderOffsetIncremented);
+ unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset,
LeaderOffsetIncremented);
+
+ // Build leader epoch cache.
+ List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm,
remoteLogSegmentMetadata);
+ if (unifiedLog.leaderEpochCache().isDefined()) {
+ unifiedLog.leaderEpochCache().get().assign(epochs);
+ }
+
+ log.info("Updated the epoch cache from remote tier till
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(),
partition);
+
+ // Restore producer snapshot
+ File snapshotFile =
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+ buildProducerSnapshotFile(snapshotFile,
remoteLogSegmentMetadata, rlm);
+
+ // Reload producer snapshots.
+
unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+ unifiedLog.loadProducerState(nextOffset);
Review Comment:
Maybe this should also be part of `buildProducerSnapshotFile()`?
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
* @return the new PartitionFetchState if the tier state machine was
advanced, otherwise, return the currentFetchState
*/
Optional<PartitionFetchState> maybeAdvanceState(TopicPartition
topicPartition,
- PartitionFetchState
currentFetchState);
+ PartitionFetchState
currentFetchState) {
+ // This is currently a no-op but will be used for implementing async
tiering logic in KAFKA-13560.
+ return Optional.of(currentFetchState);
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer
epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws
IOException, RemoteStorageException {
+
+ long nextOffset;
+
+ if (unifiedLog.remoteStorageSystemEnable() &&
unifiedLog.config().remoteStorageEnable()) {
+ if (replicaMgr.remoteLogManager().isEmpty()) throw new
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset
- 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the
leader.
+ OffsetForLeaderEpochResponseData.EpochEndOffset
earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition,
currentLeaderEpoch);
+ // Check if the target offset lies within the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset() >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will
handle any gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader
epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch
as 1, end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1
like below.
+ targetEpoch = earlierEpochEndOffset.leaderEpoch();
+ } else {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> maybeRlsm =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset);
+
+ if (maybeRlsm.isPresent()) {
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata =
maybeRlsm.get();
+ // Build leader epoch cache, producer snapshots until
remoteLogSegmentMetadata.endOffset() and start
+ // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+ // Assign nextOffset with the offset from which next fetch
should happen.
+ nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+ // Truncate the existing local log before restoring the leader
epoch cache and producer snapshots.
+ Partition partition =
replicaMgr.getPartitionOrException(topicPartition);
+ partition.truncateFullyAndStartAt(nextOffset, useFutureLog,
Option.apply(leaderLogStartOffset));
+ // Increment start offsets
+ unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset,
LeaderOffsetIncremented);
+ unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset,
LeaderOffsetIncremented);
+
+ // Build leader epoch cache.
+ List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm,
remoteLogSegmentMetadata);
+ if (unifiedLog.leaderEpochCache().isDefined()) {
+ unifiedLog.leaderEpochCache().get().assign(epochs);
+ }
+
+ log.info("Updated the epoch cache from remote tier till
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(),
partition);
+
+ // Restore producer snapshot
+ File snapshotFile =
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
Review Comment:
It seems `snapshotFile` can be declared and initialized inside
`buildProducerSnapshotFile`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]