apoorvmittal10 commented on code in PR #19640:
URL: https://github.com/apache/kafka/pull/19640#discussion_r2074025568
##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
assertEquals(expectedResponseData.results.asScala.toSet,
deleteGroupsResponse.data.results.asScala.toSet)
}
+ protected def createSocket(): Socket = {
+ IntegrationTestUtils.connect(
+ cluster.anyBrokerSocketServer(),
+ cluster.clientListener()
+ )
+ }
+
+ protected def createSocket(destination: Int): Socket = {
Review Comment:
Shouldn't it be `connect`?
##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -59,11 +59,15 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1))
)
+ val socket: Socket = createSocket()
+
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
sendAndReceiveFromExistingSocket[ShareFetchResponse](shareFetchRequest, socket)
assertEquals(Errors.UNSUPPORTED_VERSION.code,
shareFetchResponse.data.errorCode)
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
+
+ socket.close()
Review Comment:
Is there a way we can have a check in `tearDown` that all open sockets are
closed from the tests?
##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
assertEquals(expectedResponseData.results.asScala.toSet,
deleteGroupsResponse.data.results.asScala.toSet)
}
+ protected def createSocket(): Socket = {
Review Comment:
Shouldn't it be `connectAny`?
##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -934,20 +937,10 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
)
}
- protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+ protected def sendAndReceiveFromExistingSocket[T <: AbstractResponse](
request: AbstractRequest,
- destination: Int
- )(implicit classTag: ClassTag[T]): T = {
- val socket = IntegrationTestUtils.connect(brokerSocketServer(destination),
cluster.clientListener())
- openSockets += socket
- IntegrationTestUtils.sendAndReceive[T](request, socket)
- }
-
- protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
- request: AbstractRequest
+ socket: Socket
)(implicit classTag: ClassTag[T]): T = {
- val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(),
cluster.clientListener())
- openSockets += socket
IntegrationTestUtils.sendAndReceive[T](request, socket)
}
Review Comment:
Why do you need this method and can't use
`IntegrationTestUtils.sendAndReceive[T](request, socket)` directly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]