This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 4154b309494fd504ecbd93a5d4edb507b7cca2c5 Merge: 9438cc6201 3912b36d0b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Apr 21 18:10:23 2025 +0000 Merge branch '2.1' .../org/apache/accumulo/server/util/ECAdmin.java | 181 +++++++++++++++++---- .../java/org/apache/accumulo/test/ECAdminIT.java | 156 ++++++++++++++++++ 2 files changed, 305 insertions(+), 32 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java index 710568ff29,4b47e87b70..db341f48d1 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java @@@ -21,17 -22,19 +22,19 @@@ import java.io.PrintStream import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; + import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; -import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionMap; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.singletons.SingletonManager; -import org.apache.accumulo.core.singletons.SingletonManager.Mode; + import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; @@@ -45,7 -48,8 +48,9 @@@ import com.beust.jcommander.JCommander import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.google.auto.service.AutoService; +import com.google.common.net.HostAndPort; + import com.google.gson.Gson; + import com.google.gson.GsonBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@@ -54,6 -58,111 +59,111 @@@ */ @AutoService(KeywordExecutable.class) public class ECAdmin implements KeywordExecutable { + + public static class RunningCompactionSummary { + private final String ecid; + private final String addr; + private final TCompactionKind kind; - private final String queueName; ++ private final String groupName; + private final String ke; + private final String tableId; + private String status = ""; + private long lastUpdate = 0; + private long duration = 0; + private int numFiles = 0; + private double progress = 0.0; + + public RunningCompactionSummary(RunningCompaction runningCompaction, + RunningCompactionInfo runningCompactionInfo) { + super(); + ecid = runningCompaction.getJob().getExternalCompactionId(); + addr = runningCompaction.getCompactorAddress(); + kind = runningCompaction.getJob().kind; - queueName = runningCompaction.getQueueName(); ++ groupName = runningCompaction.getGroupName(); + KeyExtent extent = KeyExtent.fromThrift(runningCompaction.getJob().extent); + ke = extent.obscured(); + tableId = extent.tableId().canonical(); + if (runningCompactionInfo != null) { + status = runningCompactionInfo.status; + lastUpdate = runningCompactionInfo.lastUpdate; + duration = runningCompactionInfo.duration; + numFiles = runningCompactionInfo.numFiles; + progress = runningCompactionInfo.progress; + } + + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public long getLastUpdate() { + return lastUpdate; + } + + public void setLastUpdate(long lastUpdate) { + this.lastUpdate = lastUpdate; + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + public int getNumFiles() { + return numFiles; + } + + public void setNumFiles(int numFiles) { + this.numFiles = numFiles; + } + + public double getProgress() { + return progress; + } + + public void setProgress(double progress) { + this.progress = progress; + } + + public String getEcid() { + return ecid; + } + + public String getAddr() { + return addr; + } + + public TCompactionKind getKind() { + return kind; + } + - public String getQueueName() { - return queueName; ++ public String getGroupName() { ++ return groupName; + } + + public String getKe() { + return ke; + } + + public String getTableId() { + return tableId; + } + + public void print(PrintStream out) { - out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queueName, tableId); ++ out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, groupName, tableId); + out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n", status, + lastUpdate, duration, numFiles, progress); + } + } + private static final Logger log = LoggerFactory.getLogger(ECAdmin.class); @Parameters(commandDescription = "cancel the external compaction with given ECID") @@@ -147,54 -272,44 +271,47 @@@ } } - private void listCompactorsByQueue(ServerContext context) { + protected void listCompactorsByQueue(ServerContext context) { - var queueToCompactorsMap = ExternalCompactionUtil.getCompactorAddrs(context); - if (queueToCompactorsMap.isEmpty()) { + Set<ServerId> compactors = context.instanceOperations().getServers(ServerId.Type.COMPACTOR); + if (compactors.isEmpty()) { System.out.println("No Compactors found."); } else { - queueToCompactorsMap.forEach((q, compactors) -> System.out.println(q + ": " + compactors)); + Map<String,List<ServerId>> m = new TreeMap<>(); + compactors.forEach(csi -> { + m.putIfAbsent(csi.getResourceGroup(), new ArrayList<>()).add(csi); + }); + m.forEach((q, c) -> System.out.println(q + ": " + c)); } } - private void runningCompactions(ServerContext context, boolean details) { + protected List<RunningCompactionSummary> runningCompactions(ServerContext context, + boolean details) { CompactionCoordinatorService.Client coordinatorClient = null; - TExternalCompactionMap running; - try { coordinatorClient = getCoordinatorClient(context); - running = coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds()); - if (running == null) { - System.out.println("No running compactions found."); - return; - } - var ecidMap = running.getCompactions(); - if (ecidMap == null) { + + // Fetch running compactions as a list and convert to a map - TExternalCompactionList running = ++ TExternalCompactionMap running = + coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds()); + + List<RunningCompactionSummary> results = new ArrayList<>(); + + if (running == null || running.getCompactions() == null + || running.getCompactions().isEmpty()) { System.out.println("No running compactions found."); - return; + return results; } - ecidMap.forEach((ecid, ec) -> { - if (ec != null) { - var runningCompaction = new RunningCompaction(ec); - var addr = runningCompaction.getCompactorAddress(); - var kind = runningCompaction.getJob().kind; - var group = runningCompaction.getGroupName(); - var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent); - System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, group, ke.tableId()); - if (details) { - var runningCompactionInfo = new RunningCompactionInfo(ec); - var status = runningCompactionInfo.status; - var last = runningCompactionInfo.lastUpdate; - var duration = runningCompactionInfo.duration; - var numFiles = runningCompactionInfo.numFiles; - var progress = runningCompactionInfo.progress; - System.out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n", - status, last, duration, numFiles, progress); - } + + for (Map.Entry<String,TExternalCompaction> entry : running.getCompactions().entrySet()) { + TExternalCompaction ec = entry.getValue(); + if (ec == null) { + continue; } - }); + var summary = new RunningCompactionSummary(new RunningCompaction(ec), + details ? new RunningCompactionInfo(ec) : null); + results.add(summary); + } + return results; } catch (Exception e) { throw new IllegalStateException("Unable to get running compactions.", e); } finally { diff --cc test/src/main/java/org/apache/accumulo/test/ECAdminIT.java index 0000000000,ee584ed7be..4a8a5eafb9 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java @@@ -1,0 -1,153 +1,156 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE7; ++import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP7; + import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; + import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; + import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertNotNull; + + import java.lang.reflect.Type; + import java.util.ArrayList; + import java.util.EnumSet; + import java.util.HashMap; + import java.util.List; + import java.util.Map; ++import java.util.Optional; + -import org.apache.accumulo.compactor.Compactor; -import org.apache.accumulo.coordinator.CompactionCoordinator; + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; ++import org.apache.accumulo.core.compaction.thrift.TExternalCompactionMap; + import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; ++import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; + import org.apache.accumulo.harness.MiniClusterConfigurationCallback; + import org.apache.accumulo.harness.SharedMiniClusterBase; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.server.util.ECAdmin; + import org.apache.accumulo.server.util.ECAdmin.RunningCompactionSummary; + import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; + import org.apache.accumulo.test.functional.SlowIterator; + import org.apache.hadoop.conf.Configuration; + import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.BeforeAll; + import org.junit.jupiter.api.Test; + ++import com.google.common.net.HostAndPort; + import com.google.gson.Gson; + import com.google.gson.GsonBuilder; + import com.google.gson.reflect.TypeToken; + + public class ECAdminIT extends SharedMiniClusterBase { + + // Class exists for access to protected methods + private static class TestECAdmin extends ECAdmin { + + @Override + protected void cancelCompaction(ServerContext context, String ecid) { + super.cancelCompaction(context, ecid); + } + + @Override + protected void listCompactorsByQueue(ServerContext context) { + super.listCompactorsByQueue(context); + } + + @Override + protected List<RunningCompactionSummary> runningCompactions(ServerContext context, + boolean details) { + return super.runningCompactions(context, details); + } + } + + private static final class ECAdminITConfig implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); ++ cfg.getClusterServerConfiguration().addCompactorResourceGroup(GROUP7, 1); + } + + } + + @BeforeAll + public static void beforeAll() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new ECAdminITConfig()); - getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); - getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE7); + } + + @AfterAll + public static void afterAll() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + private final TestECAdmin eca = new TestECAdmin(); + + @Test + public void testListRunningCompactions() throws Exception { + + final String tableName = this.getUniqueNames(1)[0]; + + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + createTable(client, tableName, "cs7"); + IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); + setting.addOption("sleepTime", "3000"); + setting.addOption("seekSleepTime", "3000"); + client.tableOperations().attachIterator(tableName, setting, EnumSet.of(IteratorScope.majc)); + writeData(client, tableName); - compact(client, tableName, 2, QUEUE7, false); ++ compact(client, tableName, 2, GROUP7, false); ++ ++ Optional<HostAndPort> coordinatorHost = ++ ExternalCompactionUtil.findCompactionCoordinator(getCluster().getServerContext()); + + // wait for the compaction to start - TExternalCompactionList expected = - ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext()); ++ TExternalCompactionMap expected = ExternalCompactionTestUtils ++ .getRunningCompactions(getCluster().getServerContext(), coordinatorHost); + while (expected == null || expected.getCompactionsSize() == 0) { + Thread.sleep(1000); - expected = - ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext()); ++ expected = ExternalCompactionTestUtils ++ .getRunningCompactions(getCluster().getServerContext(), coordinatorHost); + } + + final List<RunningCompactionSummary> running = + eca.runningCompactions(getCluster().getServerContext(), true); + final Map<String,RunningCompactionSummary> compactionsByEcid = new HashMap<>(); + running.forEach(rcs -> compactionsByEcid.put(rcs.getEcid(), rcs)); + + assertEquals(expected.getCompactionsSize(), compactionsByEcid.size()); + expected.getCompactions().values().forEach(tec -> { + RunningCompactionSummary rcs = compactionsByEcid.get(tec.job.getExternalCompactionId()); + assertNotNull(rcs); + assertEquals(tec.getJob().getExternalCompactionId(), rcs.getEcid()); - assertEquals(tec.queueName, rcs.getQueueName()); ++ assertEquals(tec.groupName, rcs.getGroupName()); + assertEquals(tec.getCompactor(), rcs.getAddr()); + }); + + // Confirm JSON output works + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + String json = gson.toJson(running); + System.out.println(json); + Type listType = new TypeToken<ArrayList<RunningCompactionSummary>>() {}.getType(); + @SuppressWarnings("unused") + var unused = new GsonBuilder().create().fromJson(json, listType); + } + } + + }