showuon commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702701568


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
         return "topic" + i;
     }
 
+    private String getRemoteLogStorageEnabledTopicName(int i) {
+        return "topicRLS" + i;
+    }
+
     private void setUp() {
+        setupTopics(this::getTopicName, Collections.emptyMap());
+        sendProducerRecords(this::getTopicName);
+    }
+
+    private void setUpRemoteLogTopics() {

Review Comment:
   I'll add some comment for this method. Ex:
   ```
   In this method, we'll create 4 topics and produce records to the log like 
this:
   topicRLS1 -> 1 segment
   topicRLS2 -> 2 segments (1 local log segment + 1 segment in the remote 
storage)
   topicRLS3 -> 3 segments (1 local log segment + 2 segments in the remote 
storage)
   topicRLS4 -> 4 segments (1 local log segment + 3 segments in the remote 
storage)
   ```



##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
         return "topic" + i;
     }
 
+    private String getRemoteLogStorageEnabledTopicName(int i) {
+        return "topicRLS" + i;
+    }
+
     private void setUp() {
+        setupTopics(this::getTopicName, Collections.emptyMap());
+        sendProducerRecords(this::getTopicName);
+    }
+
+    private void setUpRemoteLogTopics() {
+        Map<String, String> rlsConfigs = new HashMap<>();
+        rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+        rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+        rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+        setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+        sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+    }
+
+    private void setupTopics(Function<Integer, String> topicName, Map<String, 
String> configs) {
         try (Admin admin = cluster.createAdminClient()) {
             List<NewTopic> topics = new ArrayList<>();
 
-            IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+            IntStream.range(0, topicCount + 1).forEach(i ->
+                    topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
             admin.createTopics(topics);
         }
+    }
 
+    private void sendProducerRecords(Function<Integer, String> topicName) {
         Properties props = new Properties();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
         try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(props)) {
             IntStream.range(0, topicCount + 1)
-                .forEach(i -> IntStream.range(0, i * i)
-                        .forEach(msgCount -> {
-                            assertDoesNotThrow(() -> producer.send(
-                                    new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-                        })
-                );
+                    .forEach(i -> IntStream.range(0, i * i)
+                            .forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+                                    new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get())));
         }
     }
 
+    private static List<ClusterConfig> withRemoteStorage() {
+        Map<String, String> serverProperties = new HashMap<>();
+        
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+        serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+        
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
+        Map<String, String> zkProperties = new HashMap<>(serverProperties);
+        
zkProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "PLAINTEXT");
+
+        Map<String, String> raftProperties = new HashMap<>(serverProperties);
+        
raftProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");

Review Comment:
   Could we add a comment here to explain why we need 2 different settings?



##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -274,6 +333,45 @@ public void testGetOffsetsByMaxTimestamp() {
         }
     }
 
+    @ClusterTemplate("withRemoteStorage")
+    public void testGetOffsetsByEarliestLocalSpec() throws 
InterruptedException {
+        setUpRemoteLogTopics();
+
+        for (String time : new String[] {"-4", "earliest-local"}) {
+            TestUtils.waitForCondition(() ->
+                    Arrays.asList(
+                            new Row("topicRLS1", 0, 0L),
+                            new Row("topicRLS2", 0, 1L),
+                            new Row("topicRLS3", 0, 2L),
+                            new Row("topicRLS4", 0, 3L))
+                            .equals(executeAndParse("--topic-partitions", 
"topicRLS.*:0", "--time", time)),
+                    "testGetOffsetsByEarliestLocalSpec result not match");
+        }
+    }
+
+    @ClusterTemplate("withRemoteStorage")
+    public void testGetOffsetsByLatestTieredSpec() throws InterruptedException 
{
+        setUp();
+        setUpRemoteLogTopics();
+
+        for (String time : new String[] {"-5", "latest-tiered"}) {
+            // test topics disable remote log storage
+            // as remote log not enabled, broker return unknown offset for 
each topic partition and these
+            // unknown offsets are ignored by GetOffsetShell hence we have 
empty result here.
+            assertEquals(Collections.emptyList(),
+                    executeAndParse("--topic-partitions", "topic\\d+:0", 
"--time", time));
+
+            // test topics enable remote log storage
+            TestUtils.waitForCondition(() ->
+                    Arrays.asList(
+                            new Row("topicRLS2", 0, 0L),
+                            new Row("topicRLS3", 0, 1L),
+                            new Row("topicRLS4", 0, 2L))

Review Comment:
   I'll add 1 more line for the comment to explain why `topicRLS1` has no 
result:
   `topicRLS1 has no result because there's no log segments being uploaded to 
the remote storage`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to