chia7712 commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702865629


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -70,31 +85,80 @@ private String getTopicName(int i) {
         return "topic" + i;
     }
 
+    private String getRemoteLogStorageEnabledTopicName(int i) {
+        return "topicRLS" + i;
+    }
+
     private void setUp() {
+        setupTopics(this::getTopicName, Collections.emptyMap());
+        sendProducerRecords(this::getTopicName);
+    }
+
+    private void setUpRemoteLogTopics() {
+        // In this method, we'll create 4 topics and produce records to the 
log like this:
+        // topicRLS1 -> 1 segment
+        // topicRLS2 -> 2 segments (1 local log segment + 1 segment in the 
remote storage)
+        // topicRLS3 -> 3 segments (1 local log segment + 2 segments in the 
remote storage)
+        // topicRLS4 -> 4 segments (1 local log segment + 3 segments in the 
remote storage)
+        Map<String, String> rlsConfigs = new HashMap<>();
+        rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+        rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+        rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+        setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+        sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+    }
+
+    private void setupTopics(Function<Integer, String> topicName, Map<String, 
String> configs) {
         try (Admin admin = cluster.createAdminClient()) {
             List<NewTopic> topics = new ArrayList<>();
 
-            IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+            IntStream.range(0, topicCount + 1).forEach(i ->
+                    topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
             admin.createTopics(topics);
         }
+    }
 
+    private void sendProducerRecords(Function<Integer, String> topicName) {
         Properties props = new Properties();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
         try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(props)) {
             IntStream.range(0, topicCount + 1)
-                .forEach(i -> IntStream.range(0, i * i)
-                        .forEach(msgCount -> {
-                            assertDoesNotThrow(() -> producer.send(
-                                    new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-                        })
-                );
+                    .forEach(i -> IntStream.range(0, i * i)
+                            .forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+                                    new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get())));
         }
     }
 
+    private static List<ClusterConfig> withRemoteStorage() {
+        Map<String, String> serverProperties = new HashMap<>();
+        
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+        
serverProperties.put(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP,
 "1");
+        serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+        
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");
+
+        return Arrays.asList(

Review Comment:
   We can set listener name for all types even though `setListenerName` is 
no-op for kraft/co-kraft mode. That can simplify the code and it is still valid.
   ```java
           return Collections.singletonList(
                   ClusterConfig.defaultBuilder()
                           .setTypes(new HashSet<>(Arrays.asList(ZK, KRAFT, 
CO_KRAFT)))
                           .setServerProperties(serverProperties)
                           .setListenerName("EXTERNAL")
                           .build());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##########
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
     }
 
     /**
-     * Used to retrieve the offset with the local log start offset,
-     * log start offset is the offset of a log above which reads are 
guaranteed to be served
-     * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
-     * as the earliest timestamp
+     * Used to retrieve the local log start offset.
+     * Local log start offset is the offset of a log above which reads
+     * are guaranteed to be served from the disk of the leader broker.
+     * <br/>
+     * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
      */
     public static OffsetSpec earliestLocalSpec() {

Review Comment:
   This is unrelated to this PR. The other specs don't use `Spec` as postfix. 
Maybe we should align the naming before release?



##########
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##########
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
     }
 
     /**
-     * Used to retrieve the offset with the local log start offset,
-     * log start offset is the offset of a log above which reads are 
guaranteed to be served
-     * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
-     * as the earliest timestamp
+     * Used to retrieve the local log start offset.
+     * Local log start offset is the offset of a log above which reads
+     * are guaranteed to be served from the disk of the leader broker.
+     * <br/>
+     * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
      */
     public static OffsetSpec earliestLocalSpec() {
         return new EarliestLocalSpec();
     }
 
     /**
-     * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
-     * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)
+     * Used to retrieve the highest offset of data stored in remote storage.
+     * <br/>
+     * Note: When tiered storage is not enabled, we will return unknown offset.

Review Comment:
   Could you please add test for this scenario?



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -283,6 +283,10 @@ private OffsetSpec parseOffsetSpec(String 
listOffsetsTimestamp) throws TerseExce
                 return OffsetSpec.latest();
             case "max-timestamp":
                 return OffsetSpec.maxTimestamp();
+            case "earliest-local":

Review Comment:
   Could you please update docs?
   
https://github.com/apache/kafka/blob/2c9d7afe4c60656d10f5b15367e3fb65173d0372/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java#L135



##########
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##########
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
     }
 
     /**
-     * Used to retrieve the offset with the local log start offset,
-     * log start offset is the offset of a log above which reads are 
guaranteed to be served
-     * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
-     * as the earliest timestamp
+     * Used to retrieve the local log start offset.
+     * Local log start offset is the offset of a log above which reads
+     * are guaranteed to be served from the disk of the leader broker.
+     * <br/>
+     * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.

Review Comment:
   Could you please add test for it?



##########
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##########
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
     }
 
     /**
-     * Used to retrieve the offset with the local log start offset,
-     * log start offset is the offset of a log above which reads are 
guaranteed to be served
-     * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
-     * as the earliest timestamp
+     * Used to retrieve the local log start offset.
+     * Local log start offset is the offset of a log above which reads
+     * are guaranteed to be served from the disk of the leader broker.
+     * <br/>
+     * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
      */
     public static OffsetSpec earliestLocalSpec() {
         return new EarliestLocalSpec();
     }
 
     /**
-     * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
-     * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)
+     * Used to retrieve the highest offset of data stored in remote storage.
+     * <br/>
+     * Note: When tiered storage is not enabled, we will return unknown offset.
      */
     public static OffsetSpec latestTierSpec() {
         return new LatestTierSpec();

Review Comment:
   This is not related to this PR, but KIP-1005 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1005%3A+Expose+EarliestLocalOffset+and+TieredOffset)
 calls it `LatestTieredSpec` rather than `LatestTierSpec`
   
   ping @FrankYang0529 as you are the author of #16781



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to