This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 4154b309494fd504ecbd93a5d4edb507b7cca2c5
Merge: 9438cc6201 3912b36d0b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Apr 21 18:10:23 2025 +0000

    Merge branch '2.1'

 .../org/apache/accumulo/server/util/ECAdmin.java   | 181 +++++++++++++++++----
 .../java/org/apache/accumulo/test/ECAdminIT.java   | 156 ++++++++++++++++++
 2 files changed, 305 insertions(+), 32 deletions(-)

diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
index 710568ff29,4b47e87b70..db341f48d1
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
@@@ -21,17 -22,19 +22,19 @@@ import java.io.PrintStream
  import java.util.ArrayList;
  import java.util.List;
  import java.util.Map;
 +import java.util.Set;
 +import java.util.TreeMap;
  
 +import org.apache.accumulo.core.client.admin.servers.ServerId;
  import 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+ import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 -import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionMap;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 -import org.apache.accumulo.core.singletons.SingletonManager;
 -import org.apache.accumulo.core.singletons.SingletonManager.Mode;
+ import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
  import org.apache.accumulo.core.trace.TraceUtil;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
  import org.apache.accumulo.core.util.compaction.RunningCompaction;
  import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
@@@ -45,7 -48,8 +48,9 @@@ import com.beust.jcommander.JCommander
  import com.beust.jcommander.Parameter;
  import com.beust.jcommander.Parameters;
  import com.google.auto.service.AutoService;
 +import com.google.common.net.HostAndPort;
+ import com.google.gson.Gson;
+ import com.google.gson.GsonBuilder;
  
  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  
@@@ -54,6 -58,111 +59,111 @@@
   */
  @AutoService(KeywordExecutable.class)
  public class ECAdmin implements KeywordExecutable {
+ 
+   public static class RunningCompactionSummary {
+     private final String ecid;
+     private final String addr;
+     private final TCompactionKind kind;
 -    private final String queueName;
++    private final String groupName;
+     private final String ke;
+     private final String tableId;
+     private String status = "";
+     private long lastUpdate = 0;
+     private long duration = 0;
+     private int numFiles = 0;
+     private double progress = 0.0;
+ 
+     public RunningCompactionSummary(RunningCompaction runningCompaction,
+         RunningCompactionInfo runningCompactionInfo) {
+       super();
+       ecid = runningCompaction.getJob().getExternalCompactionId();
+       addr = runningCompaction.getCompactorAddress();
+       kind = runningCompaction.getJob().kind;
 -      queueName = runningCompaction.getQueueName();
++      groupName = runningCompaction.getGroupName();
+       KeyExtent extent = 
KeyExtent.fromThrift(runningCompaction.getJob().extent);
+       ke = extent.obscured();
+       tableId = extent.tableId().canonical();
+       if (runningCompactionInfo != null) {
+         status = runningCompactionInfo.status;
+         lastUpdate = runningCompactionInfo.lastUpdate;
+         duration = runningCompactionInfo.duration;
+         numFiles = runningCompactionInfo.numFiles;
+         progress = runningCompactionInfo.progress;
+       }
+ 
+     }
+ 
+     public String getStatus() {
+       return status;
+     }
+ 
+     public void setStatus(String status) {
+       this.status = status;
+     }
+ 
+     public long getLastUpdate() {
+       return lastUpdate;
+     }
+ 
+     public void setLastUpdate(long lastUpdate) {
+       this.lastUpdate = lastUpdate;
+     }
+ 
+     public long getDuration() {
+       return duration;
+     }
+ 
+     public void setDuration(long duration) {
+       this.duration = duration;
+     }
+ 
+     public int getNumFiles() {
+       return numFiles;
+     }
+ 
+     public void setNumFiles(int numFiles) {
+       this.numFiles = numFiles;
+     }
+ 
+     public double getProgress() {
+       return progress;
+     }
+ 
+     public void setProgress(double progress) {
+       this.progress = progress;
+     }
+ 
+     public String getEcid() {
+       return ecid;
+     }
+ 
+     public String getAddr() {
+       return addr;
+     }
+ 
+     public TCompactionKind getKind() {
+       return kind;
+     }
+ 
 -    public String getQueueName() {
 -      return queueName;
++    public String getGroupName() {
++      return groupName;
+     }
+ 
+     public String getKe() {
+       return ke;
+     }
+ 
+     public String getTableId() {
+       return tableId;
+     }
+ 
+     public void print(PrintStream out) {
 -      out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queueName, 
tableId);
++      out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, groupName, 
tableId);
+       out.format("  %s Last Update: %dms Duration: %dms Files: %d Progress: 
%.2f%%\n", status,
+           lastUpdate, duration, numFiles, progress);
+     }
+   }
+ 
    private static final Logger log = LoggerFactory.getLogger(ECAdmin.class);
  
    @Parameters(commandDescription = "cancel the external compaction with given 
ECID")
@@@ -147,54 -272,44 +271,47 @@@
      }
    }
  
-   private void listCompactorsByQueue(ServerContext context) {
+   protected void listCompactorsByQueue(ServerContext context) {
 -    var queueToCompactorsMap = 
ExternalCompactionUtil.getCompactorAddrs(context);
 -    if (queueToCompactorsMap.isEmpty()) {
 +    Set<ServerId> compactors = 
context.instanceOperations().getServers(ServerId.Type.COMPACTOR);
 +    if (compactors.isEmpty()) {
        System.out.println("No Compactors found.");
      } else {
 -      queueToCompactorsMap.forEach((q, compactors) -> System.out.println(q + 
": " + compactors));
 +      Map<String,List<ServerId>> m = new TreeMap<>();
 +      compactors.forEach(csi -> {
 +        m.putIfAbsent(csi.getResourceGroup(), new ArrayList<>()).add(csi);
 +      });
 +      m.forEach((q, c) -> System.out.println(q + ": " + c));
      }
    }
  
-   private void runningCompactions(ServerContext context, boolean details) {
+   protected List<RunningCompactionSummary> runningCompactions(ServerContext 
context,
+       boolean details) {
      CompactionCoordinatorService.Client coordinatorClient = null;
-     TExternalCompactionMap running;
 -
      try {
        coordinatorClient = getCoordinatorClient(context);
-       running = 
coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), 
context.rpcCreds());
-       if (running == null) {
-         System.out.println("No running compactions found.");
-         return;
-       }
-       var ecidMap = running.getCompactions();
-       if (ecidMap == null) {
+ 
+       // Fetch running compactions as a list and convert to a map
 -      TExternalCompactionList running =
++      TExternalCompactionMap running =
+           coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), 
context.rpcCreds());
+ 
+       List<RunningCompactionSummary> results = new ArrayList<>();
+ 
+       if (running == null || running.getCompactions() == null
+           || running.getCompactions().isEmpty()) {
          System.out.println("No running compactions found.");
-         return;
+         return results;
        }
-       ecidMap.forEach((ecid, ec) -> {
-         if (ec != null) {
-           var runningCompaction = new RunningCompaction(ec);
-           var addr = runningCompaction.getCompactorAddress();
-           var kind = runningCompaction.getJob().kind;
-           var group = runningCompaction.getGroupName();
-           var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
-           System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, 
group, ke.tableId());
-           if (details) {
-             var runningCompactionInfo = new RunningCompactionInfo(ec);
-             var status = runningCompactionInfo.status;
-             var last = runningCompactionInfo.lastUpdate;
-             var duration = runningCompactionInfo.duration;
-             var numFiles = runningCompactionInfo.numFiles;
-             var progress = runningCompactionInfo.progress;
-             System.out.format("  %s Last Update: %dms Duration: %dms Files: 
%d Progress: %.2f%%\n",
-                 status, last, duration, numFiles, progress);
-           }
+ 
+       for (Map.Entry<String,TExternalCompaction> entry : 
running.getCompactions().entrySet()) {
+         TExternalCompaction ec = entry.getValue();
+         if (ec == null) {
+           continue;
          }
-       });
+         var summary = new RunningCompactionSummary(new RunningCompaction(ec),
+             details ? new RunningCompactionInfo(ec) : null);
+         results.add(summary);
+       }
+       return results;
      } catch (Exception e) {
        throw new IllegalStateException("Unable to get running compactions.", 
e);
      } finally {
diff --cc test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
index 0000000000,ee584ed7be..4a8a5eafb9
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ECAdminIT.java
@@@ -1,0 -1,153 +1,156 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test;
+ 
 -import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE7;
++import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP7;
+ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
+ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertNotNull;
+ 
+ import java.lang.reflect.Type;
+ import java.util.ArrayList;
+ import java.util.EnumSet;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.Optional;
+ 
 -import org.apache.accumulo.compactor.Compactor;
 -import org.apache.accumulo.coordinator.CompactionCoordinator;
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.IteratorSetting;
 -import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
++import org.apache.accumulo.core.compaction.thrift.TExternalCompactionMap;
+ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
++import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
+ import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+ import org.apache.accumulo.harness.SharedMiniClusterBase;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.server.util.ECAdmin;
+ import org.apache.accumulo.server.util.ECAdmin.RunningCompactionSummary;
+ import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
+ import org.apache.accumulo.test.functional.SlowIterator;
+ import org.apache.hadoop.conf.Configuration;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.Test;
+ 
++import com.google.common.net.HostAndPort;
+ import com.google.gson.Gson;
+ import com.google.gson.GsonBuilder;
+ import com.google.gson.reflect.TypeToken;
+ 
+ public class ECAdminIT extends SharedMiniClusterBase {
+ 
+   // Class exists for access to protected methods
+   private static class TestECAdmin extends ECAdmin {
+ 
+     @Override
+     protected void cancelCompaction(ServerContext context, String ecid) {
+       super.cancelCompaction(context, ecid);
+     }
+ 
+     @Override
+     protected void listCompactorsByQueue(ServerContext context) {
+       super.listCompactorsByQueue(context);
+     }
+ 
+     @Override
+     protected List<RunningCompactionSummary> runningCompactions(ServerContext 
context,
+         boolean details) {
+       return super.runningCompactions(context, details);
+     }
+   }
+ 
+   private static final class ECAdminITConfig implements 
MiniClusterConfigurationCallback {
+ 
+     @Override
+     public void configureMiniCluster(MiniAccumuloConfigImpl cfg, 
Configuration coreSite) {
+       ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
++      cfg.getClusterServerConfiguration().addCompactorResourceGroup(GROUP7, 
1);
+     }
+ 
+   }
+ 
+   @BeforeAll
+   public static void beforeAll() throws Exception {
+     SharedMiniClusterBase.startMiniClusterWithConfig(new ECAdminITConfig());
 -    
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
 -    getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE7);
+   }
+ 
+   @AfterAll
+   public static void afterAll() throws Exception {
+     SharedMiniClusterBase.stopMiniCluster();
+   }
+ 
+   private final TestECAdmin eca = new TestECAdmin();
+ 
+   @Test
+   public void testListRunningCompactions() throws Exception {
+ 
+     final String tableName = this.getUniqueNames(1)[0];
+ 
+     try (final AccumuloClient client =
+         
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
+ 
+       createTable(client, tableName, "cs7");
+       IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+       setting.addOption("sleepTime", "3000");
+       setting.addOption("seekSleepTime", "3000");
+       client.tableOperations().attachIterator(tableName, setting, 
EnumSet.of(IteratorScope.majc));
+       writeData(client, tableName);
 -      compact(client, tableName, 2, QUEUE7, false);
++      compact(client, tableName, 2, GROUP7, false);
++
++      Optional<HostAndPort> coordinatorHost =
++          
ExternalCompactionUtil.findCompactionCoordinator(getCluster().getServerContext());
+ 
+       // wait for the compaction to start
 -      TExternalCompactionList expected =
 -          
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
++      TExternalCompactionMap expected = ExternalCompactionTestUtils
++          .getRunningCompactions(getCluster().getServerContext(), 
coordinatorHost);
+       while (expected == null || expected.getCompactionsSize() == 0) {
+         Thread.sleep(1000);
 -        expected =
 -            
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
++        expected = ExternalCompactionTestUtils
++            .getRunningCompactions(getCluster().getServerContext(), 
coordinatorHost);
+       }
+ 
+       final List<RunningCompactionSummary> running =
+           eca.runningCompactions(getCluster().getServerContext(), true);
+       final Map<String,RunningCompactionSummary> compactionsByEcid = new 
HashMap<>();
+       running.forEach(rcs -> compactionsByEcid.put(rcs.getEcid(), rcs));
+ 
+       assertEquals(expected.getCompactionsSize(), compactionsByEcid.size());
+       expected.getCompactions().values().forEach(tec -> {
+         RunningCompactionSummary rcs = 
compactionsByEcid.get(tec.job.getExternalCompactionId());
+         assertNotNull(rcs);
+         assertEquals(tec.getJob().getExternalCompactionId(), rcs.getEcid());
 -        assertEquals(tec.queueName, rcs.getQueueName());
++        assertEquals(tec.groupName, rcs.getGroupName());
+         assertEquals(tec.getCompactor(), rcs.getAddr());
+       });
+ 
+       // Confirm JSON output works
+       Gson gson = new GsonBuilder().setPrettyPrinting().create();
+       String json = gson.toJson(running);
+       System.out.println(json);
+       Type listType = new TypeToken<ArrayList<RunningCompactionSummary>>() 
{}.getType();
+       @SuppressWarnings("unused")
+       var unused = new GsonBuilder().create().fromJson(json, listType);
+     }
+   }
+ 
+ }

Reply via email to