chia7712 commented on code in PR #19776:
URL: https://github.com/apache/kafka/pull/19776#discussion_r2116362068
##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -205,11 +205,12 @@ class ProducerIntegrationTest {
.setProducerId(RecordBatch.NO_PRODUCER_ID)
.setTransactionalId(null)
.setTransactionTimeoutMs(10)
- val request = new InitProducerIdRequest.Builder(data).build()
+ val request = new InitProducerIdRequest.Builder(data).build()
Review Comment:
```scala
val request = new InitProducerIdRequest.Builder(data).build()
val port = broker.boundPort(listener)
response =
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port)
shouldRetry = response.data.errorCode ==
Errors.COORDINATOR_LOAD_IN_PROGRESS.code
```
##########
core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala:
##########
@@ -79,10 +80,13 @@ class AllocateProducerIdsRequestTest(cluster:
ClusterInstance) {
controllerSocketServer: SocketServer,
request: AllocateProducerIdsRequest
): AllocateProducerIdsResponse = {
+
+ val listenerName = cluster.controllerListenerName
+ val port = controllerSocketServer.boundPort(listenerName)
+
IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
Review Comment:
```scala
IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
request,
controllerSocketServer.boundPort(cluster.controllerListenerName)
)
```
##########
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##########
@@ -41,12 +42,18 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
} else {
cluster.brokerSocketServers().asScala.head
}
- IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request,
socket, listenerName)
+
+ val port = socket.boundPort(listenerName)
+
+ IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, port)
}
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest):
ApiVersionsResponse = {
val overrideHeader =
IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
- val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
+ val socketServer = cluster.brokerSocketServers().asScala.head
Review Comment:
with that helper, we can use `val socket =
IntegrationTestUtils.connect(cluster.boundPorts().asScala.head)` to streamline
code, right?
##########
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##########
@@ -41,12 +42,18 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
} else {
cluster.brokerSocketServers().asScala.head
}
- IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request,
socket, listenerName)
+
+ val port = socket.boundPort(listenerName)
Review Comment:
```scala
IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request,
socket.boundPort(listenerName))
```
##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -492,12 +492,16 @@ class KRaftClusterTest {
}
private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
- listenerName:
ListenerName): DescribeClusterResponse =
- connectAndReceive[DescribeClusterResponse](
- request = new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()).build(),
- destination = destination,
- listenerName = listenerName
- )
+ listenerName:
ListenerName): DescribeClusterResponse = {
+
+ val port = destination.boundPort(listenerName)
Review Comment:
```scala
connectAndReceive[DescribeClusterResponse](new
DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
destination.boundPort(listenerName))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]