soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1627741886
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
object StorageTool extends Logging {
- def main(args: Array[String]): Unit = {
- try {
- val namespace = parseArguments(args)
- val command = namespace.getString("command")
- val config = Option(namespace.getString("config")).flatMap(
- p => Some(new KafkaConfig(Utils.loadProps(p))))
- command match {
- case "info" =>
- val directories = configToLogDirectories(config.get)
- val selfManagedMode = configToSelfManagedMode(config.get)
- Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
- case "format" =>
- val directories = configToLogDirectories(config.get)
- val clusterId = namespace.getString("cluster_id")
- val metaProperties = new MetaProperties.Builder().
- setVersion(MetaPropertiesVersion.V1).
- setClusterId(clusterId).
- setNodeId(config.get.nodeId).
- build()
- val metadataRecords : ArrayBuffer[ApiMessageAndVersion] =
ArrayBuffer()
- val specifiedFeatures: util.List[String] =
namespace.getList("feature")
- val releaseVersionFlagSpecified =
namespace.getString("release_version") != null
- if (releaseVersionFlagSpecified && specifiedFeatures != null) {
- throw new TerseFailure("Both --release-version and --feature were
set. Only one of the two flags can be set.")
- }
- val featureNamesAndLevelsMap =
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
- val metadataVersion = getMetadataVersion(namespace,
featureNamesAndLevelsMap,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
- validateMetadataVersion(metadataVersion, config)
- // Get all other features, validate, and create records for them
- // Use latest default for features if --release-version is not
specified
- generateFeatureRecords(
- metadataRecords,
- metadataVersion,
- featureNamesAndLevelsMap,
- Features.PRODUCTION_FEATURES.asScala.toList,
- config.get.unstableFeatureVersionsEnabled,
- releaseVersionFlagSpecified
- )
-
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
- if (!metadataVersion.isScramSupported) {
- throw new TerseFailure(s"SCRAM is only supported in
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
- }
- for (record <- userScramCredentialRecords) {
- metadataRecords.append(new ApiMessageAndVersion(record,
0.toShort))
- }
- })
-
- val bootstrapMetadata = buildBootstrapMetadata(metadataVersion,
Some(metadataRecords), "format command")
- val ignoreFormatted = namespace.getBoolean("ignore_formatted")
- if (!configToSelfManagedMode(config.get)) {
- throw new TerseFailure("The kafka configuration file appears to be
for " +
- "a legacy cluster. Formatting is only supported for clusters in
KRaft mode.")
- }
- Exit.exit(formatCommand(System.out, directories, metaProperties,
bootstrapMetadata,
- metadataVersion,ignoreFormatted))
- case "random-uuid" =>
- System.out.println(Uuid.randomUuid)
- Exit.exit(0)
+ /**
+ * Executes the command according to the given arguments and returns the
appropriate exit code.
+ * @param args The command line arguments
+ * @return The exit code
+ */
+ def runMain(args: Array[String]): Int = {
Review Comment:
I hadn't noticed this yet. Thanks for pointing this out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]