This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 3912b36d0b Added `--json` flag to `ec-admin running` command (#5332) 3912b36d0b is described below commit 3912b36d0b0ca103b7068d7c7575c99d32a11a97 Author: Suvrat Acharya <140749446+suvrat1...@users.noreply.github.com> AuthorDate: Mon Apr 21 23:07:03 2025 +0530 Added `--json` flag to `ec-admin running` command (#5332) Added option to print the running compactions as json. Co-authored-by: Kevin Rathbun <kevinrr...@gmail.com> Co-authored-by: Dave Marion <dlmar...@apache.org> --- .../org/apache/accumulo/server/util/ECAdmin.java | 188 +++++++++++++++++---- .../java/org/apache/accumulo/test/ECAdminIT.java | 153 +++++++++++++++++ 2 files changed, 308 insertions(+), 33 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java index 3f0bcc1e65..4b47e87b70 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java @@ -18,7 +18,13 @@ */ package org.apache.accumulo.server.util; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -26,6 +32,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.singletons.SingletonManager.Mode; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; @@ -41,6 +48,8 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.google.auto.service.AutoService; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -49,6 +58,111 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; */ @AutoService(KeywordExecutable.class) public class ECAdmin implements KeywordExecutable { + + public static class RunningCompactionSummary { + private final String ecid; + private final String addr; + private final TCompactionKind kind; + private final String queueName; + private final String ke; + private final String tableId; + private String status = ""; + private long lastUpdate = 0; + private long duration = 0; + private int numFiles = 0; + private double progress = 0.0; + + public RunningCompactionSummary(RunningCompaction runningCompaction, + RunningCompactionInfo runningCompactionInfo) { + super(); + ecid = runningCompaction.getJob().getExternalCompactionId(); + addr = runningCompaction.getCompactorAddress(); + kind = runningCompaction.getJob().kind; + queueName = runningCompaction.getQueueName(); + KeyExtent extent = KeyExtent.fromThrift(runningCompaction.getJob().extent); + ke = extent.obscured(); + tableId = extent.tableId().canonical(); + if (runningCompactionInfo != null) { + status = runningCompactionInfo.status; + lastUpdate = runningCompactionInfo.lastUpdate; + duration = runningCompactionInfo.duration; + numFiles = runningCompactionInfo.numFiles; + progress = runningCompactionInfo.progress; + } + + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public long getLastUpdate() { + return lastUpdate; + } + + public void setLastUpdate(long lastUpdate) { + this.lastUpdate = lastUpdate; + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + public int getNumFiles() { + return numFiles; + } + + public void setNumFiles(int numFiles) { + this.numFiles = numFiles; + } + + public double getProgress() { + return progress; + } + + public void setProgress(double progress) { + this.progress = progress; + } + + public String getEcid() { + return ecid; + } + + public String getAddr() { + return addr; + } + + public TCompactionKind getKind() { + return kind; + } + + public String getQueueName() { + return queueName; + } + + public String getKe() { + return ke; + } + + public String getTableId() { + return tableId; + } + + public void print(PrintStream out) { + out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queueName, tableId); + out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n", status, + lastUpdate, duration, numFiles, progress); + } + } + private static final Logger log = LoggerFactory.getLogger(ECAdmin.class); @Parameters(commandDescription = "cancel the external compaction with given ECID") @@ -62,6 +176,9 @@ public class ECAdmin implements KeywordExecutable { @Parameter(names = {"-d", "--details"}, description = "display details about the running compactions") boolean details = false; + + @Parameter(names = {"-j", "--json"}, description = "format the output as json") + boolean jsonOutput = false; } @Parameters(commandDescription = "list all compactors in zookeeper") @@ -116,7 +233,18 @@ public class ECAdmin implements KeywordExecutable { } else if (cl.getParsedCommand().equals("cancel")) { cancelCompaction(context, cancelOps.ecid); } else if (cl.getParsedCommand().equals("running")) { - runningCompactions(context, runningOpts.details); + List<RunningCompactionSummary> compactions = + runningCompactions(context, runningOpts.details); + if (runningOpts.jsonOutput) { + try { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + System.out.println(gson.toJson(compactions)); + } catch (Exception e) { + log.error("Error generating JSON output", e); + } + } else { + compactions.forEach(c -> c.print(System.out)); + } } else { log.error("Unknown command {}", cl.getParsedCommand()); cl.usage(); @@ -130,7 +258,7 @@ public class ECAdmin implements KeywordExecutable { } } - private void cancelCompaction(ServerContext context, String ecid) { + protected void cancelCompaction(ServerContext context, String ecid) { CompactionCoordinatorService.Client coordinatorClient = null; ecid = ExternalCompactionId.from(ecid).canonical(); try { @@ -144,7 +272,7 @@ public class ECAdmin implements KeywordExecutable { } } - private void listCompactorsByQueue(ServerContext context) { + protected void listCompactorsByQueue(ServerContext context) { var queueToCompactorsMap = ExternalCompactionUtil.getCompactorAddrs(context); if (queueToCompactorsMap.isEmpty()) { System.out.println("No Compactors found."); @@ -153,43 +281,37 @@ public class ECAdmin implements KeywordExecutable { } } - private void runningCompactions(ServerContext context, boolean details) { + protected List<RunningCompactionSummary> runningCompactions(ServerContext context, + boolean details) { CompactionCoordinatorService.Client coordinatorClient = null; - TExternalCompactionList running; + try { coordinatorClient = getCoordinatorClient(context); - running = coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds()); - if (running == null) { - System.out.println("No running compactions found."); - return; - } - var ecidMap = running.getCompactions(); - if (ecidMap == null) { + + // Fetch running compactions as a list and convert to a map + TExternalCompactionList running = + coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds()); + + List<RunningCompactionSummary> results = new ArrayList<>(); + + if (running == null || running.getCompactions() == null + || running.getCompactions().isEmpty()) { System.out.println("No running compactions found."); - return; + return results; } - ecidMap.forEach((ecid, ec) -> { - if (ec != null) { - var runningCompaction = new RunningCompaction(ec); - var addr = runningCompaction.getCompactorAddress(); - var kind = runningCompaction.getJob().kind; - var queue = runningCompaction.getQueueName(); - var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent); - System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queue, ke.tableId()); - if (details) { - var runningCompactionInfo = new RunningCompactionInfo(ec); - var status = runningCompactionInfo.status; - var last = runningCompactionInfo.lastUpdate; - var duration = runningCompactionInfo.duration; - var numFiles = runningCompactionInfo.numFiles; - var progress = runningCompactionInfo.progress; - System.out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n", - status, last, duration, numFiles, progress); - } + + for (Map.Entry<String,TExternalCompaction> entry : running.getCompactions().entrySet()) { + TExternalCompaction ec = entry.getValue(); + if (ec == null) { + continue; } - }); + var summary = new RunningCompactionSummary(new RunningCompaction(ec), + details ? new RunningCompactionInfo(ec) : null); + results.add(summary); + } + return results; } catch (Exception e) { - throw new RuntimeException("Unable to get running compactions.", e); + throw new IllegalStateException("Unable to get running compactions.", e); } finally { ThriftUtil.returnClient(coordinatorClient, context); } diff --git a/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java new file mode 100644 index 0000000000..ee584ed7be --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE7; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.util.ECAdmin; +import org.apache.accumulo.server.util.ECAdmin.RunningCompactionSummary; +import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +public class ECAdminIT extends SharedMiniClusterBase { + + // Class exists for access to protected methods + private static class TestECAdmin extends ECAdmin { + + @Override + protected void cancelCompaction(ServerContext context, String ecid) { + super.cancelCompaction(context, ecid); + } + + @Override + protected void listCompactorsByQueue(ServerContext context) { + super.listCompactorsByQueue(context); + } + + @Override + protected List<RunningCompactionSummary> runningCompactions(ServerContext context, + boolean details) { + return super.runningCompactions(context, details); + } + } + + private static final class ECAdminITConfig implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); + } + + } + + @BeforeAll + public static void beforeAll() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new ECAdminITConfig()); + getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); + getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE7); + } + + @AfterAll + public static void afterAll() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + private final TestECAdmin eca = new TestECAdmin(); + + @Test + public void testListRunningCompactions() throws Exception { + + final String tableName = this.getUniqueNames(1)[0]; + + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + createTable(client, tableName, "cs7"); + IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); + setting.addOption("sleepTime", "3000"); + setting.addOption("seekSleepTime", "3000"); + client.tableOperations().attachIterator(tableName, setting, EnumSet.of(IteratorScope.majc)); + writeData(client, tableName); + compact(client, tableName, 2, QUEUE7, false); + + // wait for the compaction to start + TExternalCompactionList expected = + ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext()); + while (expected == null || expected.getCompactionsSize() == 0) { + Thread.sleep(1000); + expected = + ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext()); + } + + final List<RunningCompactionSummary> running = + eca.runningCompactions(getCluster().getServerContext(), true); + final Map<String,RunningCompactionSummary> compactionsByEcid = new HashMap<>(); + running.forEach(rcs -> compactionsByEcid.put(rcs.getEcid(), rcs)); + + assertEquals(expected.getCompactionsSize(), compactionsByEcid.size()); + expected.getCompactions().values().forEach(tec -> { + RunningCompactionSummary rcs = compactionsByEcid.get(tec.job.getExternalCompactionId()); + assertNotNull(rcs); + assertEquals(tec.getJob().getExternalCompactionId(), rcs.getEcid()); + assertEquals(tec.queueName, rcs.getQueueName()); + assertEquals(tec.getCompactor(), rcs.getAddr()); + }); + + // Confirm JSON output works + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + String json = gson.toJson(running); + System.out.println(json); + Type listType = new TypeToken<ArrayList<RunningCompactionSummary>>() {}.getType(); + @SuppressWarnings("unused") + var unused = new GsonBuilder().create().fromJson(json, listType); + } + } + +}