mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1624370212
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
object StorageTool extends Logging {
+
+ /**
+ * Executes the command according to the given arguments and returns the
appropriate exit code.
+ * @param args The command line arguments
+ * @return The exit code
+ */
+ def runMain(args: Array[String]): Int = {
+ val namespace = parseArguments(args)
+ val command = namespace.getString("command")
+ val config = parseConfig(namespace.getString("config"))
+ command match {
+ case "info" =>
+ val directories = configToLogDirectories(config)
+ val selfManagedMode = configToSelfManagedMode(config)
+ infoCommand(System.out, selfManagedMode, directories)
+
+ case "format" =>
+ runFormatCommand(namespace, config)
+
+ case "random-uuid" =>
+ System.out.println(Uuid.randomUuid)
+ 0
+ case _ =>
+ throw new RuntimeException(s"Unknown command $command")
+ }
+ }
+
def main(args: Array[String]): Unit = {
+ var exitCode: Integer = 0
+ var message: Option[String] = None
try {
- val namespace = parseArguments(args)
- val command = namespace.getString("command")
- val config = Option(namespace.getString("config")).flatMap(
- p => Some(new KafkaConfig(Utils.loadProps(p))))
- command match {
- case "info" =>
- val directories = configToLogDirectories(config.get)
- val selfManagedMode = configToSelfManagedMode(config.get)
- Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
- case "format" =>
- val directories = configToLogDirectories(config.get)
- val clusterId = namespace.getString("cluster_id")
- val metaProperties = new MetaProperties.Builder().
- setVersion(MetaPropertiesVersion.V1).
- setClusterId(clusterId).
- setNodeId(config.get.nodeId).
- build()
- val metadataRecords : ArrayBuffer[ApiMessageAndVersion] =
ArrayBuffer()
- val specifiedFeatures: util.List[String] =
namespace.getList("feature")
- val releaseVersionFlagSpecified =
namespace.getString("release_version") != null
- if (releaseVersionFlagSpecified && specifiedFeatures != null) {
- throw new TerseFailure("Both --release-version and --feature were
set. Only one of the two flags can be set.")
- }
- val featureNamesAndLevelsMap =
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
- val metadataVersion = getMetadataVersion(namespace,
featureNamesAndLevelsMap,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
- validateMetadataVersion(metadataVersion, config)
- // Get all other features, validate, and create records for them
- // Use latest default for features if --release-version is not
specified
- generateFeatureRecords(
- metadataRecords,
- metadataVersion,
- featureNamesAndLevelsMap,
- Features.PRODUCTION_FEATURES.asScala.toList,
- config.get.unstableFeatureVersionsEnabled,
- releaseVersionFlagSpecified
- )
-
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
- if (!metadataVersion.isScramSupported) {
- throw new TerseFailure(s"SCRAM is only supported in
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
- }
- for (record <- userScramCredentialRecords) {
- metadataRecords.append(new ApiMessageAndVersion(record,
0.toShort))
- }
- })
-
- val bootstrapMetadata = buildBootstrapMetadata(metadataVersion,
Some(metadataRecords), "format command")
- val ignoreFormatted = namespace.getBoolean("ignore_formatted")
- if (!configToSelfManagedMode(config.get)) {
- throw new TerseFailure("The kafka configuration file appears to be
for " +
- "a legacy cluster. Formatting is only supported for clusters in
KRaft mode.")
- }
- Exit.exit(formatCommand(System.out, directories, metaProperties,
bootstrapMetadata,
- metadataVersion,ignoreFormatted))
+ exitCode = runMain(args)
+ } catch {
+ case e: TerseFailure =>
+ exitCode = 1
+ message = Some(e.getMessage)
+ }
+ message.foreach(System.err.println)
+ Exit.exit(exitCode, message)
+ }
- case "random-uuid" =>
- System.out.println(Uuid.randomUuid)
- Exit.exit(0)
+ private def parseConfig(configFilename: String): KafkaConfig = {
+ try {
+ new KafkaConfig(Utils.loadProps(configFilename))
+ } catch {
+ case e: IllegalArgumentException => throw new TerseFailure(s"Invalid
configuration: ${e.getMessage}")
Review Comment:
Wrapping `IllegalArgumentException` slightly changes the behavior in case of
bad configurations. For example if I set `log.dirs=`, on trunk I get:
```
Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: At least one log directory must be defined via log.dirs or log.dir.
at scala.Predef$.require(Predef.scala:337)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2316)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2285)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1634)
at kafka.tools.StorageTool$.$anonfun$main$1(StorageTool.scala:52)
at scala.Option.flatMap(Option.scala:283)
at kafka.tools.StorageTool$.main(StorageTool.scala:52)
at kafka.tools.StorageTool.main(StorageTool.scala)
```
With this PR I get:
```
Invalid configuration: requirement failed: At least one log directory must
be defined via log.dirs or log.dir.
```
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
object StorageTool extends Logging {
+
+ /**
+ * Executes the command according to the given arguments and returns the
appropriate exit code.
+ * @param args The command line arguments
+ * @return The exit code
+ */
+ def runMain(args: Array[String]): Int = {
+ val namespace = parseArguments(args)
+ val command = namespace.getString("command")
+ val config = parseConfig(namespace.getString("config"))
Review Comment:
We can't run this when the command is `random-uuid`, as `KafkaConfig`
requires some configs to be set and we don't provide any:
```
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
Exception in thread "main" org.apache.kafka.common.config.ConfigException:
Missing required configuration `zookeeper.connect` which has no default value.
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1223)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1214)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:544)
at kafka.tools.StorageTool$.parseConfig(StorageTool.scala:91)
at kafka.tools.StorageTool$.runMain(StorageTool.scala:57)
at kafka.tools.StorageTool$.main(StorageTool.scala:79)
at kafka.tools.StorageTool.main(StorageTool.scala)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]