[
https://issues.apache.org/jira/browse/HADOOP-19472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18032386#comment-18032386
]
ASF GitHub Bot commented on HADOOP-19472:
-----------------------------------------
anmolanmol1234 commented on code in PR #7669:
URL: https://github.com/apache/hadoop/pull/7669#discussion_r2454234411
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java:
##########
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
+
+/**
+ * Manages a thread pool for writing operations, adjusting the pool size based
on CPU utilization.
+ */
+public final class WriteThreadPoolSizeManager implements Closeable {
+
+ /* Maximum allowed size for the thread pool. */
+ private final int maxThreadPoolSize;
+ /* Executor for periodically monitoring CPU usage. */
+ private final ScheduledExecutorService cpuMonitorExecutor;
+ /* Thread pool whose size is dynamically managed. */
+ private volatile ExecutorService boundedThreadPool;
+ /* Lock to ensure thread-safe updates to the thread pool. */
+ private final Lock lock = new ReentrantLock();
+ /* New computed max size for the thread pool after adjustment. */
+ private volatile int newMaxPoolSize;
+ /* Logger instance for logging events from WriteThreadPoolSizeManager. */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ WriteThreadPoolSizeManager.class);
+ /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
+ private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
+ POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
+ /* Name of the filesystem associated with this manager. */
+ private final String filesystemName;
+ /* Initial size for the thread pool when created. */
+ private final int initialPoolSize;
+ /* Initially available heap memory. */
+ private final long initialAvailableHeapMemory;
+ /* The configuration instance. */
+ private final AbfsConfiguration abfsConfiguration;
+
+ /**
+ * Private constructor to initialize the write thread pool and CPU monitor
executor
+ * based on system resources and ABFS configuration.
+ *
+ * @param filesystemName Name of the ABFS filesystem.
+ * @param abfsConfiguration Configuration containing pool size parameters.
+ */
+ private WriteThreadPoolSizeManager(String filesystemName,
+ AbfsConfiguration abfsConfiguration) {
+ this.filesystemName = filesystemName;
+ this.abfsConfiguration = abfsConfiguration;
+ int availableProcessors = Runtime.getRuntime().availableProcessors();
+ /* Get the heap space available when the instance is created */
+ this.initialAvailableHeapMemory = getAvailableHeapMemory();
+ /* Compute the max pool size */
+ int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors,
initialAvailableHeapMemory);
+
+ /* Get the initial pool size from config, fallback to at least 1 */
+ this.initialPoolSize = Math.max(1,
+ abfsConfiguration.getWriteMaxConcurrentRequestCount());
+
+ /* Set the upper bound for the thread pool size */
+ this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
+
+ /* Initialize the bounded thread pool executor */
+ this.boundedThreadPool = Executors.newFixedThreadPool(initialPoolSize);
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
+ executor.setKeepAliveTime(
+ abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
+ executor.allowCoreThreadTimeOut(true);
+
+ /* Create a scheduled executor for CPU monitoring and pool adjustment */
+ this.cpuMonitorExecutor = Executors.newScheduledThreadPool(
+ abfsConfiguration.getWriteCorePoolSize());
+ }
+
+ public AbfsConfiguration getAbfsConfiguration() {
+ return abfsConfiguration;
+ }
+
+ /**
+ * Calculates the max thread pool size using a multiplier based on
+ * memory per core. Higher memory per core results in a larger multiplier.
+ *
+ * @param availableProcessors Number of CPU cores.
+ * @return Computed max thread pool size.
+ */
+ private int getComputedMaxPoolSize(final int availableProcessors, long
initialAvailableHeapMemory) {
+ LOG.debug("The available heap space in GB {} ",
initialAvailableHeapMemory);
+ LOG.debug("The number of available processors is {} ",
availableProcessors);
+ int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory,
availableProcessors);
+ LOG.debug("The max thread pool size is {} ", maxpoolSize);
+ return maxpoolSize;
+ }
+
+ /**
+ * Calculates the available heap memory in gigabytes.
+ * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap
memory
+ * allowed for the JVM and subtracts the currently used memory (total - free)
+ * to determine how much heap memory is still available.
+ * The result is rounded up to the nearest gigabyte.
+ *
+ * @return the available heap memory in gigabytes
+ */
+ private long getAvailableHeapMemory() {
+ Runtime runtime = Runtime.getRuntime();
+ long maxMemory = runtime.maxMemory();
+ long usedMemory = runtime.totalMemory() - runtime.freeMemory();
+ long availableHeapBytes = maxMemory - usedMemory;
+ return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ }
+
+ /**
+ * Returns aggressive thread count = CPU cores × multiplier based on heap
tier.
+ */
+ private int getMemoryTierMaxThreads(long availableHeapGB, int
availableProcessors) {
+ int multiplier;
+ if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) {
+ multiplier = abfsConfiguration.getLowTierMemoryMultiplier();
+ } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) {
+ multiplier = abfsConfiguration.getMediumTierMemoryMultiplier();
+ } else {
+ multiplier = abfsConfiguration.getHighTierMemoryMultiplier();
+ }
+ return availableProcessors * multiplier;
+ }
+
+ /**
+ * Returns the singleton instance of WriteThreadPoolSizeManager for the
given filesystem.
+ *
+ * @param filesystemName the name of the filesystem.
+ * @param abfsConfiguration the configuration for the ABFS.
+ *
+ * @return the singleton instance.
+ */
+ public static synchronized WriteThreadPoolSizeManager getInstance(
+ String filesystemName, AbfsConfiguration abfsConfiguration) {
+ /* Check if an instance already exists in the map for the given filesystem
*/
+ WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
+ filesystemName);
+
+ /* If an existing instance is found, return it */
+ if (existingInstance != null && existingInstance.boundedThreadPool != null
+ && !existingInstance.boundedThreadPool.isShutdown()) {
+ return existingInstance;
+ }
+
+ /* Otherwise, create a new instance, put it in the map, and return it */
+ LOG.debug(
+ "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
+ filesystemName);
+ WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
+ filesystemName, abfsConfiguration);
+ POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
+ return newInstance;
+ }
+
+ /**
+ * Adjusts the thread pool size to the specified maximum pool size.
+ *
+ * @param newMaxPoolSize the new maximum pool size.
+ */
+ private void adjustThreadPoolSize(int newMaxPoolSize) {
+ synchronized (this) {
+ ThreadPoolExecutor threadPoolExecutor
+ = ((ThreadPoolExecutor) boundedThreadPool);
+ int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();
+
+ if (newMaxPoolSize >= currentCorePoolSize) {
+ threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
+ threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
+ } else {
+ threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
+ threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
+ }
+
+ LOG.debug("The thread pool size is: {} ", newMaxPoolSize);
+ LOG.debug("The pool size is: {} ", threadPoolExecutor.getPoolSize());
+ LOG.debug("The active thread count is: {}",
threadPoolExecutor.getActiveCount());
+ }
+ }
+
+ /**
+ * Starts monitoring the CPU utilization and adjusts the thread pool size
accordingly.
+ */
+ synchronized void startCPUMonitoring() {
+ cpuMonitorExecutor.scheduleAtFixedRate(() -> {
+ double cpuUtilization = getCpuUtilization();
+ LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
+ try {
+ adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(String.format(
+ "Thread pool size adjustment interrupted for filesystem %s",
+ filesystemName), e);
+ }
+ }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(),
TimeUnit.SECONDS);
+ }
+
+ /**
+ * Gets the current CPU utilization.
+ *
+ * @return the CPU utilization as a percentage (0.0 to 1.0).
+ */
+ private double getCpuUtilization() {
+ OperatingSystemMXBean osBean =
ManagementFactory.getOperatingSystemMXBean();
+ if (osBean instanceof com.sun.management.OperatingSystemMXBean) {
+ com.sun.management.OperatingSystemMXBean sunOsBean
+ = (com.sun.management.OperatingSystemMXBean) osBean;
+ double cpuLoad = sunOsBean.getSystemCpuLoad();
+ if (cpuLoad >= 0) {
Review Comment:
taken
> ABFS: Enhance performance of ABFS driver for write-heavy workloads
> ------------------------------------------------------------------
>
> Key: HADOOP-19472
> URL: https://issues.apache.org/jira/browse/HADOOP-19472
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.1
> Reporter: Anmol Asrani
> Assignee: Anmol Asrani
> Priority: Minor
> Labels: pull-request-available
> Fix For: 3.4.1
>
>
> The goal of this work item is to enhance the performance of ABFS Driver for
> write-heavy workloads by improving concurrency within writes.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]