[ 
https://issues.apache.org/jira/browse/HADOOP-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015433#comment-18015433
 ] 

ASF GitHub Bot commented on HADOOP-19609:
-----------------------------------------

anujmodi2021 commented on code in PR #7817:
URL: https://github.com/apache/hadoop/pull/7817#discussion_r2284234190


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -330,7 +330,8 @@ public void close() throws IOException {
     } catch (ExecutionException e) {
       LOG.error("Error freeing leases", e);
     } finally {
-      IOUtils.cleanupWithLogger(LOG, getClient());
+      IOUtils.cleanupWithLogger(LOG, getClientHandler().getDfsClient(),

Review Comment:
   Can we simply have a close of clientHandler called here and inside client 
handler we call close for both the clients?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -139,6 +139,10 @@ public AbfsAHCHttpOperation(final URL url,
       throw new PathIOException(getUrl().toString(),
           "Unsupported HTTP method: " + getMethod());
     }
+
+    for (AbfsHttpHeader header : requestHeaders) {

Review Comment:
   Why this change?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -213,13 +213,19 @@ public final class FileSystemConfigurations {
   public static final long THOUSAND = 1000L;
 
   public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
-      = HttpOperationType.JDK_HTTP_URL_CONNECTION;
+      = HttpOperationType.APACHE_HTTP_CLIENT;

Review Comment:
   Let's revert this.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -196,7 +199,6 @@ public void processResponse(final byte[] buffer,
       final int length) throws IOException {
     try {
       if (!isPayloadRequest) {
-        prepareRequest();

Review Comment:
   Why removed?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
   /**{@inheritDoc}*/
   @Override
   public void setRequestProperty(final String key, final String value) {
-    List<AbfsHttpHeader> headers = getRequestHeaders();
-    if (headers != null) {
-      headers.add(new AbfsHttpHeader(key, value));
+    if (httpRequestBase instanceof HttpEntityEnclosingRequestBase

Review Comment:
   +1
   Add javadoc for whythese 2 conditions cannot be together.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements 
HttpClientConnectionManager {
    */
   private final HttpClientConnectionOperator connectionOperator;
 
+  /**
+   * Number of connections to be created during cache refresh.
+   */
+  private final int cacheRefreshConnections;

Review Comment:
   variable name should say its a number or count.
   



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -115,6 +180,26 @@ public HttpClientConnection get(final long timeout,
       public boolean cancel() {
         return false;
       }
+
+      /**
+       * Trigger a background warm-up of the connection cache if needed.
+       * This method checks if the cache size is small and if caching is not 
already in progress.
+       * If so, it starts a new thread to cache extra connections.
+       */
+      private void triggerConnectionWarmupIfNeeded() {
+        if (kac.size() <= 2 && !isCaching.get()) {
+          // Use a single-threaded executor or thread pool instead of raw 
thread

Review Comment:
   Also please give a relevant name to thread. This helps in identifying 
threads when debugging any issue.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements 
HttpClientConnectionManager {
    */
   private final HttpClientConnectionOperator connectionOperator;
 
+  /**
+   * Number of connections to be created during cache refresh.
+   */
+  private final int cacheRefreshConnections;
+
+  /**
+   * Connection timeout for establishing a new connection.
+   */
+  private final int connectionTimeout;
+
+  private final AtomicBoolean isCaching = new AtomicBoolean(false);
+
+  private final Object connectionLock = new Object();
+
+  private HttpHost baseHost;
+
   AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
-      AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) {
+      AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
+      final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
     this.httpConnectionFactory = connectionFactory;
+    this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout();
     this.kac = kac;
     this.connectionOperator = new DefaultHttpClientConnectionOperator(
         socketFactoryRegistry, null, null);
+    if (abfsConfiguration.getCacheWarmupConnections() > 0) {

Review Comment:
   Same for this and all the variables. If they indicate a number better to 
name then as `numOfCacheWarmupConnections` or `cacheWarmupConnectionsCount`
   
   Please check this throughout the patch in all the classes for all the 
variables.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -398,24 +400,10 @@ public void sendPayload(final byte[] buffer,
     }
   }
 
-  /**
-   * Sets the header on the request.
-   */
-  private void prepareRequest() {
-    final boolean isEntityBasedRequest
-        = httpRequestBase instanceof HttpEntityEnclosingRequestBase;
-    for (AbfsHttpHeader header : getRequestHeaders()) {
-      if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) {
-        continue;
-      }
-      httpRequestBase.setHeader(header.getName(), header.getValue());
-    }
-  }
-
   /**{@inheritDoc}*/
   @Override
   public String getRequestProperty(String name) {
-    for (AbfsHttpHeader header : getRequestHeaders()) {
+    for (Header header : httpRequestBase.getAllHeaders()) {

Review Comment:
   Why needed?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
   /**{@inheritDoc}*/
   @Override
   public void setRequestProperty(final String key, final String value) {
-    List<AbfsHttpHeader> headers = getRequestHeaders();
-    if (headers != null) {
-      headers.add(new AbfsHttpHeader(key, value));
+    if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
+        && CONTENT_LENGTH.equals(key)) {
+      return;
     }
+    httpRequestBase.setHeader(key, value);
   }
 
   /**{@inheritDoc}*/
   @Override
   Map<String, List<String>> getRequestProperties() {
     Map<String, List<String>> map = new HashMap<>();
-    for (AbfsHttpHeader header : getRequestHeaders()) {
+    for (Header header : httpRequestBase.getAllHeaders()) {

Review Comment:
   Why this change?
   Why are we changing more specific implementation to a parent class?
   ABFSHttpHeaders is supposed to be more specific and sufficient for all ABFS 
needs.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -115,6 +180,26 @@ public HttpClientConnection get(final long timeout,
       public boolean cancel() {
         return false;
       }
+
+      /**
+       * Trigger a background warm-up of the connection cache if needed.
+       * This method checks if the cache size is small and if caching is not 
already in progress.
+       * If so, it starts a new thread to cache extra connections.
+       */
+      private void triggerConnectionWarmupIfNeeded() {
+        if (kac.size() <= 2 && !isCaching.get()) {

Review Comment:
   +1



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -89,23 +121,56 @@ public ConnectionRequest requestConnection(final HttpRoute 
route,
        */
       @Override
       public HttpClientConnection get(final long timeout,
-          final TimeUnit timeUnit)
-          throws InterruptedException, ExecutionException,
-          ConnectionPoolTimeoutException {
+          final TimeUnit timeUnit) throws ExecutionException {
         String requestId = UUID.randomUUID().toString();
         logDebug("Connection requested for request {}", requestId);
+        if (!route.getTargetHost().equals(baseHost)) {
+          // If the route target host does not match the base host, create a 
new connection
+          logDebug("Route target host {} does not match base host {}, creating 
new connection",
+              route.getTargetHost(), baseHost);
+          return createNewConnection();
+        }
         try {
-          HttpClientConnection clientConn = kac.get();
-          if (clientConn != null) {
-            logDebug("Connection retrieved from KAC: {} for requestId: {}",
-                clientConn, requestId);
-            return clientConn;
+          HttpClientConnection conn = kac.get();
+
+          // If a valid connection is available, return it and trigger 
background warm-up if needed
+          if (conn != null) {
+            triggerConnectionWarmupIfNeeded();

Review Comment:
   Why this is not needed in synchronized block?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements 
HttpClientConnectionManager {
    */
   private final HttpClientConnectionOperator connectionOperator;
 
+  /**
+   * Number of connections to be created during cache refresh.
+   */
+  private final int cacheRefreshConnections;
+
+  /**
+   * Connection timeout for establishing a new connection.
+   */
+  private final int connectionTimeout;
+
+  private final AtomicBoolean isCaching = new AtomicBoolean(false);
+
+  private final Object connectionLock = new Object();
+
+  private HttpHost baseHost;
+
   AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
-      AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) {
+      AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
+      final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
     this.httpConnectionFactory = connectionFactory;
+    this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout();
     this.kac = kac;
     this.connectionOperator = new DefaultHttpClientConnectionOperator(
         socketFactoryRegistry, null, null);
+    if (abfsConfiguration.getCacheWarmupConnections() > 0) {

Review Comment:
   Should we discuss that?
   Should not the Cach be shared among both the clients. We expect ony one 
client at a time to be working. So they should share the cache IMO





> ABFS: Apache Client Connection Pool Relook
> ------------------------------------------
>
>                 Key: HADOOP-19609
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19609
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.1
>            Reporter: Manish Bhatt
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to