chia7712 commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r2873784701


##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1186,87 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
         return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == 
ApiKeys.PUSH_TELEMETRY;
     }
 
+    public static class BootstrapConfiguration {
+        public List<String> bootstrapServers;
+        public final ClientDnsLookup clientDnsLookup;
+        public final long bootstrapResolveTimeoutMs;
+        private boolean isBootstrapDisabled;
+
+        public BootstrapConfiguration(final List<String> bootstrapServers,
+                                      final ClientDnsLookup clientDnsLookup,
+                                      final long bootstrapResolveTimeoutMs) {
+            this.bootstrapServers = bootstrapServers;
+            this.clientDnsLookup = clientDnsLookup;
+            this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+            this.isBootstrapDisabled = false;
+        }
+
+        public static BootstrapConfiguration disabled() {
+            BootstrapConfiguration bootstrapConfiguration = new 
BootstrapConfiguration(List.of(), null, 0);
+            bootstrapConfiguration.disableBootstrap();
+            return bootstrapConfiguration;
+        }
+
+        public void disableBootstrap() {
+            this.isBootstrapDisabled = true;
+        }
+    }
+
+    private class BootstrapState {
+        private final Timer timer;
+        private final List<String> bootstrapServers;
+        private final ClientDnsLookup clientDnsLookup;
+        private final long dnsResolutionTimeoutMs;
+        private final boolean isDisabled;
+
+        BootstrapState(BootstrapConfiguration bootstrapConfiguration) {
+            this.dnsResolutionTimeoutMs = 
bootstrapConfiguration.bootstrapResolveTimeoutMs;
+            this.timer = 
time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+            this.bootstrapServers = bootstrapConfiguration.bootstrapServers;
+            this.clientDnsLookup = bootstrapConfiguration.clientDnsLookup;
+            this.isDisabled = bootstrapConfiguration.isBootstrapDisabled;
+        }
+
+        List<InetSocketAddress> tryResolveAddresses(final long currentTimeMs) {
+            timer.update(currentTimeMs);
+            List<InetSocketAddress> addresses = 
ClientUtils.validateAddresses(bootstrapServers, clientDnsLookup);
+            if (!addresses.isEmpty()) {
+                timer.reset(dnsResolutionTimeoutMs);
+                return addresses;
+            }
+
+            if (timer.isExpired()) {
+                throw new BootstrapResolutionException("Timeout while 
attempting to resolve bootstrap " +
+                        "servers. ");
+            }
+            return ClientUtils.validateAddresses(bootstrapServers, 
clientDnsLookup);

Review Comment:
   `tryResolveAddresses` and `ensureBootstrapped` are doing the similar thing, 
so could you try to refactor them?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1186,87 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
         return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == 
ApiKeys.PUSH_TELEMETRY;
     }
 
+    public static class BootstrapConfiguration {
+        public List<String> bootstrapServers;

Review Comment:
   We could add the final modifier to these fields, right?



##########
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##########
@@ -53,6 +53,67 @@ public final class ClientUtils {
     private ClientUtils() {
     }
 
+    /**
+     * Resolves a single URL to one or more InetSocketAddress based on the DNS 
lookup strategy.
+     *
+     * @param url the original URL string (for logging)
+     * @param host the hostname extracted from the URL
+     * @param port the port extracted from the URL
+     * @param clientDnsLookup the DNS lookup strategy
+     * @return list of resolved addresses (may be empty if addresses are 
unresolved)
+     * @throws UnknownHostException if DNS resolution fails
+     */
+    private static List<InetSocketAddress> resolveAddress(
+            String url,
+            String host,
+            Integer port,
+            ClientDnsLookup clientDnsLookup) throws UnknownHostException {
+
+        List<InetSocketAddress> addresses = new ArrayList<>();
+
+        if (clientDnsLookup == 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+            InetAddress[] inetAddresses = InetAddress.getAllByName(host);

Review Comment:
   `InetAddress.getAllByName` is a blocking call, which will cause `poll` loop 
to block. Did the KIP mention this behavior?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -629,6 +643,7 @@ private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
         ensureActive();
+        ensureBootstrapped(now);

Review Comment:
   What happens if the resolution keeps failing? Would it lead to a busy-loop?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1186,87 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
         return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == 
ApiKeys.PUSH_TELEMETRY;
     }
 
+    public static class BootstrapConfiguration {
+        public List<String> bootstrapServers;
+        public final ClientDnsLookup clientDnsLookup;
+        public final long bootstrapResolveTimeoutMs;
+        private boolean isBootstrapDisabled;
+
+        public BootstrapConfiguration(final List<String> bootstrapServers,
+                                      final ClientDnsLookup clientDnsLookup,
+                                      final long bootstrapResolveTimeoutMs) {
+            this.bootstrapServers = bootstrapServers;
+            this.clientDnsLookup = clientDnsLookup;
+            this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+            this.isBootstrapDisabled = false;
+        }
+
+        public static BootstrapConfiguration disabled() {
+            BootstrapConfiguration bootstrapConfiguration = new 
BootstrapConfiguration(List.of(), null, 0);
+            bootstrapConfiguration.disableBootstrap();
+            return bootstrapConfiguration;
+        }
+
+        public void disableBootstrap() {
+            this.isBootstrapDisabled = true;
+        }
+    }
+
+    private class BootstrapState {
+        private final Timer timer;
+        private final List<String> bootstrapServers;
+        private final ClientDnsLookup clientDnsLookup;
+        private final long dnsResolutionTimeoutMs;
+        private final boolean isDisabled;
+
+        BootstrapState(BootstrapConfiguration bootstrapConfiguration) {
+            this.dnsResolutionTimeoutMs = 
bootstrapConfiguration.bootstrapResolveTimeoutMs;
+            this.timer = 
time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+            this.bootstrapServers = bootstrapConfiguration.bootstrapServers;
+            this.clientDnsLookup = bootstrapConfiguration.clientDnsLookup;
+            this.isDisabled = bootstrapConfiguration.isBootstrapDisabled;
+        }
+
+        List<InetSocketAddress> tryResolveAddresses(final long currentTimeMs) {
+            timer.update(currentTimeMs);
+            List<InetSocketAddress> addresses = 
ClientUtils.validateAddresses(bootstrapServers, clientDnsLookup);
+            if (!addresses.isEmpty()) {
+                timer.reset(dnsResolutionTimeoutMs);
+                return addresses;
+            }
+
+            if (timer.isExpired()) {
+                throw new BootstrapResolutionException("Timeout while 
attempting to resolve bootstrap " +
+                        "servers. ");
+            }
+            return ClientUtils.validateAddresses(bootstrapServers, 
clientDnsLookup);
+        }
+
+        boolean isDisabled() {
+            return isDisabled;
+        }
+
+        boolean isTimerExpired() {

Review Comment:
   unused method



##########
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##########
@@ -53,6 +53,67 @@ public final class ClientUtils {
     private ClientUtils() {
     }
 
+    /**
+     * Resolves a single URL to one or more InetSocketAddress based on the DNS 
lookup strategy.
+     *
+     * @param url the original URL string (for logging)
+     * @param host the hostname extracted from the URL
+     * @param port the port extracted from the URL
+     * @param clientDnsLookup the DNS lookup strategy
+     * @return list of resolved addresses (may be empty if addresses are 
unresolved)
+     * @throws UnknownHostException if DNS resolution fails
+     */
+    private static List<InetSocketAddress> resolveAddress(
+            String url,

Review Comment:
   please fix the indent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to