jsancio commented on code in PR #16094:
URL: https://github.com/apache/kafka/pull/16094#discussion_r1616334909


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
       stream.println("All of the log directories are already formatted.")
     } else {
       metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+        System.out.println("logDir : "+ logDir)
         copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
           setDirectoryId(copier.generateValidDirectoryId()).
           build())
         copier.setPreWriteHandler((logDir, _, _) => {
           stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
           Files.createDirectories(Paths.get(logDir))
           val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+          System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
           bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
         })
         copier.setWriteErrorHandler((logDir, e) => {
           throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
         })
         copier.writeLogDirChanges()
+        // Write new file checkpoint file if standalone mode
+        if(standaloneMode){
+          writeCheckpointFile(logDir)
+        }
       })
     }
     0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+    val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+    val builder = new RecordsSnapshotWriter.Builder()
+      .setKraftVersion(1)
+      .setVoterSet(Optional.empty())

Review Comment:
   If you create and set the `VoterSet` to contain this one replica, The 
builder will write the necessary `KRaftVersionRecord` and `VotersRecord`. To 
create a voter set you need the default listener name (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L188),
 the replica id (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L222),
 the directory id (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/SharedServer.scala#L260)
 and the advertise hostname and port (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L1237-L1243).
   
   Looking at this code 
(https://github.com/apache/kafka/commit/e8d32563f323a245b3127394d97d6356e84f0294#diff-cbe6a8b71b05ed22cf09d97591225b588e9fca6caaf95d3b34a43262cfd23aa6R2190-R2193),
 we are going to have to relax this to allowed advertise listeners as long as 
`controller.quorum.voters` is not set.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
       stream.println("All of the log directories are already formatted.")
     } else {
       metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+        System.out.println("logDir : "+ logDir)
         copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
           setDirectoryId(copier.generateValidDirectoryId()).
           build())
         copier.setPreWriteHandler((logDir, _, _) => {
           stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
           Files.createDirectories(Paths.get(logDir))
           val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+          System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
           bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
         })
         copier.setWriteErrorHandler((logDir, e) => {
           throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
         })
         copier.writeLogDirChanges()
+        // Write new file checkpoint file if standalone mode
+        if(standaloneMode){
+          writeCheckpointFile(logDir)
+        }
       })
     }
     0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+    val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+    val builder = new RecordsSnapshotWriter.Builder()
+      .setKraftVersion(1)
+      .setVoterSet(Optional.empty())
+      .setRawSnapshotWriter(rawSnapshotWriter).build(new MetadataRecordSerde)
+
+    // Append control records: KRaftVersionRecord and VotersRecord
+    appendControlRecords(builder)

Review Comment:
   This actually write those records as "metadata" records and not as 
controller records.
   
   You don't need to call this method as the 
`RecordsSnapshotWriter.Builder.build(RecordSerde)` will write the necessary 
control record if you configure the builder correctly by calling 
`setKraftVersion` and `setVoterSet`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to