chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1590605162
##########
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##########
@@ -143,11 +143,12 @@ private void processClusterTest(ExtensionContext context,
ClusterTest annot, Clu
Type type = annot.clusterType() == Type.DEFAULT ?
defaults.clusterType() : annot.clusterType();
Map<String, String> serverProperties = new HashMap<>();
+ Map<Integer, Map<String, String>> perServerProperties = new
HashMap<>();
Review Comment:
Maybe we can rewrite them by lambda.
```java
Map<String, String> serverProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(annot.serverProperties()))
.filter(e -> e.id() == -1)
.collect(Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b));
Map<Integer, Map<String, String>> perServerProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(annot.serverProperties()))
.filter(e -> e.id() != -1)
.collect(Collectors.groupingBy(ClusterConfigProperty::id,
Collectors.mapping(Function.identity(),
Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b))));
```
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -105,17 +109,27 @@ public TestKitNodes build() {
List<Integer> controllerNodeIds =
IntStream.range(startControllerId(), startControllerId() + numControllerNodes)
.boxed()
.collect(Collectors.toList());
- List<Integer> brokerNodeIds = IntStream.range(startBrokerId(),
startBrokerId() + numBrokerNodes)
+ List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET,
BROKER_ID_OFFSET + numBrokerNodes)
.boxed()
.collect(Collectors.toList());
+ Set<Integer> unknownIds = perServerProperties.keySet().stream()
Review Comment:
We can convert the `Integer` to `String` here. The following error message
can use `String.join` to simplify code.
##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -80,30 +84,55 @@ public void testClusterTemplate() {
@ClusterTests({
@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK,
serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
- @ClusterConfigProperty(key = "spam", value = "eggs")
+ @ClusterConfigProperty(key = "spam", value = "eggs"),
+ @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), //
this one will be ignored as there is no broker id is 86400
}),
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT,
serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
- @ClusterConfigProperty(key = "default.key", value =
"overwrite.value")
+ @ClusterConfigProperty(key = "default.key", value =
"overwrite.value"),
+ @ClusterConfigProperty(id = 0, key = "queued.max.requests", value
= "200"),
+ @ClusterConfigProperty(id = 3000, key = "queued.max.requests",
value = "300")
}),
@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT,
serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
- @ClusterConfigProperty(key = "default.key", value =
"overwrite.value")
+ @ClusterConfigProperty(key = "default.key", value =
"overwrite.value"),
+ @ClusterConfigProperty(id = 0, key = "queued.max.requests", value
= "200")
})
})
- public void testClusterTests() {
- if
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
+ public void testClusterTests() throws ExecutionException,
InterruptedException {
+ if (!clusterInstance.isKRaftTest()) {
Assertions.assertEquals("bar",
clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggs",
clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("default.value",
clusterInstance.config().serverProperties().get("default.key"));
- } else if
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {
+
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+ Assertions.assertEquals(1, configs.size());
+ Assertions.assertEquals("100",
configs.get(configResource).get("queued.max.requests").value());
Review Comment:
Could you please add comments for those assert?
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -140,15 +154,11 @@ public TestKitNodes build() {
brokerNodes);
}
- private int startBrokerId() {
- return 0;
- }
-
private int startControllerId() {
Review Comment:
Could we inline this function? For example: `int controllerId = combined ?
BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET;`
##########
core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java:
##########
@@ -27,6 +27,27 @@
@Target({ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ClusterConfigProperty {
+ /**
+ * The config applies to the controller/broker with specified id. Default
is -1, indicating the property applied to
+ * all controller/broker servers. Note that the "controller" here refers
to the KRaft quorum controller.
+ * The id can vary depending on the different {@link
kafka.test.annotation.Type}.
+ * <ul>
+ * <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts
from
+ * {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by
1
+ * with each additional broker, and there is no controller server under
this mode. </li>
+ * <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id
starts from
+ * {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller
id
+ * starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET
3000}
+ * and increases by 1 with each addition broker/controller.</li>
+ * <li> Under {@link kafka.test.annotation.Type#CO_KRAFT}, the broker id
and controller id both start from
+ * {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}
+ * and increases by 1 with each additional broker/controller.</li>
+ * </ul>
+ *
+ * If the id doesn't correspond to any broker/controller server, throw
RuntimeException
Review Comment:
It seems to me `IllegalArgumentException` is more suitable since that is
caused by "illegal argument"
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -105,17 +109,27 @@ public TestKitNodes build() {
List<Integer> controllerNodeIds =
IntStream.range(startControllerId(), startControllerId() + numControllerNodes)
.boxed()
.collect(Collectors.toList());
- List<Integer> brokerNodeIds = IntStream.range(startBrokerId(),
startBrokerId() + numBrokerNodes)
+ List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET,
BROKER_ID_OFFSET + numBrokerNodes)
.boxed()
.collect(Collectors.toList());
+ Set<Integer> unknownIds = perServerProperties.keySet().stream()
+ .filter(id -> !controllerNodeIds.contains(id))
+ .filter(id -> !brokerNodeIds.contains(id))
+ .collect(Collectors.toSet());
+ if (!unknownIds.isEmpty()) {
+ throw new RuntimeException(String.format("Unknown server id %s
in perServerProperties",
Review Comment:
Could you please add "existent id" to the error message?
##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -80,30 +84,55 @@ public void testClusterTemplate() {
@ClusterTests({
@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK,
serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
- @ClusterConfigProperty(key = "spam", value = "eggs")
+ @ClusterConfigProperty(key = "spam", value = "eggs"),
+ @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), //
this one will be ignored as there is no broker id is 86400
}),
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT,
serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
- @ClusterConfigProperty(key = "default.key", value =
"overwrite.value")
+ @ClusterConfigProperty(key = "default.key", value =
"overwrite.value"),
+ @ClusterConfigProperty(id = 0, key = "queued.max.requests", value
= "200"),
+ @ClusterConfigProperty(id = 3000, key = "queued.max.requests",
value = "300")
}),
@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT,
serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
- @ClusterConfigProperty(key = "default.key", value =
"overwrite.value")
+ @ClusterConfigProperty(key = "default.key", value =
"overwrite.value"),
+ @ClusterConfigProperty(id = 0, key = "queued.max.requests", value
= "200")
})
})
- public void testClusterTests() {
- if
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
+ public void testClusterTests() throws ExecutionException,
InterruptedException {
+ if (!clusterInstance.isKRaftTest()) {
Assertions.assertEquals("bar",
clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggs",
clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("default.value",
clusterInstance.config().serverProperties().get("default.key"));
- } else if
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {
+
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+ Assertions.assertEquals(1, configs.size());
+ Assertions.assertEquals("100",
configs.get(configResource).get("queued.max.requests").value());
+ }
+ } else {
Assertions.assertEquals("baz",
clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggz",
clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("overwrite.value",
clusterInstance.config().serverProperties().get("default.key"));
- } else {
- Assertions.fail("Unknown cluster type " +
clusterInstance.clusterType());
+
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+ Assertions.assertEquals(1, configs.size());
+ Assertions.assertEquals("200",
configs.get(configResource).get("queued.max.requests").value());
+ }
+ if (clusterInstance.config().clusterType().equals(Type.KRAFT)) {
Review Comment:
`clusterInstance.config().clusterType() == Type.KRAFT`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]