showuon commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702701568
##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
return "topic" + i;
}
+ private String getRemoteLogStorageEnabledTopicName(int i) {
+ return "topicRLS" + i;
+ }
+
private void setUp() {
+ setupTopics(this::getTopicName, Collections.emptyMap());
+ sendProducerRecords(this::getTopicName);
+ }
+
+ private void setUpRemoteLogTopics() {
Review Comment:
I'll add some comment for this method. Ex:
```
In this method, we'll create 4 topics and produce records to the log like
this:
topicRLS1 -> 1 segment
topicRLS2 -> 2 segments (1 local log segment + 1 segment in the remote
storage)
topicRLS3 -> 3 segments (1 local log segment + 2 segments in the remote
storage)
topicRLS4 -> 4 segments (1 local log segment + 3 segments in the remote
storage)
```
##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
return "topic" + i;
}
+ private String getRemoteLogStorageEnabledTopicName(int i) {
+ return "topicRLS" + i;
+ }
+
private void setUp() {
+ setupTopics(this::getTopicName, Collections.emptyMap());
+ sendProducerRecords(this::getTopicName);
+ }
+
+ private void setUpRemoteLogTopics() {
+ Map<String, String> rlsConfigs = new HashMap<>();
+ rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+ rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+ rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+ setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+ sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+ }
+
+ private void setupTopics(Function<Integer, String> topicName, Map<String,
String> configs) {
try (Admin admin = cluster.createAdminClient()) {
List<NewTopic> topics = new ArrayList<>();
- IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new
NewTopic(getTopicName(i), i, (short) 1)));
+ IntStream.range(0, topicCount + 1).forEach(i ->
+ topics.add(new NewTopic(topicName.apply(i), i, (short)
1).configs(configs)));
admin.createTopics(topics);
}
+ }
+ private void sendProducerRecords(Function<Integer, String> topicName) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
try (KafkaProducer<String, String> producer = new
KafkaProducer<>(props)) {
IntStream.range(0, topicCount + 1)
- .forEach(i -> IntStream.range(0, i * i)
- .forEach(msgCount -> {
- assertDoesNotThrow(() -> producer.send(
- new ProducerRecord<>(getTopicName(i),
msgCount % i, null, "val" + msgCount)).get());
- })
- );
+ .forEach(i -> IntStream.range(0, i * i)
+ .forEach(msgCount -> assertDoesNotThrow(() ->
producer.send(
+ new ProducerRecord<>(topicName.apply(i),
msgCount % i, null, "val" + msgCount)).get())));
}
}
+ private static List<ClusterConfig> withRemoteStorage() {
+ Map<String, String> serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
"1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
"1000");
+ serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG,
"1000");
+
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
+ Map<String, String> zkProperties = new HashMap<>(serverProperties);
+
zkProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
"PLAINTEXT");
+
+ Map<String, String> raftProperties = new HashMap<>(serverProperties);
+
raftProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
"EXTERNAL");
Review Comment:
Could we add a comment here to explain why we need 2 different settings?
##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -274,6 +333,45 @@ public void testGetOffsetsByMaxTimestamp() {
}
}
+ @ClusterTemplate("withRemoteStorage")
+ public void testGetOffsetsByEarliestLocalSpec() throws
InterruptedException {
+ setUpRemoteLogTopics();
+
+ for (String time : new String[] {"-4", "earliest-local"}) {
+ TestUtils.waitForCondition(() ->
+ Arrays.asList(
+ new Row("topicRLS1", 0, 0L),
+ new Row("topicRLS2", 0, 1L),
+ new Row("topicRLS3", 0, 2L),
+ new Row("topicRLS4", 0, 3L))
+ .equals(executeAndParse("--topic-partitions",
"topicRLS.*:0", "--time", time)),
+ "testGetOffsetsByEarliestLocalSpec result not match");
+ }
+ }
+
+ @ClusterTemplate("withRemoteStorage")
+ public void testGetOffsetsByLatestTieredSpec() throws InterruptedException
{
+ setUp();
+ setUpRemoteLogTopics();
+
+ for (String time : new String[] {"-5", "latest-tiered"}) {
+ // test topics disable remote log storage
+ // as remote log not enabled, broker return unknown offset for
each topic partition and these
+ // unknown offsets are ignored by GetOffsetShell hence we have
empty result here.
+ assertEquals(Collections.emptyList(),
+ executeAndParse("--topic-partitions", "topic\\d+:0",
"--time", time));
+
+ // test topics enable remote log storage
+ TestUtils.waitForCondition(() ->
+ Arrays.asList(
+ new Row("topicRLS2", 0, 0L),
+ new Row("topicRLS3", 0, 1L),
+ new Row("topicRLS4", 0, 2L))
Review Comment:
I'll add 1 more line for the comment to explain why `topicRLS1` has no
result:
`topicRLS1 has no result because there's no log segments being uploaded to
the remote storage`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]