[
https://issues.apache.org/jira/browse/HADOOP-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18016433#comment-18016433
]
ASF GitHub Bot commented on HADOOP-19609:
-----------------------------------------
bhattmanish98 commented on code in PR #7817:
URL: https://github.com/apache/hadoop/pull/7817#discussion_r2302908266
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
*/
private final HttpClientConnectionOperator connectionOperator;
+ /**
+ * Number of connections to be created during cache refresh.
+ */
+ private final int cacheRefreshConnections;
+
+ /**
+ * Connection timeout for establishing a new connection.
+ */
+ private final int connectionTimeout;
+
+ private final AtomicBoolean isCaching = new AtomicBoolean(false);
+
+ private final Object connectionLock = new Object();
+
+ private HttpHost baseHost;
+
AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
- AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) {
+ AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
+ final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
this.httpConnectionFactory = connectionFactory;
+ this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout();
this.kac = kac;
this.connectionOperator = new DefaultHttpClientConnectionOperator(
socketFactoryRegistry, null, null);
+ if (abfsConfiguration.getCacheWarmupConnections() > 0) {
Review Comment:
Sure, we can discuss this.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
- List<AbfsHttpHeader> headers = getRequestHeaders();
- if (headers != null) {
- headers.add(new AbfsHttpHeader(key, value));
+ if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
Review Comment:
Sure, will add the java doc for this
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -213,13 +213,19 @@ public final class FileSystemConfigurations {
public static final long THOUSAND = 1000L;
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
- = HttpOperationType.JDK_HTTP_URL_CONNECTION;
+ = HttpOperationType.APACHE_HTTP_CLIENT;
Review Comment:
Reverted
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -196,7 +199,6 @@ public void processResponse(final byte[] buffer,
final int length) throws IOException {
try {
if (!isPayloadRequest) {
- prepareRequest();
Review Comment:
As noted in the previous comment, we are organizing the data in a clear and
structured way in the constructor itself so that we won't have to prepare it
separately before sending the request.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
- List<AbfsHttpHeader> headers = getRequestHeaders();
- if (headers != null) {
- headers.add(new AbfsHttpHeader(key, value));
+ if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
+ && CONTENT_LENGTH.equals(key)) {
+ return;
}
+ httpRequestBase.setHeader(key, value);
}
/**{@inheritDoc}*/
@Override
Map<String, List<String>> getRequestProperties() {
Map<String, List<String>> map = new HashMap<>();
- for (AbfsHttpHeader header : getRequestHeaders()) {
+ for (Header header : httpRequestBase.getAllHeaders()) {
Review Comment:
This method returns all headers that were passed to the server in a specific
request. If there are duplicate headers (including those with different cases),
the method returns all headers, even those not originally sent, due to
case-insensitive duplicates. This change was made to align with the JDK
implementation.
https://github.com/apache/hadoop/blob/0244dbddb38495d44f3dd8544e9b2bc17276dc19/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java#L103
Since httpRequestBase.getAllHeaders() returns Header, header is used instead
of AbfsHttpHeader.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
*/
private final HttpClientConnectionOperator connectionOperator;
+ /**
+ * Number of connections to be created during cache refresh.
+ */
+ private final int cacheRefreshConnections;
Review Comment:
Will do the change
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -398,24 +400,10 @@ public void sendPayload(final byte[] buffer,
}
}
- /**
- * Sets the header on the request.
- */
- private void prepareRequest() {
- final boolean isEntityBasedRequest
- = httpRequestBase instanceof HttpEntityEnclosingRequestBase;
- for (AbfsHttpHeader header : getRequestHeaders()) {
- if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) {
- continue;
- }
- httpRequestBase.setHeader(header.getName(), header.getValue());
- }
- }
-
/**{@inheritDoc}*/
@Override
public String getRequestProperty(String name) {
- for (AbfsHttpHeader header : getRequestHeaders()) {
+ for (Header header : httpRequestBase.getAllHeaders()) {
Review Comment:
Same as above
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -89,23 +121,56 @@ public ConnectionRequest requestConnection(final HttpRoute
route,
*/
@Override
public HttpClientConnection get(final long timeout,
- final TimeUnit timeUnit)
- throws InterruptedException, ExecutionException,
- ConnectionPoolTimeoutException {
+ final TimeUnit timeUnit) throws ExecutionException {
String requestId = UUID.randomUUID().toString();
logDebug("Connection requested for request {}", requestId);
+ if (!route.getTargetHost().equals(baseHost)) {
+ // If the route target host does not match the base host, create a
new connection
+ logDebug("Route target host {} does not match base host {}, creating
new connection",
+ route.getTargetHost(), baseHost);
+ return createNewConnection();
+ }
try {
- HttpClientConnection clientConn = kac.get();
- if (clientConn != null) {
- logDebug("Connection retrieved from KAC: {} for requestId: {}",
- clientConn, requestId);
- return clientConn;
+ HttpClientConnection conn = kac.get();
+
+ // If a valid connection is available, return it and trigger
background warm-up if needed
+ if (conn != null) {
+ triggerConnectionWarmupIfNeeded();
Review Comment:
The implementation of triggerConnectionWarmupIfNeeded ensures that only one
thread executes it, while others exit the method. Therefore, I don't believe
synchronization block is necessary in this case.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -139,6 +139,10 @@ public AbfsAHCHttpOperation(final URL url,
throw new PathIOException(getUrl().toString(),
"Unsupported HTTP method: " + getMethod());
}
+
+ for (AbfsHttpHeader header : requestHeaders) {
Review Comment:
In the current implementation of the Apache client, we perform header case
checking right before sending the request (in the prepareRequest method).
However, if a user provides two headers with similar keys but different cases,
we might end up signing the request with a different header than intended. As a
result, when the server validates the request, it could receive a different set
of headers. To ensure consistency, similar to our approach with JDK, this
change has been implemented.
https://github.com/apache/hadoop/blob/0244dbddb38495d44f3dd8544e9b2bc17276dc19/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java#L91
> ABFS: Apache Client Connection Pool Relook
> ------------------------------------------
>
> Key: HADOOP-19609
> URL: https://issues.apache.org/jira/browse/HADOOP-19609
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.1
> Reporter: Manish Bhatt
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]