Repository: accumulo Updated Branches: refs/heads/1.6 67dd532d9 -> 40b41f26e
ACCUMULO-1292 update the classloader in a background thread Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/40b41f26 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/40b41f26 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/40b41f26 Branch: refs/heads/1.6 Commit: 40b41f26ed65ccc9ea9028d664505a4a875e9bd4 Parents: 67dd532 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Fri Jan 23 10:29:54 2015 -0500 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Fri Jan 23 10:29:54 2015 -0500 ---------------------------------------------------------------------- .../vfs/AccumuloReloadingVFSClassLoader.java | 98 ++++++++++++++------ .../vfs/AccumuloVFSClassLoaderTest.java | 8 +- 2 files changed, 72 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/40b41f26/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java ---------------------------------------------------------------------- diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java index 88dfd1e..05eaae1 100644 --- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java @@ -16,8 +16,15 @@ */ package org.apache.accumulo.start.classloader.vfs; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.vfs2.FileChangeEvent; import org.apache.commons.vfs2.FileListener; @@ -28,6 +35,8 @@ import org.apache.commons.vfs2.impl.DefaultFileMonitor; import org.apache.commons.vfs2.impl.VFSClassLoader; import org.apache.log4j.Logger; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Classloader that delegates operations to a VFSClassLoader object. This class also listens for changes in any of the files/directories that are in the * classpath and will recreate the delegate object if there is any change in the classpath. @@ -38,15 +47,49 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC private static final Logger log = Logger.getLogger(AccumuloReloadingVFSClassLoader.class); // set to 5 mins. The rational behind this large time is to avoid a gazillion tservers all asking the name node for info too frequently. - private static final int DEFAULT_TIMEOUT = 300000; + private static final int DEFAULT_TIMEOUT = 5 * 60 * 1000; - private String uris; private FileObject[] files; - private FileSystemManager vfs = null; - private ReloadingClassLoader parent = null; - private DefaultFileMonitor monitor = null; - private VFSClassLoader cl = null; - private boolean preDelegate; + private VFSClassLoader cl; + private final ReloadingClassLoader parent; + private final String uris; + private final DefaultFileMonitor monitor; + private final boolean preDelegate; + private final ThreadPoolExecutor executor; + { + BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2); + ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).build(); + executor = new ThreadPoolExecutor(1, 1, 1, SECONDS, queue, factory); + } + + private final Runnable refresher = new Runnable() { + @Override + public void run() { + while (!executor.isTerminating()) { + try { + FileSystemManager vfs = AccumuloVFSClassLoader.generateVfs(); + FileObject[] files = AccumuloVFSClassLoader.resolve(vfs, uris); + + log.debug("Rebuilding dynamic classloader using files- " + stringify(files)); + + VFSClassLoader cl; + if (preDelegate) + cl = new VFSClassLoader(files, vfs, parent.getClassLoader()); + else + cl = new PostDelegatingVFSClassLoader(files, vfs, parent.getClassLoader()); + updateClassloader(files, cl); + return; + } catch (Exception e) { + log.error(e.getMessage(), e); + try { + Thread.sleep(DEFAULT_TIMEOUT); + } catch (InterruptedException ie) { + log.error(e.getMessage(), ie); + } + } + } + } + }; public String stringify(FileObject[] files) { StringBuilder sb = new StringBuilder(); @@ -63,36 +106,29 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC @Override public synchronized ClassLoader getClassLoader() { - if (cl == null || cl.getParent() != parent.getClassLoader()) { - try { - vfs = AccumuloVFSClassLoader.generateVfs(); - files = AccumuloVFSClassLoader.resolve(vfs, uris); - - log.debug("Rebuilding dynamic classloader using files- " + stringify(files)); - - if (preDelegate) - cl = new VFSClassLoader(files, vfs, parent.getClassLoader()); - else - cl = new PostDelegatingVFSClassLoader(files, vfs, parent.getClassLoader()); - - } catch (FileSystemException fse) { - throw new RuntimeException(fse); - } + if (cl.getParent() != parent.getClassLoader()) { + scheduleRefresh(); } - return cl; } - private synchronized void setClassloader(VFSClassLoader cl) { - this.cl = cl; + private void scheduleRefresh() { + try { + executor.execute(refresher); + } catch (RejectedExecutionException e) { + log.trace("Ignoring refresh request (already refreshing)"); + } + } + private synchronized void updateClassloader(FileObject[] files, VFSClassLoader cl) { + this.files = files; + this.cl = cl; } public AccumuloReloadingVFSClassLoader(String uris, FileSystemManager vfs, ReloadingClassLoader parent, long monitorDelay, boolean preDelegate) throws FileSystemException { this.uris = uris; - this.vfs = vfs; this.parent = parent; this.preDelegate = preDelegate; @@ -126,25 +162,29 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC * Should be ok if this is not called because the thread started by DefaultFileMonitor is a daemon thread */ public void close() { + executor.shutdownNow(); monitor.stop(); } + @Override public void fileCreated(FileChangeEvent event) throws Exception { if (log.isDebugEnabled()) log.debug(event.getFile().getURL().toString() + " created, recreating classloader"); - setClassloader(null); + scheduleRefresh(); } + @Override public void fileDeleted(FileChangeEvent event) throws Exception { if (log.isDebugEnabled()) log.debug(event.getFile().getURL().toString() + " deleted, recreating classloader"); - setClassloader(null); + scheduleRefresh(); } + @Override public void fileChanged(FileChangeEvent event) throws Exception { if (log.isDebugEnabled()) log.debug(event.getFile().getURL().toString() + " changed, recreating classloader"); - setClassloader(null); + scheduleRefresh(); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/40b41f26/start/src/test/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/start/src/test/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoaderTest.java b/start/src/test/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoaderTest.java index 522c870..c820762 100644 --- a/start/src/test/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoaderTest.java +++ b/start/src/test/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoaderTest.java @@ -23,7 +23,7 @@ import java.net.URLClassLoader; import org.apache.accumulo.start.classloader.AccumuloClassLoader; import org.apache.commons.io.FileUtils; -import org.apache.commons.vfs2.impl.DefaultFileSystemManager; +import org.apache.commons.vfs2.FileSystemManager; import org.apache.commons.vfs2.impl.VFSClassLoader; import org.junit.After; import org.junit.Assert; @@ -150,8 +150,7 @@ public class AccumuloVFSClassLoaderTest { Whitebox.setInternalState(AccumuloClassLoader.class, "SITE_CONF", conf.toURI().toURL().toString()); Whitebox.setInternalState(AccumuloVFSClassLoader.class, "lock", new Object()); AccumuloVFSClassLoader.getClassLoader(); - AccumuloReloadingVFSClassLoader loader = Whitebox.getInternalState(AccumuloVFSClassLoader.class, "loader"); - DefaultFileSystemManager manager = Whitebox.getInternalState(loader, "vfs"); + FileSystemManager manager = AccumuloVFSClassLoader.generateVfs(); UniqueFileReplicator replicator = Whitebox.getInternalState(manager, "fileReplicator"); File tempDir = Whitebox.getInternalState(replicator, "tempDir"); String tempDirParent = tempDir.getParent(); @@ -194,8 +193,7 @@ public class AccumuloVFSClassLoaderTest { Whitebox.setInternalState(AccumuloClassLoader.class, "SITE_CONF", conf.toURI().toURL().toString()); Whitebox.setInternalState(AccumuloVFSClassLoader.class, "lock", new Object()); AccumuloVFSClassLoader.getClassLoader(); - AccumuloReloadingVFSClassLoader loader = Whitebox.getInternalState(AccumuloVFSClassLoader.class, "loader"); - DefaultFileSystemManager manager = Whitebox.getInternalState(loader, "vfs"); + FileSystemManager manager = AccumuloVFSClassLoader.generateVfs(); UniqueFileReplicator replicator = Whitebox.getInternalState(manager, "fileReplicator"); File tempDir = Whitebox.getInternalState(replicator, "tempDir"); String tempDirParent = tempDir.getParent();