This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 2bce893 Create page in Monitor for external compactions (#2358) 2bce893 is described below commit 2bce8939145b49a765f998f5dbe2b6242a64e3e6 Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri Nov 19 10:42:48 2021 -0500 Create page in Monitor for external compactions (#2358) * Create multiple new classes for displaying 3 different tables of data in the new external compaction page in the monitor * Create 3 new ajax endpoints in ECResource * Modify Compactor and ExternalCompactionUtil to return Optional for the compaction coordinator instead of null * Add check for compaction coordinator to Monitor.fetchData() * New ExternalCompactionProgressIT for testing progress * Use new bootstrap panel and badges for coordinator info * Closes #2290 Co-authored-by: Dom G. <47725857+domgargu...@users.noreply.github.com> --- .../util/compaction/ExternalCompactionUtil.java | 10 +- .../org/apache/accumulo/compactor/Compactor.java | 8 +- .../java/org/apache/accumulo/monitor/Monitor.java | 89 +++++++++ .../compactions/external/CompactionInputFile.java | 38 ++++ .../rest/compactions/external/CompactorInfo.java | 33 ++++ .../rest/compactions/external/Compactors.java | 40 ++++ .../rest/compactions/external/CoordinatorInfo.java | 41 +++++ .../rest/compactions/external/ECResource.java | 62 +++++++ .../external/ExternalCompactionInfo.java | 60 ++++++ .../compactions/external/RunningCompactions.java | 39 ++++ .../compactions/external/RunningCompactorInfo.java | 133 ++++++++++++++ .../org/apache/accumulo/monitor/view/WebViews.java | 25 +++ .../org/apache/accumulo/monitor/resources/js/ec.js | 201 +++++++++++++++++++++ .../org/apache/accumulo/monitor/templates/ec.ftl | 80 ++++++++ .../apache/accumulo/monitor/templates/navbar.ftl | 1 + .../compaction/ExternalCompactionProgressIT.java | 166 +++++++++++++++++ .../compaction/ExternalCompactionTestUtils.java | 42 +++-- 17 files changed, 1044 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index a5538b6..24dff44 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -93,19 +94,18 @@ public class ExternalCompactionUtil { /** * - * @return null if Coordinator node not found, else HostAndPort + * @return Optional HostAndPort of Coordinator node if found */ - public static HostAndPort findCompactionCoordinator(ClientContext context) { + public static Optional<HostAndPort> findCompactionCoordinator(ClientContext context) { final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; try { var zk = ZooSession.getAnonymousSession(context.getZooKeepers(), context.getZooKeepersSessionTimeOut()); byte[] address = ServiceLock.getLockData(zk, ServiceLock.path(lockPath)); if (null == address) { - return null; + return Optional.empty(); } - String coordinatorAddress = new String(address); - return HostAndPort.fromString(coordinatorAddress); + return Optional.of(HostAndPort.fromString(new String(address))); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 898177e..6565280 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -474,12 +474,12 @@ public class Compactor extends AbstractServer implements CompactorService.Iface * when unable to get client */ protected CompactionCoordinatorService.Client getCoordinatorClient() throws TTransportException { - HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext()); - if (null == coordinatorHost) { + var coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext()); + if (coordinatorHost.isEmpty()) { throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); } - LOG.trace("CompactionCoordinator address is: {}", coordinatorHost); - return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, coordinatorHost, getContext()); + LOG.trace("CompactionCoordinator address is: {}", coordinatorHost.get()); + return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, coordinatorHost.get(), getContext()); } /** diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 3fca610..dcd7b37 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -34,6 +34,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -44,6 +45,9 @@ import jakarta.inject.Singleton; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.ManagerClient; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.gc.thrift.GCMonitorService; @@ -62,12 +66,14 @@ import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.zookeeper.ServiceLock; import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; import org.apache.accumulo.monitor.util.logging.RecentLogs; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.HighlyAvailableService; @@ -163,6 +169,12 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { private Map<TableId,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap(); private Exception problemException; private GCStatus gcStatus; + private Optional<HostAndPort> coordinatorHost = Optional.empty(); + private CompactionCoordinatorService.Client coordinatorClient; + private final String coordinatorMissingMsg = + "Error getting the compaction coordinator. Check that it is running. It is not " + + "started automatically with other cluster processes so must be started by running " + + "'accumulo compaction-coordinator'."; private EmbeddedWebServer server; @@ -365,7 +377,17 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { this.problemException = e; } + if (coordinatorHost.isEmpty()) { + coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); + } else { + log.info("External Compaction Coordinator found at {}", coordinatorHost.get()); + } + } finally { + if (coordinatorClient != null) { + ThriftUtil.returnClient(coordinatorClient, context); + coordinatorClient = null; + } lastRecalc.set(currentTime); // stop fetching; log an error if this thread wasn't already fetching if (!fetching.compareAndSet(true, false)) { @@ -564,8 +586,11 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { private final Map<HostAndPort,ScanStats> allScans = new HashMap<>(); private final Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>(); private final RecentLogs recentLogs = new RecentLogs(); + private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + private final Map<String,TExternalCompaction> ecRunningMap = new HashMap<>(); private long scansFetchedNanos = 0L; private long compactsFetchedNanos = 0L; + private long ecInfoFetchedNanos = 0L; private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1); private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15); @@ -591,6 +616,67 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { return Map.copyOf(allCompactions); } + public synchronized ExternalCompactionInfo getCompactorsInfo() { + if (coordinatorHost.isEmpty()) { + throw new IllegalStateException("Tried fetching from compaction coordinator that's missing"); + } + if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) { + log.info("User initiated fetch of External Compaction info"); + Map<String,List<HostAndPort>> compactors = + ExternalCompactionUtil.getCompactorAddrs(getContext()); + log.debug("Found compactors: " + compactors); + ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); + ecInfo.setCompactors(compactors); + ecInfo.setCoordinatorHost(coordinatorHost); + + ecInfoFetchedNanos = System.nanoTime(); + } + return ecInfo; + } + + /** + * Fetch running compactions from Compaction Coordinator. Chose not to restrict the frequency of + * user fetches since RPC calls are going to the coordinator. This allows for fine grain updates + * of external compaction progress. + */ + public synchronized Map<String,TExternalCompaction> getRunningInfo() { + if (coordinatorHost.isEmpty()) { + throw new IllegalStateException(coordinatorMissingMsg); + } + var ccHost = coordinatorHost.get(); + log.info("User initiated fetch of running External Compactions from " + ccHost); + var client = getCoordinator(ccHost); + TExternalCompactionList running; + try { + running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); + } catch (Exception e) { + throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); + } + + ecRunningMap.clear(); + if (running.getCompactions() != null) { + running.getCompactions().forEach((queue, ec) -> { + log.trace("Found Compactions running on queue {} -> {}", queue, ec); + ecRunningMap.put(queue, ec); + }); + } + + return ecRunningMap; + } + + private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) { + if (coordinatorClient == null) { + try { + coordinatorClient = ThriftUtil.getClient(new CompactionCoordinatorService.Client.Factory(), + address, getContext()); + } catch (Exception e) { + log.error("Unable to get Compaction coordinator at {}", address); + throw new IllegalStateException(coordinatorMissingMsg, e); + } + } + return coordinatorClient; + } + private void fetchScans() { ServerContext context = getContext(); for (String server : context.instanceOperations().getTabletServers()) { @@ -869,4 +955,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { return recentLogs; } + public Optional<HostAndPort> getCoordinatorHost() { + return coordinatorHost; + } } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java new file mode 100644 index 0000000..4ae5e23 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +/** + * Class for displaying input files + */ +public class CompactionInputFile { + + // Variable names become JSON keys + public String metadataFileEntry; + public long size; + public long entries; + public long timestamp; + + public CompactionInputFile(String metadataFileEntry, long size, long entries, long timestamp) { + this.metadataFileEntry = metadataFileEntry; + this.size = size; + this.entries = entries; + this.timestamp = timestamp; + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java new file mode 100644 index 0000000..1363ece --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +public class CompactorInfo { + + // Variable names become JSON keys + public long lastContact; + public String server; + public String queueName; + + public CompactorInfo(long fetchedTimeMillis, String queue, String hostAndPort) { + lastContact = System.currentTimeMillis() - fetchedTimeMillis; + queueName = queue; + server = hostAndPort; + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java new file mode 100644 index 0000000..089368e --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +import java.util.ArrayList; +import java.util.List; + +/** + * JSON Object for displaying External Compactions. Variable names become JSON Keys. + */ +public class Compactors { + + // Variable names become JSON keys + public final int numCompactors; + public final List<CompactorInfo> compactors = new ArrayList<>(); + + public Compactors(ExternalCompactionInfo ecInfo) { + ecInfo.getCompactors().forEach((q, c) -> { + var fetchedTime = ecInfo.getFetchedTimeMillis(); + c.forEach(hp -> compactors.add(new CompactorInfo(fetchedTime, q, hp.toString()))); + }); + numCompactors = compactors.size(); + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java new file mode 100644 index 0000000..45d10d8 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +import java.util.List; +import java.util.Optional; + +import org.apache.accumulo.core.util.HostAndPort; + +public class CoordinatorInfo { + + // Variable names become JSON keys + public long lastContact; + public String server; + public int numQueues; + public int numCompactors; + + public CoordinatorInfo(Optional<HostAndPort> serverOpt, ExternalCompactionInfo ecInfo) { + server = serverOpt.map(HostAndPort::toString).orElse("none"); + var queueToCompactors = ecInfo.getCompactors(); + numQueues = queueToCompactors.size(); + numCompactors = queueToCompactors.values().stream().mapToInt(List::size).sum(); + lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis(); + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java new file mode 100644 index 0000000..6c1a050 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +import org.apache.accumulo.monitor.Monitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generate a new External compactions resource + * + * @since 2.1.0 + */ +@Path("/ec") +@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) +public class ECResource { + private static Logger log = LoggerFactory.getLogger(ECResource.class); + + @Inject + private Monitor monitor; + + @GET + public CoordinatorInfo getCoordinator() { + var cc = monitor.getCompactorsInfo(); + log.info("Got coordinator from monitor = {}", cc); + return new CoordinatorInfo(cc.getCoordinatorHost(), cc); + } + + @Path("compactors") + @GET + public Compactors getCompactors() { + return new Compactors(monitor.getCompactorsInfo()); + } + + @Path("running") + @GET + public RunningCompactions getRunning() { + return new RunningCompactions(monitor.getRunningInfo()); + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java new file mode 100644 index 0000000..3706f13 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.util.HostAndPort; + +/** + * Bag of everything going on with external compactions. + */ +public class ExternalCompactionInfo { + + private Optional<HostAndPort> coordinatorHost; + private Map<String,List<HostAndPort>> compactors = new HashMap<>(); + private long fetchedTimeMillis; + + public void setCoordinatorHost(Optional<HostAndPort> coordinatorHost) { + this.coordinatorHost = coordinatorHost; + } + + public Optional<HostAndPort> getCoordinatorHost() { + return coordinatorHost; + } + + public Map<String,List<HostAndPort>> getCompactors() { + return compactors; + } + + public void setCompactors(Map<String,List<HostAndPort>> compactors) { + this.compactors = compactors; + } + + public long getFetchedTimeMillis() { + return fetchedTimeMillis; + } + + public void setFetchedTimeMillis(long fetchedTimeMillis) { + this.fetchedTimeMillis = fetchedTimeMillis; + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java new file mode 100644 index 0000000..294b91c --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; + +public class RunningCompactions { + + public final List<RunningCompactorInfo> running = new ArrayList<>(); + + public RunningCompactions(Map<String,TExternalCompaction> rMap) { + if (rMap != null) { + var fetchedTime = System.currentTimeMillis(); + for (var entry : rMap.entrySet()) { + running.add(new RunningCompactorInfo(fetchedTime, entry.getKey(), entry.getValue())); + } + } + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java new file mode 100644 index 0000000..711c135 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.tabletserver.thrift.InputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RunningCompactorInfo extends CompactorInfo { + private static Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class); + + // Variable names become JSON keys + public String ecid; + public String kind; + public String tableId; + public List<CompactionInputFile> inputFiles; + public int numFiles; + public String outputFile; + public float progress = 0f; + public long duration; + public String status; + public long lastUpdate; + + public RunningCompactorInfo(long fetchedTime, String ecid, TExternalCompaction ec) { + super(fetchedTime, ec.getQueueName(), ec.getCompactor()); + this.ecid = ecid; + var updates = ec.getUpdates(); + var job = ec.getJob(); + kind = job.getKind().name(); + tableId = KeyExtent.fromThrift(job.extent).tableId().canonical(); + inputFiles = convertInputFiles(job.files); + numFiles = inputFiles.size(); + outputFile = job.outputFile; + updateProgress(updates); + log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress); + } + + private List<CompactionInputFile> convertInputFiles(List<InputFile> files) { + List<CompactionInputFile> list = new ArrayList<>(); + files.forEach(f -> list + .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, f.timestamp))); + // sorted largest to smallest + list.sort((o1, o2) -> Long.compare(o2.size, o1.size)); + return list; + } + + /** + * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted of the last update. + * Also update the status. + */ + private void updateProgress(Map<Long,TCompactionStatusUpdate> updates) { + if (updates.isEmpty()) { + progress = 0f; + status = "na"; + } + long nowMillis = System.currentTimeMillis(); + long startedMillis = nowMillis; + long updateMillis; + TCompactionStatusUpdate last; + + // sort updates by key, which is a timestamp + TreeMap<Long,TCompactionStatusUpdate> sorted = new TreeMap<>(updates); + var firstEntry = sorted.firstEntry(); + var lastEntry = sorted.lastEntry(); + if (firstEntry != null) { + startedMillis = firstEntry.getKey(); + } + duration = nowMillis - startedMillis; + long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(duration); + if (durationMinutes > 15) { + log.warn("Compaction {} has been running for {} minutes", ecid, durationMinutes); + } + + // last entry is all we care about so bail if null + if (lastEntry != null) { + last = lastEntry.getValue(); + updateMillis = lastEntry.getKey(); + } else { + log.debug("No updates found for {}", ecid); + return; + } + + long sinceLastUpdateSeconds = TimeUnit.MILLISECONDS.toSeconds(nowMillis - updateMillis); + log.debug("Time since Last update {} - {} = {} seconds", nowMillis, updateMillis, + sinceLastUpdateSeconds); + if (sinceLastUpdateSeconds > 30) { + log.debug("Compaction hasn't progressed from {} in {} seconds.", progress, + sinceLastUpdateSeconds); + } + + float percent; + var total = last.getEntriesToBeCompacted(); + if (total <= 0) { + percent = 0f; + } else { + percent = (last.getEntriesRead() / (float) total) * 100; + } + + lastUpdate = nowMillis - updateMillis; + status = last.state.name(); + progress = percent; + } + + @Override + public String toString() { + return ecid + ": " + status + " progress: " + progress; + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java index dbbb6dc..c545da2 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java @@ -210,6 +210,31 @@ public class WebViews { } /** + * Returns the compactions template + * + * @return Scans model + */ + @GET + @Path("ec") + @Template(name = "/default.ftl") + public Map<String,Object> getExternalCompactions() { + var ccHost = monitor.getCoordinatorHost(); + + Map<String,Object> model = getModel(); + model.put("title", "External Compactions"); + model.put("template", "ec.ftl"); + + if (ccHost.isPresent()) { + model.put("coordinatorRunning", true); + model.put("js", "ec.js"); + } else { + model.put("coordinatorRunning", false); + } + + return model; + } + + /** * Returns the bulk import template * * @return Bulk Import model diff --git a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js new file mode 100644 index 0000000..201edbd --- /dev/null +++ b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + var coordinatorTable; + var compactorsTable; + var compactorsTableData; + var runningTable; + var runningTableData; + + /** + * Creates active compactions table + */ + $(document).ready(function() { + compactorsTable = $('#compactorsTable').DataTable({ + "ajax": { + "url": '/rest/ec/compactors', + "dataSrc": "compactors" + }, + "stateSave": true, + "dom": 't<"align-left"l>p', + "columnDefs": [ + { "targets": "duration", + "render": function ( data, type, row ) { + if(type === 'display') data = timeDuration(data); + return data; + } + }, + { "targets": "date", + "render": function ( data, type, row ) { + if(type === 'display') data = dateFormat(data); + return data; + } + } + ], + "columns": [ + { "data": "server" }, + { "data": "queueName"}, + { "data": "lastContact"} + ] + }); + + // Create a table for compactors + runningTable = $('#runningTable').DataTable({ + "ajax": { + "url": '/rest/ec/running', + "dataSrc": "running" + }, + "stateSave": true, + "dom": 't<"align-left"l>p', + "columnDefs": [ + { "targets": "duration", + "render": function ( data, type, row ) { + if(type === 'display') data = timeDuration(data); + return data; + } + }, + { "targets": "date", + "render": function ( data, type, row ) { + if(type === 'display') data = dateFormat(data); + return data; + } + } + ], + "columns": [ + { "data": "server" }, + { "data": "kind" }, + { "data": "status" }, + { "data": "queueName" }, + { "data": "tableId" }, + { "data": "numFiles" }, + { "data": "progress", + "type": "html", + "render": function ( data, type, row, meta ) { + if(type === 'display') { + if (row.progress < 0) { + data = '--'; + } else { + var p = Math.round(Number(row.progress)); + console.log("Compaction progress = %" + p); + data = '<div class="progress"><div class="progress-bar" role="progressbar" style="min-width: 2em; width:' + + p + '%;">' + p + '%</div></div>'; + } + } + return data; + } + }, + { "data": "lastUpdate"}, + { "data": "duration"}, + { // more column settings + "class": "details-control", + "orderable": false, + "data": null, + "defaultContent": "" + } + ] + }); + + // Array to track the ids of the details displayed rows + var detailRows = []; + $("#runningTable tbody").on( 'click', 'tr td.details-control', function () { + var tr = $(this).closest('tr'); + var row = runningTable.row( tr ); + var idx = $.inArray( tr.attr('id'), detailRows ); + + if ( row.child.isShown() ) { + tr.removeClass( 'details' ); + row.child.hide(); + + // Remove from the 'open' array + detailRows.splice( idx, 1 ); + } + else { + var rci = row.data(); + tr.addClass( 'details' ); + // put all the information into html for a single row + var htmlRow = "<table class='table table-bordered table-striped table-condensed'>" + htmlRow += "<thead><tr><th>#</th><th>Input Files</th><th>Size</th><th>Entries</th></tr></thead>"; + $.each( rci.inputFiles, function( key, value ) { + htmlRow += "<tr><td>" + key + "</td>"; + htmlRow += "<td>" + value.metadataFileEntry + "</td>"; + htmlRow += "<td>" + bigNumberForSize(value.size) + "</td>"; + htmlRow += "<td>" + bigNumberForQuantity(value.entries) + "</td></tr>"; + }); + htmlRow += "</table>"; + htmlRow += "Output File: " + rci.outputFile + "<br>"; + htmlRow += rci.ecid; + row.child(htmlRow).show(); + + // Add to the 'open' array + if ( idx === -1 ) { + detailRows.push( tr.attr('id') ); + } + } + }); + refreshECTables(); + }); + + /** + * Used to redraw the page + */ + function refresh() { + refreshECTables(); + } + + /** + * Generates the compactions table + */ + function refreshECTables() { + getCompactionCoordinator(); + var ecInfo = JSON.parse(sessionStorage.ecInfo); + var ccAddress = ecInfo.server; + var numCompactors = ecInfo.numCompactors; + var lastContactTime = timeDuration(ecInfo.lastContact); + console.log("compaction coordinator = " + ccAddress); + console.log("numCompactors = " + numCompactors); + $('#ccHostname').text(ccAddress); + $('#ccNumQueues').text(ecInfo.numQueues); + $('#ccNumCompactors').text(numCompactors); + $('#ccLastContact').html(lastContactTime); + + // user paging is not reset on reload + if(compactorsTable) compactorsTable.ajax.reload(null, false ); + if(runningTable) runningTable.ajax.reload(null, false ); + } + + /** + * Get address of the compaction coordinator info + */ + function getCompactionCoordinator() { + $.getJSON('/rest/ec', function(data) { + sessionStorage.ecInfo = JSON.stringify(data); + }); + } + + function refreshCompactors() { + console.log("Refresh compactors table."); + // user paging is not reset on reload + if(compactorsTable) compactorsTable.ajax.reload(null, false ); + } + + function refreshRunning() { + console.log("Refresh running compactions table."); + // user paging is not reset on reload + if(runningTable) runningTable.ajax.reload(null, false ); + } diff --git a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl new file mode 100644 index 0000000..ec95a68 --- /dev/null +++ b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl @@ -0,0 +1,80 @@ +<#-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + <div class="row"> + <div class="col-xs-12"> + <h3>${title}</h3> + </div> + </div> + <#if coordinatorRunning == true> + <div id="ecDiv"> + <div class="row"> + <div class="col-xs-12"> + <div class="panel panel-primary"> + <div class="panel-heading">Compaction Coordinator running on: <span id="ccHostname" title="The hostname of the compaction coordinator server"></span></div> + <div class="panel-body"> + Queues <span id="ccNumQueues" class="badge" title="Number of queues configured">0</span></span> + Compactors <span id="ccNumCompactors" class="badge" title="Number of compactors running">0</span> + Last Contact <span id="ccLastContact" class="badge" title="Last time data was fetched. Server fetches on refresh, at most every minute."></span> + </div> + </div> + </div> + </div> + <div class="row"> + <div class="col-xs-12"> + <table id="compactorsTable" class="table table-bordered table-striped table-condensed"> + <caption><span class="table-caption">Compactors</span> + <a href="javascript:refreshCompactors();"><span class="glyphicon glyphicon-refresh"/></a></caption> + <thead> + <tr> + <th class="firstcell" title="The hostname the compactor is running on.">Server</th> + <th title="The name of the queue this compactor is assigned.">Queue</th> + <th class="duration" title="Last time data was fetched. Server fetches on refresh, at most every minute.">Last Contact</th> + </tr> + </thead> + </table> + </div> + <div class="row"> + <div class="col-xs-12"> + <table id="runningTable" class="table table-bordered table-striped table-condensed"> + <caption><span class="table-caption">Running Compactions</span> + <a href="javascript:refreshRunning();"><span class="glyphicon glyphicon-refresh"/></a></caption> + <thead> + <tr> + <th class="firstcell" title="The hostname the compactor is running on.">Server Hostname</th> + <th title="The type of compaction.">Kind</th> + <th title="The status returned by the last update.">Status</th> + <th title="The name of the queue this compactor is assigned.">Queue</th> + <th title="The ID of the table being compacted.">Table ID</th> + <th title="The number of files being compacted."># of Files</th> + <th title="The progress of the compaction." class="progBar">Progress</th> + <th class="duration" title="The time of the last update for the compaction">Last Update</th> + <th class="duration" title="How long compaction has been running">Duration</th> + <th class="details-control">More</th> + </tr> + </thead> + <tbody></tbody> + </table> + </div> + </div> + </div> + <#else> + <div id="ccBanner"><div class="alert alert-danger" role="alert">Compaction Coordinator Not Running</div></div> + </#if> diff --git a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl index f180b6d..a9cca59 100644 --- a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl +++ b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl @@ -54,6 +54,7 @@ <li><a href="/compactions">Active Compactions</a></li> <li><a href="/scans">Active Scans</a></li> <li><a href="/bulkImports">Bulk Imports</a></li> + <li><a href="/ec">External Compactions</a></li> <li><a href="/replication">Replication</a></li> </ul> </li> diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java new file mode 100644 index 0000000..305344c --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorInfo; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests that external compactions report progress from start to finish. To prevent flaky test + * failures, we only measure progress in quarter segments: STARTED, QUARTER, HALF, THREE_QUARTERS. + * We can detect if the compaction finished without errors but the coordinator will never report + * 100% progress since it will remove the ECID upon completion. The {@link SlowIterator} is used to + * control the length of time it takes to complete the compaction. + */ +public class ExternalCompactionProgressIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(ExternalCompactionProgressIT.class); + private static final int ROWS = 10_000; + + enum EC_PROGRESS { + STARTED, QUARTER, HALF, THREE_QUARTERS + } + + Map<String,RunningCompactorInfo> runningMap = new HashMap<>(); + List<EC_PROGRESS> progressList = new ArrayList<>(); + + private final AtomicBoolean compactionFinished = new AtomicBoolean(false); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); + } + + @Test + public void testProgress() throws Exception { + MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null; + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + ExternalCompactionTestUtils.createTable(client, table1, "cs1"); + ExternalCompactionTestUtils.writeData(client, table1, ROWS); + c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + coord = ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl) getCluster()), + CompactionCoordinator.class, getCluster().getServerContext()); + + Thread checkerThread = startChecker(); + checkerThread.start(); + + IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); + SlowIterator.setSleepTime(setting, 1); + client.tableOperations().attachIterator(table1, setting, + EnumSet.of(IteratorUtil.IteratorScope.majc)); + log.info("Compacting table"); + ExternalCompactionTestUtils.compact(client, table1, 2, "DCQ1", true); + ExternalCompactionTestUtils.verify(client, table1, 2, ROWS); + + log.info("Done Compacting table"); + compactionFinished.set(true); + checkerThread.join(); + + verifyProgress(); + } finally { + ExternalCompactionTestUtils.stopProcesses(c1, coord); + } + } + + public Thread startChecker() { + return Threads.createThread("RC checker", () -> { + try { + while (!compactionFinished.get()) { + checkRunning(); + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + } + } catch (TException e) { + log.warn("{}", e.getMessage(), e); + } + }); + } + + /** + * Check running compaction progress. + */ + private void checkRunning() throws TException { + var ecList = ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext()); + var ecMap = ecList.getCompactions(); + if (ecMap != null) { + ecMap.forEach((ecid, ec) -> { + // returns null if it's a new mapping + RunningCompactorInfo rci = new RunningCompactorInfo(System.currentTimeMillis(), ecid, ec); + RunningCompactorInfo previousRci = runningMap.put(ecid, rci); + if (previousRci == null) { + log.debug("New ECID {} with inputFiles: {}", ecid, rci.inputFiles); + } else { + if (rci.progress <= previousRci.progress) { + log.warn("{} did not progress. It went from {} to {}", ecid, previousRci.progress, + rci.progress); + } else { + log.debug("{} progressed from {} to {}", ecid, previousRci.progress, rci.progress); + if (rci.progress > 0 && rci.progress <= 25) + progressList.add(EC_PROGRESS.STARTED); + else if (rci.progress > 25 && rci.progress <= 50) + progressList.add(EC_PROGRESS.QUARTER); + else if (rci.progress > 50 && rci.progress <= 75) + progressList.add(EC_PROGRESS.HALF); + else if (rci.progress > 75 && rci.progress <= 100) + progressList.add(EC_PROGRESS.THREE_QUARTERS); + } + if (!rci.status.equals(TCompactionState.IN_PROGRESS.name())) { + log.debug("Saw status other than IN_PROGRESS: {}", rci.status); + } + } + }); + } + } + + private void verifyProgress() { + log.info("Verify Progress."); + assertTrue("Missing start of progress", progressList.contains(EC_PROGRESS.STARTED)); + assertTrue("Missing quarter progress", progressList.contains(EC_PROGRESS.QUARTER)); + assertTrue("Missing half progress", progressList.contains(EC_PROGRESS.HALF)); + assertTrue("Missing three quarters progress", + progressList.contains(EC_PROGRESS.THREE_QUARTERS)); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index 7f4199a..509704e 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -41,7 +42,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; @@ -76,6 +76,7 @@ import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.TestFilter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,11 +145,10 @@ public class ExternalCompactionTestUtils { } - public static void writeData(AccumuloClient client, String table1) - throws MutationsRejectedException, TableNotFoundException, AccumuloException, - AccumuloSecurityException { + public static void writeData(AccumuloClient client, String table1, int rows) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { try (BatchWriter bw = client.createBatchWriter(table1)) { - for (int i = 0; i < MAX_DATA; i++) { + for (int i = 0; i < rows; i++) { Mutation m = new Mutation(row(i)); m.put("", "", "" + i); bw.addMutation(m); @@ -158,8 +158,18 @@ public class ExternalCompactionTestUtils { client.tableOperations().flush(table1); } + public static void writeData(AccumuloClient client, String table1) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + writeData(client, table1, MAX_DATA); + } + public static void verify(AccumuloClient client, String table1, int modulus) throws TableNotFoundException, AccumuloSecurityException, AccumuloException { + verify(client, table1, modulus, MAX_DATA); + } + + public static void verify(AccumuloClient client, String table1, int modulus, int rows) + throws TableNotFoundException, AccumuloSecurityException, AccumuloException { try (Scanner scanner = client.createScanner(table1)) { int count = 0; for (Entry<Key,Value> entry : scanner) { @@ -169,7 +179,7 @@ public class ExternalCompactionTestUtils { } int expectedCount = 0; - for (int i = 0; i < MAX_DATA; i++) { + for (int i = 0; i < rows; i++) { if (i % modulus == 0) expectedCount++; } @@ -237,13 +247,14 @@ public class ExternalCompactionTestUtils { } public static TExternalCompactionList getRunningCompactions(ClientContext context) - throws Exception { - HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); - if (null == coordinatorHost) { + throws TException { + Optional<HostAndPort> coordinatorHost = + ExternalCompactionUtil.findCompactionCoordinator(context); + if (coordinatorHost.isEmpty()) { throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); } - CompactionCoordinatorService.Client client = ThriftUtil - .getClient(new CompactionCoordinatorService.Client.Factory(), coordinatorHost, context); + CompactionCoordinatorService.Client client = ThriftUtil.getClient( + new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context); try { TExternalCompactionList running = client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds()); @@ -255,12 +266,13 @@ public class ExternalCompactionTestUtils { private static TExternalCompactionList getCompletedCompactions(ClientContext context) throws Exception { - HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); - if (null == coordinatorHost) { + Optional<HostAndPort> coordinatorHost = + ExternalCompactionUtil.findCompactionCoordinator(context); + if (coordinatorHost.isEmpty()) { throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); } - CompactionCoordinatorService.Client client = ThriftUtil - .getClient(new CompactionCoordinatorService.Client.Factory(), coordinatorHost, context); + CompactionCoordinatorService.Client client = ThriftUtil.getClient( + new CompactionCoordinatorService.Client.Factory(), coordinatorHost.get(), context); try { TExternalCompactionList completed = client.getCompletedCompactions(TraceUtil.traceInfo(), context.rpcCreds());