This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 64bc1c5 Fix issue so that Compactor will start with Coordinator down. Move synchronization in Coordinator due to deadlock 64bc1c5 is described below commit 64bc1c57f521a7f096a653e4b044cd0429f30ffd Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Mar 15 15:44:36 2021 +0000 Fix issue so that Compactor will start with Coordinator down. Move synchronization in Coordinator due to deadlock --- .../server/compaction/ExternalCompactionUtil.java | 5 +- .../server/compaction/RetryableThriftCall.java | 2 +- .../coordinator/CompactionCoordinator.java | 60 +++++++++++++--------- .../org/apache/accumulo/compactor/Compactor.java | 4 +- 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java index 07b2eef..25b6eb8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java @@ -43,12 +43,15 @@ public class ExternalCompactionUtil { /** * * @param context - * @return + * @return null if Coordinator node not found, else HostAndPort */ public static HostAndPort findCompactionCoordinator(ServerContext context) { final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; try { byte[] address = ZooLock.getLockData(context.getZooReaderWriter().getZooKeeper(), lockPath); + if (null == address) { + return null; + } String coordinatorAddress = new String(address); return HostAndPort.fromString(coordinatorAddress); } catch (KeeperException | InterruptedException e) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java index ae5af71..ecf5428 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java @@ -112,7 +112,7 @@ public class RetryableThriftCall<T> { } UtilWaitThread.sleep(waitTime); if (waitTime != maxWaitTime) { - waitTime = Math.max(waitTime * 2, maxWaitTime); + waitTime = Math.min(waitTime * 2, maxWaitTime); } } while (null == result); return result; diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index d72bfaa..9716258 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -85,7 +85,8 @@ public class CompactionCoordinator extends AbstractServer private static final Map<String,TreeMap<Long,LinkedHashSet<TServerInstance>>> QUEUES = new HashMap<>(); /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */ - private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX = new HashMap<>(); + private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX = + new ConcurrentHashMap<>(); /* Map of compactionId to RunningCompactions */ private static final Map<ExternalCompactionId,RunningCompaction> RUNNING = new ConcurrentHashMap<>(); @@ -193,23 +194,23 @@ public class CompactionCoordinator extends AbstractServer while (true) { tserverSet.getCurrentServers().forEach(tsi -> { try { - synchronized (QUEUES) { - TabletClientService.Client client = null; - try { - LOG.debug("contacting tserver " + tsi.getHostPort()); - client = getTabletServerConnection(tsi); - List<TCompactionQueueSummary> summaries = - client.getCompactionQueueInfo(TraceUtil.traceInfo(), getContext().rpcCreds()); - summaries.forEach(summary -> { - QueueAndPriority qp = - QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); + TabletClientService.Client client = null; + try { + LOG.debug("contacting tserver " + tsi.getHostPort()); + client = getTabletServerConnection(tsi); + List<TCompactionQueueSummary> summaries = + client.getCompactionQueueInfo(TraceUtil.traceInfo(), getContext().rpcCreds()); + summaries.forEach(summary -> { + QueueAndPriority qp = + QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); + synchronized (QUEUES) { QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>()) .computeIfAbsent(qp.getPriority(), k -> new LinkedHashSet<>()).add(tsi); INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp); - }); - } finally { - ThriftUtil.returnClient(client); - } + } + }); + } finally { + ThriftUtil.returnClient(client); } } catch (TException e) { LOG.warn("Error getting compaction summaries from tablet server: {}", @@ -239,20 +240,20 @@ public class CompactionCoordinator extends AbstractServer // run() will iterate over the current and added tservers and add them to the internal // data structures. For tservers that are deleted, we need to remove them from the // internal data structures - synchronized (QUEUES) { - deleted.forEach(tsi -> { - INDEX.get(tsi).forEach(qp -> { - TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(qp.getQueue()); - if (null != m) { - LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority()); - if (null != tservers) { + deleted.forEach(tsi -> { + INDEX.get(tsi).forEach(qp -> { + TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(qp.getQueue()); + if (null != m) { + LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority()); + if (null != tservers) { + synchronized (QUEUES) { tservers.remove(tsi); + INDEX.remove(tsi); } } - }); - INDEX.remove(tsi); + } }); - } + }); } /** @@ -305,6 +306,11 @@ public class CompactionCoordinator extends AbstractServer } if (null == tserver) { + LOG.debug("No compactions found for queue {}, returning null to compactor {}", queue, + compactorAddress); + // CBUG Returning null here causes: + // [compaction.RetryableThriftCall] ERROR: Error in Thrift function, retrying in 60000ms. Error: org.apache.thrift.TApplicationException: getCompactionJob failed: unknown result + return null; } @@ -317,6 +323,10 @@ public class CompactionCoordinator extends AbstractServer new RunningCompaction(job, compactorAddress, tserver)); LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress); return job; + } catch (TException e) { + LOG.error("Error reserving compaction from tserver {}", + ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e); + throw e; } finally { ThriftUtil.returnClient(client); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index b86de52..2b8e383 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -548,6 +548,8 @@ public class Compactor extends AbstractServer } } + } catch (Exception e) { + LOG.error("Unhandled error occurred in Compactor", e); } finally { // close connection to coordinator if (null != coordinatorClient.get()) { @@ -555,7 +557,7 @@ public class Compactor extends AbstractServer } // Shutdown local thrift server - LOG.debug("Stopping Thrift Servers"); + LOG.info("Stopping Thrift Servers"); TServerUtils.stopTServer(compactorAddress.server); try {