Repository: accumulo Updated Branches: refs/heads/1.6.1-SNAPSHOT 45b7e2957 -> d50bb3a21 refs/heads/master 8f60d494e -> 9fcaace81
ACCUMULO-2887 Add ObservableConfiguration The logic for maintaining and notifying observers in NamespaceConfiguration and TableConfiguration is refactored into a new ObservableConfiguration class. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d50bb3a2 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d50bb3a2 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d50bb3a2 Branch: refs/heads/1.6.1-SNAPSHOT Commit: d50bb3a219859d276acc08a126b074e42369b867 Parents: 45b7e29 Author: Bill Havanki <bhava...@cloudera.com> Authored: Fri Apr 4 12:19:28 2014 -0400 Committer: Bill Havanki <bhava...@cloudera.com> Committed: Wed Jun 11 10:21:32 2014 -0400 ---------------------------------------------------------------------- .../core/conf/ObservableConfiguration.java | 109 +++++++++++++++++++ .../core/conf/ObservableConfigurationTest.java | 96 ++++++++++++++++ .../server/conf/NamespaceConfWatcher.java | 2 +- .../server/conf/NamespaceConfiguration.java | 35 ++---- .../accumulo/server/conf/TableConfWatcher.java | 2 +- .../server/conf/TableConfiguration.java | 36 ++---- 6 files changed, 221 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java new file mode 100644 index 0000000..7e8cc3a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.conf; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.log4j.Logger; + +/** + * A configuration that can be observed. Handling of observers is thread-safe. + */ +public abstract class ObservableConfiguration extends AccumuloConfiguration { + private static final Logger log = Logger.getLogger(ObservableConfiguration.class); + + private Set<ConfigurationObserver> observers; + + /** + * Creates a new observable configuration. + */ + public ObservableConfiguration() { + observers = Collections.synchronizedSet(new java.util.HashSet<ConfigurationObserver>()); + } + + /** + * Adds an observer. + * + * @param co + * observer + * @throws NullPointerException + * if co is null + */ + public void addObserver(ConfigurationObserver co) { + Preconditions.checkNotNull(co); + observers.add(co); + } + + /** + * Removes an observer. + * + * @param co + * observer + */ + public void removeObserver(ConfigurationObserver co) { + observers.remove(co); + } + + /** + * Gets the current set of observers. The returned collection is a snapshot, and changes to it do not reflect back to the configuration. + * + * @return observers + */ + public Collection<ConfigurationObserver> getObservers() { + return snapshot(observers); + } + + private static Collection<ConfigurationObserver> snapshot(Collection<ConfigurationObserver> observers) { + Collection<ConfigurationObserver> c = new java.util.ArrayList<ConfigurationObserver>(); + synchronized (observers) { + c.addAll(observers); + } + return c; + } + + /** + * Expires all observers. + */ + public void expireAllObservers() { + Collection<ConfigurationObserver> copy = snapshot(observers); + for (ConfigurationObserver co : copy) + co.sessionExpired(); + } + + /** + * Notifies all observers that a property changed. + * + * @param key + * configuration property key + */ + public void propertyChanged(String key) { + Collection<ConfigurationObserver> copy = snapshot(observers); + for (ConfigurationObserver co : copy) + co.propertyChanged(key); + } + + /** + * Notifies all observers that properties changed. + */ + public void propertiesChanged() { + Collection<ConfigurationObserver> copy = snapshot(observers); + for (ConfigurationObserver co : copy) + co.propertiesChanged(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java new file mode 100644 index 0000000..3534b6c --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.conf; + +import java.util.Collection; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class ObservableConfigurationTest { + private static class TestObservableConfig extends ObservableConfiguration { + @Override + public String get(Property property) { + return null; + } + + @Override + public void getProperties(Map<String,String> props, PropertyFilter filter) {} + } + + private ObservableConfiguration c; + private ConfigurationObserver co1; + + @Before + public void setUp() { + c = new TestObservableConfig(); + co1 = createMock(ConfigurationObserver.class); + } + + @Test + public void testAddAndRemove() { + ConfigurationObserver co2 = createMock(ConfigurationObserver.class); + c.addObserver(co1); + c.addObserver(co2); + Collection<ConfigurationObserver> cos = c.getObservers(); + assertEquals(2, cos.size()); + assertTrue(cos.contains(co1)); + assertTrue(cos.contains(co2)); + c.removeObserver(co1); + cos = c.getObservers(); + assertEquals(1, cos.size()); + assertTrue(cos.contains(co2)); + } + + @Test(expected = NullPointerException.class) + public void testNoNullAdd() { + c.addObserver(null); + } + + @Test + public void testSessionExpired() { + c.addObserver(co1); + co1.sessionExpired(); + replay(co1); + c.expireAllObservers(); + verify(co1); + } + + @Test + public void testPropertyChanged() { + String key = "key"; + c.addObserver(co1); + co1.propertyChanged(key); + replay(co1); + c.propertyChanged(key); + verify(co1); + } + + @Test + public void testPropertiesChanged() { + c.addObserver(co1); + co1.propertiesChanged(); + replay(co1); + c.propertiesChanged(); + verify(co1); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java index 7f3d73e..50ab27e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java @@ -72,7 +72,7 @@ class NamespaceConfWatcher implements Watcher { ServerConfiguration.getNamespaceConfiguration(instance, namespaceId).propertyChanged(key); break; case NodeChildrenChanged: - ServerConfiguration.getNamespaceConfiguration(instance, namespaceId).propertiesChanged(key); + ServerConfiguration.getNamespaceConfiguration(instance, namespaceId).propertiesChanged(); break; case NodeDeleted: if (key == null) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java index eab198e..ebb098b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java @@ -16,18 +16,15 @@ */ package org.apache.accumulo.server.conf; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.Namespaces; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationObserver; +import org.apache.accumulo.core.conf.ObservableConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; @@ -35,7 +32,7 @@ import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.log4j.Logger; -public class NamespaceConfiguration extends AccumuloConfiguration { +public class NamespaceConfiguration extends ObservableConfiguration { private static final Logger log = Logger.getLogger(NamespaceConfiguration.class); private final AccumuloConfiguration parent; @@ -43,7 +40,6 @@ public class NamespaceConfiguration extends AccumuloConfiguration { private static final Object lock = new Object(); protected String namespaceId = null; protected Instance inst = null; - private Set<ConfigurationObserver> observers; public NamespaceConfiguration(String namespaceId, AccumuloConfiguration parent) { this(namespaceId, HdfsZooInstance.getInstance(), parent); @@ -53,7 +49,6 @@ public class NamespaceConfiguration extends AccumuloConfiguration { this.inst = inst; this.parent = parent; this.namespaceId = namespaceId; - this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>()); } @Override @@ -142,6 +137,7 @@ public class NamespaceConfiguration extends AccumuloConfiguration { return namespaceId; } + @Override public void addObserver(ConfigurationObserver co) { if (namespaceId == null) { String err = "Attempt to add observer for non-namespace configuration"; @@ -149,34 +145,17 @@ public class NamespaceConfiguration extends AccumuloConfiguration { throw new RuntimeException(err); } iterator(); - observers.add(co); + super.addObserver(co); } - public void removeObserver(ConfigurationObserver configObserver) { + @Override + public void removeObserver(ConfigurationObserver co) { if (namespaceId == null) { String err = "Attempt to remove observer for non-namespace configuration"; log.error(err); throw new RuntimeException(err); } - observers.remove(configObserver); - } - - public void expireAllObservers() { - Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); - for (ConfigurationObserver co : copy) - co.sessionExpired(); - } - - public void propertyChanged(String key) { - Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); - for (ConfigurationObserver co : copy) - co.propertyChanged(key); - } - - public void propertiesChanged(String key) { - Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); - for (ConfigurationObserver co : copy) - co.propertiesChanged(); + super.removeObserver(co); } protected boolean isIteratorOrConstraint(String key) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java index c407309..3c24ec4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java @@ -72,7 +72,7 @@ class TableConfWatcher implements Watcher { ServerConfiguration.getTableConfiguration(instance, tableId).propertyChanged(key); break; case NodeChildrenChanged: - ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged(key); + ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged(); break; case NodeDeleted: if (key == null) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java index 909b450..a9c9411 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java @@ -16,17 +16,14 @@ */ package org.apache.accumulo.server.conf; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationObserver; +import org.apache.accumulo.core.conf.ObservableConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; @@ -34,7 +31,7 @@ import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.log4j.Logger; -public class TableConfiguration extends AccumuloConfiguration { +public class TableConfiguration extends ObservableConfiguration { private static final Logger log = Logger.getLogger(TableConfiguration.class); // Need volatile keyword to ensure double-checked locking works as intended @@ -46,7 +43,6 @@ public class TableConfiguration extends AccumuloConfiguration { private final NamespaceConfiguration parent; private String table = null; - private Set<ConfigurationObserver> observers; public TableConfiguration(String instanceId, String table, NamespaceConfiguration parent) { this(instanceId, HdfsZooInstance.getInstance(), table, parent); @@ -57,8 +53,6 @@ public class TableConfiguration extends AccumuloConfiguration { this.instance = instance; this.table = table; this.parent = parent; - - this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>()); } private void initializeZooCache() { @@ -76,6 +70,7 @@ public class TableConfiguration extends AccumuloConfiguration { return tablePropCache; } + @Override public void addObserver(ConfigurationObserver co) { if (table == null) { String err = "Attempt to add observer for non-table configuration"; @@ -83,34 +78,17 @@ public class TableConfiguration extends AccumuloConfiguration { throw new RuntimeException(err); } iterator(); - observers.add(co); + super.addObserver(co); } - public void removeObserver(ConfigurationObserver configObserver) { + @Override + public void removeObserver(ConfigurationObserver co) { if (table == null) { String err = "Attempt to remove observer for non-table configuration"; log.error(err); throw new RuntimeException(err); } - observers.remove(configObserver); - } - - public void expireAllObservers() { - Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); - for (ConfigurationObserver co : copy) - co.sessionExpired(); - } - - public void propertyChanged(String key) { - Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); - for (ConfigurationObserver co : copy) - co.propertyChanged(key); - } - - public void propertiesChanged(String key) { - Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers); - for (ConfigurationObserver co : copy) - co.propertiesChanged(); + super.removeObserver(co); } @Override