[
https://issues.apache.org/jira/browse/HADOOP-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015433#comment-18015433
]
ASF GitHub Bot commented on HADOOP-19609:
-----------------------------------------
anujmodi2021 commented on code in PR #7817:
URL: https://github.com/apache/hadoop/pull/7817#discussion_r2284234190
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -330,7 +330,8 @@ public void close() throws IOException {
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
- IOUtils.cleanupWithLogger(LOG, getClient());
+ IOUtils.cleanupWithLogger(LOG, getClientHandler().getDfsClient(),
Review Comment:
Can we simply have a close of clientHandler called here and inside client
handler we call close for both the clients?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -139,6 +139,10 @@ public AbfsAHCHttpOperation(final URL url,
throw new PathIOException(getUrl().toString(),
"Unsupported HTTP method: " + getMethod());
}
+
+ for (AbfsHttpHeader header : requestHeaders) {
Review Comment:
Why this change?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -213,13 +213,19 @@ public final class FileSystemConfigurations {
public static final long THOUSAND = 1000L;
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
- = HttpOperationType.JDK_HTTP_URL_CONNECTION;
+ = HttpOperationType.APACHE_HTTP_CLIENT;
Review Comment:
Let's revert this.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -196,7 +199,6 @@ public void processResponse(final byte[] buffer,
final int length) throws IOException {
try {
if (!isPayloadRequest) {
- prepareRequest();
Review Comment:
Why removed?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
- List<AbfsHttpHeader> headers = getRequestHeaders();
- if (headers != null) {
- headers.add(new AbfsHttpHeader(key, value));
+ if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
Review Comment:
+1
Add javadoc for whythese 2 conditions cannot be together.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
*/
private final HttpClientConnectionOperator connectionOperator;
+ /**
+ * Number of connections to be created during cache refresh.
+ */
+ private final int cacheRefreshConnections;
Review Comment:
variable name should say its a number or count.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -115,6 +180,26 @@ public HttpClientConnection get(final long timeout,
public boolean cancel() {
return false;
}
+
+ /**
+ * Trigger a background warm-up of the connection cache if needed.
+ * This method checks if the cache size is small and if caching is not
already in progress.
+ * If so, it starts a new thread to cache extra connections.
+ */
+ private void triggerConnectionWarmupIfNeeded() {
+ if (kac.size() <= 2 && !isCaching.get()) {
+ // Use a single-threaded executor or thread pool instead of raw
thread
Review Comment:
Also please give a relevant name to thread. This helps in identifying
threads when debugging any issue.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
*/
private final HttpClientConnectionOperator connectionOperator;
+ /**
+ * Number of connections to be created during cache refresh.
+ */
+ private final int cacheRefreshConnections;
+
+ /**
+ * Connection timeout for establishing a new connection.
+ */
+ private final int connectionTimeout;
+
+ private final AtomicBoolean isCaching = new AtomicBoolean(false);
+
+ private final Object connectionLock = new Object();
+
+ private HttpHost baseHost;
+
AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
- AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) {
+ AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
+ final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
this.httpConnectionFactory = connectionFactory;
+ this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout();
this.kac = kac;
this.connectionOperator = new DefaultHttpClientConnectionOperator(
socketFactoryRegistry, null, null);
+ if (abfsConfiguration.getCacheWarmupConnections() > 0) {
Review Comment:
Same for this and all the variables. If they indicate a number better to
name then as `numOfCacheWarmupConnections` or `cacheWarmupConnectionsCount`
Please check this throughout the patch in all the classes for all the
variables.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -398,24 +400,10 @@ public void sendPayload(final byte[] buffer,
}
}
- /**
- * Sets the header on the request.
- */
- private void prepareRequest() {
- final boolean isEntityBasedRequest
- = httpRequestBase instanceof HttpEntityEnclosingRequestBase;
- for (AbfsHttpHeader header : getRequestHeaders()) {
- if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) {
- continue;
- }
- httpRequestBase.setHeader(header.getName(), header.getValue());
- }
- }
-
/**{@inheritDoc}*/
@Override
public String getRequestProperty(String name) {
- for (AbfsHttpHeader header : getRequestHeaders()) {
+ for (Header header : httpRequestBase.getAllHeaders()) {
Review Comment:
Why needed?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException {
/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
- List<AbfsHttpHeader> headers = getRequestHeaders();
- if (headers != null) {
- headers.add(new AbfsHttpHeader(key, value));
+ if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
+ && CONTENT_LENGTH.equals(key)) {
+ return;
}
+ httpRequestBase.setHeader(key, value);
}
/**{@inheritDoc}*/
@Override
Map<String, List<String>> getRequestProperties() {
Map<String, List<String>> map = new HashMap<>();
- for (AbfsHttpHeader header : getRequestHeaders()) {
+ for (Header header : httpRequestBase.getAllHeaders()) {
Review Comment:
Why this change?
Why are we changing more specific implementation to a parent class?
ABFSHttpHeaders is supposed to be more specific and sufficient for all ABFS
needs.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -115,6 +180,26 @@ public HttpClientConnection get(final long timeout,
public boolean cancel() {
return false;
}
+
+ /**
+ * Trigger a background warm-up of the connection cache if needed.
+ * This method checks if the cache size is small and if caching is not
already in progress.
+ * If so, it starts a new thread to cache extra connections.
+ */
+ private void triggerConnectionWarmupIfNeeded() {
+ if (kac.size() <= 2 && !isCaching.get()) {
Review Comment:
+1
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -89,23 +121,56 @@ public ConnectionRequest requestConnection(final HttpRoute
route,
*/
@Override
public HttpClientConnection get(final long timeout,
- final TimeUnit timeUnit)
- throws InterruptedException, ExecutionException,
- ConnectionPoolTimeoutException {
+ final TimeUnit timeUnit) throws ExecutionException {
String requestId = UUID.randomUUID().toString();
logDebug("Connection requested for request {}", requestId);
+ if (!route.getTargetHost().equals(baseHost)) {
+ // If the route target host does not match the base host, create a
new connection
+ logDebug("Route target host {} does not match base host {}, creating
new connection",
+ route.getTargetHost(), baseHost);
+ return createNewConnection();
+ }
try {
- HttpClientConnection clientConn = kac.get();
- if (clientConn != null) {
- logDebug("Connection retrieved from KAC: {} for requestId: {}",
- clientConn, requestId);
- return clientConn;
+ HttpClientConnection conn = kac.get();
+
+ // If a valid connection is available, return it and trigger
background warm-up if needed
+ if (conn != null) {
+ triggerConnectionWarmupIfNeeded();
Review Comment:
Why this is not needed in synchronized block?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -65,12 +69,40 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
*/
private final HttpClientConnectionOperator connectionOperator;
+ /**
+ * Number of connections to be created during cache refresh.
+ */
+ private final int cacheRefreshConnections;
+
+ /**
+ * Connection timeout for establishing a new connection.
+ */
+ private final int connectionTimeout;
+
+ private final AtomicBoolean isCaching = new AtomicBoolean(false);
+
+ private final Object connectionLock = new Object();
+
+ private HttpHost baseHost;
+
AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
- AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) {
+ AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
+ final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
this.httpConnectionFactory = connectionFactory;
+ this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout();
this.kac = kac;
this.connectionOperator = new DefaultHttpClientConnectionOperator(
socketFactoryRegistry, null, null);
+ if (abfsConfiguration.getCacheWarmupConnections() > 0) {
Review Comment:
Should we discuss that?
Should not the Cach be shared among both the clients. We expect ony one
client at a time to be working. So they should share the cache IMO
> ABFS: Apache Client Connection Pool Relook
> ------------------------------------------
>
> Key: HADOOP-19609
> URL: https://issues.apache.org/jira/browse/HADOOP-19609
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.1
> Reporter: Manish Bhatt
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]