This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96c8e86cdf8 KAFKA-19530 RemoteLogManager should record lag stats when
remote storage is offline (#20218)
96c8e86cdf8 is described below
commit 96c8e86cdf80368e50e9227b809b8a0ee2a9d587
Author: Ken Huang <[email protected]>
AuthorDate: Tue Jul 29 22:38:06 2025 +0800
KAFKA-19530 RemoteLogManager should record lag stats when remote storage is
offline (#20218)
When remote storage is offline, then the segmentLag and bytesLag metrics
are not recorded. These metrics are useful to know the pending data to
upload when remote storage is down.
Reviewers: TaiJuWu <[email protected]>, Kamal Chandraprakash
<[email protected]>
---
.../apache/kafka/server/log/remote/storage/RemoteLogManager.java | 7 +++++++
.../kafka/server/log/remote/storage/RemoteLogManagerTest.java | 5 +++++
2 files changed, 12 insertions(+)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 692b348f73e..10ab8ebcaca 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -982,6 +982,9 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
segmentIdsBeingCopied.add(segmentId);
try {
copyLogSegment(log,
candidateLogSegment.logSegment, segmentId,
candidateLogSegment.nextSegmentOffset);
+ } catch (Exception e) {
+ recordLagStats(log);
+ throw e;
} finally {
segmentIdsBeingCopied.remove(segmentId);
}
@@ -1088,6 +1091,10 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
logger.info("Copied {} to remote storage with segment-id: {}",
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
+ recordLagStats(log);
+ }
+
+ private void recordLagStats(UnifiedLog log) {
long bytesLag = log.onlyLocalLogSegmentsSize() -
log.activeSegment().size();
long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
recordLagStats(bytesLag, segmentsLag);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 928688b55bb..bf2ef3a2fcf 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -691,6 +691,8 @@ public class RemoteLogManagerTest {
long lastStableOffset = 150L;
long logEndOffset = 150L;
+ when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L);
+ when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
// leader epoch preparation
@@ -708,6 +710,7 @@ public class RemoteLogManagerTest {
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+ when(activeSegment.size()).thenReturn(2);
verify(oldSegment, times(0)).readNextOffset();
verify(activeSegment, times(0)).readNextOffset();
@@ -764,6 +767,8 @@ public class RemoteLogManagerTest {
assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(1,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+ assertEquals(10,
brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value());
+ assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
}
@Test