chia7712 commented on code in PR #20002:
URL: https://github.com/apache/kafka/pull/20002#discussion_r2160018390
##########
tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java:
##########
@@ -506,7 +507,9 @@ public void
testUpdateBrokerConfigNotAffectedByInvalidConfig() {
"--entity-type", "brokers",
"--entity-default"))));
kafka.utils.TestUtils.waitUntilTrue(
- () ->
cluster.brokerSocketServers().stream().allMatch(broker ->
broker.config().getInt("log.cleaner.threads") == 2),
+ () -> cluster.brokers().values().stream()
Review Comment:
```java
() -> cluster.brokers().values().stream()
.map(KafkaBroker::config)
.allMatch(config ->
config.getInt("log.cleaner.threads") == 2),
```
##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -182,9 +182,9 @@ class ProducerIntegrationTest {
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates
two blocks
- val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker
=> {
+ val ids = clusterInstance.brokers().values().stream().flatMap( broker => {
IntStream.range(0, 1001).parallel().mapToObj( _ =>
- nextProducerId(broker, clusterInstance.clientListener())
+ nextProducerId(broker.socketServer, clusterInstance.clientListener())
Review Comment:
Could you pleas change the arguments of `nextProducerId` to "port"? that can
simplify the method and then we don't need to use `SocketServer`
##########
tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java:
##########
@@ -444,7 +445,7 @@ public void
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
@ClusterTest
public void testUpdateInvalidBrokerConfigs() {
updateAndCheckInvalidBrokerConfig(Optional.empty());
-
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
+ ""));
+
updateAndCheckInvalidBrokerConfig(cluster.brokers().values().stream().map(broker
-> broker.socketServer().config().brokerId() + "").findFirst());
Review Comment:
```java
updateAndCheckInvalidBrokerConfig(Optional.of(String.valueOf((cluster.brokers().entrySet().iterator().next().getKey()))));
```
##########
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##########
@@ -38,9 +38,9 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName:
ListenerName): ApiVersionsResponse = {
Review Comment:
Could you please consider adding `controllerBoundPorts` to `ClusterInstance`?
```java
default List<Integer> controllerBoundPorts() {
return controllers().values().stream()
.map(ControllerServer::socketServer)
.map(s -> s.boundPort(controllerListenerName()))
.toList();
}
```
then `sendApiVersionsRequest` could be removed as the users could
straightforward use `IntegrationTestUtils.connectAndReceive`.
```java
val apiVersionsResponse =
IntegrationTestUtils.connectAndReceive(apiVersionsRequest,
cluster.controllerBoundPorts().get(0))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]