This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new c43ebb8 Moved inner classes out of Compactor and CompactionCoordinator to make them smaller in overall size c43ebb8 is described below commit c43ebb8ceaf78f76db7d9222b97672b713e123b2 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Mar 9 14:45:44 2021 +0000 Moved inner classes out of Compactor and CompactionCoordinator to make them smaller in overall size --- .../server/compaction/ExternalCompactionUtil.java | 19 ++ .../coordinator/CompactionCoordinator.java | 187 +------------- .../accumulo/coordinator/CompactionUpdate.java} | 35 ++- .../coordinator/CoordinatorLockWatcher.java | 53 ++++ .../accumulo/coordinator/QueueAndPriority.java | 88 +++++++ .../accumulo/coordinator/RunningCompaction.java | 72 ++++++ .../accumulo/compactor/CompactionEnvironment.java | 95 +++++++ .../accumulo/compactor/CompactionJobHolder.java | 77 ++++++ .../org/apache/accumulo/compactor/Compactor.java | 283 ++++++--------------- 9 files changed, 517 insertions(+), 392 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java index 51c743f..e346baa 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java @@ -24,6 +24,25 @@ import org.apache.accumulo.server.ServerContext; public class ExternalCompactionUtil { + /** + * Utility for returning the address of a service in the form host:port + * + * @param address + * HostAndPort of service + * @return host and port + */ + public static String getHostPortString(HostAndPort address) { + if (address == null) { + return null; + } + return address.getHost() + ":" + address.getPort(); + } + + /** + * + * @param context + * @return + */ public static HostAndPort findCompactionCoordinator(ServerContext context) { final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; byte[] address = context.getZooCache().get(lockPath); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 227714a..db6777f 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -29,7 +29,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.UUID; -import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -48,17 +47,15 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.fate.zookeeper.ZooLock; -import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.compaction.ExternalCompactionUtil; import org.apache.accumulo.server.compaction.RetryableThriftCall; import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; import org.apache.accumulo.server.compaction.RetryableThriftFunction; @@ -79,178 +76,6 @@ public class CompactionCoordinator extends AbstractServer implements org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface, LiveTServerSet.Listener { - private static class QueueAndPriority implements Comparable<QueueAndPriority> { - - private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new WeakHashMap<>(); - - public static QueueAndPriority get(String queue, Long priority) { - return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority)); - } - - private final String queue; - private final Long priority; - - private QueueAndPriority(String queue, Long priority) { - super(); - this.queue = queue; - this.priority = priority; - } - - public String getQueue() { - return queue; - } - - public Long getPriority() { - return priority; - } - - @Override - public int hashCode() { - return queue.hashCode() + priority.hashCode(); - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append("queue: ").append(queue); - buf.append(", priority: ").append(priority); - return buf.toString(); - } - - @Override - public boolean equals(Object obj) { - if (null == obj) - return false; - if (obj == this) - return true; - if (!(obj instanceof QueueAndPriority)) { - return false; - } else { - QueueAndPriority other = (QueueAndPriority) obj; - return this.queue.equals(other.queue) && this.priority.equals(other.priority); - } - } - - @Override - public int compareTo(QueueAndPriority other) { - int result = this.queue.compareTo(other.queue); - if (result == 0) { - // reversing order such that if other priority is lower, then this has a higher priority - return Long.compare(other.priority, this.priority); - } else { - return result; - } - } - - } - - private static class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher { - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() - } - - } - - /** - * Utility for returning the address in the form host:port - * - * @return host and port for Compactor client connections - */ - private static String getHostPortString(HostAndPort address) { - if (address == null) { - return null; - } - return address.getHost() + ":" + address.getPort(); - } - - private static class CompactionUpdate { - private final Long timestamp; - private final String message; - private final CompactionState state; - - public CompactionUpdate(Long timestamp, String message, CompactionState state) { - super(); - this.timestamp = timestamp; - this.message = message; - this.state = state; - } - - public Long getTimestamp() { - return timestamp; - } - - public String getMessage() { - return message; - } - - public CompactionState getState() { - return state; - } - } - - private static class RunningCompaction { - private final TExternalCompactionJob job; - private final String compactorAddress; - private final TServerInstance tserver; - private Map<Long,CompactionUpdate> updates = new TreeMap<>(); - private CompactionStats stats = null; - - public RunningCompaction(TExternalCompactionJob job, String compactorAddress, - TServerInstance tserver) { - super(); - this.job = job; - this.compactorAddress = compactorAddress; - this.tserver = tserver; - } - - public Map<Long,CompactionUpdate> getUpdates() { - return updates; - } - - public void addUpdate(Long timestamp, String message, CompactionState state) { - this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state)); - } - - public CompactionStats getStats() { - return stats; - } - - public void setStats(CompactionStats stats) { - this.stats = stats; - } - - public TExternalCompactionJob getJob() { - return job; - } - - public String getCompactorAddress() { - return compactorAddress; - } - - public TServerInstance getTserver() { - return tserver; - } - } - private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); private static final long TIME_BETWEEN_CHECKS = 5000; @@ -290,11 +115,11 @@ public class CompactionCoordinator extends AbstractServer * @throws KeeperException * @throws InterruptedException */ - private boolean getCoordinatorLock(HostAndPort clientAddress) + protected boolean getCoordinatorLock(HostAndPort clientAddress) throws KeeperException, InterruptedException { LOG.info("trying to get coordinator lock"); - final String coordinatorClientAddress = getHostPortString(clientAddress); + final String coordinatorClientAddress = ExternalCompactionUtil.getHostPortString(clientAddress); final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; final UUID zooLockUUID = UUID.randomUUID(); @@ -309,7 +134,7 @@ public class CompactionCoordinator extends AbstractServer * @return address of this CompactionCoordinator client service * @throws UnknownHostException */ - private ServerAddress startCoordinatorClientService() throws UnknownHostException { + protected ServerAddress startCoordinatorClientService() throws UnknownHostException { CompactionCoordinator rpcProxy = TraceUtil.wrapService(this); final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor< CompactionCoordinator> processor; @@ -488,7 +313,7 @@ public class CompactionCoordinator extends AbstractServer } } - private TabletClientService.Client getTabletServerConnection(TServerInstance tserver) + protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver) throws TTransportException { TServerConnection connection = tserverSet.getConnection(tserver); TTransport transport = @@ -496,7 +321,7 @@ public class CompactionCoordinator extends AbstractServer return ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); } - private Compactor.Client getCompactorConnection(HostAndPort compactorAddress) + protected Compactor.Client getCompactorConnection(HostAndPort compactorAddress) throws TTransportException { TTransport transport = ThriftTransportPool.getInstance().getTransport(compactorAddress, 0, getContext()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java similarity index 57% copy from server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java copy to server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java index 51c743f..becc428 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java @@ -16,18 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.server.compaction; +package org.apache.accumulo.coordinator; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.core.compaction.thrift.CompactionState; -public class ExternalCompactionUtil { +public class CompactionUpdate { - public static HostAndPort findCompactionCoordinator(ServerContext context) { - final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; - byte[] address = context.getZooCache().get(lockPath); - String coordinatorAddress = new String(address); - return HostAndPort.fromString(coordinatorAddress); + private final Long timestamp; + private final String message; + private final CompactionState state; + + CompactionUpdate(Long timestamp, String message, CompactionState state) { + super(); + this.timestamp = timestamp; + this.message = message; + this.state = state; + } + + public Long getTimestamp() { + return timestamp; + } + + public String getMessage() { + return message; + } + + public CompactionState getState() { + return state; } + } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java new file mode 100644 index 0000000..30ebb2e --- /dev/null +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.coordinator; + +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class); + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e)); + + } + + @Override + public synchronized void acquiredLock() { + // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java new file mode 100644 index 0000000..641fa97 --- /dev/null +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.coordinator; + +import java.util.WeakHashMap; + +import org.apache.accumulo.core.util.Pair; + +public class QueueAndPriority implements Comparable<QueueAndPriority> { + + private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new WeakHashMap<>(); + + public static QueueAndPriority get(String queue, Long priority) { + return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority)); + } + + private final String queue; + private final Long priority; + + private QueueAndPriority(String queue, Long priority) { + super(); + this.queue = queue; + this.priority = priority; + } + + public String getQueue() { + return queue; + } + + public Long getPriority() { + return priority; + } + + @Override + public int hashCode() { + return queue.hashCode() + priority.hashCode(); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("queue: ").append(queue); + buf.append(", priority: ").append(priority); + return buf.toString(); + } + + @Override + public boolean equals(Object obj) { + if (null == obj) + return false; + if (obj == this) + return true; + if (!(obj instanceof QueueAndPriority)) { + return false; + } else { + QueueAndPriority other = (QueueAndPriority) obj; + return this.queue.equals(other.queue) && this.priority.equals(other.priority); + } + } + + @Override + public int compareTo(QueueAndPriority other) { + int result = this.queue.compareTo(other.queue); + if (result == 0) { + // reversing order such that if other priority is lower, then this has a higher priority + return Long.compare(other.priority, this.priority); + } else { + return result; + } + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java new file mode 100644 index 0000000..cc36f50 --- /dev/null +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.coordinator; + +import java.util.Map; +import java.util.TreeMap; + +import org.apache.accumulo.core.compaction.thrift.CompactionState; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.tabletserver.thrift.CompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; + +public class RunningCompaction { + + private final TExternalCompactionJob job; + private final String compactorAddress; + private final TServerInstance tserver; + private Map<Long,CompactionUpdate> updates = new TreeMap<>(); + private CompactionStats stats = null; + + RunningCompaction(TExternalCompactionJob job, String compactorAddress, TServerInstance tserver) { + super(); + this.job = job; + this.compactorAddress = compactorAddress; + this.tserver = tserver; + } + + public Map<Long,CompactionUpdate> getUpdates() { + return updates; + } + + public void addUpdate(Long timestamp, String message, CompactionState state) { + this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state)); + } + + public CompactionStats getStats() { + return stats; + } + + public void setStats(CompactionStats stats) { + this.stats = stats; + } + + public TExternalCompactionJob getJob() { + return job; + } + + public String getCompactorAddress() { + return compactorAddress; + } + + public TServerInstance getTserver() { + return tserver; + } + +} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java new file mode 100644 index 0000000..bfda224 --- /dev/null +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.compactor; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.CompactionReason; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.compaction.Compactor.CompactionEnv; +import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; + +public class CompactionEnvironment implements CompactionEnv { + + private final ServerContext context; + private final CompactionJobHolder jobHolder; + + CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) { + this.context = context; + this.jobHolder = jobHolder; + } + + @Override + public boolean isCompactionEnabled() { + return !jobHolder.isCancelled(); + } + + @Override + public IteratorScope getIteratorScope() { + return IteratorScope.majc; + } + + @Override + public RateLimiter getReadLimiter() { + return SharedRateLimiterFactory.getInstance(context.getConfiguration()) + .create("read_rate_limiter", () -> jobHolder.getJob().getReadRate()); + } + + @Override + public RateLimiter getWriteLimiter() { + return SharedRateLimiterFactory.getInstance(context.getConfiguration()) + .create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate()); + } + + @Override + public SystemIteratorEnvironment createIteratorEnv(ServerContext context, + AccumuloConfiguration acuTableConf, TableId tableId) { + return new TabletIteratorEnvironment(context, IteratorScope.majc, + !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId, + CompactionKind.valueOf(jobHolder.getJob().getKind().name())); + } + + @Override + public SortedKeyValueIterator<Key,Value> getMinCIterator() { + throw new UnsupportedOperationException(); + } + + @Override + public CompactionReason getReason() { + switch (jobHolder.getJob().getKind()) { + case USER: + return CompactionReason.USER; + case CHOP: + return CompactionReason.CHOP; + case SELECTOR: + case SYSTEM: + default: + return CompactionReason.SYSTEM; + } + } + +} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java new file mode 100644 index 0000000..f6ab00c --- /dev/null +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.compactor; + +import java.util.Objects; + +import org.apache.accumulo.core.tabletserver.thrift.CompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; + +public class CompactionJobHolder { + + private TExternalCompactionJob job; + private Thread compactionThread; + private volatile Boolean cancelled = Boolean.FALSE; + private CompactionStats stats = null; + + CompactionJobHolder() {} + + public void reset() { + job = null; + compactionThread = null; + cancelled = Boolean.FALSE; + stats = null; + } + + public TExternalCompactionJob getJob() { + return job; + } + + public Thread getThread() { + return compactionThread; + } + + public CompactionStats getStats() { + return stats; + } + + public void setStats(CompactionStats stats) { + this.stats = stats; + } + + public void cancel() { + cancelled = Boolean.TRUE; + } + + public boolean isCancelled() { + return cancelled; + } + + public boolean isSet() { + return (null != this.job); + } + + public void set(TExternalCompactionJob job, Thread compactionThread) { + Objects.requireNonNull(job, "CompactionJob is null"); + Objects.requireNonNull(compactionThread, "Compaction thread is null"); + this.job = job; + this.compactionThread = compactionThread; + } + +} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index d2e94e3..992c383 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -26,7 +26,6 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -41,26 +40,18 @@ import org.apache.accumulo.core.compaction.thrift.CompactionState; import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.rpc.ThriftUtil; -import org.apache.accumulo.core.spi.compaction.CompactionKind; -import org.apache.accumulo.core.tabletserver.thrift.CompactionReason; import org.apache.accumulo.core.tabletserver.thrift.CompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.UtilWaitThread; @@ -80,8 +71,6 @@ import org.apache.accumulo.server.compaction.RetryableThriftCall; import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; import org.apache.accumulo.server.compaction.RetryableThriftFunction; import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; -import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; import org.apache.accumulo.server.rpc.TServerUtils; @@ -107,73 +96,6 @@ public class Compactor extends AbstractServer } } - /** - * Object used to hold information about the current compaction - */ - private static class CompactionJobHolder { - private TExternalCompactionJob job; - private Thread compactionThread; - private volatile Boolean cancelled = Boolean.FALSE; - private CompactionStats stats = null; - - public CompactionJobHolder() {} - - public void reset() { - job = null; - compactionThread = null; - cancelled = Boolean.FALSE; - stats = null; - } - - public TExternalCompactionJob getJob() { - return job; - } - - public Thread getThread() { - return compactionThread; - } - - public CompactionStats getStats() { - return stats; - } - - public void setStats(CompactionStats stats) { - this.stats = stats; - } - - public void cancel() { - cancelled = Boolean.TRUE; - } - - public boolean isCancelled() { - return cancelled; - } - - public boolean isSet() { - return (null != this.job); - } - - public void set(TExternalCompactionJob job, Thread compactionThread) { - Objects.requireNonNull(job, "CompactionJob is null"); - Objects.requireNonNull(compactionThread, "Compaction thread is null"); - this.job = job; - this.compactionThread = compactionThread; - } - - } - - /** - * Utility for returning the address in the form host:port - * - * @return host and port for Compactor client connections - */ - private static String getHostPortString(HostAndPort address) { - if (address == null) { - return null; - } - return address.getHost() + ":" + address.getPort(); - } - public static final String COMPACTOR_SERVICE = "COMPACTOR_SVC"; private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); @@ -186,6 +108,7 @@ public class Compactor extends AbstractServer private final String queueName; private final AtomicReference<CompactionCoordinator.Client> coordinatorClient = new AtomicReference<>(); + private ZooLock compactorLock; private ServerAddress compactorAddress = null; @@ -212,10 +135,10 @@ public class Compactor extends AbstractServer * @throws KeeperException * @throws InterruptedException */ - private void announceExistence(HostAndPort clientAddress) + protected void announceExistence(HostAndPort clientAddress) throws KeeperException, InterruptedException { - String hostPort = getHostPortString(clientAddress); + String hostPort = ExternalCompactionUtil.getHostPortString(clientAddress); ZooReaderWriter zoo = getContext().getZooReaderWriter(); String compactorQueuePath = @@ -276,7 +199,7 @@ public class Compactor extends AbstractServer * @return address of this compactor client service * @throws UnknownHostException */ - private ServerAddress startCompactorClientService() throws UnknownHostException { + protected ServerAddress startCompactorClientService() throws UnknownHostException { Compactor rpcProxy = TraceUtil.wrapService(this); final org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Compactor> processor; if (getContext().getThriftServerType() == ThriftServerType.SASL) { @@ -298,11 +221,12 @@ public class Compactor extends AbstractServer } /** - * Called by a thrift client to cancel the currently running compaction if it matches the supplied - * job + * Called by a CompactionCoordinator to cancel the currently running compaction * - * @param compactionJob - * job + * @param externalCompactionId + * compaction id + * @throws UnknownCompactionIdException + * if the externalCompactionId does not match the currently executing compaction */ @Override public void cancel(String externalCompactionId) throws TException { @@ -319,18 +243,17 @@ public class Compactor extends AbstractServer } /** - * Send an update to the coordinator for this job + * Send an update to the CompactionCoordinator for this job * - * @param coordinatorClient - * address of the CompactionCoordinator * @param job * compactionJob * @param state * updated state * @param message * updated message + * @throws RetriesExceededException */ - private void updateCompactionState(TExternalCompactionJob job, CompactionState state, + protected void updateCompactionState(TExternalCompactionJob job, CompactionState state, String message) throws RetriesExceededException { RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() { @@ -351,7 +274,7 @@ public class Compactor extends AbstractServer } /** - * Update the coordinator with the stats from the job + * Update the CompactionCoordinator with the stats from the completed job * * @param job * current compaction job @@ -359,7 +282,7 @@ public class Compactor extends AbstractServer * compaction stats * @throws RetriesExceededException */ - private void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats) + protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats) throws RetriesExceededException { RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() { @@ -388,7 +311,7 @@ public class Compactor extends AbstractServer * @return CompactionJob * @throws RetriesExceededException */ - private TExternalCompactionJob getNextJob() throws RetriesExceededException { + protected TExternalCompactionJob getNextJob() throws RetriesExceededException { RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall = new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<TExternalCompactionJob>() { @@ -397,7 +320,7 @@ public class Compactor extends AbstractServer try { coordinatorClient.compareAndSet(null, getCoordinatorClient()); return coordinatorClient.get().getCompactionJob(queueName, - getHostPortString(compactorAddress.getAddress())); + ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress())); } catch (TException e) { ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); throw e; @@ -414,7 +337,7 @@ public class Compactor extends AbstractServer * @throws TTransportException * when unable to get client */ - private CompactionCoordinator.Client getCoordinatorClient() throws TTransportException { + protected CompactionCoordinator.Client getCoordinatorClient() throws TTransportException { HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext()); if (null == coordinatorHost) { throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); @@ -425,60 +348,72 @@ public class Compactor extends AbstractServer } /** - * Create and return a new CompactionEnv for the current compaction job - * + * Create compaction runnable + * * @param job - * current compaction job - * @return new env + * compaction job + * @param totalInputEntries + * object to capture total entries + * @param started + * started latch + * @param stopped + * stopped latch + * @param err + * reference to error + * @return Runnable compaction job */ - private CompactionEnv getCompactionEnvironment(TExternalCompactionJob job) { - return new CompactionEnv() { - @Override - public boolean isCompactionEnabled() { - return !jobHolder.isCancelled(); - } + protected Runnable createCompactionJob(final TExternalCompactionJob job, + final LongAdder totalInputEntries, final CountDownLatch started, final CountDownLatch stopped, + final AtomicReference<Throwable> err) { + return new Runnable() { @Override - public IteratorScope getIteratorScope() { - return IteratorScope.majc; - } - - @Override - public RateLimiter getReadLimiter() { - return SharedRateLimiterFactory.getInstance(getContext().getConfiguration()) - .create("read_rate_limiter", () -> job.getReadRate()); - } - - @Override - public RateLimiter getWriteLimiter() { - return SharedRateLimiterFactory.getInstance(getContext().getConfiguration()) - .create("write_rate_limiter", () -> job.getWriteRate()); - } - - @Override - public SystemIteratorEnvironment createIteratorEnv(ServerContext context, - AccumuloConfiguration acuTableConf, TableId tableId) { - return new TabletIteratorEnvironment(getContext(), IteratorScope.majc, - !job.isPropagateDeletes(), acuTableConf, tableId, - CompactionKind.valueOf(job.getKind().name())); - } - - @Override - public SortedKeyValueIterator<Key,Value> getMinCIterator() { - throw new UnsupportedOperationException(); - } - - @Override - public CompactionReason getReason() { - switch (job.getKind()) { - case USER: - return CompactionReason.USER; - case CHOP: - return CompactionReason.CHOP; - case SELECTOR: - case SYSTEM: - default: - return CompactionReason.SYSTEM; + public void run() { + try { + LOG.info("Starting up compaction runnable for job: {}", job); + updateCompactionState(job, CompactionState.STARTED, "Compaction started"); + + final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8)); + final TableConfiguration tConfig = getContext().getTableConfiguration(tableId); + final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile())); + final CompactionEnv cenv = new CompactionEnvironment(getContext(), jobHolder); + + final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>(); + job.getFiles().forEach(f -> { + files.put(new StoredTabletFile(f.getMetadataFileEntry()), + new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp())); + totalInputEntries.add(f.getEntries()); + }); + + final List<IteratorSetting> iters = new ArrayList<>(); + job.getIteratorSettings().getIterators() + .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); + + org.apache.accumulo.server.compaction.Compactor compactor = + new org.apache.accumulo.server.compaction.Compactor(getContext(), + KeyExtent.fromThrift(job.getExtent()), files, outputFile, + job.isPropagateDeletes(), cenv, iters, tConfig); + + LOG.info("Starting compactor"); + started.countDown(); + + org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); + CompactionStats cs = new CompactionStats(); + cs.setEntriesRead(stat.getEntriesRead()); + cs.setEntriesWritten(stat.getEntriesWritten()); + cs.setFileSize(stat.getFileSize()); + jobHolder.setStats(cs); + LOG.info("Compaction completed successfully"); + // Update state when completed + updateCompactionState(job, CompactionState.SUCCEEDED, + "Compaction completed successfully"); + } catch (Exception e) { + LOG.error("Compaction failed", e); + err.set(e); + throw new RuntimeException("Compaction failed", e); + } finally { + stopped.countDown(); + // TODO: Any cleanup } } }; @@ -518,67 +453,13 @@ public class Compactor extends AbstractServer } LOG.info("Received next compaction job: {}", job); - final LongAdder totalInputSize = new LongAdder(); final LongAdder totalInputEntries = new LongAdder(); final CountDownLatch started = new CountDownLatch(1); final CountDownLatch stopped = new CountDownLatch(1); - Thread compactionThread = Threads.createThread( - "Compaction job for tablet " + job.getExtent().toString(), new Runnable() { - - @Override - public void run() { - try { - LOG.info("Setting up to run compactor"); - updateCompactionState(job, CompactionState.STARTED, "Compaction started"); - - final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8)); - final TableConfiguration tConfig = getContext().getTableConfiguration(tableId); - - final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>(); - job.getFiles().forEach(f -> { - files.put(new StoredTabletFile(f.getMetadataFileEntry()), - new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp())); - totalInputSize.add(f.getSize()); - totalInputEntries.add(f.getEntries()); - }); - - final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile())); - - final CompactionEnv cenv = getCompactionEnvironment(job); - - final List<IteratorSetting> iters = new ArrayList<>(); - job.getIteratorSettings().getIterators() - .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - - org.apache.accumulo.server.compaction.Compactor compactor = - new org.apache.accumulo.server.compaction.Compactor(getContext(), - KeyExtent.fromThrift(job.getExtent()), files, outputFile, - job.isPropagateDeletes(), cenv, iters, tConfig); - - LOG.info("Starting compactor"); - started.countDown(); - - org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); - CompactionStats cs = new CompactionStats(); - cs.setEntriesRead(stat.getEntriesRead()); - cs.setEntriesWritten(stat.getEntriesWritten()); - cs.setFileSize(stat.getFileSize()); - jobHolder.setStats(cs); - LOG.info("Compaction completed successfully"); - // Update state when completed - updateCompactionState(job, CompactionState.SUCCEEDED, - "Compaction completed successfully"); - } catch (Exception e) { - LOG.error("Compaction failed", e); - err.set(e); - throw new RuntimeException("Compaction failed", e); - } finally { - stopped.countDown(); - // TODO: Any cleanup - } - } - }); + Thread compactionThread = + Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), + this.createCompactionJob(job, totalInputEntries, started, stopped, err)); synchronized (jobHolder) { jobHolder.set(job, compactionThread);