This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4fc4a9a460 HDDS-12040. `ozone freon cr` fails with NPE in
ReplicationSupervisor (#7776)
4fc4a9a460 is described below
commit 4fc4a9a460d418613be25969601baa4e015b8d85
Author: len548 <[email protected]>
AuthorDate: Thu Jan 30 13:25:20 2025 +0100
HDDS-12040. `ozone freon cr` fails with NPE in ReplicationSupervisor (#7776)
---
.../replication/ReplicationSupervisor.java | 14 +++++++-
.../ozone/freon/ClosedContainerReplicator.java | 40 ++++++++++++++++------
2 files changed, 42 insertions(+), 12 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 9513cac84e..8374e45e17 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -232,14 +232,24 @@ private ReplicationSupervisor(StateContext context,
ExecutorService executor,
* Queue an asynchronous download of the given container.
*/
public void addTask(AbstractReplicationTask task) {
+ if (queueHasRoomFor(task)) {
+ initCounters(task);
+ addToQueue(task);
+ }
+ }
+
+ private boolean queueHasRoomFor(AbstractReplicationTask task) {
final int max = maxQueueSize;
if (getTotalInFlightReplications() >= max) {
LOG.warn("Ignored {} command for container {} in Replication Supervisor"
+ "as queue reached max size of {}.",
task.getClass(), task.getContainerId(), max);
- return;
+ return false;
}
+ return true;
+ }
+ public void initCounters(AbstractReplicationTask task) {
if (requestCounter.get(task.getMetricName()) == null) {
synchronized (this) {
if (requestCounter.get(task.getMetricName()) == null) {
@@ -255,7 +265,9 @@ public void addTask(AbstractReplicationTask task) {
}
}
}
+ }
+ private void addToQueue(AbstractReplicationTask task) {
if (inFlight.add(task)) {
if (task.getPriority() != ReplicationCommandPriority.LOW) {
// Low priority tasks are not included in the replication queue sizes
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 1c4f3601b3..f641d4384b 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -25,13 +25,15 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.metadata.WitnessedContainerMetadataStore;
@@ -44,7 +46,10 @@
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.replication.ReplicationTask;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.kohsuke.MetaInfServices;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
@@ -90,6 +95,9 @@ public class ClosedContainerReplicator extends
BaseFreonGenerator implements
private List<ReplicationTask> replicationTasks;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ClosedContainerReplicator.class);
+
@Override
public Void call() throws Exception {
try {
@@ -124,16 +132,9 @@ public Void replicate() throws Exception {
replicationTasks = new ArrayList<>();
for (ContainerInfo container : containerInfos) {
-
- final ContainerWithPipeline containerWithPipeline =
- containerOperationClient
- .getContainerWithPipeline(container.getContainerID());
-
if (container.getState() == LifeCycleState.CLOSED) {
-
- final List<DatanodeDetails> datanodesWithContainer =
- containerWithPipeline.getPipeline().getNodes();
-
+ final Pipeline pipeline =
containerOperationClient.getPipeline(container.getPipelineID().getProtobuf());
+ final List<DatanodeDetails> datanodesWithContainer =
pipeline.getNodes();
final List<String> datanodeUUIDs =
datanodesWithContainer
.stream().map(DatanodeDetails::getUuidString)
@@ -183,6 +184,8 @@ private void checkDestinationDirectory(String dirUrl)
throws IOException {
private void initializeReplicationSupervisor(
ConfigurationSource conf, int queueSize) throws IOException {
+ String scmID = UUID.randomUUID().toString();
+ String clusterID = UUID.randomUUID().toString();
String fakeDatanodeUuid = datanode;
if (fakeDatanodeUuid.isEmpty()) {
@@ -198,6 +201,20 @@ private void initializeReplicationSupervisor(
MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf,
null, StorageVolume.VolumeType.DATA_VOLUME, null);
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ MutableVolumeSet dbVolumeSet =
+ HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
+ new MutableVolumeSet(fakeDatanodeUuid, conf, null,
+ StorageVolume.VolumeType.DB_VOLUME, null);
+ // load rocksDB with readOnly mode, otherwise it will fail.
+ HddsVolumeUtil.loadAllHddsVolumeDbStore(
+ volumeSet, dbVolumeSet, false, LOG);
+
+ for (StorageVolume volume : volumeSet.getVolumesList()) {
+ StorageVolumeUtil.checkVolume(volume, scmID, clusterID, conf, LOG,
dbVolumeSet);
+ }
+ }
+
Map<ContainerType, Handler> handlers = new HashMap<>();
for (ContainerType containerType : ContainerType.values()) {
@@ -211,7 +228,7 @@ private void initializeReplicationSupervisor(
metrics,
containerReplicaProto -> {
});
- handler.setClusterID(UUID.randomUUID().toString());
+ handler.setClusterID(clusterID);
handlers.put(containerType, handler);
}
@@ -238,6 +255,7 @@ private void replicateContainer(long counter) throws
Exception {
timer.time(() -> {
final ReplicationTask replicationTask =
replicationTasks.get((int) counter);
+ supervisor.initCounters(replicationTask);
supervisor.new TaskRunner(replicationTask).run();
return null;
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]