This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 3912b36d0b Added `--json` flag to `ec-admin running` command (#5332)
3912b36d0b is described below

commit 3912b36d0b0ca103b7068d7c7575c99d32a11a97
Author: Suvrat Acharya <140749446+suvrat1...@users.noreply.github.com>
AuthorDate: Mon Apr 21 23:07:03 2025 +0530

    Added `--json` flag to `ec-admin running` command (#5332)
    
    Added option to print the running compactions as json.
    
    
    Co-authored-by: Kevin Rathbun <kevinrr...@gmail.com>
    Co-authored-by: Dave Marion <dlmar...@apache.org>
---
 .../org/apache/accumulo/server/util/ECAdmin.java   | 188 +++++++++++++++++----
 .../java/org/apache/accumulo/test/ECAdminIT.java   | 153 +++++++++++++++++
 2 files changed, 308 insertions(+), 33 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java 
b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
index 3f0bcc1e65..4b47e87b70 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
@@ -18,7 +18,13 @@
  */
 package org.apache.accumulo.server.util;
 
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -26,6 +32,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.singletons.SingletonManager;
 import org.apache.accumulo.core.singletons.SingletonManager.Mode;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
@@ -41,6 +48,8 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.google.auto.service.AutoService;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
@@ -49,6 +58,111 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  */
 @AutoService(KeywordExecutable.class)
 public class ECAdmin implements KeywordExecutable {
+
+  public static class RunningCompactionSummary {
+    private final String ecid;
+    private final String addr;
+    private final TCompactionKind kind;
+    private final String queueName;
+    private final String ke;
+    private final String tableId;
+    private String status = "";
+    private long lastUpdate = 0;
+    private long duration = 0;
+    private int numFiles = 0;
+    private double progress = 0.0;
+
+    public RunningCompactionSummary(RunningCompaction runningCompaction,
+        RunningCompactionInfo runningCompactionInfo) {
+      super();
+      ecid = runningCompaction.getJob().getExternalCompactionId();
+      addr = runningCompaction.getCompactorAddress();
+      kind = runningCompaction.getJob().kind;
+      queueName = runningCompaction.getQueueName();
+      KeyExtent extent = 
KeyExtent.fromThrift(runningCompaction.getJob().extent);
+      ke = extent.obscured();
+      tableId = extent.tableId().canonical();
+      if (runningCompactionInfo != null) {
+        status = runningCompactionInfo.status;
+        lastUpdate = runningCompactionInfo.lastUpdate;
+        duration = runningCompactionInfo.duration;
+        numFiles = runningCompactionInfo.numFiles;
+        progress = runningCompactionInfo.progress;
+      }
+
+    }
+
+    public String getStatus() {
+      return status;
+    }
+
+    public void setStatus(String status) {
+      this.status = status;
+    }
+
+    public long getLastUpdate() {
+      return lastUpdate;
+    }
+
+    public void setLastUpdate(long lastUpdate) {
+      this.lastUpdate = lastUpdate;
+    }
+
+    public long getDuration() {
+      return duration;
+    }
+
+    public void setDuration(long duration) {
+      this.duration = duration;
+    }
+
+    public int getNumFiles() {
+      return numFiles;
+    }
+
+    public void setNumFiles(int numFiles) {
+      this.numFiles = numFiles;
+    }
+
+    public double getProgress() {
+      return progress;
+    }
+
+    public void setProgress(double progress) {
+      this.progress = progress;
+    }
+
+    public String getEcid() {
+      return ecid;
+    }
+
+    public String getAddr() {
+      return addr;
+    }
+
+    public TCompactionKind getKind() {
+      return kind;
+    }
+
+    public String getQueueName() {
+      return queueName;
+    }
+
+    public String getKe() {
+      return ke;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    public void print(PrintStream out) {
+      out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queueName, 
tableId);
+      out.format("  %s Last Update: %dms Duration: %dms Files: %d Progress: 
%.2f%%\n", status,
+          lastUpdate, duration, numFiles, progress);
+    }
+  }
+
   private static final Logger log = LoggerFactory.getLogger(ECAdmin.class);
 
   @Parameters(commandDescription = "cancel the external compaction with given 
ECID")
@@ -62,6 +176,9 @@ public class ECAdmin implements KeywordExecutable {
     @Parameter(names = {"-d", "--details"},
         description = "display details about the running compactions")
     boolean details = false;
+
+    @Parameter(names = {"-j", "--json"}, description = "format the output as 
json")
+    boolean jsonOutput = false;
   }
 
   @Parameters(commandDescription = "list all compactors in zookeeper")
@@ -116,7 +233,18 @@ public class ECAdmin implements KeywordExecutable {
       } else if (cl.getParsedCommand().equals("cancel")) {
         cancelCompaction(context, cancelOps.ecid);
       } else if (cl.getParsedCommand().equals("running")) {
-        runningCompactions(context, runningOpts.details);
+        List<RunningCompactionSummary> compactions =
+            runningCompactions(context, runningOpts.details);
+        if (runningOpts.jsonOutput) {
+          try {
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(compactions));
+          } catch (Exception e) {
+            log.error("Error generating JSON output", e);
+          }
+        } else {
+          compactions.forEach(c -> c.print(System.out));
+        }
       } else {
         log.error("Unknown command {}", cl.getParsedCommand());
         cl.usage();
@@ -130,7 +258,7 @@ public class ECAdmin implements KeywordExecutable {
     }
   }
 
-  private void cancelCompaction(ServerContext context, String ecid) {
+  protected void cancelCompaction(ServerContext context, String ecid) {
     CompactionCoordinatorService.Client coordinatorClient = null;
     ecid = ExternalCompactionId.from(ecid).canonical();
     try {
@@ -144,7 +272,7 @@ public class ECAdmin implements KeywordExecutable {
     }
   }
 
-  private void listCompactorsByQueue(ServerContext context) {
+  protected void listCompactorsByQueue(ServerContext context) {
     var queueToCompactorsMap = 
ExternalCompactionUtil.getCompactorAddrs(context);
     if (queueToCompactorsMap.isEmpty()) {
       System.out.println("No Compactors found.");
@@ -153,43 +281,37 @@ public class ECAdmin implements KeywordExecutable {
     }
   }
 
-  private void runningCompactions(ServerContext context, boolean details) {
+  protected List<RunningCompactionSummary> runningCompactions(ServerContext 
context,
+      boolean details) {
     CompactionCoordinatorService.Client coordinatorClient = null;
-    TExternalCompactionList running;
+
     try {
       coordinatorClient = getCoordinatorClient(context);
-      running = coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), 
context.rpcCreds());
-      if (running == null) {
-        System.out.println("No running compactions found.");
-        return;
-      }
-      var ecidMap = running.getCompactions();
-      if (ecidMap == null) {
+
+      // Fetch running compactions as a list and convert to a map
+      TExternalCompactionList running =
+          coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), 
context.rpcCreds());
+
+      List<RunningCompactionSummary> results = new ArrayList<>();
+
+      if (running == null || running.getCompactions() == null
+          || running.getCompactions().isEmpty()) {
         System.out.println("No running compactions found.");
-        return;
+        return results;
       }
-      ecidMap.forEach((ecid, ec) -> {
-        if (ec != null) {
-          var runningCompaction = new RunningCompaction(ec);
-          var addr = runningCompaction.getCompactorAddress();
-          var kind = runningCompaction.getJob().kind;
-          var queue = runningCompaction.getQueueName();
-          var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
-          System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, 
queue, ke.tableId());
-          if (details) {
-            var runningCompactionInfo = new RunningCompactionInfo(ec);
-            var status = runningCompactionInfo.status;
-            var last = runningCompactionInfo.lastUpdate;
-            var duration = runningCompactionInfo.duration;
-            var numFiles = runningCompactionInfo.numFiles;
-            var progress = runningCompactionInfo.progress;
-            System.out.format("  %s Last Update: %dms Duration: %dms Files: %d 
Progress: %.2f%%\n",
-                status, last, duration, numFiles, progress);
-          }
+
+      for (Map.Entry<String,TExternalCompaction> entry : 
running.getCompactions().entrySet()) {
+        TExternalCompaction ec = entry.getValue();
+        if (ec == null) {
+          continue;
         }
-      });
+        var summary = new RunningCompactionSummary(new RunningCompaction(ec),
+            details ? new RunningCompactionInfo(ec) : null);
+        results.add(summary);
+      }
+      return results;
     } catch (Exception e) {
-      throw new RuntimeException("Unable to get running compactions.", e);
+      throw new IllegalStateException("Unable to get running compactions.", e);
     } finally {
       ThriftUtil.returnClient(coordinatorClient, context);
     }
diff --git a/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java 
b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
new file mode 100644
index 0000000000..ee584ed7be
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE7;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.util.ECAdmin;
+import org.apache.accumulo.server.util.ECAdmin.RunningCompactionSummary;
+import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+public class ECAdminIT extends SharedMiniClusterBase {
+
+  // Class exists for access to protected methods
+  private static class TestECAdmin extends ECAdmin {
+
+    @Override
+    protected void cancelCompaction(ServerContext context, String ecid) {
+      super.cancelCompaction(context, ecid);
+    }
+
+    @Override
+    protected void listCompactorsByQueue(ServerContext context) {
+      super.listCompactorsByQueue(context);
+    }
+
+    @Override
+    protected List<RunningCompactionSummary> runningCompactions(ServerContext 
context,
+        boolean details) {
+      return super.runningCompactions(context, details);
+    }
+  }
+
+  private static final class ECAdminITConfig implements 
MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+    }
+
+  }
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    SharedMiniClusterBase.startMiniClusterWithConfig(new ECAdminITConfig());
+    
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+    getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE7);
+  }
+
+  @AfterAll
+  public static void afterAll() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  private final TestECAdmin eca = new TestECAdmin();
+
+  @Test
+  public void testListRunningCompactions() throws Exception {
+
+    final String tableName = this.getUniqueNames(1)[0];
+
+    try (final AccumuloClient client =
+        Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
+
+      createTable(client, tableName, "cs7");
+      IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+      setting.addOption("sleepTime", "3000");
+      setting.addOption("seekSleepTime", "3000");
+      client.tableOperations().attachIterator(tableName, setting, 
EnumSet.of(IteratorScope.majc));
+      writeData(client, tableName);
+      compact(client, tableName, 2, QUEUE7, false);
+
+      // wait for the compaction to start
+      TExternalCompactionList expected =
+          
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
+      while (expected == null || expected.getCompactionsSize() == 0) {
+        Thread.sleep(1000);
+        expected =
+            
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
+      }
+
+      final List<RunningCompactionSummary> running =
+          eca.runningCompactions(getCluster().getServerContext(), true);
+      final Map<String,RunningCompactionSummary> compactionsByEcid = new 
HashMap<>();
+      running.forEach(rcs -> compactionsByEcid.put(rcs.getEcid(), rcs));
+
+      assertEquals(expected.getCompactionsSize(), compactionsByEcid.size());
+      expected.getCompactions().values().forEach(tec -> {
+        RunningCompactionSummary rcs = 
compactionsByEcid.get(tec.job.getExternalCompactionId());
+        assertNotNull(rcs);
+        assertEquals(tec.getJob().getExternalCompactionId(), rcs.getEcid());
+        assertEquals(tec.queueName, rcs.getQueueName());
+        assertEquals(tec.getCompactor(), rcs.getAddr());
+      });
+
+      // Confirm JSON output works
+      Gson gson = new GsonBuilder().setPrettyPrinting().create();
+      String json = gson.toJson(running);
+      System.out.println(json);
+      Type listType = new TypeToken<ArrayList<RunningCompactionSummary>>() 
{}.getType();
+      @SuppressWarnings("unused")
+      var unused = new GsonBuilder().create().fromJson(json, listType);
+    }
+  }
+
+}

Reply via email to