This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e0a76464c HDDS-12486. Warmup KMS encrypted keys when OM starts (#8081)
4e0a76464c is described below

commit 4e0a76464cb43603ed5b3417928278701f010548
Author: Aryan Gupta <[email protected]>
AuthorDate: Tue Mar 25 02:25:49 2025 +0530

    HDDS-12486. Warmup KMS encrypted keys when OM starts (#8081)
    
    Co-authored-by: Aryan Gupta <[email protected]>
---
 .../common/src/main/resources/ozone-default.xml    |  26 +++++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  13 +++
 .../client/rpc/TestOzoneAtRestEncryption.java      |  32 ++++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 119 +++++++++++++++++++++
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |   5 +
 5 files changed, 195 insertions(+)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 155eb48bf7..7deebfb787 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4658,4 +4658,30 @@
       default value of DEFAULT_RACK is returned for all node names.
     </description>
   </property>
+
+  <property>
+    <name>ozone.om.edekcacheloader.interval.ms</name>
+    <value>1000</value>
+    <description>When KeyProvider is configured, the interval time of warming
+      up edek cache on OM starts up. All edeks will be loaded
+      from KMS into provider cache. The edek cache loader will try to warm up 
the
+      cache until succeed or OM leaves active state.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.edekcacheloader.initial.delay.ms</name>
+    <value>3000</value>
+    <description>When KeyProvider is configured, the time delayed until the 
first
+      attempt to warm up edek cache on OM start up.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.edekcacheloader.max-retries</name>
+    <value>10</value>
+    <description>When KeyProvider is configured, the max retries allowed to 
attempt
+      warm up edek cache if none of key successful on OM start up.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index a6d849a127..77f3f17244 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -619,4 +619,17 @@ private OMConfigKeys() {
   public static final String OZONE_OM_MAX_BUCKET =
       "ozone.om.max.buckets";
   public static final int OZONE_OM_MAX_BUCKET_DEFAULT = 100000;
+
+  public static final String OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY =
+      "ozone.om.edekcacheloader.initial.delay.ms";
+
+  public static final int OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 
3000;
+
+  public static final String OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_KEY = 
"ozone.om.edekcacheloader.interval.ms";
+
+  public static final int OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
+
+  public static final String OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_KEY =
+      "ozone.om.edekcacheloader.max-retries";
+  public static final int OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_DEFAULT = 10;
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 709f3f5157..0bd501b7d7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -59,10 +59,12 @@
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.function.BooleanSupplier;
 import javax.xml.bind.DatatypeConverter;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
+import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
 import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -95,6 +97,7 @@
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -106,6 +109,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.test.Whitebox;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.tag.Flaky;
 import org.apache.ozone.test.tag.Unhealthy;
@@ -211,6 +215,34 @@ static void reInitClient() throws IOException {
     store = ozClient.getObjectStore();
   }
 
+  @Test
+  public void testWarmupEDEKCacheOnStartup() throws Exception {
+
+    createVolumeAndBucket("vol", "buck", BucketLayout.OBJECT_STORE);
+
+    @SuppressWarnings("unchecked") KMSClientProvider spy = 
getKMSClientProvider();
+    assertTrue(spy.getEncKeyQueueSize(TEST_KEY) > 0);
+
+    conf.setInt(OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY, 0);
+    cluster.restartOzoneManager();
+
+    GenericTestUtils.waitFor(new BooleanSupplier() {
+      @Override
+      public boolean getAsBoolean() {
+        final KMSClientProvider kspy = getKMSClientProvider();
+        return kspy.getEncKeyQueueSize(TEST_KEY) > 0;
+      }
+    }, 1000, 60000);
+  }
+
+  private KMSClientProvider getKMSClientProvider() {
+    LoadBalancingKMSClientProvider lbkmscp =
+        (LoadBalancingKMSClientProvider) Whitebox.getInternalState(
+            cluster.getOzoneManager().getKmsProvider(), "extension");
+    assert lbkmscp.getProviders().length == 1;
+    return lbkmscp.getProviders()[0];
+  }
+
 
   @ParameterizedTest
   @EnumSource
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index eb5d83b5a8..5200848485 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -54,6 +54,12 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE;
@@ -95,6 +101,7 @@
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
 import static org.apache.hadoop.security.UserGroupInformation.getCurrentUser;
 import static org.apache.hadoop.util.ExitUtil.terminate;
+import static org.apache.hadoop.util.Time.monotonicNow;
 import static org.apache.ozone.graph.PrintableGraph.GraphType.FILE_NAME;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -103,6 +110,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.ProtocolMessageEnum;
 import java.io.BufferedWriter;
@@ -135,6 +143,8 @@
 import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -386,6 +396,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private KeyManager keyManager;
   private PrefixManagerImpl prefixManager;
   private final UpgradeFinalizer<OzoneManager> upgradeFinalizer;
+  private ExecutorService edekCacheLoader = null;
 
   /**
    * OM super user / admin list.
@@ -725,6 +736,110 @@ private OzoneManager(OzoneConfiguration conf, 
StartupOption startupOption)
     omHostName = HddsUtils.getHostName(conf);
   }
 
+  public void initializeEdekCache(OzoneConfiguration conf) {
+    int edekCacheLoaderDelay =
+        conf.getInt(OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY, 
OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT);
+    int edekCacheLoaderInterval =
+        conf.getInt(OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_KEY, 
OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);
+    int edekCacheLoaderMaxRetries =
+        conf.getInt(OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_KEY, 
OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_DEFAULT);
+    if (kmsProvider != null) {
+      edekCacheLoader = Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("Warm Up EDEK Cache Thread #%d")
+              .build());
+      warmUpEdekCache(edekCacheLoader, edekCacheLoaderDelay, 
edekCacheLoaderInterval, edekCacheLoaderMaxRetries);
+    }
+  }
+
+  static class EDEKCacheLoader implements Runnable {
+    private final String[] keyNames;
+    private final KeyProviderCryptoExtension kp;
+    private int initialDelay;
+    private int retryInterval;
+    private int maxRetries;
+
+    EDEKCacheLoader(final String[] names, final KeyProviderCryptoExtension kp,
+        final int delay, final int interval, final int maxRetries) {
+      this.keyNames = names;
+      this.kp = kp;
+      this.initialDelay = delay;
+      this.retryInterval = interval;
+      this.maxRetries = maxRetries;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Warming up {} EDEKs... (initialDelay={}, "
+              + "retryInterval={}, maxRetries={})", keyNames.length, 
initialDelay, retryInterval,
+          maxRetries);
+      try {
+        Thread.sleep(initialDelay);
+      } catch (InterruptedException ie) {
+        LOG.info("EDEKCacheLoader interrupted before warming up.");
+        return;
+      }
+
+      boolean success = false;
+      int retryCount = 0;
+      IOException lastSeenIOE = null;
+      long warmUpEDEKStartTime = monotonicNow();
+
+      while (!success && retryCount < maxRetries) {
+        try {
+          kp.warmUpEncryptedKeys(keyNames);
+          LOG.info("Successfully warmed up {} EDEKs.", keyNames.length);
+          success = true;
+        } catch (IOException ioe) {
+          lastSeenIOE = ioe;
+          LOG.info("Failed to warm up EDEKs.", ioe);
+        } catch (Exception e) {
+          LOG.error("Cannot warm up EDEKs.", e);
+          throw e;
+        }
+
+        if (!success) {
+          try {
+            Thread.sleep(retryInterval);
+          } catch (InterruptedException ie) {
+            LOG.info("EDEKCacheLoader interrupted during retry.");
+            break;
+          }
+          retryCount++;
+        }
+      }
+
+      long warmUpEDEKTime = monotonicNow() - warmUpEDEKStartTime;
+      LOG.debug("Time taken to load EDEK keys to the cache: {}", 
warmUpEDEKTime);
+      if (!success) {
+        LOG.warn("Max retry {} reached, unable to warm up EDEKs.", maxRetries);
+        if (lastSeenIOE != null) {
+          LOG.warn("Last seen exception:", lastSeenIOE);
+        }
+      }
+    }
+  }
+
+  public void warmUpEdekCache(final ExecutorService executor, final int delay, 
final int interval, int maxRetries) {
+    Set<String> keys = new HashSet<>();
+    try (
+        TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>> 
iterator =
+            metadataManager.getBucketTable().iterator()) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, OmBucketInfo> entry = iterator.next();
+        if (entry.getValue().getEncryptionKeyInfo() != null) {
+          String encKey = entry.getValue().getEncryptionKeyInfo().getKeyName();
+          keys.add(encKey);
+        }
+      }
+    } catch (IOException ex) {
+      LOG.error("Error while retrieving encryption keys for warming up EDEK 
cache", ex);
+    }
+    String[] edeks = new String[keys.size()];
+    edeks = keys.toArray(edeks);
+    executor.execute(new EDEKCacheLoader(edeks, getKmsProvider(), delay, 
interval, maxRetries));
+  }
+
   public boolean isStopped() {
     return omState == State.STOPPED;
   }
@@ -2297,6 +2412,10 @@ public boolean stop() {
       if (versionManager != null) {
         versionManager.close();
       }
+
+      if (edekCacheLoader != null) {
+        edekCacheLoader.shutdown();
+      }
       return true;
     } catch (Exception e) {
       LOG.error("OzoneManager stop failed.", e);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 0394ee08c7..dce2951b95 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -159,6 +159,11 @@ public SnapshotInfo getLatestSnapshot() {
   @Override
   public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
                                   RaftPeerId newLeaderId) {
+    RaftPeerId currentPeerId = groupMemberId.getPeerId();
+    if (newLeaderId.equals(currentPeerId)) {
+      // warmup cache
+      ozoneManager.initializeEdekCache(ozoneManager.getConfiguration());
+    }
     // Initialize OMHAMetrics
     ozoneManager.omHAMetricsInit(newLeaderId.toString());
     LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to