Rancho-7 commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2280908402
##########
tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java:
##########
@@ -29,50 +33,248 @@
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class EndToEndLatencyTest {
+ private static final byte[] RECORD_VALUE =
"record-sent".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] RECORD_VALUE_DIFFERENT =
"record-received".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] RECORD_KEY =
"key-sent".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] RECORD_KEY_DIFFERENT =
"key-received".getBytes(StandardCharsets.UTF_8);
+ private static final String HEADER_KEY = "header-key-sent";
+ private static final String HEADER_KEY_DIFFERENT = "header-key-received";
+ private static final byte[] HEADER_VALUE =
"header-value-sent".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] HEADER_VALUE_DIFFERENT =
"header-value-received".getBytes(StandardCharsets.UTF_8);
+
+ // legacy format test arguments
+ private static final String[] LEGACY_INVALID_ARGS_UNEXPECTED = {
+ "localhost:9092", "test", "10000", "1", "200", "propsfile.properties",
"random"
+ };
+
+ private static class ArgsBuilder {
+ private final Map<String, String> params = new LinkedHashMap<>();
+
+ private ArgsBuilder() {
+ params.put("--bootstrap-server", "localhost:9092");
+ params.put("--topic", "test-topic");
+ params.put("--num-records", "100");
+ params.put("--producer-acks", "1");
+ params.put("--record-size", "200");
+ }
+
+ public static ArgsBuilder defaults() {
+ return new ArgsBuilder();
+ }
+
+ public ArgsBuilder with(String param, String value) {
+ params.put(param, value);
+ return this;
+ }
+
+ public String[] build() {
+ return params.entrySet().stream()
+ .flatMap(entry -> Stream.of(entry.getKey(),
entry.getValue()))
+ .toArray(String[]::new);
+ }
+
+ public ArgsBuilder withNegative(String param) {
+ return with(param, "-1");
+ }
+
+ public ArgsBuilder withZero(String param) {
+ return with(param, "0");
+ }
+ }
+
@Mock
KafkaConsumer<byte[], byte[]> consumer;
@Mock
ConsumerRecords<byte[], byte[]> records;
@Test
- public void shouldFailWhenSuppliedUnexpectedArgs() {
- String[] args = new String[] {"localhost:9092", "test", "10000", "1",
"200", "propsfile.properties", "random"};
- assertThrows(TerseException.class, () ->
EndToEndLatency.execute(args));
+ public void testInvalidArgs() {
+ testUnexpectedArgsWithLegacyFormat();
+ testInvalidProducerAcks();
+ testInvalidNumRecords();
+ testInvalidRecordSize();
+ testInvalidRecordKey();
+ testInvalidNumHeaders();
+ testInvalidRecordHeaderKey();
+ testInvalidRecordHeaderValue();
+ }
+
+ private void testUnexpectedArgsWithLegacyFormat() {
+ assertThrows(TerseException.class, () ->
EndToEndLatency.execute(LEGACY_INVALID_ARGS_UNEXPECTED));
+ }
+
+ private void testInvalidNumRecords() {
+ String expectedMsg = "Value for --num-records must be a positive
integer.";
+ assertInitializeInvalidOptionsExitCodeAndMsg(
+ ArgsBuilder.defaults().withNegative("--num-records").build(),
expectedMsg);
+ }
+
+ private void testInvalidRecordSize() {
+ String expectedMsg = "Value for --record-size must be a non-negative
integer.";
+ assertInitializeInvalidOptionsExitCodeAndMsg(
+ ArgsBuilder.defaults().withNegative("--record-size").build(),
expectedMsg);
+ }
+
+ private void testInvalidRecordKey() {
+ String expectedMsg = "Value for --record-key-size must be a
non-negative integer.";
+ assertInitializeInvalidOptionsExitCodeAndMsg(
+ ArgsBuilder.defaults().withNegative("--record-key-size").build(),
expectedMsg);
+ }
+
+ private void testInvalidNumHeaders() {
+ String expectedMsg = "Value for --num-headers must be a non-negative
integer.";
+ assertInitializeInvalidOptionsExitCodeAndMsg(
+ ArgsBuilder.defaults().withNegative("--num-headers").build(),
expectedMsg);
+ }
+
+ private void testInvalidRecordHeaderKey() {
+ String expectedMsg = "Value for --record-header-key-size must be a
non-negative integer.";
+ assertInitializeInvalidOptionsExitCodeAndMsg(
+
ArgsBuilder.defaults().withNegative("--record-header-key-size").build(),
expectedMsg);
+ }
+
+ private void testInvalidRecordHeaderValue() {
+ String expectedMsg = "Value for --record-header-size must be a
non-negative integer.";
+ assertInitializeInvalidOptionsExitCodeAndMsg(
+
ArgsBuilder.defaults().withNegative("--record-header-size").build(),
expectedMsg);
+ }
+
+ private void testInvalidProducerAcks() {
+ String expectedMsg = "Invalid value for --producer-acks. Latency
testing requires synchronous acknowledgement. Please use '1' or 'all'.";
+ assertInitializeInvalidOptionsExitCodeAndMsg(
+ ArgsBuilder.defaults().withZero("--producer-acks").build(),
expectedMsg);
+ }
+
+ private void assertInitializeInvalidOptionsExitCodeAndMsg(String[] args,
String expectedMsg) {
+ Exit.setExitProcedure((exitCode, message) -> {
+ assertEquals(1, exitCode);
+ assertTrue(message.contains(expectedMsg));
+ throw new RuntimeException();
+ });
+ try {
+ assertThrows(RuntimeException.class, () ->
EndToEndLatency.execute(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
}
@Test
- public void shouldFailWhenProducerAcksAreNotSynchronised() {
- String[] args = new String[] {"localhost:9092", "test", "10000", "0",
"200"};
- assertThrows(IllegalArgumentException.class, () ->
EndToEndLatency.execute(args));
+ public void testConvertLegacyArgs() throws Exception {
Review Comment:
@m1a2st Thanks for catching this, updated it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]