This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 64bc1c5  Fix issue so that Compactor will start with Coordinator down. 
Move synchronization in Coordinator due to deadlock
64bc1c5 is described below

commit 64bc1c57f521a7f096a653e4b044cd0429f30ffd
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Mar 15 15:44:36 2021 +0000

    Fix issue so that Compactor will start with Coordinator down. Move 
synchronization in Coordinator due to deadlock
---
 .../server/compaction/ExternalCompactionUtil.java  |  5 +-
 .../server/compaction/RetryableThriftCall.java     |  2 +-
 .../coordinator/CompactionCoordinator.java         | 60 +++++++++++++---------
 .../org/apache/accumulo/compactor/Compactor.java   |  4 +-
 4 files changed, 43 insertions(+), 28 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index 07b2eef..25b6eb8 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -43,12 +43,15 @@ public class ExternalCompactionUtil {
   /**
    *
    * @param context
-   * @return
+   * @return null if Coordinator node not found, else HostAndPort
    */
   public static HostAndPort findCompactionCoordinator(ServerContext context) {
     final String lockPath = context.getZooKeeperRoot() + 
Constants.ZCOORDINATOR_LOCK;
     try {
       byte[] address = 
ZooLock.getLockData(context.getZooReaderWriter().getZooKeeper(), lockPath);
+      if (null == address) {
+        return null;
+      }
       String coordinatorAddress = new String(address);
       return HostAndPort.fromString(coordinatorAddress);
     } catch (KeeperException | InterruptedException e) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
index ae5af71..ecf5428 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
@@ -112,7 +112,7 @@ public class RetryableThriftCall<T> {
       }
       UtilWaitThread.sleep(waitTime);
       if (waitTime != maxWaitTime) {
-        waitTime = Math.max(waitTime * 2, maxWaitTime);
+        waitTime = Math.min(waitTime * 2, maxWaitTime);
       }
     } while (null == result);
     return result;
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index d72bfaa..9716258 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -85,7 +85,8 @@ public class CompactionCoordinator extends AbstractServer
   private static final 
Map<String,TreeMap<Long,LinkedHashSet<TServerInstance>>> QUEUES =
       new HashMap<>();
   /* index of tserver to queue and priority, exists to provide O(1) lookup 
into QUEUES */
-  private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX = 
new HashMap<>();
+  private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX =
+      new ConcurrentHashMap<>();
   /* Map of compactionId to RunningCompactions */
   private static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
       new ConcurrentHashMap<>();
@@ -193,23 +194,23 @@ public class CompactionCoordinator extends AbstractServer
     while (true) {
       tserverSet.getCurrentServers().forEach(tsi -> {
         try {
-          synchronized (QUEUES) {
-            TabletClientService.Client client = null;
-            try {
-              LOG.debug("contacting tserver " + tsi.getHostPort());
-              client = getTabletServerConnection(tsi);
-              List<TCompactionQueueSummary> summaries =
-                  client.getCompactionQueueInfo(TraceUtil.traceInfo(), 
getContext().rpcCreds());
-              summaries.forEach(summary -> {
-                QueueAndPriority qp =
-                    QueueAndPriority.get(summary.getQueue().intern(), 
summary.getPriority());
+          TabletClientService.Client client = null;
+          try {
+            LOG.debug("contacting tserver " + tsi.getHostPort());
+            client = getTabletServerConnection(tsi);
+            List<TCompactionQueueSummary> summaries =
+                client.getCompactionQueueInfo(TraceUtil.traceInfo(), 
getContext().rpcCreds());
+            summaries.forEach(summary -> {
+              QueueAndPriority qp =
+                  QueueAndPriority.get(summary.getQueue().intern(), 
summary.getPriority());
+              synchronized (QUEUES) {
                 QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>())
                     .computeIfAbsent(qp.getPriority(), k -> new 
LinkedHashSet<>()).add(tsi);
                 INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp);
-              });
-            } finally {
-              ThriftUtil.returnClient(client);
-            }
+              }
+            });
+          } finally {
+            ThriftUtil.returnClient(client);
           }
         } catch (TException e) {
           LOG.warn("Error getting compaction summaries from tablet server: {}",
@@ -239,20 +240,20 @@ public class CompactionCoordinator extends AbstractServer
     // run() will iterate over the current and added tservers and add them to 
the internal
     // data structures. For tservers that are deleted, we need to remove them 
from the
     // internal data structures
-    synchronized (QUEUES) {
-      deleted.forEach(tsi -> {
-        INDEX.get(tsi).forEach(qp -> {
-          TreeMap<Long,LinkedHashSet<TServerInstance>> m = 
QUEUES.get(qp.getQueue());
-          if (null != m) {
-            LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority());
-            if (null != tservers) {
+    deleted.forEach(tsi -> {
+      INDEX.get(tsi).forEach(qp -> {
+        TreeMap<Long,LinkedHashSet<TServerInstance>> m = 
QUEUES.get(qp.getQueue());
+        if (null != m) {
+          LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority());
+          if (null != tservers) {
+            synchronized (QUEUES) {
               tservers.remove(tsi);
+              INDEX.remove(tsi);
             }
           }
-        });
-        INDEX.remove(tsi);
+        }
       });
-    }
+    });
   }
 
   /**
@@ -305,6 +306,11 @@ public class CompactionCoordinator extends AbstractServer
     }
 
     if (null == tserver) {
+      LOG.debug("No compactions found for queue {}, returning null to 
compactor {}", queue,
+          compactorAddress);
+      // CBUG Returning null here causes:
+      // [compaction.RetryableThriftCall] ERROR: Error in Thrift function, 
retrying in 60000ms. Error: org.apache.thrift.TApplicationException: 
getCompactionJob failed: unknown result
+
       return null;
     }
 
@@ -317,6 +323,10 @@ public class CompactionCoordinator extends AbstractServer
           new RunningCompaction(job, compactorAddress, tserver));
       LOG.debug("Returning external job {} to {}", job.externalCompactionId, 
compactorAddress);
       return job;
+    } catch (TException e) {
+      LOG.error("Error reserving compaction from tserver {}",
+          ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), 
e);
+      throw e;
     } finally {
       ThriftUtil.returnClient(client);
     }
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index b86de52..2b8e383 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -548,6 +548,8 @@ public class Compactor extends AbstractServer
         }
       }
 
+    } catch (Exception e) {
+      LOG.error("Unhandled error occurred in Compactor", e);
     } finally {
       // close connection to coordinator
       if (null != coordinatorClient.get()) {
@@ -555,7 +557,7 @@ public class Compactor extends AbstractServer
       }
 
       // Shutdown local thrift server
-      LOG.debug("Stopping Thrift Servers");
+      LOG.info("Stopping Thrift Servers");
       TServerUtils.stopTServer(compactorAddress.server);
 
       try {

Reply via email to