This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d93d1096c2 KAFKA-17108: Add EarliestPendingUpload offset spec in
ListOffsets API (#16584)
8d93d1096c2 is described below
commit 8d93d1096c254cd98743cd51ccacb2dc6a815efc
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Aug 27 08:34:31 2025 +0530
KAFKA-17108: Add EarliestPendingUpload offset spec in ListOffsets API
(#16584)
This is the first part of the implementation of
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
The purpose of this pull request is for the broker to start returning
the correct offset when it receives a -6 as a timestamp in a ListOffsets
API request.
Added unit tests for the new timestamp.
Reviewers: Kamal Chandraprakash <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +
.../org/apache/kafka/clients/admin/OffsetSpec.java | 10 +
.../admin/internals/ListOffsetsHandler.java | 7 +-
.../kafka/common/requests/ListOffsetsRequest.java | 11 +-
.../common/message/ListOffsetsRequest.json | 4 +-
.../common/message/ListOffsetsResponse.json | 4 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 28 +++
.../common/requests/ListOffsetsRequestTest.java | 10 +-
.../main/scala/kafka/server/ReplicaManager.scala | 3 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 227 ++++++++++++++++++++-
.../kafka/server/common/MetadataVersion.java | 4 +-
.../kafka/server/common/MetadataVersionTest.java | 4 +-
.../kafka/storage/internals/log/UnifiedLog.java | 27 +++
.../org/apache/kafka/tools/GetOffsetShell.java | 8 +-
.../kafka/tools/GetOffsetShellParsingTest.java | 2 +-
.../org/apache/kafka/tools/GetOffsetShellTest.java | 24 +++
16 files changed, 358 insertions(+), 17 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 270a7124826..90f83eac935 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -5154,6 +5154,8 @@ public class KafkaAdminClient extends AdminClient {
return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
} else if (offsetSpec instanceof OffsetSpec.LatestTieredSpec) {
return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
+ } else if (offsetSpec instanceof OffsetSpec.EarliestPendingUploadSpec)
{
+ return ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP;
}
return ListOffsetsRequest.LATEST_TIMESTAMP;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index 68f94cc493e..ad73c8d51f0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -28,6 +28,7 @@ public class OffsetSpec {
public static class MaxTimestampSpec extends OffsetSpec { }
public static class EarliestLocalSpec extends OffsetSpec { }
public static class LatestTieredSpec extends OffsetSpec { }
+ public static class EarliestPendingUploadSpec extends OffsetSpec { }
public static class TimestampSpec extends OffsetSpec {
private final long timestamp;
@@ -91,4 +92,13 @@ public class OffsetSpec {
public static OffsetSpec latestTiered() {
return new LatestTieredSpec();
}
+
+ /**
+ * Used to retrieve the earliest offset of records that are pending upload
to remote storage.
+ * <br/>
+ * Note: When tiered storage is not enabled, we will return unknown offset.
+ */
+ public static OffsetSpec earliestPendingUpload() {
+ return new EarliestPendingUploadSpec();
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index f7c495d7fd8..a46d6f24a7b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -103,12 +103,17 @@ public final class ListOffsetsHandler extends
Batched<TopicPartition, ListOffset
.stream()
.anyMatch(key -> offsetTimestampsByPartition.get(key) ==
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ boolean requireEarliestPendingUploadTimestamp = keys
+ .stream()
+ .anyMatch(key -> offsetTimestampsByPartition.get(key) ==
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
int timeoutMs = options.timeoutMs() != null ? options.timeoutMs() :
defaultApiTimeoutMs;
return ListOffsetsRequest.Builder.forConsumer(true,
options.isolationLevel(),
supportsMaxTimestamp,
requireEarliestLocalTimestamp,
- requireTieredStorageTimestamp)
+ requireTieredStorageTimestamp,
+ requireEarliestPendingUploadTimestamp)
.setTargetTimes(new ArrayList<>(topicsByName.values()))
.setTimeoutMs(timeoutMs);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
index 7415412d050..5862ebdfafc 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
public static final long LATEST_TIERED_TIMESTAMP = -5L;
+ public static final long EARLIEST_PENDING_UPLOAD_TIMESTAMP = -6L;
+
public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;
@@ -58,16 +60,19 @@ public class ListOffsetsRequest extends AbstractRequest {
public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel) {
- return forConsumer(requireTimestamp, isolationLevel, false, false,
false);
+ return forConsumer(requireTimestamp, isolationLevel, false, false,
false, false);
}
public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel,
boolean requireMaxTimestamp,
boolean
requireEarliestLocalTimestamp,
- boolean
requireTieredStorageTimestamp) {
+ boolean
requireTieredStorageTimestamp,
+ boolean
requireEarliestPendingUploadTimestamp) {
short minVersion = ApiKeys.LIST_OFFSETS.oldestVersion();
- if (requireTieredStorageTimestamp)
+ if (requireEarliestPendingUploadTimestamp)
+ minVersion = 11;
+ else if (requireTieredStorageTimestamp)
minVersion = 9;
else if (requireEarliestLocalTimestamp)
minVersion = 8;
diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json
b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index 6f8ff7d6cf9..1a2de6ca30a 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -40,7 +40,9 @@
// Version 9 enables listing offsets by last tiered offset (KIP-1005).
//
// Version 10 enables async remote list offsets support (KIP-1075)
- "validVersions": "1-10",
+ //
+ // Version 11 enables listing offsets by earliest pending upload offset
(KIP-1023)
+ "validVersions": "1-11",
"flexibleVersions": "6+",
"latestVersionUnstable": false,
"fields": [
diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json
b/clients/src/main/resources/common/message/ListOffsetsResponse.json
index 7f9588847b9..1407273bf4d 100644
--- a/clients/src/main/resources/common/message/ListOffsetsResponse.json
+++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json
@@ -40,7 +40,9 @@
// Version 9 enables listing offsets by last tiered offset (KIP-1005).
//
// Version 10 enables async remote list offsets support (KIP-1075)
- "validVersions": "1-10",
+ //
+ // Version 11 enables listing offsets by earliest pending upload offset
(KIP-1023)
+ "validVersions": "1-11",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+",
"ignorable": true,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3e093c5029a..e7fa11177d3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -8730,6 +8730,34 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testListOffsetsEarliestPendingUploadSpecSpecMinVersion()
throws Exception {
+ Node node = new Node(0, "localhost", 8120);
+ List<Node> nodes = Collections.singletonList(node);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new
Node[]{node}));
+ final Cluster cluster = new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ node);
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+ AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.adminClient().listOffsets(Collections.singletonMap(tp0,
OffsetSpec.earliestPendingUpload()));
+
+ TestUtils.waitForCondition(() ->
env.kafkaClient().requests().stream().anyMatch(request ->
+ request.requestBuilder().apiKey().messageType ==
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion()
== 11
+ ), "no listOffsets request has the expected oldestAllowedVersion");
+ }
+ }
+
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
return Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE)),
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
index 2cf4cbc00c9..48542c1a2fd 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
@@ -127,13 +127,16 @@ public class ListOffsetsRequestTest {
.forConsumer(false, IsolationLevel.READ_COMMITTED);
ListOffsetsRequest.Builder maxTimestampRequestBuilder =
ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false,
false);
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false,
false, false);
ListOffsetsRequest.Builder requireEarliestLocalTimestampRequestBuilder
= ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true,
false);
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true,
false, false);
ListOffsetsRequest.Builder requireTieredStorageTimestampRequestBuilder
= ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false,
true);
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false,
true, false);
+
+ ListOffsetsRequest.Builder
requireEarliestPendingUploadTimestampRequestBuilder = ListOffsetsRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false,
false, true);
assertEquals((short) 1, consumerRequestBuilder.oldestAllowedVersion());
assertEquals((short) 1,
requireTimestampRequestBuilder.oldestAllowedVersion());
@@ -141,5 +144,6 @@ public class ListOffsetsRequestTest {
assertEquals((short) 7,
maxTimestampRequestBuilder.oldestAllowedVersion());
assertEquals((short) 8,
requireEarliestLocalTimestampRequestBuilder.oldestAllowedVersion());
assertEquals((short) 9,
requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion());
+ assertEquals((short) 11,
requireEarliestPendingUploadTimestampRequestBuilder.oldestAllowedVersion());
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 202590ca6f4..070b3e544a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -172,7 +172,8 @@ object ReplicaManager {
ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
- ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort,
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP -> 11.toShort
)
def createLogReadResult(highWatermark: Long,
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index d30d5a1040e..da54113ae5c 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffs
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
Cleaner, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
RecordValidationException, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, Cleaner, EpochEntry, LogConfig, LogFileUtils,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment,
LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
RecordValidationException, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics,
BrokerTopicStats}
import org.junit.jupiter.api.Assertions._
@@ -2416,6 +2416,193 @@ class UnifiedLogTest {
KafkaConfig.fromProps(props)
}
+ @Test
+ def testFetchEarliestPendingUploadTimestampNoRemoteStorage(): Unit = {
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200,
indexIntervalBytes = 1)
+ val log = createLog(logDir, logConfig)
+
+ // Test initial state before any records
+ assertFetchOffsetBySpecialTimestamp(log, None, new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+
+ // Append records
+ val _ = prepareLogWithSequentialRecords(log, recordCount = 2)
+
+ // Test state after records are appended
+ assertFetchOffsetBySpecialTimestamp(log, None, new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+ }
+
+ @Test
+ def testFetchEarliestPendingUploadTimestampWithRemoteStorage(): Unit = {
+ val logStartOffset = 0
+ val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+ val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+ val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+ val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
+
+ doAnswer(ans => {
+ val timestamp = ans.getArgument(1).asInstanceOf[Long]
+ Optional.of(timestamp)
+ .filter(_ == timestampAndEpochs.head.timestamp)
+ .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L,
Optional.of(timestampAndEpochs.head.leaderEpoch)))
+
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+ anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+ // Offset 0 (first timestamp) is in remote storage and deleted locally.
Offset 1 (second timestamp) is in local storage.
+ log.updateLocalLogStartOffset(1)
+ log.updateHighestOffsetInRemoteStorage(0)
+
+ // In the assertions below we test that offset 0 (first timestamp) is only
in remote and offset 1 (second timestamp) is in local storage.
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+ }
+
+ @Test
+ def
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion(): Unit
= {
+ val logStartOffset = 0
+ val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+ val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+ val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+ val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
+
+ // Offsets upto 1 are in remote storage
+ doAnswer(ans => {
+ val timestamp = ans.getArgument(1).asInstanceOf[Long]
+ Optional.of(
+ timestamp match {
+ case x if x == firstTimestamp => new TimestampAndOffset(x, 0L,
Optional.of(firstLeaderEpoch))
+ case x if x == secondTimestamp => new TimestampAndOffset(x, 1L,
Optional.of(secondLeaderEpoch))
+ case _ => null
+ }
+ )
+
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+ anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+ // Offsets 0, 1 (first and second timestamps) are in remote storage and
not deleted locally.
+ log.updateLocalLogStartOffset(0)
+ log.updateHighestOffsetInRemoteStorage(1)
+
+ // In the assertions below we test that offset 0 (first timestamp) and
offset 1 (second timestamp) are on both remote and local storage
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+ }
+
+ @Test
+ def testFetchEarliestPendingUploadTimestampNoSegmentsUploaded(): Unit = {
+ val logStartOffset = 0
+ val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+ val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+ val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+ val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
+
+ // No offsets are in remote storage
+ doAnswer(_ => Optional.empty[TimestampAndOffset]())
+
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+ anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+ // Offsets 0, 1, 2 (first, second and third timestamps) are in local
storage only and not uploaded to remote storage.
+ log.updateLocalLogStartOffset(0)
+ log.updateHighestOffsetInRemoteStorage(-1)
+
+ // In the assertions below we test that offset 0 (first timestamp), offset
1 (second timestamp) and offset 2 (third timestamp) are only on the local
storage.
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L, Optional.of(-1)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+ }
+
+ @Test
+ def testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote():
Unit = {
+ val logStartOffset = 100
+ val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+ val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+ val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+ val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
+
+ // Offsets 100, 101, 102 (first, second and third timestamps) are in local
storage and not uploaded to remote storage.
+ // Tiered storage copy was disabled and then enabled again, because of
which the remote log segments are deleted but
+ // the highest offset in remote storage has become stale
+ doAnswer(_ => Optional.empty[TimestampAndOffset]())
+
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+ anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+ log.updateLocalLogStartOffset(100)
+ log.updateHighestOffsetInRemoteStorage(50)
+
+ // In the assertions below we test that offset 100 (first timestamp),
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the
local storage.
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 100L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
+ assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 101L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L,
Optional.empty()),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP)
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+ }
+
+ private def prepare(logStartOffset: Int): (RemoteLogManager, UnifiedLog,
Seq[TimestampAndEpoch]) = {
+ val config: KafkaConfig = createKafkaConfigWithRLM
+ val purgatory = new
DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets",
config.brokerId)
+ val remoteLogManager = spy(new
RemoteLogManager(config.remoteLogManagerConfig,
+ 0,
+ logDir.getAbsolutePath,
+ "clusterId",
+ mockTime,
+ _ => Optional.empty[UnifiedLog](),
+ (_, _) => {},
+ brokerTopicStats,
+ new Metrics(),
+ Optional.empty))
+ remoteLogManager.setDelayedOperationPurgatory(purgatory)
+
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200,
indexIntervalBytes = 1, remoteLogStorageEnable = true)
+ val log = createLog(logDir, logConfig, logStartOffset = logStartOffset,
remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
+
+ // Verify earliest pending upload offset for empty log
+ assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, logStartOffset,
Optional.empty()),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+
+ val timestampAndEpochs = prepareLogWithSequentialRecords(log, recordCount
= 3)
+ (remoteLogManager, log, timestampAndEpochs)
+ }
+
/**
* Test the Log truncate operations
*/
@@ -4786,6 +4973,44 @@ class UnifiedLogTest {
(log, segmentWithOverflow)
}
+
+ private def assertFetchOffsetByTimestamp(log: UnifiedLog,
remoteLogManagerOpt: Option[RemoteLogManager], expected:
Option[TimestampAndOffset], timestamp: Long): Unit = {
+ val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
+ val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp,
remoteOffsetReader)
+ assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
+ offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
+ assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
+
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset)
+ assertEquals(expected.get,
offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null))
+ }
+
+ private def assertFetchOffsetBySpecialTimestamp(log: UnifiedLog,
remoteLogManagerOpt: Option[RemoteLogManager], expected: TimestampAndOffset,
timestamp: Long): Unit = {
+ val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
+ val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp,
remoteOffsetReader)
+ assertEquals(new OffsetResultHolder(expected), offsetResultHolder)
+ }
+
+ private def getRemoteOffsetReader(remoteLogManagerOpt: Option[Any]):
Optional[AsyncOffsetReader] = {
+ remoteLogManagerOpt match {
+ case Some(remoteLogManager) =>
Optional.of(remoteLogManager.asInstanceOf[AsyncOffsetReader])
+ case None => Optional.empty[AsyncOffsetReader]()
+ }
+ }
+
+ private def prepareLogWithSequentialRecords(log: UnifiedLog, recordCount:
Int): Seq[TimestampAndEpoch] = {
+ val firstTimestamp = mockTime.milliseconds()
+
+ (0 until recordCount).map { i =>
+ val timestampAndEpoch = TimestampAndEpoch(firstTimestamp + i, i)
+ log.appendAsLeader(
+ TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
timestamp = timestampAndEpoch.timestamp),
+ timestampAndEpoch.leaderEpoch
+ )
+ timestampAndEpoch
+ }
+ }
+
+ case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
}
object UnifiedLogTest {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index dd7c5937bdc..ceca9a6a7de 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -283,7 +283,9 @@ public enum MetadataVersion {
}
public short listOffsetRequestVersion() {
- if (this.isAtLeast(IBP_4_0_IV3)) {
+ if (this.isAtLeast(IBP_4_2_IV1)) {
+ return 11;
+ } else if (this.isAtLeast(IBP_4_0_IV3)) {
return 10;
} else if (this.isAtLeast(IBP_3_9_IV0)) {
return 9;
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 508d4bd900b..49a200f6225 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -266,8 +266,8 @@ class MetadataVersionTest {
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testListOffsetsValueVersion(MetadataVersion metadataVersion) {
- final short expectedVersion = 10;
- if (metadataVersion.isAtLeast(IBP_4_0_IV3)) {
+ final short expectedVersion = 11;
+ if (metadataVersion.isAtLeast(IBP_4_2_IV1)) {
assertEquals(expectedVersion,
metadataVersion.listOffsetRequestVersion());
} else {
assertTrue(metadataVersion.listOffsetRequestVersion() <
expectedVersion);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index ca32e4f086a..769f59d56dc 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -1667,6 +1667,8 @@ public class UnifiedLog implements AutoCloseable {
} else {
return new OffsetResultHolder(new
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)));
}
+ } else if (targetTimestamp ==
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) {
+ return
fetchEarliestPendingUploadOffset(remoteOffsetReader);
} else if (targetTimestamp ==
ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions.
List<LogSegment> segments = logSegments();
@@ -1709,6 +1711,31 @@ public class UnifiedLog implements AutoCloseable {
});
}
+ private OffsetResultHolder
fetchEarliestPendingUploadOffset(Optional<AsyncOffsetReader>
remoteOffsetReader) {
+ if (remoteLogEnabled()) {
+ long curHighestRemoteOffset = highestOffsetInRemoteStorage();
+
+ if (curHighestRemoteOffset == -1L) {
+ if (localLogStartOffset() == logStartOffset()) {
+ // No segments have been uploaded yet
+ return
fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP,
remoteOffsetReader);
+ } else {
+ // Leader currently does not know about the already
uploaded segments
+ return new OffsetResultHolder(Optional.of(new
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L,
Optional.of(-1))));
+ }
+ } else {
+ long earliestPendingUploadOffset =
Math.max(curHighestRemoteOffset + 1, logStartOffset());
+ OptionalInt epochForOffset =
leaderEpochCache.epochForOffset(earliestPendingUploadOffset);
+ Optional<Integer> epochResult = epochForOffset.isPresent()
+ ? Optional.of(epochForOffset.getAsInt())
+ : Optional.empty();
+ return new OffsetResultHolder(new
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP,
earliestPendingUploadOffset, epochResult));
+ }
+ } else {
+ return new OffsetResultHolder(new
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)));
+ }
+ }
+
/**
* Checks if the log is empty.
* @return Returns True when the log is empty. Otherwise, false.
diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
index 8cc9428afbd..ae16d11d8ed 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
@@ -126,7 +126,7 @@ public class GetOffsetShell {
.ofType(String.class);
timeOpt = parser.accepts("time", "timestamp of the offsets before
that. [Note: No offset is returned, if the timestamp greater than recently
committed record timestamp is given.]")
.withRequiredArg()
- .describedAs("<timestamp> / -1 or latest / -2 or earliest
/ -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered")
+ .describedAs("<timestamp> / -1 or latest / -2 or earliest
/ -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or
earliest-pending-upload")
.ofType(String.class)
.defaultsTo("latest");
commandConfigOpt = parser.accepts("command-config", "Property file
containing configs to be passed to Admin Client.")
@@ -276,6 +276,8 @@ public class GetOffsetShell {
return OffsetSpec.earliestLocal();
case "latest-tiered":
return OffsetSpec.latestTiered();
+ case "earliest-pending-upload":
+ return OffsetSpec.earliestPendingUpload();
default:
long timestamp;
@@ -283,7 +285,7 @@ public class GetOffsetShell {
timestamp = Long.parseLong(listOffsetsTimestamp);
} catch (NumberFormatException e) {
throw new TerseException("Malformed time argument " +
listOffsetsTimestamp + ". " +
- "Please use -1 or latest / -2 or earliest / -3 or
max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long
format timestamp");
+ "Please use -1 or latest / -2 or earliest / -3 or
max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or
earliest-pending-upload, or a specified long format timestamp");
}
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
@@ -296,6 +298,8 @@ public class GetOffsetShell {
return OffsetSpec.earliestLocal();
} else if (timestamp ==
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
return OffsetSpec.latestTiered();
+ } else if (timestamp ==
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) {
+ return OffsetSpec.earliestPendingUpload();
} else {
return OffsetSpec.forTimestamp(timestamp);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
index 53c1c4d79c9..db53695a7be 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
@@ -247,7 +247,7 @@ public class GetOffsetShellParsingTest {
@Test
public void testInvalidOffset() {
assertEquals("Malformed time argument foo. " +
- "Please use -1 or latest / -2 or earliest / -3 or
max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long
format timestamp",
+ "Please use -1 or latest / -2 or earliest / -3 or
max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or
earliest-pending-upload, or a specified long format timestamp",
assertThrows(TerseException.class, () ->
GetOffsetShell.parseOffsetSpec("foo")).getMessage());
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 9986daa7f3b..c1c7b27639a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -367,6 +367,30 @@ public class GetOffsetShellTest {
}
}
+ @ClusterTemplate("withRemoteStorage")
+ public void testGetOffsetsByEarliestTieredSpec() throws
InterruptedException {
+ setUp();
+ setUpRemoteLogTopics();
+
+ for (String time : new String[] {"-6", "earliest-pending-upload"}) {
+ // test topics disable remote log storage
+ // as remote log disabled, broker returns unknown offset of each
topic partition and these
+ // unknown offsets are ignore by GetOffsetShell, hence we have
empty result here.
+ assertEquals(List.of(),
+ executeAndParse("--topic-partitions", "topic\\d+:0", "--time",
time));
+
+ // test topics enable remote log storage
+ TestUtils.waitForCondition(() ->
+ List.of(
+ new Row("topicRLS1", 0, 0L),
+ new Row("topicRLS2", 0, 1L),
+ new Row("topicRLS3", 0, 2L),
+ new Row("topicRLS4", 0, 3L))
+ .equals(executeAndParse("--topic-partitions",
"topicRLS.*:0", "--time", time)),
+ "testGetOffsetsByEarliestTieredSpec result not match");
+ }
+ }
+
@ClusterTest
public void testGetOffsetsByTimestamp() {
setUp();