OmniaGM commented on code in PR #16659:
URL: https://github.com/apache/kafka/pull/16659#discussion_r1688336616


##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();

Review Comment:
   BTW we have `Utils.mkMap` that can be used to define this in one statement 
instead of two 
   ```
   Utils.mkMap` that can be used to define like 
   
`Utils.mkMap(Utils.mkEntry(ListenerName.normalised("listener"),InetSocketAddress.createUnresolved("localhost",
 9092)));
   ```
   
   or use simple java 
   
   ```
   new HashMap<>() {{
   put(ListenerName.normalised("listener"), 
InetSocketAddress.createUnresolved("localhost", 9092));
   ```
   



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();

Review Comment:
   Maybe we can combine these two lines to `assertEquals(0, 
Endpoints.empty().size());` and drop size variable 



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    void testIsEmptyWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertFalse(endpoints.isEmpty());
+    }
+
+    @Test
+    void testIsEmptyWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        assertEquals(0, endpoints.size());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithSameEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Endpoints sameEndpoints = 
Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertEquals(endpoints, sameEndpoints);
+        assertEquals(endpoints.hashCode(), sameEndpoints.hashCode());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithDifferentEndpoints() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Map<ListenerName, InetSocketAddress> anotherEndpointMap = new 
HashMap<>();
+        anotherEndpointMap.put(
+                ListenerName.normalised("another"),
+                InetSocketAddress.createUnresolved("localhost", 9093)
+        );
+        Endpoints differentEndpoints = 
Endpoints.fromInetSocketAddresses(anotherEndpointMap);
+
+        assertNotEquals(endpoints, differentEndpoints);
+        assertNotEquals(endpoints.hashCode(), differentEndpoints.hashCode());
+    }
+
+    @Test
+    void testToString() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        ListenerName listener = ListenerName.normalised("listener");
+        ListenerName listener1 = ListenerName.normalised("listener1");
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 9092);
+        InetSocketAddress address1 = 
InetSocketAddress.createUnresolved("localhost", 9093);
+        endpointMap.put(listener, address);
+        endpointMap.put(listener1, address1);
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        String expectedString = String.format(
+                "Endpoints(endpoints={%s=%s, %s=%s})",
+                listener,
+                address,
+                listener1,
+                address1
+        );
+
+        assertEquals(expectedString, endpoints.toString());
+    }
+
+    @Test
+    void testToBeginQuorumEpochRequestWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        BeginQuorumEpochRequestData.LeaderEndpointCollection leaderEndpoints = 
endpoints.toBeginQuorumEpochRequest();
+
+        assertEquals(1, leaderEndpoints.size());
+
+        Iterator<BeginQuorumEpochRequestData.LeaderEndpoint> iterator = 
leaderEndpoints.iterator();
+        BeginQuorumEpochRequestData.LeaderEndpoint endpoint = iterator.next();
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testToBeginQuorumEpochRequestWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        BeginQuorumEpochRequestData.LeaderEndpointCollection leaderEndpoints = 
endpoints.toBeginQuorumEpochRequest();
+
+        assertEquals(0, leaderEndpoints.size());

Review Comment:
   `leaderEndpoints` in the assert can be replaced by its value 
`endpoints.toBeginQuorumEpochRequest()`



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();

Review Comment:
   Also `ListenerName.normalised("listener")` and  
`InetSocketAddress.createUnresolved("localhost", 9092)` seem to be duplicated 
in every test maybe we can define them on top of the test.



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    void testIsEmptyWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertFalse(endpoints.isEmpty());
+    }
+
+    @Test
+    void testIsEmptyWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        assertEquals(0, endpoints.size());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithSameEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Endpoints sameEndpoints = 
Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertEquals(endpoints, sameEndpoints);
+        assertEquals(endpoints.hashCode(), sameEndpoints.hashCode());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithDifferentEndpoints() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Map<ListenerName, InetSocketAddress> anotherEndpointMap = new 
HashMap<>();
+        anotherEndpointMap.put(
+                ListenerName.normalised("another"),
+                InetSocketAddress.createUnresolved("localhost", 9093)
+        );
+        Endpoints differentEndpoints = 
Endpoints.fromInetSocketAddresses(anotherEndpointMap);
+
+        assertNotEquals(endpoints, differentEndpoints);
+        assertNotEquals(endpoints.hashCode(), differentEndpoints.hashCode());
+    }
+
+    @Test
+    void testToString() {

Review Comment:
   What the value of testing `toString`? what public interfaces will `toString` 
impact if it change to any other string? 



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    void testIsEmptyWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertFalse(endpoints.isEmpty());

Review Comment:
   the assert and endpoints variable line can be merged into one line 
`assertFalse(Endpoints.fromInetSocketAddresses(endpointMap).isEmpty());`



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    void testIsEmptyWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertFalse(endpoints.isEmpty());
+    }
+
+    @Test
+    void testIsEmptyWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        assertEquals(0, endpoints.size());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithSameEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Endpoints sameEndpoints = 
Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertEquals(endpoints, sameEndpoints);

Review Comment:
   Am not sure what the value of this test? can you explain it a bit as I 
recall we rarely test `hashCode` method.



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    void testIsEmptyWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertFalse(endpoints.isEmpty());
+    }
+
+    @Test
+    void testIsEmptyWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        assertEquals(0, endpoints.size());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithSameEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Endpoints sameEndpoints = 
Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertEquals(endpoints, sameEndpoints);
+        assertEquals(endpoints.hashCode(), sameEndpoints.hashCode());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithDifferentEndpoints() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Map<ListenerName, InetSocketAddress> anotherEndpointMap = new 
HashMap<>();
+        anotherEndpointMap.put(
+                ListenerName.normalised("another"),
+                InetSocketAddress.createUnresolved("localhost", 9093)
+        );
+        Endpoints differentEndpoints = 
Endpoints.fromInetSocketAddresses(anotherEndpointMap);
+
+        assertNotEquals(endpoints, differentEndpoints);
+        assertNotEquals(endpoints.hashCode(), differentEndpoints.hashCode());
+    }
+
+    @Test
+    void testToString() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        ListenerName listener = ListenerName.normalised("listener");
+        ListenerName listener1 = ListenerName.normalised("listener1");
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 9092);
+        InetSocketAddress address1 = 
InetSocketAddress.createUnresolved("localhost", 9093);
+        endpointMap.put(listener, address);
+        endpointMap.put(listener1, address1);
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        String expectedString = String.format(
+                "Endpoints(endpoints={%s=%s, %s=%s})",
+                listener,
+                address,
+                listener1,
+                address1
+        );
+
+        assertEquals(expectedString, endpoints.toString());
+    }
+
+    @Test
+    void testToBeginQuorumEpochRequestWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        BeginQuorumEpochRequestData.LeaderEndpointCollection leaderEndpoints = 
endpoints.toBeginQuorumEpochRequest();
+
+        assertEquals(1, leaderEndpoints.size());
+
+        Iterator<BeginQuorumEpochRequestData.LeaderEndpoint> iterator = 
leaderEndpoints.iterator();
+        BeginQuorumEpochRequestData.LeaderEndpoint endpoint = iterator.next();
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testToBeginQuorumEpochRequestWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        BeginQuorumEpochRequestData.LeaderEndpointCollection leaderEndpoints = 
endpoints.toBeginQuorumEpochRequest();
+
+        assertEquals(0, leaderEndpoints.size());
+    }
+
+    @Test
+    void testEmpty() {
+        Endpoints emptyEndpoints = Endpoints.empty();
+
+        
assertEquals(Endpoints.fromInetSocketAddresses(Collections.emptyMap()), 
emptyEndpoints);
+    }
+
+    @Test
+    void testFromInetSocketAddressesWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertEquals(1, endpoints.size());
+    }
+
+    @Test
+    void testFromVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        List<VotersRecord.Endpoint> votersEndpoints = new ArrayList<>();
+        votersEndpoints.add(
+                new VotersRecord.Endpoint()
+                        .setName("listener")
+                        .setHost("localhost")
+                        .setPort(9092)
+        );
+
+        Endpoints createdEndpoints = 
Endpoints.fromVotersRecordEndpoints(votersEndpoints);
+
+        assertEquals(endpoints, createdEndpoints);
+    }
+
+    @Test
+    void testFromVotersRecordEndpointsWithEmptyEndpoint() {
+        List<VotersRecord.Endpoint> votersEndpoints = new ArrayList<>();

Review Comment:
   Can't `votersEndpoints` be `Collections.emptyList` instead? Then  
`createdEndpoints` can be 
`Endpoints.fromVotersRecordEndpoints(Collections.emptyList());`



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    void testIsEmptyWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertFalse(endpoints.isEmpty());
+    }
+
+    @Test
+    void testIsEmptyWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        assertEquals(0, endpoints.size());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithSameEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Endpoints sameEndpoints = 
Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertEquals(endpoints, sameEndpoints);
+        assertEquals(endpoints.hashCode(), sameEndpoints.hashCode());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithDifferentEndpoints() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Map<ListenerName, InetSocketAddress> anotherEndpointMap = new 
HashMap<>();
+        anotherEndpointMap.put(
+                ListenerName.normalised("another"),
+                InetSocketAddress.createUnresolved("localhost", 9093)
+        );
+        Endpoints differentEndpoints = 
Endpoints.fromInetSocketAddresses(anotherEndpointMap);
+
+        assertNotEquals(endpoints, differentEndpoints);
+        assertNotEquals(endpoints.hashCode(), differentEndpoints.hashCode());
+    }
+
+    @Test
+    void testToString() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        ListenerName listener = ListenerName.normalised("listener");
+        ListenerName listener1 = ListenerName.normalised("listener1");
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 9092);
+        InetSocketAddress address1 = 
InetSocketAddress.createUnresolved("localhost", 9093);
+        endpointMap.put(listener, address);
+        endpointMap.put(listener1, address1);
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        String expectedString = String.format(
+                "Endpoints(endpoints={%s=%s, %s=%s})",
+                listener,
+                address,
+                listener1,
+                address1
+        );
+
+        assertEquals(expectedString, endpoints.toString());
+    }
+
+    @Test
+    void testToBeginQuorumEpochRequestWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        BeginQuorumEpochRequestData.LeaderEndpointCollection leaderEndpoints = 
endpoints.toBeginQuorumEpochRequest();
+
+        assertEquals(1, leaderEndpoints.size());
+
+        Iterator<BeginQuorumEpochRequestData.LeaderEndpoint> iterator = 
leaderEndpoints.iterator();
+        BeginQuorumEpochRequestData.LeaderEndpoint endpoint = iterator.next();

Review Comment:
   maybe better name here is `leaderEndpoint`? as `endpoints` above was 
referring to `Endpoints`



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+

Review Comment:
   I can see that we keep calling `ListenerName.normalised("listener")` once 
for `endpointMap` and second time to asset the `ListenerName::value` or to pass 
it to `Endpoints::address`. Why not extract it to a variable at the beginning 
of the test?  same can go to `InetSocketAddress.createUnresolved("localhost", 
9092)`



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    void testIsEmptyWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertFalse(endpoints.isEmpty());
+    }
+
+    @Test
+    void testIsEmptyWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        assertEquals(0, endpoints.size());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithSameEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Endpoints sameEndpoints = 
Endpoints.fromInetSocketAddresses(endpointMap);
+
+        assertEquals(endpoints, sameEndpoints);
+        assertEquals(endpoints.hashCode(), sameEndpoints.hashCode());
+    }
+
+    @Test
+    void testEqualsAndHashCodeWithDifferentEndpoints() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Map<ListenerName, InetSocketAddress> anotherEndpointMap = new 
HashMap<>();
+        anotherEndpointMap.put(
+                ListenerName.normalised("another"),
+                InetSocketAddress.createUnresolved("localhost", 9093)
+        );
+        Endpoints differentEndpoints = 
Endpoints.fromInetSocketAddresses(anotherEndpointMap);
+
+        assertNotEquals(endpoints, differentEndpoints);
+        assertNotEquals(endpoints.hashCode(), differentEndpoints.hashCode());
+    }
+
+    @Test
+    void testToString() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        ListenerName listener = ListenerName.normalised("listener");
+        ListenerName listener1 = ListenerName.normalised("listener1");
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 9092);
+        InetSocketAddress address1 = 
InetSocketAddress.createUnresolved("localhost", 9093);
+        endpointMap.put(listener, address);
+        endpointMap.put(listener1, address1);
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        String expectedString = String.format(
+                "Endpoints(endpoints={%s=%s, %s=%s})",
+                listener,
+                address,
+                listener1,
+                address1
+        );
+
+        assertEquals(expectedString, endpoints.toString());
+    }
+
+    @Test
+    void testToBeginQuorumEpochRequestWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        BeginQuorumEpochRequestData.LeaderEndpointCollection leaderEndpoints = 
endpoints.toBeginQuorumEpochRequest();
+
+        assertEquals(1, leaderEndpoints.size());
+
+        Iterator<BeginQuorumEpochRequestData.LeaderEndpoint> iterator = 
leaderEndpoints.iterator();
+        BeginQuorumEpochRequestData.LeaderEndpoint endpoint = iterator.next();
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testToBeginQuorumEpochRequestWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        BeginQuorumEpochRequestData.LeaderEndpointCollection leaderEndpoints = 
endpoints.toBeginQuorumEpochRequest();
+
+        assertEquals(0, leaderEndpoints.size());
+    }
+
+    @Test
+    void testEmpty() {
+        Endpoints emptyEndpoints = Endpoints.empty();
+
+        
assertEquals(Endpoints.fromInetSocketAddresses(Collections.emptyMap()), 
emptyEndpoints);

Review Comment:
   What is the value of this test if we are testing `Endpoints::size`?



##########
raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class EndpointsTest {
+    @Test
+    void testAddressWithValidEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("listener"));
+
+        
assertEquals(Optional.of(InetSocketAddress.createUnresolved("localhost", 
9092)), address);
+    }
+
+    @Test
+    void testAddressWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Optional<InetSocketAddress> address = 
endpoints.address(ListenerName.normalised("nonExistentListener"));
+
+        assertEquals(Optional.empty(), address);
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        VotersRecord.Endpoint endpoint = iterator.next();
+
+        assertEquals(ListenerName.normalised("listener").value(), 
endpoint.name());
+        assertEquals("localhost", endpoint.host());
+        assertEquals(9092, endpoint.port());
+    }
+
+    @Test
+    void testVotersRecordEndpointsWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        Iterator<VotersRecord.Endpoint> iterator = 
endpoints.votersRecordEndpoints();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testSizeWithOneEndpoint() {
+        Map<ListenerName, InetSocketAddress> endpointMap = new HashMap<>();
+        endpointMap.put(
+                ListenerName.normalised("listener"),
+                InetSocketAddress.createUnresolved("localhost", 9092)
+        );
+        Endpoints endpoints = Endpoints.fromInetSocketAddresses(endpointMap);
+
+        int size = endpoints.size();
+
+        assertEquals(1, size);
+    }
+
+    @Test
+    void testSizeWithEmptyEndpoint() {
+        Endpoints endpoints = Endpoints.empty();
+
+        int size = endpoints.size();

Review Comment:
   I feel `testSizeWithOneEndpoint` and `testSizeWithEmptyEndpoint` can be 
merged into one test and `assertEquals(1, 
Endpoints.fromInetSocketAddresses(endpointMap).size());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to