This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 2bce893  Create page in Monitor for external compactions (#2358)
2bce893 is described below

commit 2bce8939145b49a765f998f5dbe2b6242a64e3e6
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Fri Nov 19 10:42:48 2021 -0500

    Create page in Monitor for external compactions (#2358)
    
    * Create multiple new classes for displaying 3 different tables of data
    in the new external compaction page in the monitor
    * Create 3 new ajax endpoints in ECResource
    * Modify Compactor and ExternalCompactionUtil to return Optional for the
    compaction coordinator instead of null
    * Add check for compaction coordinator to Monitor.fetchData()
    * New ExternalCompactionProgressIT for testing progress
    * Use new bootstrap panel and badges for coordinator info
    * Closes #2290
    
    Co-authored-by: Dom G. <47725857+domgargu...@users.noreply.github.com>
---
 .../util/compaction/ExternalCompactionUtil.java    |  10 +-
 .../org/apache/accumulo/compactor/Compactor.java   |   8 +-
 .../java/org/apache/accumulo/monitor/Monitor.java  |  89 +++++++++
 .../compactions/external/CompactionInputFile.java  |  38 ++++
 .../rest/compactions/external/CompactorInfo.java   |  33 ++++
 .../rest/compactions/external/Compactors.java      |  40 ++++
 .../rest/compactions/external/CoordinatorInfo.java |  41 +++++
 .../rest/compactions/external/ECResource.java      |  62 +++++++
 .../external/ExternalCompactionInfo.java           |  60 ++++++
 .../compactions/external/RunningCompactions.java   |  39 ++++
 .../compactions/external/RunningCompactorInfo.java | 133 ++++++++++++++
 .../org/apache/accumulo/monitor/view/WebViews.java |  25 +++
 .../org/apache/accumulo/monitor/resources/js/ec.js | 201 +++++++++++++++++++++
 .../org/apache/accumulo/monitor/templates/ec.ftl   |  80 ++++++++
 .../apache/accumulo/monitor/templates/navbar.ftl   |   1 +
 .../compaction/ExternalCompactionProgressIT.java   | 166 +++++++++++++++++
 .../compaction/ExternalCompactionTestUtils.java    |  42 +++--
 17 files changed, 1044 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index a5538b6..24dff44 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -93,19 +94,18 @@ public class ExternalCompactionUtil {
 
   /**
    *
-   * @return null if Coordinator node not found, else HostAndPort
+   * @return Optional HostAndPort of Coordinator node if found
    */
-  public static HostAndPort findCompactionCoordinator(ClientContext context) {
+  public static Optional<HostAndPort> findCompactionCoordinator(ClientContext 
context) {
     final String lockPath = context.getZooKeeperRoot() + 
Constants.ZCOORDINATOR_LOCK;
     try {
       var zk = ZooSession.getAnonymousSession(context.getZooKeepers(),
           context.getZooKeepersSessionTimeOut());
       byte[] address = ServiceLock.getLockData(zk, ServiceLock.path(lockPath));
       if (null == address) {
-        return null;
+        return Optional.empty();
       }
-      String coordinatorAddress = new String(address);
-      return HostAndPort.fromString(coordinatorAddress);
+      return Optional.of(HostAndPort.fromString(new String(address)));
     } catch (KeeperException | InterruptedException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 898177e..6565280 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -474,12 +474,12 @@ public class Compactor extends AbstractServer implements 
CompactorService.Iface
    *           when unable to get client
    */
   protected CompactionCoordinatorService.Client getCoordinatorClient() throws 
TTransportException {
-    HostAndPort coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(getContext());
-    if (null == coordinatorHost) {
+    var coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(getContext());
+    if (coordinatorHost.isEmpty()) {
       throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
     }
-    LOG.trace("CompactionCoordinator address is: {}", coordinatorHost);
-    return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, coordinatorHost, 
getContext());
+    LOG.trace("CompactionCoordinator address is: {}", coordinatorHost.get());
+    return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, 
coordinatorHost.get(), getContext());
   }
 
   /**
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 3fca610..dcd7b37 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -34,6 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -44,6 +45,9 @@ import jakarta.inject.Singleton;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.ManagerClient;
+import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService;
@@ -62,12 +66,14 @@ import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import 
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
 import org.apache.accumulo.monitor.util.logging.RecentLogs;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.HighlyAvailableService;
@@ -163,6 +169,12 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
   private Map<TableId,Map<ProblemType,Integer>> problemSummary = 
Collections.emptyMap();
   private Exception problemException;
   private GCStatus gcStatus;
+  private Optional<HostAndPort> coordinatorHost = Optional.empty();
+  private CompactionCoordinatorService.Client coordinatorClient;
+  private final String coordinatorMissingMsg =
+      "Error getting the compaction coordinator. Check that it is running. It 
is not "
+          + "started automatically with other cluster processes so must be 
started by running "
+          + "'accumulo compaction-coordinator'.";
 
   private EmbeddedWebServer server;
 
@@ -365,7 +377,17 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
         this.problemException = e;
       }
 
+      if (coordinatorHost.isEmpty()) {
+        coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(context);
+      } else {
+        log.info("External Compaction Coordinator found at {}", 
coordinatorHost.get());
+      }
+
     } finally {
+      if (coordinatorClient != null) {
+        ThriftUtil.returnClient(coordinatorClient, context);
+        coordinatorClient = null;
+      }
       lastRecalc.set(currentTime);
       // stop fetching; log an error if this thread wasn't already fetching
       if (!fetching.compareAndSet(true, false)) {
@@ -564,8 +586,11 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
   private final Map<HostAndPort,ScanStats> allScans = new HashMap<>();
   private final Map<HostAndPort,CompactionStats> allCompactions = new 
HashMap<>();
   private final RecentLogs recentLogs = new RecentLogs();
+  private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+  private final Map<String,TExternalCompaction> ecRunningMap = new HashMap<>();
   private long scansFetchedNanos = 0L;
   private long compactsFetchedNanos = 0L;
+  private long ecInfoFetchedNanos = 0L;
   private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1);
   private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15);
 
@@ -591,6 +616,67 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     return Map.copyOf(allCompactions);
   }
 
+  public synchronized ExternalCompactionInfo getCompactorsInfo() {
+    if (coordinatorHost.isEmpty()) {
+      throw new IllegalStateException("Tried fetching from compaction 
coordinator that's missing");
+    }
+    if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) {
+      log.info("User initiated fetch of External Compaction info");
+      Map<String,List<HostAndPort>> compactors =
+          ExternalCompactionUtil.getCompactorAddrs(getContext());
+      log.debug("Found compactors: " + compactors);
+      ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
+      ecInfo.setCompactors(compactors);
+      ecInfo.setCoordinatorHost(coordinatorHost);
+
+      ecInfoFetchedNanos = System.nanoTime();
+    }
+    return ecInfo;
+  }
+
+  /**
+   * Fetch running compactions from Compaction Coordinator. Chose not to 
restrict the frequency of
+   * user fetches since RPC calls are going to the coordinator. This allows 
for fine grain updates
+   * of external compaction progress.
+   */
+  public synchronized Map<String,TExternalCompaction> getRunningInfo() {
+    if (coordinatorHost.isEmpty()) {
+      throw new IllegalStateException(coordinatorMissingMsg);
+    }
+    var ccHost = coordinatorHost.get();
+    log.info("User initiated fetch of running External Compactions from " + 
ccHost);
+    var client = getCoordinator(ccHost);
+    TExternalCompactionList running;
+    try {
+      running = client.getRunningCompactions(TraceUtil.traceInfo(), 
getContext().rpcCreds());
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to get running compactions from 
" + ccHost, e);
+    }
+
+    ecRunningMap.clear();
+    if (running.getCompactions() != null) {
+      running.getCompactions().forEach((queue, ec) -> {
+        log.trace("Found Compactions running on queue {} -> {}", queue, ec);
+        ecRunningMap.put(queue, ec);
+      });
+    }
+
+    return ecRunningMap;
+  }
+
+  private CompactionCoordinatorService.Client getCoordinator(HostAndPort 
address) {
+    if (coordinatorClient == null) {
+      try {
+        coordinatorClient = ThriftUtil.getClient(new 
CompactionCoordinatorService.Client.Factory(),
+            address, getContext());
+      } catch (Exception e) {
+        log.error("Unable to get Compaction coordinator at {}", address);
+        throw new IllegalStateException(coordinatorMissingMsg, e);
+      }
+    }
+    return coordinatorClient;
+  }
+
   private void fetchScans() {
     ServerContext context = getContext();
     for (String server : context.instanceOperations().getTabletServers()) {
@@ -869,4 +955,7 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     return recentLogs;
   }
 
+  public Optional<HostAndPort> getCoordinatorHost() {
+    return coordinatorHost;
+  }
 }
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java
new file mode 100644
index 0000000..4ae5e23
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+/**
+ * Class for displaying input files
+ */
+public class CompactionInputFile {
+
+  // Variable names become JSON keys
+  public String metadataFileEntry;
+  public long size;
+  public long entries;
+  public long timestamp;
+
+  public CompactionInputFile(String metadataFileEntry, long size, long 
entries, long timestamp) {
+    this.metadataFileEntry = metadataFileEntry;
+    this.size = size;
+    this.entries = entries;
+    this.timestamp = timestamp;
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
new file mode 100644
index 0000000..1363ece
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+public class CompactorInfo {
+
+  // Variable names become JSON keys
+  public long lastContact;
+  public String server;
+  public String queueName;
+
+  public CompactorInfo(long fetchedTimeMillis, String queue, String 
hostAndPort) {
+    lastContact = System.currentTimeMillis() - fetchedTimeMillis;
+    queueName = queue;
+    server = hostAndPort;
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
new file mode 100644
index 0000000..089368e
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JSON Object for displaying External Compactions. Variable names become JSON 
Keys.
+ */
+public class Compactors {
+
+  // Variable names become JSON keys
+  public final int numCompactors;
+  public final List<CompactorInfo> compactors = new ArrayList<>();
+
+  public Compactors(ExternalCompactionInfo ecInfo) {
+    ecInfo.getCompactors().forEach((q, c) -> {
+      var fetchedTime = ecInfo.getFetchedTimeMillis();
+      c.forEach(hp -> compactors.add(new CompactorInfo(fetchedTime, q, 
hp.toString())));
+    });
+    numCompactors = compactors.size();
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
new file mode 100644
index 0000000..45d10d8
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.accumulo.core.util.HostAndPort;
+
+public class CoordinatorInfo {
+
+  // Variable names become JSON keys
+  public long lastContact;
+  public String server;
+  public int numQueues;
+  public int numCompactors;
+
+  public CoordinatorInfo(Optional<HostAndPort> serverOpt, 
ExternalCompactionInfo ecInfo) {
+    server = serverOpt.map(HostAndPort::toString).orElse("none");
+    var queueToCompactors = ecInfo.getCompactors();
+    numQueues = queueToCompactors.size();
+    numCompactors = 
queueToCompactors.values().stream().mapToInt(List::size).sum();
+    lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis();
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
new file mode 100644
index 0000000..6c1a050
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+
+import org.apache.accumulo.monitor.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generate a new External compactions resource
+ *
+ * @since 2.1.0
+ */
+@Path("/ec")
+@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+public class ECResource {
+  private static Logger log = LoggerFactory.getLogger(ECResource.class);
+
+  @Inject
+  private Monitor monitor;
+
+  @GET
+  public CoordinatorInfo getCoordinator() {
+    var cc = monitor.getCompactorsInfo();
+    log.info("Got coordinator from monitor = {}", cc);
+    return new CoordinatorInfo(cc.getCoordinatorHost(), cc);
+  }
+
+  @Path("compactors")
+  @GET
+  public Compactors getCompactors() {
+    return new Compactors(monitor.getCompactorsInfo());
+  }
+
+  @Path("running")
+  @GET
+  public RunningCompactions getRunning() {
+    return new RunningCompactions(monitor.getRunningInfo());
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
new file mode 100644
index 0000000..3706f13
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.util.HostAndPort;
+
+/**
+ * Bag of everything going on with external compactions.
+ */
+public class ExternalCompactionInfo {
+
+  private Optional<HostAndPort> coordinatorHost;
+  private Map<String,List<HostAndPort>> compactors = new HashMap<>();
+  private long fetchedTimeMillis;
+
+  public void setCoordinatorHost(Optional<HostAndPort> coordinatorHost) {
+    this.coordinatorHost = coordinatorHost;
+  }
+
+  public Optional<HostAndPort> getCoordinatorHost() {
+    return coordinatorHost;
+  }
+
+  public Map<String,List<HostAndPort>> getCompactors() {
+    return compactors;
+  }
+
+  public void setCompactors(Map<String,List<HostAndPort>> compactors) {
+    this.compactors = compactors;
+  }
+
+  public long getFetchedTimeMillis() {
+    return fetchedTimeMillis;
+  }
+
+  public void setFetchedTimeMillis(long fetchedTimeMillis) {
+    this.fetchedTimeMillis = fetchedTimeMillis;
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
new file mode 100644
index 0000000..294b91c
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+
+public class RunningCompactions {
+
+  public final List<RunningCompactorInfo> running = new ArrayList<>();
+
+  public RunningCompactions(Map<String,TExternalCompaction> rMap) {
+    if (rMap != null) {
+      var fetchedTime = System.currentTimeMillis();
+      for (var entry : rMap.entrySet()) {
+        running.add(new RunningCompactorInfo(fetchedTime, entry.getKey(), 
entry.getValue()));
+      }
+    }
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
new file mode 100644
index 0000000..711c135
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.tabletserver.thrift.InputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RunningCompactorInfo extends CompactorInfo {
+  private static Logger log = 
LoggerFactory.getLogger(RunningCompactorInfo.class);
+
+  // Variable names become JSON keys
+  public String ecid;
+  public String kind;
+  public String tableId;
+  public List<CompactionInputFile> inputFiles;
+  public int numFiles;
+  public String outputFile;
+  public float progress = 0f;
+  public long duration;
+  public String status;
+  public long lastUpdate;
+
+  public RunningCompactorInfo(long fetchedTime, String ecid, 
TExternalCompaction ec) {
+    super(fetchedTime, ec.getQueueName(), ec.getCompactor());
+    this.ecid = ecid;
+    var updates = ec.getUpdates();
+    var job = ec.getJob();
+    kind = job.getKind().name();
+    tableId = KeyExtent.fromThrift(job.extent).tableId().canonical();
+    inputFiles = convertInputFiles(job.files);
+    numFiles = inputFiles.size();
+    outputFile = job.outputFile;
+    updateProgress(updates);
+    log.debug("Parsed running compaction {} for {} with progress = {}%", 
status, ecid, progress);
+  }
+
+  private List<CompactionInputFile> convertInputFiles(List<InputFile> files) {
+    List<CompactionInputFile> list = new ArrayList<>();
+    files.forEach(f -> list
+        .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, 
f.timestamp)));
+    // sorted largest to smallest
+    list.sort((o1, o2) -> Long.compare(o2.size, o1.size));
+    return list;
+  }
+
+  /**
+   * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted 
of the last update.
+   * Also update the status.
+   */
+  private void updateProgress(Map<Long,TCompactionStatusUpdate> updates) {
+    if (updates.isEmpty()) {
+      progress = 0f;
+      status = "na";
+    }
+    long nowMillis = System.currentTimeMillis();
+    long startedMillis = nowMillis;
+    long updateMillis;
+    TCompactionStatusUpdate last;
+
+    // sort updates by key, which is a timestamp
+    TreeMap<Long,TCompactionStatusUpdate> sorted = new TreeMap<>(updates);
+    var firstEntry = sorted.firstEntry();
+    var lastEntry = sorted.lastEntry();
+    if (firstEntry != null) {
+      startedMillis = firstEntry.getKey();
+    }
+    duration = nowMillis - startedMillis;
+    long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(duration);
+    if (durationMinutes > 15) {
+      log.warn("Compaction {} has been running for {} minutes", ecid, 
durationMinutes);
+    }
+
+    // last entry is all we care about so bail if null
+    if (lastEntry != null) {
+      last = lastEntry.getValue();
+      updateMillis = lastEntry.getKey();
+    } else {
+      log.debug("No updates found for {}", ecid);
+      return;
+    }
+
+    long sinceLastUpdateSeconds = TimeUnit.MILLISECONDS.toSeconds(nowMillis - 
updateMillis);
+    log.debug("Time since Last update {} - {} = {} seconds", nowMillis, 
updateMillis,
+        sinceLastUpdateSeconds);
+    if (sinceLastUpdateSeconds > 30) {
+      log.debug("Compaction hasn't progressed from {} in {} seconds.", 
progress,
+          sinceLastUpdateSeconds);
+    }
+
+    float percent;
+    var total = last.getEntriesToBeCompacted();
+    if (total <= 0) {
+      percent = 0f;
+    } else {
+      percent = (last.getEntriesRead() / (float) total) * 100;
+    }
+
+    lastUpdate = nowMillis - updateMillis;
+    status = last.state.name();
+    progress = percent;
+  }
+
+  @Override
+  public String toString() {
+    return ecid + ": " + status + " progress: " + progress;
+  }
+}
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java
index dbbb6dc..c545da2 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java
@@ -210,6 +210,31 @@ public class WebViews {
   }
 
   /**
+   * Returns the compactions template
+   *
+   * @return Scans model
+   */
+  @GET
+  @Path("ec")
+  @Template(name = "/default.ftl")
+  public Map<String,Object> getExternalCompactions() {
+    var ccHost = monitor.getCoordinatorHost();
+
+    Map<String,Object> model = getModel();
+    model.put("title", "External Compactions");
+    model.put("template", "ec.ftl");
+
+    if (ccHost.isPresent()) {
+      model.put("coordinatorRunning", true);
+      model.put("js", "ec.js");
+    } else {
+      model.put("coordinatorRunning", false);
+    }
+
+    return model;
+  }
+
+  /**
    * Returns the bulk import template
    *
    * @return Bulk Import model
diff --git 
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
 
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
new file mode 100644
index 0000000..201edbd
--- /dev/null
+++ 
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ var coordinatorTable;
+ var compactorsTable;
+ var compactorsTableData;
+ var runningTable;
+ var runningTableData;
+
+ /**
+  * Creates active compactions table
+  */
+ $(document).ready(function() {
+    compactorsTable = $('#compactorsTable').DataTable({
+           "ajax": {
+               "url": '/rest/ec/compactors',
+               "dataSrc": "compactors"
+           },
+           "stateSave": true,
+           "dom": 't<"align-left"l>p',
+           "columnDefs": [
+              { "targets": "duration",
+                "render": function ( data, type, row ) {
+                  if(type === 'display') data = timeDuration(data);
+                  return data;
+                }
+              },
+              { "targets": "date",
+                  "render": function ( data, type, row ) {
+                    if(type === 'display') data = dateFormat(data);
+                    return data;
+                  }
+                }
+            ],
+           "columns": [
+             { "data": "server" },
+             { "data": "queueName"},
+             { "data": "lastContact"}
+           ]
+         });
+
+     // Create a table for compactors
+     runningTable = $('#runningTable').DataTable({
+       "ajax": {
+            "url": '/rest/ec/running',
+            "dataSrc": "running"
+       },
+       "stateSave": true,
+       "dom": 't<"align-left"l>p',
+       "columnDefs": [
+           { "targets": "duration",
+             "render": function ( data, type, row ) {
+               if(type === 'display') data = timeDuration(data);
+               return data;
+             }
+           },
+           { "targets": "date",
+               "render": function ( data, type, row ) {
+                 if(type === 'display') data = dateFormat(data);
+                 return data;
+               }
+             }
+         ],
+       "columns": [
+         { "data": "server" },
+         { "data": "kind" },
+         { "data": "status" },
+         { "data": "queueName" },
+         { "data": "tableId" },
+         { "data": "numFiles" },
+         { "data": "progress",
+           "type": "html",
+           "render": function ( data, type, row, meta ) {
+              if(type === 'display') {
+                  if (row.progress < 0) {
+                    data = '--';
+                  } else {
+                    var p = Math.round(Number(row.progress));
+                    console.log("Compaction progress = %" + p);
+                    data = '<div class="progress"><div class="progress-bar" 
role="progressbar" style="min-width: 2em; width:' +
+                         p + '%;">' + p + '%</div></div>';
+                  }
+              }
+              return data;
+            }
+         },
+         { "data": "lastUpdate"},
+         { "data": "duration"},
+         { // more column settings
+            "class":          "details-control",
+            "orderable":      false,
+            "data":           null,
+            "defaultContent": ""
+         }
+       ]
+     });
+
+     // Array to track the ids of the details displayed rows
+       var detailRows = [];
+       $("#runningTable tbody").on( 'click', 'tr td.details-control', function 
() {
+         var tr = $(this).closest('tr');
+         var row = runningTable.row( tr );
+         var idx = $.inArray( tr.attr('id'), detailRows );
+
+         if ( row.child.isShown() ) {
+             tr.removeClass( 'details' );
+             row.child.hide();
+
+             // Remove from the 'open' array
+             detailRows.splice( idx, 1 );
+         }
+         else {
+             var rci = row.data();
+             tr.addClass( 'details' );
+             // put all the information into html for a single row
+             var htmlRow = "<table class='table table-bordered table-striped 
table-condensed'>"
+             htmlRow += "<thead><tr><th>#</th><th>Input 
Files</th><th>Size</th><th>Entries</th></tr></thead>";
+             $.each( rci.inputFiles, function( key, value ) {
+               htmlRow += "<tr><td>" + key + "</td>";
+               htmlRow += "<td>" + value.metadataFileEntry + "</td>";
+               htmlRow += "<td>" + bigNumberForSize(value.size) + "</td>";
+               htmlRow += "<td>" + bigNumberForQuantity(value.entries) + 
"</td></tr>";
+             });
+             htmlRow += "</table>";
+             htmlRow += "Output File: " + rci.outputFile + "<br>";
+             htmlRow += rci.ecid;
+             row.child(htmlRow).show();
+
+             // Add to the 'open' array
+             if ( idx === -1 ) {
+                 detailRows.push( tr.attr('id') );
+             }
+         }
+       });
+     refreshECTables();
+ });
+
+ /**
+  * Used to redraw the page
+  */
+ function refresh() {
+   refreshECTables();
+ }
+
+ /**
+  * Generates the compactions table
+  */
+ function refreshECTables() {
+   getCompactionCoordinator();
+   var ecInfo = JSON.parse(sessionStorage.ecInfo);
+   var ccAddress = ecInfo.server;
+   var numCompactors = ecInfo.numCompactors;
+   var lastContactTime = timeDuration(ecInfo.lastContact);
+   console.log("compaction coordinator = " + ccAddress);
+   console.log("numCompactors = " + numCompactors);
+   $('#ccHostname').text(ccAddress);
+   $('#ccNumQueues').text(ecInfo.numQueues);
+   $('#ccNumCompactors').text(numCompactors);
+   $('#ccLastContact').html(lastContactTime);
+
+   // user paging is not reset on reload
+   if(compactorsTable) compactorsTable.ajax.reload(null, false );
+   if(runningTable) runningTable.ajax.reload(null, false );
+ }
+
+ /**
+  * Get address of the compaction coordinator info
+  */
+ function getCompactionCoordinator() {
+   $.getJSON('/rest/ec', function(data) {
+        sessionStorage.ecInfo = JSON.stringify(data);
+   });
+ }
+
+ function refreshCompactors() {
+   console.log("Refresh compactors table.");
+   // user paging is not reset on reload
+   if(compactorsTable) compactorsTable.ajax.reload(null, false );
+ }
+
+ function refreshRunning() {
+   console.log("Refresh running compactions table.");
+   // user paging is not reset on reload
+   if(runningTable) runningTable.ajax.reload(null, false );
+ }
diff --git 
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl
 
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl
new file mode 100644
index 0000000..ec95a68
--- /dev/null
+++ 
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl
@@ -0,0 +1,80 @@
+<#--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+      <div class="row">
+        <div class="col-xs-12">
+          <h3>${title}</h3>
+        </div>
+      </div>
+    <#if coordinatorRunning == true>
+    <div id="ecDiv">
+      <div class="row">
+        <div class="col-xs-12">
+          <div class="panel panel-primary">
+            <div 
class="panel-heading">Compaction&nbsp;Coordinator&nbsp;running&nbsp;on:&nbsp;<span
 id="ccHostname" title="The hostname of the compaction coordinator 
server"></span></div>
+            <div class="panel-body">
+                Queues&nbsp;<span id="ccNumQueues" class="badge" title="Number 
of queues configured">0</span></span>&nbsp;&nbsp;&nbsp;&nbsp;
+                Compactors&nbsp;<span id="ccNumCompactors" class="badge" 
title="Number of compactors running">0</span>&nbsp;&nbsp;&nbsp;&nbsp;
+                Last&nbsp;Contact&nbsp;<span id="ccLastContact" class="badge" 
title="Last time data was fetched. Server fetches on refresh, at most every 
minute."></span>
+            </div>
+          </div>
+        </div>
+      </div>
+      <div class="row">
+      <div class="col-xs-12">
+        <table id="compactorsTable" class="table table-bordered table-striped 
table-condensed">
+          <caption><span class="table-caption">Compactors</span>
+          <a href="javascript:refreshCompactors();"><span class="glyphicon 
glyphicon-refresh"/></a></caption>
+          <thead>
+            <tr>
+              <th class="firstcell" title="The hostname the compactor is 
running on.">Server</th>
+              <th title="The name of the queue this compactor is 
assigned.">Queue</th>
+              <th class="duration" title="Last time data was fetched. Server 
fetches on refresh, at most every minute.">Last Contact</th>
+            </tr>
+          </thead>
+        </table>
+      </div>
+      <div class="row">
+          <div class="col-xs-12">
+            <table id="runningTable" class="table table-bordered table-striped 
table-condensed">
+              <caption><span class="table-caption">Running Compactions</span>
+              <a href="javascript:refreshRunning();"><span class="glyphicon 
glyphicon-refresh"/></a></caption>
+              <thead>
+                <tr>
+                  <th class="firstcell" title="The hostname the compactor is 
running on.">Server Hostname</th>
+                  <th title="The type of compaction.">Kind</th>
+                  <th title="The status returned by the last 
update.">Status</th>
+                  <th title="The name of the queue this compactor is 
assigned.">Queue</th>
+                  <th title="The ID of the table being compacted.">Table 
ID</th>
+                  <th title="The number of files being compacted."># of 
Files</th>
+                  <th title="The progress of the compaction." 
class="progBar">Progress</th>
+                  <th class="duration" title="The time of the last update for 
the compaction">Last Update</th>
+                  <th class="duration" title="How long compaction has been 
running">Duration</th>
+                  <th class="details-control">More</th>
+                </tr>
+              </thead>
+              <tbody></tbody>
+            </table>
+          </div>
+      </div>
+    </div>
+   <#else>
+    <div id="ccBanner"><div class="alert alert-danger" role="alert">Compaction 
Coordinator Not Running</div></div>
+  </#if>
diff --git 
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
 
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
index f180b6d..a9cca59 100644
--- 
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
+++ 
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
@@ -54,6 +54,7 @@
                 <li><a href="/compactions">Active&nbsp;Compactions</a></li>
                 <li><a href="/scans">Active&nbsp;Scans</a></li>
                 <li><a href="/bulkImports">Bulk&nbsp;Imports</a></li>
+                <li><a href="/ec">External&nbsp;Compactions</a></li>
                 <li><a href="/replication">Replication</a></li>
               </ul>
             </li>
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
new file mode 100644
index 0000000..305344c
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.compaction;
+
+import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import 
org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorInfo;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests that external compactions report progress from start to finish. To 
prevent flaky test
+ * failures, we only measure progress in quarter segments: STARTED, QUARTER, 
HALF, THREE_QUARTERS.
+ * We can detect if the compaction finished without errors but the coordinator 
will never report
+ * 100% progress since it will remove the ECID upon completion. The {@link 
SlowIterator} is used to
+ * control the length of time it takes to complete the compaction.
+ */
+public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
+  private static final Logger log = 
LoggerFactory.getLogger(ExternalCompactionProgressIT.class);
+  private static final int ROWS = 10_000;
+
+  enum EC_PROGRESS {
+    STARTED, QUARTER, HALF, THREE_QUARTERS
+  }
+
+  Map<String,RunningCompactorInfo> runningMap = new HashMap<>();
+  List<EC_PROGRESS> progressList = new ArrayList<>();
+
+  private final AtomicBoolean compactionFinished = new AtomicBoolean(false);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+    ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+  }
+
+  @Test
+  public void testProgress() throws Exception {
+    MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
+    String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client =
+        Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
+      ExternalCompactionTestUtils.createTable(client, table1, "cs1");
+      ExternalCompactionTestUtils.writeData(client, table1, ROWS);
+      c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, 
"-q", "DCQ1");
+      coord = 
ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) 
getCluster()),
+          CompactionCoordinator.class, getCluster().getServerContext());
+
+      Thread checkerThread = startChecker();
+      checkerThread.start();
+
+      IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
+      SlowIterator.setSleepTime(setting, 1);
+      client.tableOperations().attachIterator(table1, setting,
+          EnumSet.of(IteratorUtil.IteratorScope.majc));
+      log.info("Compacting table");
+      ExternalCompactionTestUtils.compact(client, table1, 2, "DCQ1", true);
+      ExternalCompactionTestUtils.verify(client, table1, 2, ROWS);
+
+      log.info("Done Compacting table");
+      compactionFinished.set(true);
+      checkerThread.join();
+
+      verifyProgress();
+    } finally {
+      ExternalCompactionTestUtils.stopProcesses(c1, coord);
+    }
+  }
+
+  public Thread startChecker() {
+    return Threads.createThread("RC checker", () -> {
+      try {
+        while (!compactionFinished.get()) {
+          checkRunning();
+          sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
+        }
+      } catch (TException e) {
+        log.warn("{}", e.getMessage(), e);
+      }
+    });
+  }
+
+  /**
+   * Check running compaction progress.
+   */
+  private void checkRunning() throws TException {
+    var ecList = 
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
+    var ecMap = ecList.getCompactions();
+    if (ecMap != null) {
+      ecMap.forEach((ecid, ec) -> {
+        // returns null if it's a new mapping
+        RunningCompactorInfo rci = new 
RunningCompactorInfo(System.currentTimeMillis(), ecid, ec);
+        RunningCompactorInfo previousRci = runningMap.put(ecid, rci);
+        if (previousRci == null) {
+          log.debug("New ECID {} with inputFiles: {}", ecid, rci.inputFiles);
+        } else {
+          if (rci.progress <= previousRci.progress) {
+            log.warn("{} did not progress. It went from {} to {}", ecid, 
previousRci.progress,
+                rci.progress);
+          } else {
+            log.debug("{} progressed from {} to {}", ecid, 
previousRci.progress, rci.progress);
+            if (rci.progress > 0 && rci.progress <= 25)
+              progressList.add(EC_PROGRESS.STARTED);
+            else if (rci.progress > 25 && rci.progress <= 50)
+              progressList.add(EC_PROGRESS.QUARTER);
+            else if (rci.progress > 50 && rci.progress <= 75)
+              progressList.add(EC_PROGRESS.HALF);
+            else if (rci.progress > 75 && rci.progress <= 100)
+              progressList.add(EC_PROGRESS.THREE_QUARTERS);
+          }
+          if (!rci.status.equals(TCompactionState.IN_PROGRESS.name())) {
+            log.debug("Saw status other than IN_PROGRESS: {}", rci.status);
+          }
+        }
+      });
+    }
+  }
+
+  private void verifyProgress() {
+    log.info("Verify Progress.");
+    assertTrue("Missing start of progress", 
progressList.contains(EC_PROGRESS.STARTED));
+    assertTrue("Missing quarter progress", 
progressList.contains(EC_PROGRESS.QUARTER));
+    assertTrue("Missing half progress", 
progressList.contains(EC_PROGRESS.HALF));
+    assertTrue("Missing three quarters progress",
+        progressList.contains(EC_PROGRESS.THREE_QUARTERS));
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index 7f4199a..509704e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -41,7 +42,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
@@ -76,6 +76,7 @@ import 
org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.TestFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,11 +145,10 @@ public class ExternalCompactionTestUtils {
 
   }
 
-  public static void writeData(AccumuloClient client, String table1)
-      throws MutationsRejectedException, TableNotFoundException, 
AccumuloException,
-      AccumuloSecurityException {
+  public static void writeData(AccumuloClient client, String table1, int rows)
+      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
     try (BatchWriter bw = client.createBatchWriter(table1)) {
-      for (int i = 0; i < MAX_DATA; i++) {
+      for (int i = 0; i < rows; i++) {
         Mutation m = new Mutation(row(i));
         m.put("", "", "" + i);
         bw.addMutation(m);
@@ -158,8 +158,18 @@ public class ExternalCompactionTestUtils {
     client.tableOperations().flush(table1);
   }
 
+  public static void writeData(AccumuloClient client, String table1)
+      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
+    writeData(client, table1, MAX_DATA);
+  }
+
   public static void verify(AccumuloClient client, String table1, int modulus)
       throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
+    verify(client, table1, modulus, MAX_DATA);
+  }
+
+  public static void verify(AccumuloClient client, String table1, int modulus, 
int rows)
+      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
     try (Scanner scanner = client.createScanner(table1)) {
       int count = 0;
       for (Entry<Key,Value> entry : scanner) {
@@ -169,7 +179,7 @@ public class ExternalCompactionTestUtils {
       }
 
       int expectedCount = 0;
-      for (int i = 0; i < MAX_DATA; i++) {
+      for (int i = 0; i < rows; i++) {
         if (i % modulus == 0)
           expectedCount++;
       }
@@ -237,13 +247,14 @@ public class ExternalCompactionTestUtils {
   }
 
   public static TExternalCompactionList getRunningCompactions(ClientContext 
context)
-      throws Exception {
-    HostAndPort coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(context);
-    if (null == coordinatorHost) {
+      throws TException {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
       throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
     }
-    CompactionCoordinatorService.Client client = ThriftUtil
-        .getClient(new CompactionCoordinatorService.Client.Factory(), 
coordinatorHost, context);
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), 
coordinatorHost.get(), context);
     try {
       TExternalCompactionList running =
           client.getRunningCompactions(TraceUtil.traceInfo(), 
context.rpcCreds());
@@ -255,12 +266,13 @@ public class ExternalCompactionTestUtils {
 
   private static TExternalCompactionList getCompletedCompactions(ClientContext 
context)
       throws Exception {
-    HostAndPort coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(context);
-    if (null == coordinatorHost) {
+    Optional<HostAndPort> coordinatorHost =
+        ExternalCompactionUtil.findCompactionCoordinator(context);
+    if (coordinatorHost.isEmpty()) {
       throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
     }
-    CompactionCoordinatorService.Client client = ThriftUtil
-        .getClient(new CompactionCoordinatorService.Client.Factory(), 
coordinatorHost, context);
+    CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+        new CompactionCoordinatorService.Client.Factory(), 
coordinatorHost.get(), context);
     try {
       TExternalCompactionList completed =
           client.getCompletedCompactions(TraceUtil.traceInfo(), 
context.rpcCreds());

Reply via email to