This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88b373a631 Add tool to offline dump cluster metadata and the log
88b373a631 is described below

commit 88b373a63122b3fb64d2e8d792893d880d1896f3
Author: Abhijeet Dubey <[email protected]>
AuthorDate: Tue Jan 20 17:23:59 2026 +0530

    Add tool to offline dump cluster metadata and the log
    
    Patch by Abhijeet Dubey; reviewed by Sam Tunnicliffe and marcuse for 
CASSANDRA-21129
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/tcm/log/LogReader.java    |   2 +-
 .../tools/OfflineClusterMetadataDump.java          | 622 +++++++++++++++++++++
 .../OfflineClusterMetadataDumpIntegrationTest.java | 257 +++++++++
 .../tools/OfflineClusterMetadataDumpTest.java      | 184 ++++++
 tools/bin/offlineclustermetadatadump               |  49 ++
 6 files changed, 1114 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e9de27af5c..41d379fbde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Add tool to offline dump cluster metadata and the log (CASSANDRA-21129)
  * Send client warnings when writing to a large partition (CASSANDRA-17258)
  * Harden the possible range of values for max dictionary size and max total 
sample size for dictionary training (CASSANDRA-21194)
  * Implement a guardrail ensuring that minimum training frequency parameter is 
provided in ZstdDictionaryCompressor (CASSANDRA-21192)
diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java 
b/src/java/org/apache/cassandra/tcm/log/LogReader.java
index fb352dcd22..e7fcd2b308 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogReader.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.tcm.MetadataSnapshots;
 public interface LogReader
 {
     /**
-     * Gets all entries where epoch >= since - could be empty if since is a 
later epoch than the current highest seen
+     * Gets all entries where epoch > since - could be empty if since is a 
later epoch than the current highest seen
      */
     EntryHolder getEntries(Epoch since) throws IOException;
     EntryHolder getEntries(Epoch since, Epoch until) throws IOException;
diff --git 
a/src/java/org/apache/cassandra/tools/OfflineClusterMetadataDump.java 
b/src/java/org/apache/cassandra/tools/OfflineClusterMetadataDump.java
new file mode 100644
index 0000000000..d0b9b54fea
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/OfflineClusterMetadataDump.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.log.LogReader;
+import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static org.apache.cassandra.tcm.Epoch.EMPTY;
+import static org.apache.cassandra.tcm.Epoch.FIRST;
+import static org.apache.cassandra.tcm.Epoch.UPGRADE_GOSSIP;
+import static org.apache.cassandra.tcm.Epoch.UPGRADE_STARTUP;
+
+/**
+ * Offline tool to dump cluster metadata from local SSTables.
+ * <p>
+ * This is an emergency recovery tool for debugging when a Cassandra instance 
cannot
+ * start due to cluster metadata issues. It reads the local_metadata_log and 
metadata_snapshots
+ * tables from the system keyspace to reconstruct and display the cluster 
metadata state.
+ * <p>
+ * <b>NOTE: This tool is for offline use only. Do not run on a live 
cluster.</b>
+ * <p>
+ * Usage:
+ * <pre>
+ * # Dump cluster metadata as binary (default)
+ * offlineclustermetadatadump metadata --data-dir /path/to/data
+ *
+ * # Dump cluster metadata as toString output
+ * offlineclustermetadatadump metadata --data-dir /path/to/data --to-string
+ *
+ * # Dump local log entries
+ * offlineclustermetadatadump log --data-dir /path/to/data --from-epoch 1 
--to-epoch 50
+ *
+ * # Dump distributed log (CMS nodes)
+ * offlineclustermetadatadump distributed-log --data-dir /path/to/data
+ * </pre>
+ */
+@Command(name = "offlineclustermetadatadump",
+mixinStandardHelpOptions = true,
+description = "Offline tool to dump cluster metadata from local SSTables. 
NOTE: For offline use only.",
+subcommands = { OfflineClusterMetadataDump.MetadataCommand.class, 
OfflineClusterMetadataDump.LogCommand.class, 
OfflineClusterMetadataDump.DistributedLogCommand.class })
+public class OfflineClusterMetadataDump implements Runnable
+{
+    private static final Output output = Output.CONSOLE;
+
+    public static void main(String... args)
+    {
+        Util.initDatabaseDescriptor();
+
+        CommandLine cli = new 
CommandLine(OfflineClusterMetadataDump.class).setExecutionExceptionHandler((ex, 
cmd, parseResult) -> {
+            err(ex);
+            return 2;
+        });
+        int status = cli.execute(args);
+        System.exit(status);
+    }
+
+    protected static void err(Throwable e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+    @Override
+    public void run()
+    {
+        CommandLine.usage(this, output.out);
+    }
+
+    /**
+     * Base class with common options and methods shared by all subcommands.
+     */
+    @Command(mixinStandardHelpOptions = true)
+    static abstract class BaseCommand implements Runnable
+    {
+        @Option(names = { "-d", "--data-dir" }, description = "Data directory 
containing system keyspace (can be specified multiple times)", arity = "1..*")
+        public List<String> dataDirs;
+
+        @Option(names = { "-s", "--sstables" }, description = "Path to SSTable 
directory for metadata tables (can be specified multiple times)", arity = 
"1..*")
+        public List<String> sstableDirectories;
+
+        @Option(names = { "-p", "--partitioner" }, description = "Partitioner 
class name",
+        defaultValue = "org.apache.cassandra.dht.Murmur3Partitioner")
+        public String partitioner;
+
+        @Option(names = { "-v", "--verbose" }, description = "Verbose output")
+        public boolean verbose;
+
+        @Option(names = { "--debug" }, description = "Show stack traces on 
errors")
+        public boolean debug;
+
+        protected Path tempDir;
+
+        /**
+         * Template method for subcommands.
+         */
+        protected abstract void execute() throws Exception;
+
+        /**
+         * Whether this command needs the distributed metadata keyspace schema.
+         */
+        protected boolean needsDistributedMetadataKeyspace()
+        {
+            return false;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                setupTempDirectory();
+
+                // Set the cluster's partitioner - needed when reconstructing 
ClusterMetadata
+                // from log entries when no snapshot is available.
+                DatabaseDescriptor.setPartitioner(partitioner);
+
+                if (needsDistributedMetadataKeyspace())
+                {
+                    // Set up schema for distributed metadata keyspace
+                    
ClusterMetadataService.empty(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata("dc1")));
+                }
+                else
+                {
+                    // No distributed keyspaces needed for local-only commands
+                    ClusterMetadataService.empty(Keyspaces.none());
+                }
+                Keyspace.setInitialized();
+
+                // This is needed for non-local system keyspaces (e.g., 
system_cluster_metadata).
+                // Using loadSSTables=false ensures no disk I/O during 
initialization - SSTables are imported separately.
+                
ClusterMetadata.current().schema.initializeKeyspaceInstances(DistributedSchema.empty(),
 false);
+
+                execute();
+            }
+            catch (Exception e)
+            {
+                if (debug)
+                {
+                    e.printStackTrace(output.err);
+                }
+                else
+                {
+                    output.err.println("Error: " + e.getMessage());
+                }
+                System.exit(1);
+            }
+            finally
+            {
+                cleanupTempDirectory();
+            }
+        }
+
+        /**
+         * Creates a temporary directory and configures DatabaseDescriptor to 
use it.
+         */
+        protected void setupTempDirectory() throws IOException
+        {
+            tempDir = Files.createTempDirectory("offlinedump");
+            DatabaseDescriptor.getRawConfig().data_file_directories = new 
String[]{ tempDir.resolve("data").toString() };
+            DatabaseDescriptor.getRawConfig().commitlog_directory = 
tempDir.resolve("commitlog").toString();
+            DatabaseDescriptor.getRawConfig().hints_directory = 
tempDir.resolve("hints").toString();
+            DatabaseDescriptor.getRawConfig().saved_caches_directory = 
tempDir.resolve("saved_caches").toString();
+            DatabaseDescriptor.getRawConfig().accord.journal_directory = 
tempDir.resolve("accord_journal").toString();
+
+            if (verbose)
+            {
+                output.out.println("Using temporary directory: " + tempDir);
+            }
+        }
+
+        /**
+         * Cleans up the temporary directory.
+         */
+        protected void cleanupTempDirectory()
+        {
+            if (tempDir != null)
+            {
+                try
+                {
+                    Files.walkFileTree(tempDir, new SimpleFileVisitor<>()
+                    {
+                        @Override
+                        public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs)
+                        throws IOException
+                        {
+                            Files.deleteIfExists(file);
+                            return FileVisitResult.CONTINUE;
+                        }
+
+                        @Override
+                        public FileVisitResult postVisitDirectory(Path dir, 
IOException exc)
+                        throws IOException
+                        {
+                            Files.deleteIfExists(dir);
+                            return FileVisitResult.CONTINUE;
+                        }
+                    });
+                }
+                catch (IOException e)
+                {
+                    if (verbose)
+                    {
+                        output.err.println("Warning: Failed to fully cleanup 
temp directory: " + tempDir + " (" + e.getMessage() + ")");
+                    }
+                }
+                finally
+                {
+                    tempDir = null;
+                }
+            }
+        }
+
+        protected void importSystemKeyspaceSSTables() throws IOException
+        {
+            Keyspace ks = 
Schema.instance.getKeyspaceInstance(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+
+            // Find and import SSTables for local_metadata_log
+            Set<String> logTablePaths = 
findTablePaths(SystemKeyspace.METADATA_LOG, 
SchemaConstants.SYSTEM_KEYSPACE_NAME);
+            if (!logTablePaths.isEmpty())
+            {
+                ColumnFamilyStore logCfs = 
ks.getColumnFamilyStore(SystemKeyspace.METADATA_LOG);
+                logCfs.importNewSSTables(logTablePaths, false, false, false, 
false, false, false, true);
+                if (verbose)
+                {
+                    output.out.println("Imported SSTables from: " + 
logTablePaths);
+                }
+            }
+
+            // Find and import SSTables for metadata_snapshots
+            Set<String> snapshotTablePaths = 
findTablePaths(SystemKeyspace.SNAPSHOT_TABLE_NAME, 
SchemaConstants.SYSTEM_KEYSPACE_NAME);
+            if (!snapshotTablePaths.isEmpty())
+            {
+                ColumnFamilyStore snapshotCfs = 
ks.getColumnFamilyStore(SystemKeyspace.SNAPSHOT_TABLE_NAME);
+                snapshotCfs.importNewSSTables(snapshotTablePaths, false, 
false, false, false, false, false, true);
+                if (verbose)
+                {
+                    output.out.println("Imported SSTables from: " + 
snapshotTablePaths);
+                }
+            }
+        }
+
+        protected void importDistributedLogSSTables() throws IOException
+        {
+            Keyspace ks = 
Schema.instance.getKeyspaceInstance(SchemaConstants.METADATA_KEYSPACE_NAME);
+
+            // Find and import SSTables for distributed_metadata_log
+            Set<String> logTablePaths = 
findTablePaths(DistributedMetadataLogKeyspace.TABLE_NAME, 
SchemaConstants.METADATA_KEYSPACE_NAME);
+            if (!logTablePaths.isEmpty())
+            {
+                ColumnFamilyStore logCfs = 
ks.getColumnFamilyStore(DistributedMetadataLogKeyspace.TABLE_NAME);
+                logCfs.importNewSSTables(logTablePaths, false, false, false, 
false, false, false, true);
+                if (verbose)
+                {
+                    output.out.println("Imported SSTables from: " + 
logTablePaths);
+                }
+            }
+        }
+
+        protected Set<String> findTablePaths(String tableName, String 
keyspaceName) throws IOException
+        {
+            Set<String> paths = new LinkedHashSet<>();
+
+            if (sstableDirectories != null && !sstableDirectories.isEmpty())
+            {
+                for (String sstablePath : sstableDirectories)
+                {
+                    if (sstablePath.contains(tableName))
+                    {
+                        paths.add(sstablePath);
+                        continue;
+                    }
+                    Path tableDir = Path.of(sstablePath, tableName);
+                    if (Files.exists(tableDir))
+                    {
+                        paths.add(tableDir.toString());
+                        continue;
+                    }
+                    String match = findTablePathInDir(tableName, keyspaceName, 
sstablePath);
+                    if (match != null)
+                        paths.add(match);
+                }
+                return paths;
+            }
+
+            if (dataDirs != null && !dataDirs.isEmpty())
+            {
+                for (String dataDir : dataDirs)
+                {
+                    String match = findTablePathInDir(tableName, keyspaceName, 
dataDir);
+                    if (match != null)
+                        paths.add(match);
+                }
+                return paths;
+            }
+
+            // Fall back to DatabaseDescriptor locations
+            String[] defaultDirs = 
DatabaseDescriptor.getAllDataFileLocations();
+            for (String dir : defaultDirs)
+            {
+                String match = findTablePathInDir(tableName, keyspaceName, 
dir);
+                if (match != null)
+                    paths.add(match);
+            }
+
+            return paths;
+        }
+
+        private String findTablePathInDir(String tableName, String 
keyspaceName, String dataDir) throws IOException
+        {
+            Path ksDir = Path.of(dataDir, keyspaceName);
+            if (Files.exists(ksDir))
+            {
+                try (Stream<Path> paths = Files.list(ksDir))
+                {
+                    List<Path> matches = paths.filter(p -> 
p.getFileName().toString().startsWith(tableName + "-"))
+                                              .collect(Collectors.toList());
+                    if (!matches.isEmpty())
+                        return matches.get(0).toString();
+                }
+            }
+            return null;
+        }
+
+        /**
+         * Gets log state from the given reader, detecting and logging gaps in 
epochs.
+         *
+         * @param reader the log reader to read entries from
+         * @param snapshotManager snapshot manager (use NO_OP for log listing 
commands)
+         * @param startEpoch if provided, start reading from this epoch (for 
--from-epoch)
+         * @param targetEpoch if provided, stop reading at this epoch (for 
--to-epoch or --epoch)
+         * @param out output for warnings
+         */
+        @VisibleForTesting
+        static LogState getLogState(LogReader reader,
+                                    MetadataSnapshots snapshotManager,
+                                    Long startEpoch,
+                                    Long targetEpoch,
+                                    Output out)
+        {
+            Epoch endEpoch = targetEpoch != null ? Epoch.create(targetEpoch) : 
Epoch.create(Long.MAX_VALUE);
+            ClusterMetadata base = snapshotManager.getSnapshotBefore(endEpoch);
+
+            Epoch baseEpoch = base != null
+                              ? base.epoch
+                              : startEpoch != null ? 
previousEpoch(Epoch.create(startEpoch)) : Epoch.EMPTY;
+            try
+            {
+                LogReader.EntryHolder entryHolder = 
reader.getEntries(baseEpoch, endEpoch);
+                ImmutableList<Entry> entryList = 
processEntriesWithGapDetection(entryHolder, baseEpoch, out);
+
+                // Warn if requested targetEpoch > available epochs
+                if (targetEpoch != null && !entryList.isEmpty())
+                {
+                    long maxAvailableEpoch = entryList.get(entryList.size() - 
1).epoch.getEpoch();
+                    if (targetEpoch > maxAvailableEpoch)
+                    {
+                        out.err.println("WARNING: Requested epoch " + 
targetEpoch +
+                                        " exceeds max available epoch " + 
maxAvailableEpoch);
+                    }
+                }
+
+                // Warn if there's a gap between snapshot and first entry
+                ClusterMetadata effectiveBase = base;
+                if (effectiveBase != null && !entryList.isEmpty() && 
!entryList.get(0).epoch.isDirectlyAfter(effectiveBase.epoch))
+                {
+                    out.err.println("WARNING: Gap between snapshot (epoch " + 
effectiveBase.epoch.getEpoch() +
+                                    ") and first log entry (epoch " + 
entryList.get(0).epoch.getEpoch() +
+                                    "). Proceeding without base snapshot.");
+                    effectiveBase = null;
+                }
+
+                return new LogState(effectiveBase, entryList);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Failed to read log entries", e);
+            }
+        }
+
+        static private Epoch previousEpoch(Epoch epoch)
+        {
+            if (UPGRADE_GOSSIP.equals(epoch) || UPGRADE_STARTUP.equals(epoch))
+                return epoch;
+            if (EMPTY.equals(epoch) || FIRST.equals(epoch))
+                return EMPTY;
+            return Epoch.create(epoch.getEpoch() - 1);
+        }
+
+        /**
+         * Validates that the from-epoch is not greater than to-epoch.
+         */
+        protected void validateEpochRange(Long fromEpoch, Long toEpoch)
+        {
+            if (fromEpoch != null && toEpoch != null && fromEpoch > toEpoch)
+            {
+                throw new IllegalArgumentException(
+                    String.format("--from-epoch (%d) must be less than or 
equal to --to-epoch (%d)",
+                                  fromEpoch, toEpoch));
+            }
+        }
+
+        /**
+         * Processes entries from an EntryHolder, detecting and reporting gaps 
in epochs.
+         */
+        @VisibleForTesting
+        static ImmutableList<Entry> 
processEntriesWithGapDetection(LogReader.EntryHolder entryHolder,
+                                                                   Epoch 
startEpoch,
+                                                                   Output out)
+        {
+            ImmutableList.Builder<Entry> entries = ImmutableList.builder();
+            Epoch prevEpoch = startEpoch;
+            List<String> gaps = new ArrayList<>();
+
+            for (Entry e : (Iterable<Entry>) entryHolder::iterator)
+            {
+                if (!prevEpoch.nextEpoch().is(e.epoch))
+                {
+                    gaps.add(String.format("Gap detected: expected epoch %d 
but found %d",
+                                           prevEpoch.getEpoch() + 1, 
e.epoch.getEpoch()));
+                }
+                prevEpoch = e.epoch;
+                entries.add(e);
+            }
+
+            if (!gaps.isEmpty())
+            {
+                out.err.println("WARNING: Found " + gaps.size() + " gap(s) in 
the epoch sequence:");
+                for (String gap : gaps)
+                {
+                    out.err.println("  " + gap);
+                }
+                out.err.println("Proceeding with available epochs...");
+            }
+
+            return entries.build();
+        }
+
+        protected void dumpLogEntries(LogState logState)
+        {
+            for (Entry entry : logState.entries)
+            {
+                output.out.println(entry.toString());
+            }
+        }
+    }
+
+    /**
+     * Dumps cluster metadata state (binary or toString format).
+     */
+    @Command(name = "metadata", description = "Dump cluster metadata state 
from SSTables. For offline use only.")
+    public static class MetadataCommand extends BaseCommand
+    {
+        @Option(names = { "-o", "--output" }, description = "Output file path 
for binary dump (default: temp file)")
+        public String outputFile;
+
+        @Option(names = { "--serialization-version" }, description = 
"Serialization version for binary output (0-8, default: current version)")
+        public Integer serializationVersion;
+
+        @Option(names = { "--to-string" }, description = "Print 
ClusterMetadata.toString() to stdout instead of binary dump")
+        public boolean toStringOutput;
+
+        @Option(names = { "--epoch" }, description = "Show state at specific 
epoch")
+        public Long targetEpoch;
+
+        @Override
+        protected void execute() throws Exception
+        {
+            importSystemKeyspaceSSTables();
+
+            MetadataSnapshots snapshotManager = new 
MetadataSnapshots.SystemKeyspaceMetadataSnapshots();
+            SystemKeyspaceStorage storage = new SystemKeyspaceStorage(() -> 
snapshotManager);
+            LogState logState = getLogState(storage, snapshotManager, null, 
targetEpoch, output);
+
+            if (logState.isEmpty())
+            {
+                output.out.println("No metadata available");
+                return;
+            }
+
+            ClusterMetadata metadata = logState.flatten().baseState;
+
+            if (toStringOutput)
+            {
+                output.out.println(metadata.toString());
+            }
+            else
+            {
+                dumpBinary(metadata);
+            }
+        }
+
+        private void dumpBinary(ClusterMetadata metadata) throws IOException
+        {
+            Path outputPath = outputFile != null ? Path.of(outputFile) : 
Files.createTempFile("clustermetadata", ".dump");
+            // NOTE: Serializing to an older version may lose data if the 
metadata contains features added in newer versions.
+            Version version = serializationVersion != null
+                              ? Version.fromInt(serializationVersion)
+                              : NodeVersion.CURRENT.serializationVersion();
+            try (FileOutputStreamPlus out = new 
FileOutputStreamPlus(outputPath))
+            {
+                
VerboseMetadataSerializer.serialize(ClusterMetadata.serializer, metadata, out, 
version);
+            }
+            output.out.println("Dumped cluster metadata to " + outputPath);
+        }
+    }
+
+    /**
+     * Dumps local log entries from system.local_metadata_log.
+     */
+    @Command(name = "log", description = "Dump local log entries from 
system.local_metadata_log. For offline use only.")
+    public static class LogCommand extends BaseCommand
+    {
+        @Option(names = { "--from-epoch" }, description = "Filter log entries 
from this epoch")
+        public Long fromEpoch;
+
+        @Option(names = { "--to-epoch" }, description = "Filter log entries to 
this epoch")
+        public Long toEpoch;
+
+        @Override
+        protected void execute() throws Exception
+        {
+            importSystemKeyspaceSSTables();
+
+            validateEpochRange(fromEpoch, toEpoch);
+            SystemKeyspaceStorage storage = new SystemKeyspaceStorage(() -> 
MetadataSnapshots.NO_OP);
+            LogState logState = getLogState(storage, MetadataSnapshots.NO_OP, 
fromEpoch, toEpoch, output);
+
+            dumpLogEntries(logState);
+        }
+    }
+
+    /**
+     * Dumps distributed log entries from 
system_cluster_metadata.distributed_metadata_log.
+     */
+    @Command(name = "distributed-log", description = "Dump distributed log 
entries from system_cluster_metadata.distributed_metadata_log. For CMS nodes. 
For offline use only.")
+    public static class DistributedLogCommand extends BaseCommand
+    {
+        @Option(names = { "--from-epoch" }, description = "Filter log entries 
from this epoch")
+        public Long fromEpoch;
+
+        @Option(names = { "--to-epoch" }, description = "Filter log entries to 
this epoch")
+        public Long toEpoch;
+
+        @Override
+        protected boolean needsDistributedMetadataKeyspace()
+        {
+            return true;
+        }
+
+        @Override
+        protected void execute() throws Exception
+        {
+            importDistributedLogSSTables();
+
+            validateEpochRange(fromEpoch, toEpoch);
+            DistributedMetadataLogKeyspace.DistributedTableLogReader reader =
+                new 
DistributedMetadataLogKeyspace.DistributedTableLogReader(ConsistencyLevel.NODE_LOCAL,
+                                                                             
() -> MetadataSnapshots.NO_OP);
+            LogState logState = getLogState(reader, MetadataSnapshots.NO_OP, 
fromEpoch, toEpoch, output);
+
+            dumpLogEntries(logState);
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tools/OfflineClusterMetadataDumpIntegrationTest.java
 
b/test/unit/org/apache/cassandra/tools/OfflineClusterMetadataDumpIntegrationTest.java
new file mode 100644
index 0000000000..1f2834e3b5
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tools/OfflineClusterMetadataDumpIntegrationTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
+import org.apache.cassandra.tcm.transformations.CustomTransformation;
+
+import static org.apache.cassandra.db.SystemKeyspace.METADATA_LOG;
+import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for OfflineClusterMetadataDump tool that verify gap 
detection and metadata output.
+ * <p>
+ * These tests write entries directly to the system keyspace storage and then
+ * call OfflineClusterMetadataDump.BaseCommand.getLogState() directly to 
verify gap detection behavior.
+ */
+public class OfflineClusterMetadataDumpIntegrationTest extends OfflineToolUtils
+{
+    private SystemKeyspaceStorage storage;
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        DatabaseDescriptor.daemonInitialization();
+        
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+        ServerTestUtils.prepareServerNoRegister();
+        CommitLog.instance.start();
+    }
+
+    @Before
+    public void setup()
+    {
+        ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(SYSTEM_KEYSPACE_NAME, METADATA_LOG);
+        if (cfs != null)
+            cfs.truncateBlockingWithoutSnapshot();
+
+        storage = new SystemKeyspaceStorage(() -> MetadataSnapshots.NO_OP);
+    }
+
+    private Entry entry(long epoch)
+    {
+        return new Entry(new Entry.Id(epoch),
+                         Epoch.create(epoch),
+                         CustomTransformation.make((int) epoch));
+    }
+
+    @Test
+    public void testGapDetectionInEpochs()
+    {
+        // Write entries with gap at epoch 3
+        storage.append(entry(1));
+        storage.append(entry(2));
+        storage.append(entry(4));  // Gap: skipping 3
+        storage.append(entry(5));
+
+        TestOutput testOutput = new TestOutput();
+        LogState logState = 
OfflineClusterMetadataDump.BaseCommand.getLogState(storage, 
MetadataSnapshots.NO_OP, null, null, testOutput.getOutput());
+        String stderr = testOutput.getStderr();
+
+        // Verify gap is detected and reported
+        assertThat(stderr).contains("Gap detected");
+        assertThat(stderr).contains("expected epoch 3 but found 4");
+
+        // All epochs should still be in the log state
+        assertThat(logState.entries).hasSize(4);
+        assertThat(logState.entries.get(0).epoch.getEpoch()).isEqualTo(1);
+        assertThat(logState.entries.get(1).epoch.getEpoch()).isEqualTo(2);
+        assertThat(logState.entries.get(2).epoch.getEpoch()).isEqualTo(4);
+        assertThat(logState.entries.get(3).epoch.getEpoch()).isEqualTo(5);
+    }
+
+    @Test
+    public void testMultipleGapsDetection()
+    {
+        // Write entries with multiple gaps: missing 2, 4, 6, 7
+        storage.append(entry(1));
+        storage.append(entry(3));  // Gap: skipping 2
+        storage.append(entry(5));  // Gap: skipping 4
+        storage.append(entry(8));  // Gap: skipping 6, 7
+
+        TestOutput testOutput = new TestOutput();
+        LogState logState = 
OfflineClusterMetadataDump.BaseCommand.getLogState(storage, 
MetadataSnapshots.NO_OP, null, null, testOutput.getOutput());
+        String stderr = testOutput.getStderr();
+
+        // Verify multiple gaps are detected
+        assertThat(stderr).contains("Gap detected");
+        assertThat(stderr).contains("expected epoch 2 but found 3");
+        assertThat(stderr).contains("expected epoch 4 but found 5");
+        assertThat(stderr).contains("expected epoch 6 but found 8");
+
+        // All available epochs should still be in the log state
+        assertThat(logState.entries).hasSize(4);
+        assertThat(logState.entries.get(0).epoch.getEpoch()).isEqualTo(1);
+        assertThat(logState.entries.get(1).epoch.getEpoch()).isEqualTo(3);
+        assertThat(logState.entries.get(2).epoch.getEpoch()).isEqualTo(5);
+        assertThat(logState.entries.get(3).epoch.getEpoch()).isEqualTo(8);
+    }
+
+    @Test
+    public void testNoGapsNoWarnings()
+    {
+        // No gaps
+        storage.append(entry(1));
+        storage.append(entry(2));
+        storage.append(entry(3));
+        storage.append(entry(4));
+        storage.append(entry(5));
+
+        TestOutput testOutput = new TestOutput();
+        LogState logState = 
OfflineClusterMetadataDump.BaseCommand.getLogState(storage, 
MetadataSnapshots.NO_OP, null, null, testOutput.getOutput());
+        String stderr = testOutput.getStderr();
+
+        // Gap warnings should not appear
+        assertThat(stderr).doesNotContain("Gap detected");
+        assertThat(stderr).doesNotContain("WARNING");
+
+        // All entries should be in the log state
+        assertThat(logState.entries).hasSize(5);
+        for (int i = 0; i < 5; i++)
+        {
+            assertThat(logState.entries.get(i).epoch.getEpoch()).isEqualTo(i + 
1);
+        }
+    }
+
+    @Test
+    public void testEmptyLogReturnsEmptyState()
+    {
+        // Don't write any entries - log is empty
+        TestOutput testOutput = new TestOutput();
+        LogState logState = 
OfflineClusterMetadataDump.BaseCommand.getLogState(storage, 
MetadataSnapshots.NO_OP, null, null, testOutput.getOutput());
+
+        // Should return empty log state
+        assertThat(logState.isEmpty()).isTrue();
+        assertThat(logState.entries).isEmpty();
+    }
+
+    @Test
+    public void testSingleEntryNoGap()
+    {
+        // Single entry at epoch 1
+        storage.append(entry(1));
+
+        TestOutput testOutput = new TestOutput();
+        LogState logState = 
OfflineClusterMetadataDump.BaseCommand.getLogState(storage, 
MetadataSnapshots.NO_OP, null, null, testOutput.getOutput());
+        String stderr = testOutput.getStderr();
+
+        // No gap warnings
+        assertThat(stderr).doesNotContain("Gap detected");
+
+        // Single entry should be present
+        assertThat(logState.entries).hasSize(1);
+        assertThat(logState.entries.get(0).epoch.getEpoch()).isEqualTo(1);
+    }
+
+    @Test
+    public void testGapAtBeginning()
+    {
+        // Start with epoch 3 instead of 1 - gap at the beginning
+        storage.append(entry(3));
+        storage.append(entry(4));
+        storage.append(entry(5));
+
+        TestOutput testOutput = new TestOutput();
+        LogState logState = 
OfflineClusterMetadataDump.BaseCommand.getLogState(storage, 
MetadataSnapshots.NO_OP, null, null, testOutput.getOutput());
+        String stderr = testOutput.getStderr();
+
+        // Should detect gap at beginning (expected 1 but found 3)
+        assertThat(stderr).contains("Gap detected");
+        assertThat(stderr).contains("expected epoch 1 but found 3");
+
+        // All entries should still be present
+        assertThat(logState.entries).hasSize(3);
+        assertThat(logState.entries.get(0).epoch.getEpoch()).isEqualTo(3);
+        assertThat(logState.entries.get(1).epoch.getEpoch()).isEqualTo(4);
+        assertThat(logState.entries.get(2).epoch.getEpoch()).isEqualTo(5);
+    }
+
+    @Test
+    public void testTargetEpochFilter()
+    {
+        // Write entries 1-10
+        for (int i = 1; i <= 10; i++)
+        {
+            storage.append(entry(i));
+        }
+
+        // Get log state up to epoch 5
+        TestOutput testOutput = new TestOutput();
+        LogState logState = 
OfflineClusterMetadataDump.BaseCommand.getLogState(storage, 
MetadataSnapshots.NO_OP, null, 5L, testOutput.getOutput());
+        String stderr = testOutput.getStderr();
+
+        // No gaps
+        assertThat(stderr).doesNotContain("Gap detected");
+
+        // Should only have epochs up to 5
+        assertThat(logState.entries).hasSizeLessThanOrEqualTo(5);
+        for (Entry e : logState.entries)
+        {
+            assertThat(e.epoch.getEpoch()).isLessThanOrEqualTo(5);
+        }
+    }
+
+    /**
+     * Helper class to capture output from the gap detection logic.
+     */
+    private static class TestOutput
+    {
+        private final ByteArrayOutputStream outStream = new 
ByteArrayOutputStream();
+        private final ByteArrayOutputStream errStream = new 
ByteArrayOutputStream();
+        private final Output output = new Output(new PrintStream(outStream), 
new PrintStream(errStream));
+
+        public Output getOutput()
+        {
+            return output;
+        }
+
+        public String getStderr()
+        {
+            return errStream.toString();
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tools/OfflineClusterMetadataDumpTest.java 
b/test/unit/org/apache/cassandra/tools/OfflineClusterMetadataDumpTest.java
new file mode 100644
index 0000000000..072268ac64
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/OfflineClusterMetadataDumpTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import org.assertj.core.api.Assertions;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import org.apache.cassandra.tools.ToolRunner.ToolResult;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for OfflineClusterMetadataDump tool.
+ * <p>
+ * Note: This tool requires some initialization (DatabaseDescriptor, Schema) 
even for help,
+ * similar to StandaloneJournalUtil and other cluster metadata-related tools.
+ */
+public class OfflineClusterMetadataDumpTest extends OfflineToolUtils
+{
+    @Test
+    public void testMainHelpOption()
+    {
+        // Main command help shows subcommands
+        ToolResult tool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "-h");
+        String output = tool.getStdout() + tool.getStderr();
+        assertThat("Help should show usage", output, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+        assertThat("Help should mention metadata subcommand", output, 
CoreMatchers.containsStringIgnoringCase("metadata"));
+        assertThat("Help should mention log subcommand", output, 
CoreMatchers.containsStringIgnoringCase("log"));
+        assertThat("Help should mention distributed-log subcommand", output, 
CoreMatchers.containsStringIgnoringCase("distributed-log"));
+    }
+
+    @Test
+    public void testMetadataSubcommandHelpOption()
+    {
+        // Metadata subcommand help shows all the options
+        ToolResult tool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", "-h");
+        String output = tool.getStdout() + tool.getStderr();
+
+        assertThat("Help should show usage", output, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+        Assertions.assertThat(output).containsIgnoringCase("--data-dir");
+        Assertions.assertThat(output).containsIgnoringCase("--to-string");
+        Assertions.assertThat(output).containsIgnoringCase("--output");
+        Assertions.assertThat(output).containsIgnoringCase("--epoch");
+    }
+
+    @Test
+    public void testLogSubcommandHelpOption()
+    {
+        // Log subcommand help shows all the options
+        ToolResult tool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "log", "-h");
+        String output = tool.getStdout() + tool.getStderr();
+
+        assertThat("Help should show usage", output, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+        Assertions.assertThat(output).containsIgnoringCase("--data-dir");
+        Assertions.assertThat(output).containsIgnoringCase("--from-epoch");
+        Assertions.assertThat(output).containsIgnoringCase("--to-epoch");
+    }
+
+    @Test
+    public void testDistributedLogSubcommandHelpOption()
+    {
+        // Distributed-log subcommand help shows all the options
+        ToolResult tool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "distributed-log", 
"-h");
+        String output = tool.getStdout() + tool.getStderr();
+
+        assertThat("Help should show usage", output, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+        Assertions.assertThat(output).containsIgnoringCase("--data-dir");
+        Assertions.assertThat(output).containsIgnoringCase("--from-epoch");
+        Assertions.assertThat(output).containsIgnoringCase("--to-epoch");
+    }
+
+    @Test
+    public void testWrongArgFailsAndPrintsHelp()
+    {
+        ToolResult tool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", 
"--invalid-option");
+        String output = tool.getStdout() + tool.getStderr();
+        assertThat("Should mention unknown option", output, 
CoreMatchers.containsStringIgnoringCase("Unknown"));
+        assertTrue("Expected non-zero exit code", tool.getExitCode() != 0);
+    }
+
+    @Test
+    public void testNonExistentDataDirectory()
+    {
+        // When running with a non-existent directory, should fail gracefully
+        ToolResult tool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata",
+                                                 "--data-dir", 
"/nonexistent/path/to/data",
+                                                 "--to-string");
+        String output = tool.getStdout() + tool.getStderr();
+        // Tool should fail gracefully when directory doesn't exist or no 
SSTables found
+        assertTrue("Expected error or no metadata message",
+                   tool.getExitCode() != 0 ||
+                   output.toLowerCase().contains("no metadata") ||
+                   output.toLowerCase().contains("not found") ||
+                   output.toLowerCase().contains("does not exist") ||
+                   output.toLowerCase().contains("error"));
+    }
+
+    @Test
+    public void testMetadataSubcommandFlags()
+    {
+        // Test that --to-string flag is recognized
+        ToolResult toStringFlag = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", 
"--to-string", "-h");
+        String toStringOutput = toStringFlag.getStdout() + 
toStringFlag.getStderr();
+        assertThat("Should show help with --to-string", toStringOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+
+        // Test that -o/--output flag is recognized
+        ToolResult outputFlag = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", "-o", 
"/tmp/test.dump", "-h");
+        String outputOutput = outputFlag.getStdout() + outputFlag.getStderr();
+        assertThat("Should show help with -o", outputOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+
+        ToolResult outputLongFlag = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", 
"--output", "/tmp/test.dump", "-h");
+        String outputLongOutput = outputLongFlag.getStdout() + 
outputLongFlag.getStderr();
+        assertThat("Should show help with --output", outputLongOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+
+        // Test --epoch flag
+        ToolResult epochTool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", "--epoch", 
"100", "-h");
+        String epochOutput = epochTool.getStdout() + epochTool.getStderr();
+        assertThat("--epoch flag should be recognized", epochOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+    }
+
+    @Test
+    public void testLogSubcommandEpochFilterFlags()
+    {
+        // Test that epoch filter flags are recognized on log subcommand
+        ToolResult fromTool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "log", "--from-epoch", 
"50", "-h");
+        String fromOutput = fromTool.getStdout() + fromTool.getStderr();
+        assertThat("--from-epoch flag should be recognized", fromOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+
+        ToolResult toTool = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "log", "--to-epoch", 
"150", "-h");
+        String toOutput = toTool.getStdout() + toTool.getStderr();
+        assertThat("--to-epoch flag should be recognized", toOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+    }
+
+    @Test
+    public void testVerboseAndDebugFlags()
+    {
+        // Test verbose flags on metadata subcommand
+        ToolResult verboseShort = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", "-v", 
"-h");
+        String verboseShortOutput = verboseShort.getStdout() + 
verboseShort.getStderr();
+        assertThat("-v flag should be recognized", verboseShortOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+
+        ToolResult verboseLong = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", 
"--verbose", "-h");
+        String verboseLongOutput = verboseLong.getStdout() + 
verboseLong.getStderr();
+        assertThat("--verbose flag should be recognized", verboseLongOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+
+        // Test debug flag
+        ToolResult debug = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata", "--debug", 
"-h");
+        String debugOutput = debug.getStdout() + debug.getStderr();
+        assertThat("--debug flag should be recognized", debugOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+    }
+
+    @Test
+    public void testPartitionerFlag()
+    {
+        // Test partitioner flags on metadata subcommand
+        ToolResult shortFlag = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata",
+                                                      "-p", 
"org.apache.cassandra.dht.Murmur3Partitioner", "-h");
+        String shortOutput = shortFlag.getStdout() + shortFlag.getStderr();
+        assertThat("-p flag should be recognized", shortOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+
+        ToolResult longFlag = 
ToolRunner.invokeClass(OfflineClusterMetadataDump.class, "metadata",
+                                                     "--partitioner", 
"org.apache.cassandra.dht.Murmur3Partitioner", "-h");
+        String longOutput = longFlag.getStdout() + longFlag.getStderr();
+        assertThat("--partitioner flag should be recognized", longOutput, 
CoreMatchers.containsStringIgnoringCase("Usage:"));
+    }
+}
diff --git a/tools/bin/offlineclustermetadatadump 
b/tools/bin/offlineclustermetadatadump
new file mode 100755
index 0000000000..175b126d75
--- /dev/null
+++ b/tools/bin/offlineclustermetadatadump
@@ -0,0 +1,49 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    # Locations (in order) to use when searching for an include file.
+    for include in "`dirname "$0"`/cassandra.in.sh" \
+                   "$HOME/.cassandra.in.sh" \
+                   /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+if [ -z "$CLASSPATH" ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="256M"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+        -Dcassandra.storagedir="$cassandra_storagedir" \
+        -Dlogback.configurationFile=logback-tools.xml \
+        org.apache.cassandra.tools.OfflineClusterMetadataDump "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to