This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 3912b36d0b Added `--json` flag to `ec-admin running` command (#5332)
3912b36d0b is described below
commit 3912b36d0b0ca103b7068d7c7575c99d32a11a97
Author: Suvrat Acharya <[email protected]>
AuthorDate: Mon Apr 21 23:07:03 2025 +0530
Added `--json` flag to `ec-admin running` command (#5332)
Added option to print the running compactions as json.
Co-authored-by: Kevin Rathbun <[email protected]>
Co-authored-by: Dave Marion <[email protected]>
---
.../org/apache/accumulo/server/util/ECAdmin.java | 188 +++++++++++++++++----
.../java/org/apache/accumulo/test/ECAdminIT.java | 153 +++++++++++++++++
2 files changed, 308 insertions(+), 33 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
index 3f0bcc1e65..4b47e87b70 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
@@ -18,7 +18,13 @@
*/
package org.apache.accumulo.server.util;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -26,6 +32,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonManager.Mode;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
@@ -41,6 +48,8 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.auto.service.AutoService;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -49,6 +58,111 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
*/
@AutoService(KeywordExecutable.class)
public class ECAdmin implements KeywordExecutable {
+
+ public static class RunningCompactionSummary {
+ private final String ecid;
+ private final String addr;
+ private final TCompactionKind kind;
+ private final String queueName;
+ private final String ke;
+ private final String tableId;
+ private String status = "";
+ private long lastUpdate = 0;
+ private long duration = 0;
+ private int numFiles = 0;
+ private double progress = 0.0;
+
+ public RunningCompactionSummary(RunningCompaction runningCompaction,
+ RunningCompactionInfo runningCompactionInfo) {
+ super();
+ ecid = runningCompaction.getJob().getExternalCompactionId();
+ addr = runningCompaction.getCompactorAddress();
+ kind = runningCompaction.getJob().kind;
+ queueName = runningCompaction.getQueueName();
+ KeyExtent extent =
KeyExtent.fromThrift(runningCompaction.getJob().extent);
+ ke = extent.obscured();
+ tableId = extent.tableId().canonical();
+ if (runningCompactionInfo != null) {
+ status = runningCompactionInfo.status;
+ lastUpdate = runningCompactionInfo.lastUpdate;
+ duration = runningCompactionInfo.duration;
+ numFiles = runningCompactionInfo.numFiles;
+ progress = runningCompactionInfo.progress;
+ }
+
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public long getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public void setLastUpdate(long lastUpdate) {
+ this.lastUpdate = lastUpdate;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ public int getNumFiles() {
+ return numFiles;
+ }
+
+ public void setNumFiles(int numFiles) {
+ this.numFiles = numFiles;
+ }
+
+ public double getProgress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+
+ public String getEcid() {
+ return ecid;
+ }
+
+ public String getAddr() {
+ return addr;
+ }
+
+ public TCompactionKind getKind() {
+ return kind;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getKe() {
+ return ke;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ public void print(PrintStream out) {
+ out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queueName,
tableId);
+ out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress:
%.2f%%\n", status,
+ lastUpdate, duration, numFiles, progress);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(ECAdmin.class);
@Parameters(commandDescription = "cancel the external compaction with given
ECID")
@@ -62,6 +176,9 @@ public class ECAdmin implements KeywordExecutable {
@Parameter(names = {"-d", "--details"},
description = "display details about the running compactions")
boolean details = false;
+
+ @Parameter(names = {"-j", "--json"}, description = "format the output as
json")
+ boolean jsonOutput = false;
}
@Parameters(commandDescription = "list all compactors in zookeeper")
@@ -116,7 +233,18 @@ public class ECAdmin implements KeywordExecutable {
} else if (cl.getParsedCommand().equals("cancel")) {
cancelCompaction(context, cancelOps.ecid);
} else if (cl.getParsedCommand().equals("running")) {
- runningCompactions(context, runningOpts.details);
+ List<RunningCompactionSummary> compactions =
+ runningCompactions(context, runningOpts.details);
+ if (runningOpts.jsonOutput) {
+ try {
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(compactions));
+ } catch (Exception e) {
+ log.error("Error generating JSON output", e);
+ }
+ } else {
+ compactions.forEach(c -> c.print(System.out));
+ }
} else {
log.error("Unknown command {}", cl.getParsedCommand());
cl.usage();
@@ -130,7 +258,7 @@ public class ECAdmin implements KeywordExecutable {
}
}
- private void cancelCompaction(ServerContext context, String ecid) {
+ protected void cancelCompaction(ServerContext context, String ecid) {
CompactionCoordinatorService.Client coordinatorClient = null;
ecid = ExternalCompactionId.from(ecid).canonical();
try {
@@ -144,7 +272,7 @@ public class ECAdmin implements KeywordExecutable {
}
}
- private void listCompactorsByQueue(ServerContext context) {
+ protected void listCompactorsByQueue(ServerContext context) {
var queueToCompactorsMap =
ExternalCompactionUtil.getCompactorAddrs(context);
if (queueToCompactorsMap.isEmpty()) {
System.out.println("No Compactors found.");
@@ -153,43 +281,37 @@ public class ECAdmin implements KeywordExecutable {
}
}
- private void runningCompactions(ServerContext context, boolean details) {
+ protected List<RunningCompactionSummary> runningCompactions(ServerContext
context,
+ boolean details) {
CompactionCoordinatorService.Client coordinatorClient = null;
- TExternalCompactionList running;
+
try {
coordinatorClient = getCoordinatorClient(context);
- running = coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(),
context.rpcCreds());
- if (running == null) {
- System.out.println("No running compactions found.");
- return;
- }
- var ecidMap = running.getCompactions();
- if (ecidMap == null) {
+
+ // Fetch running compactions as a list and convert to a map
+ TExternalCompactionList running =
+ coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(),
context.rpcCreds());
+
+ List<RunningCompactionSummary> results = new ArrayList<>();
+
+ if (running == null || running.getCompactions() == null
+ || running.getCompactions().isEmpty()) {
System.out.println("No running compactions found.");
- return;
+ return results;
}
- ecidMap.forEach((ecid, ec) -> {
- if (ec != null) {
- var runningCompaction = new RunningCompaction(ec);
- var addr = runningCompaction.getCompactorAddress();
- var kind = runningCompaction.getJob().kind;
- var queue = runningCompaction.getQueueName();
- var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
- System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind,
queue, ke.tableId());
- if (details) {
- var runningCompactionInfo = new RunningCompactionInfo(ec);
- var status = runningCompactionInfo.status;
- var last = runningCompactionInfo.lastUpdate;
- var duration = runningCompactionInfo.duration;
- var numFiles = runningCompactionInfo.numFiles;
- var progress = runningCompactionInfo.progress;
- System.out.format(" %s Last Update: %dms Duration: %dms Files: %d
Progress: %.2f%%\n",
- status, last, duration, numFiles, progress);
- }
+
+ for (Map.Entry<String,TExternalCompaction> entry :
running.getCompactions().entrySet()) {
+ TExternalCompaction ec = entry.getValue();
+ if (ec == null) {
+ continue;
}
- });
+ var summary = new RunningCompactionSummary(new RunningCompaction(ec),
+ details ? new RunningCompactionInfo(ec) : null);
+ results.add(summary);
+ }
+ return results;
} catch (Exception e) {
- throw new RuntimeException("Unable to get running compactions.", e);
+ throw new IllegalStateException("Unable to get running compactions.", e);
} finally {
ThriftUtil.returnClient(coordinatorClient, context);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
new file mode 100644
index 0000000000..ee584ed7be
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE7;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.util.ECAdmin;
+import org.apache.accumulo.server.util.ECAdmin.RunningCompactionSummary;
+import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+public class ECAdminIT extends SharedMiniClusterBase {
+
+ // Class exists for access to protected methods
+ private static class TestECAdmin extends ECAdmin {
+
+ @Override
+ protected void cancelCompaction(ServerContext context, String ecid) {
+ super.cancelCompaction(context, ecid);
+ }
+
+ @Override
+ protected void listCompactorsByQueue(ServerContext context) {
+ super.listCompactorsByQueue(context);
+ }
+
+ @Override
+ protected List<RunningCompactionSummary> runningCompactions(ServerContext
context,
+ boolean details) {
+ return super.runningCompactions(context, details);
+ }
+ }
+
+ private static final class ECAdminITConfig implements
MiniClusterConfigurationCallback {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
coreSite) {
+ ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+ }
+
+ }
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ SharedMiniClusterBase.startMiniClusterWithConfig(new ECAdminITConfig());
+
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+ getCluster().getClusterControl().startCompactors(Compactor.class, 1,
QUEUE7);
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ private final TestECAdmin eca = new TestECAdmin();
+
+ @Test
+ public void testListRunningCompactions() throws Exception {
+
+ final String tableName = this.getUniqueNames(1)[0];
+
+ try (final AccumuloClient client =
+ Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
+
+ createTable(client, tableName, "cs7");
+ IteratorSetting setting = new IteratorSetting(50, "sleepy",
SlowIterator.class);
+ setting.addOption("sleepTime", "3000");
+ setting.addOption("seekSleepTime", "3000");
+ client.tableOperations().attachIterator(tableName, setting,
EnumSet.of(IteratorScope.majc));
+ writeData(client, tableName);
+ compact(client, tableName, 2, QUEUE7, false);
+
+ // wait for the compaction to start
+ TExternalCompactionList expected =
+
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
+ while (expected == null || expected.getCompactionsSize() == 0) {
+ Thread.sleep(1000);
+ expected =
+
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
+ }
+
+ final List<RunningCompactionSummary> running =
+ eca.runningCompactions(getCluster().getServerContext(), true);
+ final Map<String,RunningCompactionSummary> compactionsByEcid = new
HashMap<>();
+ running.forEach(rcs -> compactionsByEcid.put(rcs.getEcid(), rcs));
+
+ assertEquals(expected.getCompactionsSize(), compactionsByEcid.size());
+ expected.getCompactions().values().forEach(tec -> {
+ RunningCompactionSummary rcs =
compactionsByEcid.get(tec.job.getExternalCompactionId());
+ assertNotNull(rcs);
+ assertEquals(tec.getJob().getExternalCompactionId(), rcs.getEcid());
+ assertEquals(tec.queueName, rcs.getQueueName());
+ assertEquals(tec.getCompactor(), rcs.getAddr());
+ });
+
+ // Confirm JSON output works
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String json = gson.toJson(running);
+ System.out.println(json);
+ Type listType = new TypeToken<ArrayList<RunningCompactionSummary>>()
{}.getType();
+ @SuppressWarnings("unused")
+ var unused = new GsonBuilder().create().fromJson(json, listType);
+ }
+ }
+
+}