brandboat commented on code in PR #16646:
URL: https://github.com/apache/kafka/pull/16646#discussion_r1697116056


##########
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##########
@@ -494,4 +473,60 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
         }
     }
 
+    static final class ConfigPostProcessor {
+        final String topicName;
+        final long numRecords;
+        final Integer recordSize;
+        final double throughput;
+        final boolean payloadMonotonic;
+        final Properties producerProps;
+        final boolean shouldPrintMetrics;
+        final Long transactionDurationMs;
+        final boolean transactionsEnabled;
+        final List<byte[]> payloadByteList;
+
+        public ConfigPostProcessor(ArgumentParser parser, String[] args) 
throws IOException, ArgumentParserException {
+            Namespace namespace = parser.parseArgs(args);
+            this.topicName = namespace.getString("topic");
+            this.numRecords = namespace.getLong("numRecords");
+            this.recordSize = namespace.getInt("recordSize");
+            this.throughput = namespace.getDouble("throughput");
+            this.payloadMonotonic = namespace.getBoolean("payloadMonotonic");
+            this.shouldPrintMetrics = namespace.getBoolean("printMetrics");
+
+            List<String> producerConfigs = namespace.getList("producerConfig");
+            String producerConfigFile = 
namespace.getString("producerConfigFile");
+            String payloadFilePath = namespace.getString("payloadFile");
+            Long transactionDurationMsArg = 
namespace.getLong("transactionDurationMs");
+            String transactionIdArg = namespace.getString("transactionalId");
+            if (producerConfigs == null && producerConfigFile == null) {
+                throw new ArgumentParserException("Either --producer-props or 
--producer.config must be specified.", parser);
+            }
+            if (transactionDurationMsArg != null && transactionDurationMsArg 
<= 0) {
+                throw new ArgumentParserException("--transaction-duration-ms 
should > 0", parser);
+            }
+
+            // since default value gets printed with the help text, we are 
escaping \n there and replacing it with correct value here.
+            String payloadDelimiter = 
namespace.getString("payloadDelimiter").equals("\\n")
+                    ? "\n" : namespace.getString("payloadDelimiter");
+            this.payloadByteList = readPayloadFile(payloadFilePath, 
payloadDelimiter);
+            this.producerProps = readProps(producerConfigs, 
producerConfigFile);
+            // setup transaction related configs
+            this.transactionsEnabled = transactionDurationMsArg != null
+                    || transactionIdArg != null
+                    || 
producerProps.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+            if (transactionsEnabled) {
+                Optional<String> txIdInProps =
+                        
Optional.ofNullable(producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
+                                .map(Object::toString);
+                String transactionId = 
Optional.ofNullable(transactionIdArg).orElse(txIdInProps.orElse(DEFAULT_TRANSACTION_ID));

Review Comment:
   Use unique transaction id by default make sense to me, I didn't change this 
since this is the original implementation see 
https://issues.apache.org/jira/browse/KAFKA-5491. And I didn't find the reason 
we use same transaction id in the beginning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to