[ 
https://issues.apache.org/jira/browse/HADOOP-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18016433#comment-18016433
 ] 

ASF GitHub Bot commented on HADOOP-19609:
-----------------------------------------

bhattmanish98 commented on code in PR #7817:
URL: https://github.com/apache/hadoop/pull/7817#discussion_r2302908266


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements 
HttpClientConnectionManager {
    */
   private final HttpClientConnectionOperator connectionOperator;
 
+  /**
+   * Number of connections to be created during cache refresh.
+   */
+  private final int cacheRefreshConnections;
+
+  /**
+   * Connection timeout for establishing a new connection.
+   */
+  private final int connectionTimeout;
+
+  private final AtomicBoolean isCaching = new AtomicBoolean(false);
+
+  private final Object connectionLock = new Object();
+
+  private HttpHost baseHost;
+
   AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
-      AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) {
+      AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
+      final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
     this.httpConnectionFactory = connectionFactory;
+    this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout();
     this.kac = kac;
     this.connectionOperator = new DefaultHttpClientConnectionOperator(
         socketFactoryRegistry, null, null);
+    if (abfsConfiguration.getCacheWarmupConnections() > 0) {

Review Comment:
   Sure, we can discuss this.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
   /**{@inheritDoc}*/
   @Override
   public void setRequestProperty(final String key, final String value) {
-    List<AbfsHttpHeader> headers = getRequestHeaders();
-    if (headers != null) {
-      headers.add(new AbfsHttpHeader(key, value));
+    if (httpRequestBase instanceof HttpEntityEnclosingRequestBase

Review Comment:
   Sure, will add the java doc for this
   



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -213,13 +213,19 @@ public final class FileSystemConfigurations {
   public static final long THOUSAND = 1000L;
 
   public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
-      = HttpOperationType.JDK_HTTP_URL_CONNECTION;
+      = HttpOperationType.APACHE_HTTP_CLIENT;

Review Comment:
   Reverted



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -196,7 +199,6 @@ public void processResponse(final byte[] buffer,
       final int length) throws IOException {
     try {
       if (!isPayloadRequest) {
-        prepareRequest();

Review Comment:
   As noted in the previous comment, we are organizing the data in a clear and 
structured way in the constructor itself so that we won't have to prepare it 
separately before sending the request.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
   /**{@inheritDoc}*/
   @Override
   public void setRequestProperty(final String key, final String value) {
-    List<AbfsHttpHeader> headers = getRequestHeaders();
-    if (headers != null) {
-      headers.add(new AbfsHttpHeader(key, value));
+    if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
+        && CONTENT_LENGTH.equals(key)) {
+      return;
     }
+    httpRequestBase.setHeader(key, value);
   }
 
   /**{@inheritDoc}*/
   @Override
   Map<String, List<String>> getRequestProperties() {
     Map<String, List<String>> map = new HashMap<>();
-    for (AbfsHttpHeader header : getRequestHeaders()) {
+    for (Header header : httpRequestBase.getAllHeaders()) {

Review Comment:
   This method returns all headers that were passed to the server in a specific 
request. If there are duplicate headers (including those with different cases), 
the method returns all headers, even those not originally sent, due to 
case-insensitive duplicates. This change was made to align with the JDK 
implementation. 
https://github.com/apache/hadoop/blob/0244dbddb38495d44f3dd8544e9b2bc17276dc19/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java#L103
   Since httpRequestBase.getAllHeaders() returns Header, header is used instead 
of AbfsHttpHeader.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements 
HttpClientConnectionManager {
    */
   private final HttpClientConnectionOperator connectionOperator;
 
+  /**
+   * Number of connections to be created during cache refresh.
+   */
+  private final int cacheRefreshConnections;

Review Comment:
   Will do the change



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -398,24 +400,10 @@ public void sendPayload(final byte[] buffer,
     }
   }
 
-  /**
-   * Sets the header on the request.
-   */
-  private void prepareRequest() {
-    final boolean isEntityBasedRequest
-        = httpRequestBase instanceof HttpEntityEnclosingRequestBase;
-    for (AbfsHttpHeader header : getRequestHeaders()) {
-      if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) {
-        continue;
-      }
-      httpRequestBase.setHeader(header.getName(), header.getValue());
-    }
-  }
-
   /**{@inheritDoc}*/
   @Override
   public String getRequestProperty(String name) {
-    for (AbfsHttpHeader header : getRequestHeaders()) {
+    for (Header header : httpRequestBase.getAllHeaders()) {

Review Comment:
   Same as above



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -89,23 +121,56 @@ public ConnectionRequest requestConnection(final HttpRoute 
route,
        */
       @Override
       public HttpClientConnection get(final long timeout,
-          final TimeUnit timeUnit)
-          throws InterruptedException, ExecutionException,
-          ConnectionPoolTimeoutException {
+          final TimeUnit timeUnit) throws ExecutionException {
         String requestId = UUID.randomUUID().toString();
         logDebug("Connection requested for request {}", requestId);
+        if (!route.getTargetHost().equals(baseHost)) {
+          // If the route target host does not match the base host, create a 
new connection
+          logDebug("Route target host {} does not match base host {}, creating 
new connection",
+              route.getTargetHost(), baseHost);
+          return createNewConnection();
+        }
         try {
-          HttpClientConnection clientConn = kac.get();
-          if (clientConn != null) {
-            logDebug("Connection retrieved from KAC: {} for requestId: {}",
-                clientConn, requestId);
-            return clientConn;
+          HttpClientConnection conn = kac.get();
+
+          // If a valid connection is available, return it and trigger 
background warm-up if needed
+          if (conn != null) {
+            triggerConnectionWarmupIfNeeded();

Review Comment:
   The implementation of triggerConnectionWarmupIfNeeded ensures that only one 
thread executes it, while others exit the method. Therefore, I don't believe 
synchronization block is necessary in this case.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -139,6 +139,10 @@ public AbfsAHCHttpOperation(final URL url,
       throw new PathIOException(getUrl().toString(),
           "Unsupported HTTP method: " + getMethod());
     }
+
+    for (AbfsHttpHeader header : requestHeaders) {

Review Comment:
   In the current implementation of the Apache client, we perform header case 
checking right before sending the request (in the prepareRequest method). 
However, if a user provides two headers with similar keys but different cases, 
we might end up signing the request with a different header than intended. As a 
result, when the server validates the request, it could receive a different set 
of headers. To ensure consistency, similar to our approach with JDK, this 
change has been implemented. 
https://github.com/apache/hadoop/blob/0244dbddb38495d44f3dd8544e9b2bc17276dc19/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java#L91





> ABFS: Apache Client Connection Pool Relook
> ------------------------------------------
>
>                 Key: HADOOP-19609
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19609
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.1
>            Reporter: Manish Bhatt
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to