bhattmanish98 commented on code in PR #7817:
URL: https://github.com/apache/hadoop/pull/7817#discussion_r2302908266
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
*/
private final HttpClientConnectionOperator connectionOperator;
+ /**
+ * Number of connections to be created during cache refresh.
+ */
+ private final int cacheRefreshConnections;
+
+ /**
+ * Connection timeout for establishing a new connection.
+ */
+ private final int connectionTimeout;
+
+ private final AtomicBoolean isCaching = new AtomicBoolean(false);
+
+ private final Object connectionLock = new Object();
+
+ private HttpHost baseHost;
+
AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
- AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) {
+ AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
+ final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
this.httpConnectionFactory = connectionFactory;
+ this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout();
this.kac = kac;
this.connectionOperator = new DefaultHttpClientConnectionOperator(
socketFactoryRegistry, null, null);
+ if (abfsConfiguration.getCacheWarmupConnections() > 0) {
Review Comment:
Sure, we can discuss this.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
- List<AbfsHttpHeader> headers = getRequestHeaders();
- if (headers != null) {
- headers.add(new AbfsHttpHeader(key, value));
+ if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
Review Comment:
Sure, will add the java doc for this
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -213,13 +213,19 @@ public final class FileSystemConfigurations {
public static final long THOUSAND = 1000L;
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
- = HttpOperationType.JDK_HTTP_URL_CONNECTION;
+ = HttpOperationType.APACHE_HTTP_CLIENT;
Review Comment:
Reverted
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -196,7 +199,6 @@ public void processResponse(final byte[] buffer,
final int length) throws IOException {
try {
if (!isPayloadRequest) {
- prepareRequest();
Review Comment:
As noted in the previous comment, we are organizing the data in a clear and
structured way in the constructor itself so that we won't have to prepare it
separately before sending the request.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
- List<AbfsHttpHeader> headers = getRequestHeaders();
- if (headers != null) {
- headers.add(new AbfsHttpHeader(key, value));
+ if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
+ && CONTENT_LENGTH.equals(key)) {
+ return;
}
+ httpRequestBase.setHeader(key, value);
}
/**{@inheritDoc}*/
@Override
Map<String, List<String>> getRequestProperties() {
Map<String, List<String>> map = new HashMap<>();
- for (AbfsHttpHeader header : getRequestHeaders()) {
+ for (Header header : httpRequestBase.getAllHeaders()) {
Review Comment:
This method returns all headers that were passed to the server in a specific
request. If there are duplicate headers (including those with different cases),
the method returns all headers, even those not originally sent, due to
case-insensitive duplicates. This change was made to align with the JDK
implementation.
https://github.com/apache/hadoop/blob/0244dbddb38495d44f3dd8544e9b2bc17276dc19/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java#L103
Since httpRequestBase.getAllHeaders() returns Header, header is used instead
of AbfsHttpHeader.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
*/
private final HttpClientConnectionOperator connectionOperator;
+ /**
+ * Number of connections to be created during cache refresh.
+ */
+ private final int cacheRefreshConnections;
Review Comment:
Will do the change
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -398,24 +400,10 @@ public void sendPayload(final byte[] buffer,
}
}
- /**
- * Sets the header on the request.
- */
- private void prepareRequest() {
- final boolean isEntityBasedRequest
- = httpRequestBase instanceof HttpEntityEnclosingRequestBase;
- for (AbfsHttpHeader header : getRequestHeaders()) {
- if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) {
- continue;
- }
- httpRequestBase.setHeader(header.getName(), header.getValue());
- }
- }
-
/**{@inheritDoc}*/
@Override
public String getRequestProperty(String name) {
- for (AbfsHttpHeader header : getRequestHeaders()) {
+ for (Header header : httpRequestBase.getAllHeaders()) {
Review Comment:
Same as above
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -89,23 +121,56 @@ public ConnectionRequest requestConnection(final HttpRoute
route,
*/
@Override
public HttpClientConnection get(final long timeout,
- final TimeUnit timeUnit)
- throws InterruptedException, ExecutionException,
- ConnectionPoolTimeoutException {
+ final TimeUnit timeUnit) throws ExecutionException {
String requestId = UUID.randomUUID().toString();
logDebug("Connection requested for request {}", requestId);
+ if (!route.getTargetHost().equals(baseHost)) {
+ // If the route target host does not match the base host, create a
new connection
+ logDebug("Route target host {} does not match base host {}, creating
new connection",
+ route.getTargetHost(), baseHost);
+ return createNewConnection();
+ }
try {
- HttpClientConnection clientConn = kac.get();
- if (clientConn != null) {
- logDebug("Connection retrieved from KAC: {} for requestId: {}",
- clientConn, requestId);
- return clientConn;
+ HttpClientConnection conn = kac.get();
+
+ // If a valid connection is available, return it and trigger
background warm-up if needed
+ if (conn != null) {
+ triggerConnectionWarmupIfNeeded();
Review Comment:
The implementation of triggerConnectionWarmupIfNeeded ensures that only one
thread executes it, while others exit the method. Therefore, I don't believe
synchronization block is necessary in this case.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -139,6 +139,10 @@ public AbfsAHCHttpOperation(final URL url,
throw new PathIOException(getUrl().toString(),
"Unsupported HTTP method: " + getMethod());
}
+
+ for (AbfsHttpHeader header : requestHeaders) {
Review Comment:
In the current implementation of the Apache client, we perform header case
checking right before sending the request (in the prepareRequest method).
However, if a user provides two headers with similar keys but different cases,
we might end up signing the request with a different header than intended. As a
result, when the server validates the request, it could receive a different set
of headers. To ensure consistency, similar to our approach with JDK, this
change has been implemented.
https://github.com/apache/hadoop/blob/0244dbddb38495d44f3dd8544e9b2bc17276dc19/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java#L91
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]