Repository: accumulo Updated Branches: refs/heads/1.5.2-SNAPSHOT 63d5e55a0 -> 1d608a81f refs/heads/1.6.0-SNAPSHOT 44b13c12e -> ef5dc4a1f refs/heads/master 394fe061f -> 866422d27
ACCUMULO-2489 Fixes race condition in TableConfiguration where NPE may occur. Make invalidateCache much more efficient by calling clear on ZooCache instead of creating a new one. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1d608a81 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1d608a81 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1d608a81 Branch: refs/heads/1.5.2-SNAPSHOT Commit: 1d608a81f488c2bf371fc81f83e0022bd2943a36 Parents: 63d5e55 Author: Josh Elser <els...@apache.org> Authored: Thu Mar 20 18:08:55 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu Mar 20 18:08:55 2014 -0400 ---------------------------------------------------------------------- .../server/conf/ServerConfiguration.java | 1 + .../server/conf/TableConfiguration.java | 80 ++++++---- .../test/TableConfigurationUpdateTest.java | 152 +++++++++++++++++++ 3 files changed, 200 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java index b2acd1a..8653274 100644 --- a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java +++ b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.conf.ConfigSanityCheck; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.server.client.HdfsZooInstance; public class ServerConfiguration { http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java index 59ff1f7..7a3d6e4 100644 --- a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java +++ b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java @@ -37,37 +37,46 @@ import org.apache.log4j.Logger; public class TableConfiguration extends AccumuloConfiguration { private static final Logger log = Logger.getLogger(TableConfiguration.class); - + // Need volatile keyword to ensure double-checked locking works as intended private static volatile ZooCache tablePropCache = null; + private static final Object initLock = new Object(); + private final String instanceId; + private final Instance instance; private final AccumuloConfiguration parent; - + private String table = null; private Set<ConfigurationObserver> observers; - + public TableConfiguration(String instanceId, String table, AccumuloConfiguration parent) { + this(instanceId, HdfsZooInstance.getInstance(), table, parent); + } + + public TableConfiguration(String instanceId, Instance instance, String table, AccumuloConfiguration parent) { this.instanceId = instanceId; + this.instance = instance; this.table = table; this.parent = parent; - + this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>()); } - - /** - * @deprecated not for client use - */ - @Deprecated - private static ZooCache getTablePropCache() { - Instance inst = HdfsZooInstance.getInstance(); - if (tablePropCache == null) - synchronized (TableConfiguration.class) { - if (tablePropCache == null) - tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst)); + + private void initializeZooCache() { + synchronized (initLock) { + if (null == tablePropCache) { + tablePropCache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), new TableConfWatcher(instance)); } + } + } + + private ZooCache getTablePropCache() { + if (null == tablePropCache) { + initializeZooCache(); + } return tablePropCache; } - + public void addObserver(ConfigurationObserver co) { if (table == null) { String err = "Attempt to add observer for non-table configuration"; @@ -77,7 +86,7 @@ public class TableConfiguration extends AccumuloConfiguration { iterator(); observers.add(co); } - + public void removeObserver(ConfigurationObserver configObserver) { if (table == null) { String err = "Attempt to remove observer for non-table configuration"; @@ -86,29 +95,29 @@ public class TableConfiguration extends AccumuloConfiguration { } observers.remove(configObserver); } - + public void expireAllObservers() { Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); for (ConfigurationObserver co : copy) co.sessionExpired(); } - + public void propertyChanged(String key) { Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); for (ConfigurationObserver co : copy) co.propertyChanged(key); } - + public void propertiesChanged(String key) { Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); for (ConfigurationObserver co : copy) co.propertiesChanged(); } - + public String get(Property property) { String key = property.getKey(); String value = get(key); - + if (value == null || !property.getType().isValidFormat(value)) { if (value != null) log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value); @@ -116,23 +125,24 @@ public class TableConfiguration extends AccumuloConfiguration { } return value; } - + private String get(String key) { String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key; + byte[] v = getTablePropCache().get(zPath); String value = null; if (v != null) value = new String(v, Constants.UTF8); return value; } - + @Override public Iterator<Entry<String,String>> iterator() { TreeMap<String,String> entries = new TreeMap<String,String>(); - + for (Entry<String,String> parentEntry : parent) entries.put(parentEntry.getKey(), parentEntry.getValue()); - + List<String> children = getTablePropCache().getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF); if (children != null) { for (String child : children) { @@ -141,10 +151,10 @@ public class TableConfiguration extends AccumuloConfiguration { entries.put(child, value); } } - + return entries.entrySet().iterator(); } - + public String getTableId() { return table; } @@ -152,11 +162,15 @@ public class TableConfiguration extends AccumuloConfiguration { @Override public void invalidateCache() { if (null != tablePropCache) { - synchronized (TableConfiguration.class) { - if (null != tablePropCache) { - tablePropCache = null; - } - } + tablePropCache.clear(); } + // Else, if the cache is null, we could lock and double-check + // to see if it happened to be created so we could invalidate it + // but I don't see much benefit coming from that extra check. + } + + @Override + public String toString() { + return this.getClass().getSimpleName(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java new file mode 100644 index 0000000..30da268 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TableConfigurationUpdateTest { + private static final Logger log = Logger.getLogger(TableConfigurationUpdateTest.class); + + public static TemporaryFolder folder = new TemporaryFolder(); + private MiniAccumuloCluster accumulo; + private String secret = "secret"; + + @Before + public void setUp() throws Exception { + folder.create(); + accumulo = new MiniAccumuloCluster(folder.getRoot(), secret); + accumulo.start(); + } + + @After + public void tearDown() throws Exception { + accumulo.stop(); + folder.delete(); + } + + @Test + public void test() throws Exception { + ZooKeeperInstance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers(), 60 * 1000); + Connector conn = inst.getConnector("root", new PasswordToken(secret)); + + String table = "foo"; + conn.tableOperations().create(table); + + final DefaultConfiguration defaultConf = AccumuloConfiguration.getDefaultConfiguration(); + + // Cache invalidates 25% of the time + int randomMax = 4; + // Number of threads + int numThreads = 2; + // Number of iterations per thread + int iterations = 100000; + AccumuloConfiguration tableConf = new TableConfiguration(inst.getInstanceID(), inst, table, defaultConf); + + long start = System.currentTimeMillis(); + ExecutorService svc = Executors.newFixedThreadPool(numThreads); + CountDownLatch countDown = new CountDownLatch(numThreads); + ArrayList<Future<Exception>> futures = new ArrayList<Future<Exception>>(numThreads); + + for (int i = 0; i < numThreads; i++) { + futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown))); + } + + svc.shutdown(); + Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES)); + + for (Future<Exception> fut : futures) { + Exception e = fut.get(); + if (null != e) { + Assert.fail("Thread failed with exception " + e); + } + } + + long end = System.currentTimeMillis(); + log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates " + + ((1. / randomMax) * 100.) + "% took " + (end - start) / 1000 + " second(s)"); + } + + public static class TableConfRunner implements Callable<Exception> { + private static final Property prop = Property.TABLE_SPLIT_THRESHOLD; + private AccumuloConfiguration tableConf; + private CountDownLatch countDown; + private int iterations, randMax; + + public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) { + this.randMax = randMax; + this.iterations = iterations; + this.tableConf = tableConf; + this.countDown = countDown; + } + + @Override + public Exception call() { + Random r = new Random(); + countDown.countDown(); + try { + countDown.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return e; + } + + String t = Thread.currentThread().getName() + " "; + try { + for (int i = 0; i < iterations; i++) { + // if (i % 10000 == 0) { + // log.info(t + " " + i); + // } + int choice = r.nextInt(randMax); + if (choice < 1) { + tableConf.invalidateCache(); + } else { + tableConf.get(prop); + } + } + } catch (Exception e) { + log.error(t, e); + return e; + } + + return null; + } + + } + +}