This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fc4a9a460 HDDS-12040. `ozone freon cr` fails with NPE in 
ReplicationSupervisor (#7776)
4fc4a9a460 is described below

commit 4fc4a9a460d418613be25969601baa4e015b8d85
Author: len548 <[email protected]>
AuthorDate: Thu Jan 30 13:25:20 2025 +0100

    HDDS-12040. `ozone freon cr` fails with NPE in ReplicationSupervisor (#7776)
---
 .../replication/ReplicationSupervisor.java         | 14 +++++++-
 .../ozone/freon/ClosedContainerReplicator.java     | 40 ++++++++++++++++------
 2 files changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 9513cac84e..8374e45e17 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -232,14 +232,24 @@ private ReplicationSupervisor(StateContext context, 
ExecutorService executor,
    * Queue an asynchronous download of the given container.
    */
   public void addTask(AbstractReplicationTask task) {
+    if (queueHasRoomFor(task)) {
+      initCounters(task);
+      addToQueue(task);
+    }
+  }
+
+  private boolean queueHasRoomFor(AbstractReplicationTask task) {
     final int max = maxQueueSize;
     if (getTotalInFlightReplications() >= max) {
       LOG.warn("Ignored {} command for container {} in Replication Supervisor"
               + "as queue reached max size of {}.",
           task.getClass(), task.getContainerId(), max);
-      return;
+      return false;
     }
+    return true;
+  }
 
+  public void initCounters(AbstractReplicationTask task) {
     if (requestCounter.get(task.getMetricName()) == null) {
       synchronized (this) {
         if (requestCounter.get(task.getMetricName()) == null) {
@@ -255,7 +265,9 @@ public void addTask(AbstractReplicationTask task) {
         }
       }
     }
+  }
 
+  private void addToQueue(AbstractReplicationTask task) {
     if (inFlight.add(task)) {
       if (task.getPriority() != ReplicationCommandPriority.LOW) {
         // Low priority tasks are not included in the replication queue sizes
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 1c4f3601b3..f641d4384b 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -25,13 +25,15 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.metadata.WitnessedContainerMetadataStore;
@@ -44,7 +46,10 @@
 import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask;
 import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.kohsuke.MetaInfServices;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
@@ -90,6 +95,9 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
 
   private List<ReplicationTask> replicationTasks;
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ClosedContainerReplicator.class);
+
   @Override
   public Void call() throws Exception {
     try {
@@ -124,16 +132,9 @@ public Void replicate() throws Exception {
     replicationTasks = new ArrayList<>();
 
     for (ContainerInfo container : containerInfos) {
-
-      final ContainerWithPipeline containerWithPipeline =
-          containerOperationClient
-              .getContainerWithPipeline(container.getContainerID());
-
       if (container.getState() == LifeCycleState.CLOSED) {
-
-        final List<DatanodeDetails> datanodesWithContainer =
-            containerWithPipeline.getPipeline().getNodes();
-
+        final Pipeline pipeline = 
containerOperationClient.getPipeline(container.getPipelineID().getProtobuf());
+        final List<DatanodeDetails> datanodesWithContainer = 
pipeline.getNodes();
         final List<String> datanodeUUIDs =
             datanodesWithContainer
                 .stream().map(DatanodeDetails::getUuidString)
@@ -183,6 +184,8 @@ private void checkDestinationDirectory(String dirUrl) 
throws IOException {
 
   private void initializeReplicationSupervisor(
       ConfigurationSource conf, int queueSize) throws IOException {
+    String scmID = UUID.randomUUID().toString();
+    String clusterID = UUID.randomUUID().toString();
     String fakeDatanodeUuid = datanode;
 
     if (fakeDatanodeUuid.isEmpty()) {
@@ -198,6 +201,20 @@ private void initializeReplicationSupervisor(
     MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf,
         null, StorageVolume.VolumeType.DATA_VOLUME, null);
 
+    if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+      MutableVolumeSet dbVolumeSet =
+          HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
+          new MutableVolumeSet(fakeDatanodeUuid, conf, null,
+              StorageVolume.VolumeType.DB_VOLUME, null);
+      // load rocksDB with readOnly mode, otherwise it will fail.
+      HddsVolumeUtil.loadAllHddsVolumeDbStore(
+          volumeSet, dbVolumeSet, false, LOG);
+
+      for (StorageVolume volume : volumeSet.getVolumesList()) {
+        StorageVolumeUtil.checkVolume(volume, scmID, clusterID, conf, LOG, 
dbVolumeSet);
+      }
+    }
+
     Map<ContainerType, Handler> handlers = new HashMap<>();
 
     for (ContainerType containerType : ContainerType.values()) {
@@ -211,7 +228,7 @@ private void initializeReplicationSupervisor(
               metrics,
               containerReplicaProto -> {
               });
-      handler.setClusterID(UUID.randomUUID().toString());
+      handler.setClusterID(clusterID);
       handlers.put(containerType, handler);
     }
 
@@ -238,6 +255,7 @@ private void replicateContainer(long counter) throws 
Exception {
     timer.time(() -> {
       final ReplicationTask replicationTask =
           replicationTasks.get((int) counter);
+      supervisor.initCounters(replicationTask);
       supervisor.new TaskRunner(replicationTask).run();
       return null;
     });


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to