keith-turner commented on code in PR #5256:
URL: https://github.com/apache/accumulo/pull/5256#discussion_r1924089705
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java:
##########
@@ -152,53 +184,78 @@ public void process(WatchedEvent event) {
break;
}
- externalWatcher.ifPresent(w -> w.accept(event));
+ externalWatchers.forEach(ew -> ew.accept(event));
}
}
/**
- * Creates a new cache without an external watcher.
+ * Creates a ZooCache instance that uses the supplied ZooSession for
communicating with the
+ * instance's ZooKeeper servers. The ZooCache will create persistent
watchers at the given
+ * pathsToWatch, if any, to be updated when changes are made in ZooKeeper
for nodes at or below in
+ * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher},
then they will be
+ * notified when this object is notified of changes via the
PersistentWatcher callback.
*
- * @param zk the ZooKeeper instance
- * @throws NullPointerException if zk is {@code null}
+ * @param zk ZooSession for this instance
+ * @param pathsToWatch Paths in ZooKeeper to watch
*/
- public ZooCache(ZooSession zk) {
- this(zk, Optional.empty(), Duration.ofMinutes(3));
+ public ZooCache(ZooSession zk, Set<String> pathsToWatch) {
+ this(zk, pathsToWatch, Ticker.systemTicker());
}
- /**
- * Creates a new cache. The given watcher is called whenever a watched node
changes.
- *
- * @param zk the ZooKeeper instance
- * @param watcher watcher object
- * @throws NullPointerException if zk or watcher is {@code null}
- */
- public ZooCache(ZooSession zk, ZooCacheWatcher watcher) {
- this(zk, Optional.of(watcher), Duration.ofMinutes(3));
+ // for tests that use a Ticker
+ public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
+ this.zk = requireNonNull(zk);
+ this.cache =
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
+
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
+ this.nodeCache = cache.asMap();
+ this.pathsToWatch = requireNonNull(pathsToWatch);
+ setupWatchers(pathsToWatch);
+ log.trace("{} created new cache", cacheId, new Exception());
}
- public ZooCache(ZooSession zk, Optional<ZooCacheWatcher> watcher, Duration
timeout) {
- this.zk = requireNonNull(zk);
- this.externalWatcher = watcher;
- RemovalListener<String,ZcNode> removalListerner = (path, zcNode, reason)
-> {
- try {
- log.trace("{} removing watches for {} because {} accesses {}",
cacheId, path, reason,
- zcNode == null ? -1 : zcNode.getAccessCount());
- zk.removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any,
false);
- } catch (InterruptedException | KeeperException | RuntimeException e) {
- log.warn("{} failed to remove watches on path {} in zookeeper",
cacheId, path, e);
+ public void addZooCacheWatcher(ZooCacheWatcher watcher) {
+ externalWatchers.add(requireNonNull(watcher));
+ }
+
+ // Visible for testing
+ protected void setupWatchers(Set<String> pathsToWatch) {
+
+ for (String left : pathsToWatch) {
+ for (String right : pathsToWatch) {
+ if (!left.equals(right) && left.contains(right)) {
+ throw new IllegalArgumentException(
+ "Overlapping paths found in paths to watch. left: " + left + ",
right: " + right);
+ }
}
- };
- // Must register the removal listener using evictionListener inorder for
removal to be mutually
- // exclusive with any other operations on the same path. This is important
for watcher
- // consistency, concurrently adding and removing watches for the same path
would leave zoocache
- // in a really bad state. The cache builder has another way to register a
removal listener that
- // is not mutually exclusive.
- Cache<String,ZcNode> cache =
- Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE,
false)
-
.expireAfterAccess(timeout).evictionListener(removalListerner).build();
- nodeCache = cache.asMap();
- log.trace("{} created new cache", cacheId, new Exception());
+ }
+
+ try {
+ for (String path : pathsToWatch) {
+ watchedPaths.add(path);
+ zk.addPersistentRecursiveWatcher(path, this.watcher);
+ log.trace("Added persistent recursive watcher at {}", path);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new RuntimeException("Error setting up persistent recursive
watcher", e);
+ }
+ }
+
+ private boolean isWatchedPath(String path) {
+ // Check that the path is equal to, or a descendant of, a watched path
+ for (String watchedPath : watchedPaths) {
+ if (path.startsWith(watchedPath)) {
+ return true;
+ }
+ }
+ return false;
Review Comment:
Can use the floor function on treeset to quickly find the string with the
same prefix. I was experimenting with this and found that calling startsWith
is not sufficient to ensure something is watched. For example if the code
watches the path `/accumulo/iid/gc/lock`, then the path
`/accumulo/iid/gc/lock1` starts with `/accumulo/iid/gc/lock` but it is not
watched because `lock1` is a sibling to `lock` in ZK. Added
`floor.equals(path) || path.startsWith(floor+"/")` to deal with this, but I
don't like that it allocates a string.
```suggestion
var floor = ts.floor(path);
return floor != null && (floor.equals(path) ||
path.startsWith(floor+"/"));
```
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java:
##########
@@ -152,53 +184,78 @@ public void process(WatchedEvent event) {
break;
}
- externalWatcher.ifPresent(w -> w.accept(event));
+ externalWatchers.forEach(ew -> ew.accept(event));
}
}
/**
- * Creates a new cache without an external watcher.
+ * Creates a ZooCache instance that uses the supplied ZooSession for
communicating with the
+ * instance's ZooKeeper servers. The ZooCache will create persistent
watchers at the given
+ * pathsToWatch, if any, to be updated when changes are made in ZooKeeper
for nodes at or below in
+ * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher},
then they will be
+ * notified when this object is notified of changes via the
PersistentWatcher callback.
*
- * @param zk the ZooKeeper instance
- * @throws NullPointerException if zk is {@code null}
+ * @param zk ZooSession for this instance
+ * @param pathsToWatch Paths in ZooKeeper to watch
*/
- public ZooCache(ZooSession zk) {
- this(zk, Optional.empty(), Duration.ofMinutes(3));
+ public ZooCache(ZooSession zk, Set<String> pathsToWatch) {
+ this(zk, pathsToWatch, Ticker.systemTicker());
}
- /**
- * Creates a new cache. The given watcher is called whenever a watched node
changes.
- *
- * @param zk the ZooKeeper instance
- * @param watcher watcher object
- * @throws NullPointerException if zk or watcher is {@code null}
- */
- public ZooCache(ZooSession zk, ZooCacheWatcher watcher) {
- this(zk, Optional.of(watcher), Duration.ofMinutes(3));
+ // for tests that use a Ticker
+ public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
+ this.zk = requireNonNull(zk);
+ this.cache =
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
+
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
+ this.nodeCache = cache.asMap();
+ this.pathsToWatch = requireNonNull(pathsToWatch);
+ setupWatchers(pathsToWatch);
+ log.trace("{} created new cache", cacheId, new Exception());
}
- public ZooCache(ZooSession zk, Optional<ZooCacheWatcher> watcher, Duration
timeout) {
- this.zk = requireNonNull(zk);
- this.externalWatcher = watcher;
- RemovalListener<String,ZcNode> removalListerner = (path, zcNode, reason)
-> {
- try {
- log.trace("{} removing watches for {} because {} accesses {}",
cacheId, path, reason,
- zcNode == null ? -1 : zcNode.getAccessCount());
- zk.removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any,
false);
- } catch (InterruptedException | KeeperException | RuntimeException e) {
- log.warn("{} failed to remove watches on path {} in zookeeper",
cacheId, path, e);
+ public void addZooCacheWatcher(ZooCacheWatcher watcher) {
+ externalWatchers.add(requireNonNull(watcher));
+ }
+
+ // Visible for testing
+ protected void setupWatchers(Set<String> pathsToWatch) {
+
+ for (String left : pathsToWatch) {
+ for (String right : pathsToWatch) {
+ if (!left.equals(right) && left.contains(right)) {
+ throw new IllegalArgumentException(
+ "Overlapping paths found in paths to watch. left: " + left + ",
right: " + right);
+ }
}
- };
- // Must register the removal listener using evictionListener inorder for
removal to be mutually
- // exclusive with any other operations on the same path. This is important
for watcher
- // consistency, concurrently adding and removing watches for the same path
would leave zoocache
- // in a really bad state. The cache builder has another way to register a
removal listener that
- // is not mutually exclusive.
- Cache<String,ZcNode> cache =
- Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE,
false)
-
.expireAfterAccess(timeout).evictionListener(removalListerner).build();
- nodeCache = cache.asMap();
- log.trace("{} created new cache", cacheId, new Exception());
+ }
+
+ try {
+ for (String path : pathsToWatch) {
+ watchedPaths.add(path);
+ zk.addPersistentRecursiveWatcher(path, this.watcher);
Review Comment:
If the path is added to watchedPaths after then in the case of an excpetion
it will never be added, that would cause `ensureWatched` calls to fail for this
substree.
```suggestion
zk.addPersistentRecursiveWatcher(path, this.watcher);
watchedPaths.add(path);
```
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java:
##########
@@ -59,17 +63,29 @@ public interface ZooCacheWatcher extends
Consumer<WatchedEvent> {}
private static final Logger log = LoggerFactory.getLogger(ZooCache.class);
- private final ZCacheWatcher watcher = new ZCacheWatcher();
- private final Optional<ZooCacheWatcher> externalWatcher;
+ protected final TreeSet<String> watchedPaths = new TreeSet<>();
Review Comment:
This is modfied outside of the constructor so it needs to be made a
synchronized set or a volatile pointer to an immutable set that gets recreated
w/ each change.
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java:
##########
@@ -136,7 +166,9 @@ public void process(WatchedEvent event) {
clear();
break;
case SyncConnected:
- log.trace("{} ZooKeeper connection established, ignoring; {}",
cacheId, event);
+ log.trace("{} ZooKeeper connection established, re-establishing
watchers; {}",
Review Comment:
When a reconnect happens, we probably want to do the following zoocache
1. Clear the cache
2. Add recursive watchers to zookeper
3. For successfully added watches, reset watchedPaths to only contain them.
Do not want any paths that failed to add in this set after reconnect.
Its important to reset watchedPaths because there could be exceptions adding
them back, so reseting watchedPaths helps deal with that issue. Was not sure
about clearing the cache, but it seems like a good thing to do because its not
really known why the reconnect is happening and if we will get an event from
the old zookeeper object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]