This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8c44dcab2 CASSSIDECAR-409: Add safety check to Live Migration data
copy endpoint (#321)
8c44dcab2 is described below
commit 8c44dcab2440ed3972e1b0f74cc3a2457efc7316
Author: N V Harikrishna <[email protected]>
AuthorDate: Fri Mar 27 04:02:28 2026 +0530
CASSSIDECAR-409: Add safety check to Live Migration data copy endpoint
(#321)
Patch by N V Harikrishna; reviewed by Francisco Guerrero, Yifan Cai for
CASSSIDECAR-409
---
CHANGES.txt | 1 +
.../request/LiveMigrationDataCopyRequest.java | 2 +-
.../sidecar/routes/RepairIntegrationTest.java | 2 +-
.../LiveMigrationCreateDataCopyTaskHandler.java | 2 +-
.../sidecar/livemigration/DataCopyTaskManager.java | 93 ++++++++++++------
.../LiveMigrationFileDownloadPreCheck.java | 105 +++++++++++++++++++++
.../livemigration/LiveMigrationFileDownloader.java | 73 +++++++++++++-
.../LiveMigrationTaskFactoryImpl.java | 7 +-
.../livemigration/LiveMigrationTaskImpl.java | 5 +-
.../sidecar/modules/LiveMigrationModule.java | 2 +
.../cassandra/sidecar/HelperTestModules.java | 1 +
.../livemigration/DataCopyTaskManagerTest.java | 75 ++++++++++++++-
.../LiveMigrationFileDownloaderTest.java | 96 +++++++++++++++++++
.../livemigration/LiveMigrationTaskImplTest.java | 2 +-
14 files changed, 430 insertions(+), 36 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 538baacb3..3b382f539 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Add safety check to Live Migration data copy task endpoint (CASSSIDECAR-409)
* Define common operational job tracking interface and refactor current
operational job tracker (CASSSIDECAR-372)
0.3.0
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
index ebb447d69..389fa3941 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
@@ -45,7 +45,7 @@ public class LiveMigrationDataCopyRequest
public final int maxConcurrency;
/**
- * Creates a new request with auto-generated ID.
+ * Creates a new live migration data copy request.
*/
@JsonCreator
public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int
maxIterations,
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
index 7c6b1776f..71495a414 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
@@ -214,7 +214,7 @@ class RepairIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
private void pollStatusForState(String uuid)
{
String status = "/api/v1/cassandra/operational-jobs/" + uuid;
- loopAssert(30, 500, () -> {
+ loopAssert(60, 500, () -> {
HttpResponse<Buffer> resp =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", status)
.send());
logger.info("Success Status Response code: {}", resp.statusCode());
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
index b6875d982..14fe60b6e 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
@@ -103,7 +103,7 @@ public class LiveMigrationCreateDataCopyTaskHandler extends
AbstractHandler<Live
.onFailure(throwable -> {
if (throwable instanceof LiveMigrationInvalidRequestException)
{
- LOGGER.error("Input payload is not valid.", throwable);
+ LOGGER.error("Invalid live migration request.", throwable);
context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
throwable.getMessage(), throwable));
}
else if (throwable instanceof
LiveMigrationDataCopyInProgressException)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
index 719b93d35..faf87080b 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
@@ -31,10 +31,12 @@ import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
import
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException;
import
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
import
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
@@ -101,39 +103,76 @@ public class DataCopyTaskManager
String source,
InstanceMetadata
localInstanceMetadata)
{
- LiveMigrationTask newTask = createTask(request,
- source,
-
sidecarConfiguration.serviceConfiguration().port(),
- localInstanceMetadata);
-
- // It is possible to serve only one live migration data copy request
per instance at a time.
- // Checking if there is another migration is in progress before
accepting new one.
- boolean accepted = currentTasks.compute(localInstanceMetadata.id(),
(integer, taskInMap) -> {
- if (taskInMap == null)
- {
- return newTask;
- }
+ // Fast local JMX check before creating task - prevents task creation
if Cassandra is running
+ return verifyCassandraNotRunning(localInstanceMetadata)
+ .compose(v -> {
+ LiveMigrationTask newTask = createTask(request,
+ source,
+
sidecarConfiguration.serviceConfiguration().port(),
+
localInstanceMetadata);
+
+ // It is possible to serve only one live migration data
copy request per instance at a time.
+ // Checking if there is another migration is in progress
before accepting new one.
+ boolean accepted = newTask ==
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+ if (taskInMap == null)
+ {
+ return newTask;
+ }
+
+ // Accept new task if and only if the existing task has
completed.
+ return taskInMap.isCompleted() ? newTask : taskInMap;
+ });
+
+ if (!accepted)
+ {
+ return Future.failedFuture(
+ new LiveMigrationDataCopyInProgressException("Another
task is already under progress. Cannot accept new task."));
+ }
+ LOGGER.info("Starting data copy task with id={}, source={},
destination={}",
+ newTask.id(), source,
localInstanceMetadata.host());
+ newTask.start();
+ return Future.succeededFuture(newTask);
+ });
+ }
- if (!taskInMap.isCompleted())
- {
- // Accept new task if and only if the existing task has
completed.
- return taskInMap;
- }
- else
+ /**
+ * Initiating data copy once a Cassandra instance starts is not
acceptable. This method checks whether
+ * Cassandra is running or not at the moment on the destination instance
by checking if Sidecar
+ * is able to connect to the Cassandra instance's JMX port or native (CQL)
port. It returns a failed
+ * future if Sidecar is able to connect to either port of Cassandra.
+ *
+ * @param localInstance metadata for the local Cassandra instance
+ * @return Future that succeeds if Cassandra is not running, fails if it
is running
+ */
+ private Future<Void> verifyCassandraNotRunning(InstanceMetadata
localInstance)
+ {
+ try
+ {
+ CassandraAdapterDelegate delegate = localInstance.delegate();
+
+ if (delegate.isJmxUp() || delegate.isNativeUp())
{
- return newTask;
+ return Future.failedFuture(new
LiveMigrationInvalidRequestException(
+ "Cannot start data copy: Cassandra is currently running on
this instance " +
+ "(JMX or native connectivity established). Data copy cannot
proceed while Cassandra is active."));
}
- }) == newTask;
- if (!accepted)
+ // JMX and native are down - Cassandra is not running (or at least
wasn't during last health check)
+ LOGGER.debug("Local JMX and native check passed: Cassandra not
detected as running on {}", localInstance.host());
+ return Future.succeededFuture();
+ }
+ catch (CassandraUnavailableException e)
{
- return Future.failedFuture(
- new LiveMigrationDataCopyInProgressException("Another task is
already under progress. Cannot accept new task."));
+ // No delegate available - Cassandra is not running
+ LOGGER.debug("No Cassandra delegate available for {} (Cassandra
not running)", localInstance.host());
+ return Future.succeededFuture();
+ }
+ catch (Exception e)
+ {
+ // Unexpected error - be conservative and reject for safety
+ LOGGER.warn("Unable to verify Cassandra status on {}, rejecting
for safety", localInstance.host(), e);
+ return Future.failedFuture(e);
}
- LOGGER.info("Starting data copy task with id={}, source={},
destination={}",
- newTask.id(), source, localInstanceMetadata.host());
- newTask.start();
- return Future.succeededFuture(newTask);
}
LiveMigrationTask createTask(LiveMigrationDataCopyRequest request,
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloadPreCheck.java
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloadPreCheck.java
new file mode 100644
index 000000000..71abb8a8e
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloadPreCheck.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
+
+/**
+ * A pluggable pre-check hook that runs before each file download iteration in
the live migration
+ * data copy process. Since the data copy involves deleting local files and
downloading files from
+ * the source, implementations can perform safety validations (e.g., verifying
cluster state via gossip,
+ * checking instance readiness) to prevent data corruption or unsafe
operations.
+ *
+ * <p>The pre-check is invoked at the beginning of every download iteration
(not just the first one),
+ * allowing implementations to continuously validate that conditions remain
safe throughout the
+ * multi-iteration copy process.</p>
+ *
+ * <p>A {@link #DEFAULT} no-op implementation is provided for cases where no
pre-check is needed.
+ * Custom implementations can be bound via Guice to override the default
behavior.</p>
+ *
+ * <p>Example use cases:</p>
+ * <ul>
+ * <li>Gossip-based validation: verify the source node is present and the
destination node
+ * is absent from cluster gossip, preventing data copy to a node that
has already joined</li>
+ * <li>Instance state checks: verify the local Cassandra process is not
running</li>
+ * <li>Disk space validation: ensure sufficient space before downloading</li>
+ * </ul>
+ *
+ * @see LiveMigrationFileDownloader#downloadFiles()
+ */
+public interface LiveMigrationFileDownloadPreCheck
+{
+ /**
+ * Default no-op implementation that always succeeds. Used when no
pre-check validation is required.
+ */
+ LiveMigrationFileDownloadPreCheck DEFAULT = context ->
Future.succeededFuture();
+
+ /**
+ * Performs a safety check before a file download iteration begins.
+ *
+ * <p>Implementations should return a succeeded future if the check passes
(it is safe to proceed),
+ * or a failed future with a descriptive exception if the check fails
(download should be aborted).</p>
+ *
+ * @param context provides access to the source host, destination instance
metadata,
+ * sidecar port, and the data copy request parameters
needed for validation
+ * @return a succeeded {@link Future} if the pre-check passes, a failed
{@link Future} otherwise
+ */
+ Future<Void> doCheck(PreCheckContext context);
+
+ /**
+ * Encapsulates the contextual information available to a {@link
LiveMigrationFileDownloadPreCheck}
+ * implementation. This context is populated by the {@link
LiveMigrationFileDownloader} before each
+ * download iteration.
+ *
+ * <p>Provides access to:</p>
+ * <ul>
+ * <li>{@link #source()} - hostname of the source instance being copied
from</li>
+ * <li>{@link #destinationInstanceMetadata()} - metadata of the
local/destination instance,
+ * including host, data directories, and the Cassandra adapter
delegate</li>
+ * <li>{@link #sidecarPort()} - port on which Sidecar is running, useful
for making
+ * Sidecar API calls to other cluster instances</li>
+ * <li>{@link #request()} - the original data copy request containing
task parameters</li>
+ * </ul>
+ */
+ interface PreCheckContext
+ {
+ /**
+ * @return the hostname of the source instance from which files are
being copied
+ */
+ String source();
+
+ /**
+ * @return the metadata for the destination (local) instance where
files will be written
+ */
+ InstanceMetadata destinationInstanceMetadata();
+
+ /**
+ * @return the Sidecar service port, useful for contacting other
Sidecar instances in the cluster
+ */
+ int sidecarPort();
+
+ /**
+ * @return the original data copy request containing task parameters
such as max iterations,
+ * success threshold, and max concurrency
+ */
+ LiveMigrationDataCopyRequest request();
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
index 4f918808f..5e0142d98 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
@@ -81,6 +81,7 @@ class LiveMigrationFileDownloader
private final int port;
private final String logPrefix;
private final ExecutorPools executorPools;
+ private final LiveMigrationFileDownloadPreCheck preCheck;
private OperationStatus operationStatus;
private AsyncConcurrentTaskExecutor<Void> concurrentTaskExecutor;
@@ -97,6 +98,7 @@ class LiveMigrationFileDownloader
this.source = builder.source;
this.port = builder.port;
this.executorPools = builder.executorPools;
+ this.preCheck = builder.preCheck;
this.operationStatus = OperationStatus.startingState();
this.logPrefix = String.format("liveMigrationRequest=%s iteration=%s
", id, iteration);
@@ -115,7 +117,8 @@ class LiveMigrationFileDownloader
*/
public Future<OperationStatus> downloadFiles()
{
- return checkLiveMigrationStatusOfSource()
+ return runPreCheck()
+ .compose(v -> checkLiveMigrationStatusOfSource())
.compose(v -> fetchSourceFileList())
.compose(this::cleanupUnnecessaryFiles)
.compose(this::prepareDownloadList)
@@ -124,6 +127,14 @@ class LiveMigrationFileDownloader
.otherwise(this::handleDownloadFailure);
}
+ private Future<Void> runPreCheck()
+ {
+ LiveMigrationFileDownloadPreCheck.PreCheckContext context
+ = new PreCheckContextImpl(source, instanceMetadata, port, request);
+ return preCheck.doCheck(context)
+ .onSuccess(v -> LOGGER.debug("{} Pre-check completed
successfully. Proceeding with data copy.", logPrefix))
+ .onFailure(throwable -> LOGGER.error("{} Pre-check failed.",
logPrefix, throwable));
+ }
/**
* Checks whether the live migration status at the source is NOT_COMPLETED
or COMPLETED.
@@ -572,6 +583,7 @@ class LiveMigrationFileDownloader
private String id;
private String source;
private int port;
+ private LiveMigrationFileDownloadPreCheck preCheck;
protected Builder()
{
@@ -704,6 +716,17 @@ class LiveMigrationFileDownloader
return update(b -> b.executorPools = executorPools);
}
+ /**
+ * Sets the {@code preCheck} instance and return a reference to this
Builder enabling method chaining.
+ *
+ * @param preCheck the {@code preCheck} to set
+ * @return a reference to this Builder
+ */
+ public Builder preCheck(LiveMigrationFileDownloadPreCheck preCheck)
+ {
+ return update(b -> b.preCheck = preCheck);
+ }
+
/**
* Returns a {@code LiveMigrationFileDownloader} built from the
parameters previously set.
*
@@ -721,6 +744,7 @@ class LiveMigrationFileDownloader
Objects.requireNonNull(request);
Objects.requireNonNull(source);
Objects.requireNonNull(executorPools);
+ Objects.requireNonNull(preCheck);
return new LiveMigrationFileDownloader(this);
}
@@ -744,4 +768,51 @@ class LiveMigrationFileDownloader
this.lastModifiedTime = lastModifiedTime;
}
}
+
+ /**
+ * Implementation of {@link
LiveMigrationFileDownloadPreCheck.PreCheckContext} that provides
+ * the downloader's context to pre-check implementations.
+ */
+ private static class PreCheckContextImpl implements
LiveMigrationFileDownloadPreCheck.PreCheckContext
+ {
+ private final String source;
+ private final InstanceMetadata destinationInstanceMetadata;
+ private final int sidecarPort;
+ private final LiveMigrationDataCopyRequest request;
+
+ PreCheckContextImpl(String source,
+ InstanceMetadata destinationInstanceMetadata,
+ int sidecarPort,
+ LiveMigrationDataCopyRequest request)
+ {
+ this.source = source;
+ this.destinationInstanceMetadata = destinationInstanceMetadata;
+ this.sidecarPort = sidecarPort;
+ this.request = request;
+ }
+
+ @Override
+ public String source()
+ {
+ return source;
+ }
+
+ @Override
+ public InstanceMetadata destinationInstanceMetadata()
+ {
+ return destinationInstanceMetadata;
+ }
+
+ @Override
+ public int sidecarPort()
+ {
+ return sidecarPort;
+ }
+
+ @Override
+ public LiveMigrationDataCopyRequest request()
+ {
+ return request;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
index bc98c2cc9..4c254f20c 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
@@ -39,17 +39,20 @@ public class LiveMigrationTaskFactoryImpl implements
LiveMigrationTaskFactory
private final SidecarClientProvider sidecarClientProvider;
private final LiveMigrationConfiguration liveMigrationConfiguration;
private final ExecutorPools executorPools;
+ private final LiveMigrationFileDownloadPreCheck preCheck;
@Inject
public LiveMigrationTaskFactoryImpl(Vertx vertx,
ExecutorPools executorPools,
SidecarClientProvider
sidecarClientProvider,
- SidecarConfiguration
sidecarConfiguration)
+ SidecarConfiguration
sidecarConfiguration,
+ LiveMigrationFileDownloadPreCheck
preCheck)
{
this.vertx = vertx;
this.executorPools = executorPools;
this.sidecarClientProvider = sidecarClientProvider;
this.liveMigrationConfiguration =
sidecarConfiguration.liveMigrationConfiguration();
+ this.preCheck = preCheck;
}
/**
@@ -63,6 +66,6 @@ public class LiveMigrationTaskFactoryImpl implements
LiveMigrationTaskFactory
InstanceMetadata instanceMetadata)
{
return new LiveMigrationTaskImpl(vertx, executorPools,
sidecarClientProvider, liveMigrationConfiguration,
- id, request, source, port,
instanceMetadata);
+ id, request, source, port,
instanceMetadata, preCheck);
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
index 723f35de7..c64b14be4 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
@@ -52,6 +52,7 @@ public class LiveMigrationTaskImpl implements
LiveMigrationTask
private final ExecutorPools executorPools;
private final SidecarClientProvider sidecarClientProvider;
private final LiveMigrationConfiguration liveMigrationConfiguration;
+ private final LiveMigrationFileDownloadPreCheck preCheck;
// Indicates overall status of the operation (succeeded or failed).
// Future returned by downloader changes on next iteration. Using a
separate future to track overall operation.
@@ -69,7 +70,7 @@ public class LiveMigrationTaskImpl implements
LiveMigrationTask
LiveMigrationDataCopyRequest request,
String source,
int port,
- InstanceMetadata instanceMetadata)
+ InstanceMetadata instanceMetadata,
LiveMigrationFileDownloadPreCheck preCheck)
{
this.vertx = vertx;
this.executorPools = executorPools;
@@ -80,6 +81,7 @@ public class LiveMigrationTaskImpl implements
LiveMigrationTask
this.instanceMetadata = instanceMetadata;
this.source = source;
this.port = port;
+ this.preCheck = preCheck;
}
/**
@@ -130,6 +132,7 @@ public class LiveMigrationTaskImpl implements
LiveMigrationTask
.source(source)
.port(port)
.executorPools(executorPools)
+ .preCheck(preCheck)
.build();
return downloader.downloadFiles();
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
index 9cb95c836..4c10c5180 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
@@ -42,6 +42,7 @@ import
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationMapSidec
import
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationStatusClearHandler;
import
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationStatusCompleteHandler;
import
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationStatusGetHandler;
+import
org.apache.cassandra.sidecar.livemigration.LiveMigrationFileDownloadPreCheck;
import org.apache.cassandra.sidecar.livemigration.LiveMigrationStatusTracker;
import
org.apache.cassandra.sidecar.livemigration.LiveMigrationStatusTrackerImpl;
import org.apache.cassandra.sidecar.livemigration.LiveMigrationTaskFactory;
@@ -69,6 +70,7 @@ public class LiveMigrationModule extends AbstractModule
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
bind(LiveMigrationStatusTracker.class).to(LiveMigrationStatusTrackerImpl.class);
+
bind(LiveMigrationFileDownloadPreCheck.class).toInstance(LiveMigrationFileDownloadPreCheck.DEFAULT);
}
@GET
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java
b/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java
index eb671308b..87674d82e 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java
@@ -80,6 +80,7 @@ public class HelperTestModules
this.instanceMetadataList = instanceMetadataList;
}
+ @Override
protected void configure()
{
InstancesMetadata mockInstancesMetadata =
mock(InstancesMetadata.class);
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
index 996663753..52bafd6e8 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
@@ -33,6 +33,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
@@ -40,6 +41,7 @@ import
org.apache.cassandra.sidecar.common.response.LiveMigrationTaskResponse;
import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
import
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException;
import
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
import
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
@@ -47,6 +49,7 @@ import
org.apache.cassandra.sidecar.handlers.livemigration.FakeLiveMigrationTask
import org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationMap;
import org.jetbrains.annotations.NotNull;
+import static
org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL_AND_JMX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
@@ -161,6 +164,72 @@ public class DataCopyTaskManagerTest
assertThat(future.result().id()).isNotEqualTo("completed-task");
}
+ @Test
+ public void testCreateTaskShouldFailWhenCassandraInstanceJMXIsUp() throws
InterruptedException
+ {
+ Injector injector = getInjector();
+ DataCopyTaskManager dataCopyTaskManager =
getDataCopyTaskManager(injector);
+ InstancesMetadata instancesMetadata =
injector.getInstance(InstancesMetadata.class);
+ InstanceMetadata destinationMetadata =
instancesMetadata.instanceFromHost(dest1Name);
+
+ // Mocking JMX as up
+ when(destinationMetadata.delegate().isJmxUp()).thenReturn(true);
+
+ LiveMigrationDataCopyRequest request = new
LiveMigrationDataCopyRequest(1, 1.0, 2);
+ Future<LiveMigrationTask> future =
dataCopyTaskManager.createTask(request, dest1Name);
+ awaitForFuture(future);
+
+ assertThat(future.succeeded()).isFalse();
+ assertThat(future.failed()).isTrue();
+ assertThat(future.result()).isNull();
+ assertThat(future.cause()).isNotNull();
+
assertThat(future.cause()).isInstanceOf(LiveMigrationInvalidRequestException.class);
+ }
+
+ @Test
+ public void testCreateTaskShouldFailWhenCassandraInstanceNativeIsUp()
throws InterruptedException
+ {
+ Injector injector = getInjector();
+ DataCopyTaskManager dataCopyTaskManager =
getDataCopyTaskManager(injector);
+ InstancesMetadata instancesMetadata =
injector.getInstance(InstancesMetadata.class);
+ InstanceMetadata destinationMetadata =
instancesMetadata.instanceFromHost(dest1Name);
+
+ // Mocking native (CQL) as up but JMX as down
+ when(destinationMetadata.delegate().isJmxUp()).thenReturn(false);
+ when(destinationMetadata.delegate().isNativeUp()).thenReturn(true);
+
+ LiveMigrationDataCopyRequest request = new
LiveMigrationDataCopyRequest(1, 1.0, 2);
+ Future<LiveMigrationTask> future =
dataCopyTaskManager.createTask(request, dest1Name);
+ awaitForFuture(future);
+
+ assertThat(future.succeeded()).isFalse();
+ assertThat(future.failed()).isTrue();
+ assertThat(future.result()).isNull();
+ assertThat(future.cause()).isNotNull();
+
assertThat(future.cause()).isInstanceOf(LiveMigrationInvalidRequestException.class);
+ }
+
+ @Test
+ public void
testCreateTaskShouldSucceedWhenCassandraAdapterIsNotAvailable() throws
InterruptedException
+ {
+ Injector injector = getInjector();
+ DataCopyTaskManager dataCopyTaskManager =
getDataCopyTaskManager(injector);
+ InstancesMetadata instancesMetadata =
injector.getInstance(InstancesMetadata.class);
+ InstanceMetadata destinationMetadata =
instancesMetadata.instanceFromHost(dest1Name);
+ when(destinationMetadata.delegate())
+ .thenThrow(new CassandraUnavailableException(CQL_AND_JMX,
"CassandraAdapterDelegate is not available"));
+
+ LiveMigrationDataCopyRequest request = new
LiveMigrationDataCopyRequest(1, 1.0, 2);
+ Future<LiveMigrationTask> future =
dataCopyTaskManager.createTask(request, dest1Name);
+ awaitForFuture(future);
+
+ assertThat(future.succeeded()).isTrue();
+ assertThat(future.failed()).isFalse();
+ assertThat(future.result()).isNotNull();
+ assertThat(future.result().id()).isNotNull();
+ assertThat(future.cause()).isNull();
+ }
+
@Test
public void testGetTaskSuccess()
{
@@ -324,7 +393,7 @@ public class DataCopyTaskManagerTest
CountDownLatch latch = new CountDownLatch(1);
future.onComplete(res -> latch.countDown());
- latch.await(100, TimeUnit.MILLISECONDS);
+ latch.await(2, TimeUnit.SECONDS);
}
private DataCopyTaskManager getDataCopyTaskManager(Injector injector)
@@ -388,17 +457,21 @@ public class DataCopyTaskManagerTest
when(mockInstancesMetadata.instanceFromHost(dest1Name)).thenReturn(mockDest1InstanceMeta);
when(mockDest1InstanceMeta.id()).thenReturn(dest1Id);
when(mockDest1InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2"));
+
when(mockDest1InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
when(mockInstancesMetadata.instanceFromHost(dest2Name)).thenReturn(mockDest2InstanceMeta);
when(mockDest2InstanceMeta.id()).thenReturn(dest2Id);
when(mockDest2InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2"));
+
when(mockDest2InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
when(mockInstancesMetadata.instanceFromHost(dest3Name)).thenReturn(mockDest3InstanceMeta);
when(mockDest3InstanceMeta.id()).thenReturn(dest3Id);
when(mockDest3InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2"));
+
when(mockDest3InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
when(mockInstancesMetadata.instanceFromHost(source1Name)).thenReturn(mockSourceInstanceMeta);
when(mockSourceInstanceMeta.dataDirs()).thenReturn(List.of("/data1"));
+
when(mockSourceInstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
// Configure LiveMigrationTaskFactory to return fake tasks
when(mockLiveMigrationTaskFactory.create(anyString(),
any(LiveMigrationDataCopyRequest.class), anyString(), anyInt(),
any(InstanceMetadata.class))).thenAnswer(invocation -> {
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
index 194683db0..7606d2088 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
@@ -1144,6 +1144,68 @@ class LiveMigrationFileDownloaderTest
assertThat(attrs.lastModifiedTime).isEqualTo(lastModified);
}
+ @Test
+ void testDownloadFailsWhenPreCheckFails(@TempDir Path tmpDir) throws
InterruptedException
+ {
+ String storageDir =
tmpDir.resolve("testDownloadFailsWhenPreCheckFails").toAbsolutePath().toString();
+ List<String> dataDirs = getDataDirList(storageDir);
+ final Consumer<OperationStatus> statusUpdater = mock(Consumer.class);
+
+ LiveMigrationFileDownloadPreCheck failingPreCheck =
+ context -> Future.failedFuture(new RuntimeException("Pre-check failed:
node already joined cluster"));
+
+ Injector injector = getInjector();
+ LiveMigrationFileDownloader downloader =
+ getDownloader(injector, dummyRequest100pThreshold, 0, statusUpdater,
storageDir, dataDirs, failingPreCheck);
+
+ Future<OperationStatus> statusFuture = downloader.downloadFiles();
+ awaitForFuture(statusFuture);
+
+ assertThat(statusFuture.isComplete()).isTrue();
+
assertThat(statusFuture.result().state()).isEqualTo(OperationStatus.State.FAILED);
+ }
+
+ @Test
+ void testPreCheckReceivesCorrectContext(@TempDir Path tmpDir) throws
InterruptedException
+ {
+ String storageDir =
tmpDir.resolve("testPreCheckReceivesCorrectContext").toAbsolutePath().toString();
+ List<String> dataDirs = getDataDirList(storageDir);
+ final Consumer<OperationStatus> statusUpdater = mock(Consumer.class);
+
+ // Pre-check that captures the context and then fails to stop the
pipeline early
+ LiveMigrationFileDownloadPreCheck.PreCheckContext[] capturedContext =
+ new LiveMigrationFileDownloadPreCheck.PreCheckContext[1];
+ LiveMigrationFileDownloadPreCheck capturingPreCheck = context -> {
+ capturedContext[0] = context;
+ return Future.failedFuture(new RuntimeException("stop here"));
+ };
+
+ Injector injector = getInjector();
+ LiveMigrationFileDownloader downloader =
+ getDownloader(injector, dummyRequest100pThreshold, 0, statusUpdater,
storageDir, dataDirs, capturingPreCheck);
+
+ Future<OperationStatus> statusFuture = downloader.downloadFiles();
+ awaitForFuture(statusFuture);
+
+ assertThat(capturedContext[0]).isNotNull();
+ assertThat(capturedContext[0].source()).isEqualTo(SOURCE);
+ assertThat(capturedContext[0].sidecarPort()).isEqualTo(PORT);
+
assertThat(capturedContext[0].request()).isSameAs(dummyRequest100pThreshold);
+
assertThat(capturedContext[0].destinationInstanceMetadata()).isNotNull();
+ }
+
+ @Test
+ void testDefaultPreCheckAlwaysSucceeds()
+ {
+ LiveMigrationFileDownloadPreCheck defaultPreCheck =
LiveMigrationFileDownloadPreCheck.DEFAULT;
+ LiveMigrationFileDownloadPreCheck.PreCheckContext mockContext =
+ mock(LiveMigrationFileDownloadPreCheck.PreCheckContext.class);
+
+ Future<Void> result = defaultPreCheck.doCheck(mockContext);
+
+ assertThat(result.succeeded()).isTrue();
+ }
+
@Test
void testCanDeleteWithIOException(@TempDir Path tempDir) throws IOException
{
@@ -1372,6 +1434,40 @@ class LiveMigrationFileDownloaderTest
.source(SOURCE)
.port(PORT)
.executorPools(ExecutorPoolsHelper.createdSharedTestPool(vertx))
+
.preCheck(LiveMigrationFileDownloadPreCheck.DEFAULT)
+ .build();
+ }
+
+ LiveMigrationFileDownloader getDownloader(Injector injector,
+ LiveMigrationDataCopyRequest
request,
+ int currentIteration,
+ Consumer<OperationStatus>
mockStatusUpdater,
+ String storageDir,
+ List<String> dataDirs,
+
LiveMigrationFileDownloadPreCheck preCheck)
+ {
+ SidecarClientProvider sidecarClientProvider =
injector.getInstance(SidecarClientProvider.class);
+ LiveMigrationConfiguration liveMigrationConfig =
injector.getInstance(SidecarConfiguration.class)
+
.liveMigrationConfiguration();
+ return LiveMigrationFileDownloader.builder()
+ .id(UUID.randomUUID().toString())
+ .vertx(vertx)
+
.sidecarClient(sidecarClientProvider.get())
+ .request(request)
+ .iteration(currentIteration)
+ .statusUpdater(mockStatusUpdater)
+
.instanceMetadata(InstanceMetadataImpl.builder()
+
.dataDirs(dataDirs)
+
.storageDir(storageDir)
+
.metricRegistry(new MetricRegistry())
+
.id(1)
+
.storagePort(7000)
+
.build())
+
.liveMigrationConfiguration(liveMigrationConfig)
+ .source(SOURCE)
+ .port(PORT)
+
.executorPools(ExecutorPoolsHelper.createdSharedTestPool(vertx))
+ .preCheck(preCheck)
.build();
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
index 4976867c7..1ebe83ed2 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
@@ -62,7 +62,7 @@ class LiveMigrationTaskImplTest
ExecutorPools executorPools =
ExecutorPoolsHelper.createdSharedTestPool(vertx);
return new LiveMigrationTaskImpl(vertx, executorPools,
sidecarClientProvider, liveMigrationConfiguration,
- id, request, SOURCE, PORT,
instanceMetadata);
+ id, request, SOURCE, PORT,
instanceMetadata, LiveMigrationFileDownloadPreCheck.DEFAULT);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]