frankvicky commented on code in PR #20002:
URL: https://github.com/apache/kafka/pull/20002#discussion_r2160315482
##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -182,9 +182,9 @@ class ProducerIntegrationTest {
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates
two blocks
- val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker
=> {
+ val ids = clusterInstance.brokers().values().stream().flatMap( broker => {
IntStream.range(0, 1001).parallel().mapToObj( _ =>
Review Comment:
ditto
```suggestion
IntStream.range(0, 1001).parallel().mapToObj(_ =>
```
##########
core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala:
##########
@@ -217,7 +217,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
else
InetAddress.getByName(entityName)
var currentServerQuota = 0
- currentServerQuota =
cluster.brokerSocketServers().asScala.head.connectionQuotas.connectionRateForIp(entityIp)
+ currentServerQuota =
cluster.brokers().values().asScala.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)
Review Comment:
hmm
It seems that we could have an abstract method at `ClusterInstance`?
In that way, we could avoid using `socketServer` directly.
For example:
```java
@Override
public int getConnectionQuota(InetAddress address) {
SocketServer socketServer = brokers().values().stream()
.map(KafkaBroker::socketServer)
.findFirst()
.orElseThrow();
return socketServer.connectionQuotas().get(address);
}
```
##########
test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java:
##########
@@ -189,9 +191,9 @@ public void testClusterTestWithDisksPerBroker() throws
ExecutionException, Inter
@ClusterTest(autoStart = AutoStart.NO)
public void testNoAutoStart() {
- Assertions.assertThrows(RuntimeException.class,
clusterInstance::anyBrokerSocketServer);
+ Assertions.assertThrows(RuntimeException.class, () ->
clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst());
clusterInstance.start();
- assertNotNull(clusterInstance.anyBrokerSocketServer());
+
assertTrue(clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst().isPresent());
Review Comment:
Same as the above comment, we could change `anyBrokerSocketServer` to return
a boolean.
```java
public boolean anyBrokerSocketServer() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.findAny()
.isPresent();
}
```
##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -182,9 +182,9 @@ class ProducerIntegrationTest {
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates
two blocks
- val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker
=> {
+ val ids = clusterInstance.brokers().values().stream().flatMap( broker => {
Review Comment:
nit: redundant space
```suggestion
val ids = clusterInstance.brokers().values().stream().flatMap(broker => {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]