This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 431e168ff28b99e8201b68158c381a71db7d6762 Merge: ad10737 828a321 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Jan 2 15:14:47 2020 -0500 Merge branch '1.9' .../java/org/apache/accumulo/master/Master.java | 4 ++- .../accumulo/master/recovery/RecoveryManager.java | 33 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index 40faf68,7332998..1041387 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -159,36 -168,40 +159,38 @@@ import edu.umd.cs.findbugs.annotations. /** * The Master is responsible for assigning and balancing tablets to tablet servers. - * + * <p> * The master will also coordinate log recoveries and reports general status. */ -public class Master extends AccumuloServerContext - implements LiveTServerSet.Listener, TableObserver, CurrentState { +public class Master extends AbstractServer + implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService { - final static Logger log = LoggerFactory.getLogger(Master.class); + static final Logger log = LoggerFactory.getLogger(Master.class); - final static int ONE_SECOND = 1000; - final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND; + static final int ONE_SECOND = 1000; + static final long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND; + // made this less than TIME_TO_WAIT_BETWEEN_SCANS, so that the cache is cleared between cycles - final static long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4; - final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND; - final static long WAIT_BETWEEN_ERRORS = ONE_SECOND; - final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; - final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; - final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND; - final static int MAX_TSERVER_WORK_CHUNK = 5000; - final private static int MAX_BAD_STATUS_COUNT = 3; ++ static final long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4; + private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND; + static final long WAIT_BETWEEN_ERRORS = ONE_SECOND; + private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; + private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; + private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND; + static final int MAX_TSERVER_WORK_CHUNK = 5000; + private static final int MAX_BAD_STATUS_COUNT = 3; final VolumeManager fs; - final private String hostname; - final private Object balancedNotifier = new Object(); + private final Object balancedNotifier = new Object(); final LiveTServerSet tserverSet; - final private List<TabletGroupWatcher> watchers = new ArrayList<>(); + private final List<TabletGroupWatcher> watchers = new ArrayList<>(); final SecurityOperation security; - final Map<TServerInstance,AtomicInteger> badServers = Collections - .synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger())); - final Set<TServerInstance> serversToShutdown = - Collections.synchronizedSet(new HashSet<TServerInstance>()); + final Map<TServerInstance,AtomicInteger> badServers = + Collections.synchronizedMap(new HashMap<TServerInstance,AtomicInteger>()); + final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final SortedMap<KeyExtent,TServerInstance> migrations = - Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>()); + Collections.synchronizedSortedMap(new TreeMap<>()); final EventCoordinator nextEvent = new EventCoordinator(); - final private Object mergeLock = new Object(); + private final Object mergeLock = new Object(); private ReplicationDriver replicationWorkDriver; private WorkDriver replicationWorkAssigner; RecoveryManager recoveryManager = null; @@@ -981,49 -1239,14 +983,49 @@@ return info; } - public void run() throws IOException, InterruptedException, KeeperException { - final String zroot = ZooUtil.getRoot(getInstance()); + @Override + public void run() { + final ServerContext context = getContext(); + final String zroot = getZooKeeperRoot(); + + // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health + // when a hot-standby + // + // Start the Master's Client service + clientHandler = new MasterClientServiceHandler(this); + // Ensure that calls before the master gets the lock fail + Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this); + Iface rpcProxy = TraceUtil.wrapService(haProxy); + final Processor<Iface> processor; + if (context.getThriftServerType() == ThriftServerType.SASL) { + Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(), + getConfiguration()); + processor = new Processor<>(tcredsProxy); + } else { + processor = new Processor<>(rpcProxy); + } + ServerAddress sa; + try { + sa = TServerUtils.startServer(getMetricsSystem(), context, getHostname(), + Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, + Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, + Property.GENERAL_MAX_MESSAGE_SIZE); + } catch (UnknownHostException e) { + throw new IllegalStateException("Unable to start server on host " + getHostname(), e); + } + clientService = sa.server; + log.info("Started Master client service at {}", sa.address); - getMasterLock(zroot + Constants.ZMASTER_LOCK); + // block until we can obtain the ZK lock for the master + try { + getMasterLock(zroot + Constants.ZMASTER_LOCK); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Exception getting master lock", e); + } - recoveryManager = new RecoveryManager(this); + recoveryManager = new RecoveryManager(this, TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE); - TableManager.getInstance().addObserver(this); + context.getTableManager().addObserver(this); StatusThread statusThread = new StatusThread(); statusThread.start(); diff --cc server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java index e5fef4e,bfc98aa..f380778 --- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java +++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java @@@ -63,14 -69,24 +70,23 @@@ public class RecoveryManager private Master master; private ZooCache zooCache; - public RecoveryManager(Master master) { + public RecoveryManager(Master master, long timeToCacheExistsInMillis) { this.master = master; + existenceCache = + CacheBuilder.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS) + .maximumWeight(10_000_000).weigher(new Weigher<Path,Boolean>() { + @Override + public int weigh(Path path, Boolean exist) { + return path.toString().length(); + } + }).build(); + executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter ")); - zooCache = new ZooCache(); + zooCache = new ZooCache(master.getContext().getZooReaderWriter(), null); try { - AccumuloConfiguration aconf = master.getConfiguration(); List<String> workIDs = - new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY, - aconf).getWorkQueued(); + new DistributedWorkQueue(master.getZooKeeperRoot() + Constants.ZRECOVERY, + master.getConfiguration()).getWorkQueued(); sortsQueued.addAll(workIDs); } catch (Exception e) { log.warn("{}", e.getMessage(), e); @@@ -128,10 -144,23 +144,23 @@@ sortsQueued.add(sortId); } - final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId; - log.info("Created zookeeper entry " + path + " with data " + work); + final String path = master.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId; + log.info("Created zookeeper entry {} with data {}", path, work); } + private boolean exists(final Path path) throws IOException { + try { + return existenceCache.get(path, new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return master.getFileSystem().exists(path); + } + }); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException { boolean recoveryNeeded = false;