keith-turner commented on code in PR #5256:
URL: https://github.com/apache/accumulo/pull/5256#discussion_r1927223793
##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -119,6 +124,7 @@ private static void digestAuth(ZooKeeper zoo, String
secret) {
private final String sessionName;
private final int timeout;
private final ZooReaderWriter zrw;
+ private final Map<String,Watcher> persistentWatcherPaths = new HashMap<>();
Review Comment:
Its possible that different ZooCaches or other future code add the same path
w/ different Watcher instances. W/ this Map the last one to add wins removing
the previous additions. Maybe could track the added watches in the following
way?
```suggestion
private static class AddedWatches {
private final Set<String> paths;
private final Watcher watcher;
}
private final List<AddedWatches> persistentWatcherPaths = new
ArrayList<>();
```
Could also do something like the following maybe, however this make
assumptions about hashCode and equals on the watcher that may not be valid.
```suggestion
private final Map<Watcher,List<String>> persistentWatcherPaths = new
HashMap<>();
```
##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -290,9 +318,19 @@ public void sync(final String path, VoidCallback cb,
Object ctx) {
verifyConnected().sync(path, cb, ctx);
}
+ public void addPersistentRecursiveWatchers(Set<String> paths, Watcher
watcher)
+ throws KeeperException, InterruptedException {
+ for (String path : paths) {
+ verifyConnected().addWatch(path, watcher,
AddWatchMode.PERSISTENT_RECURSIVE);
+ persistentWatcherPaths.put(path, watcher);
+ log.debug("Added persistent recursive watcher at {}", path);
+ }
+ }
Review Comment:
During this loop the zookeepers could potentially change multiple times and
the function could run concurrently w/ reconnect. That is making it harder to
reason about these changes, could sync the method to prevent it from running at
the same time as reconnect.
```suggestion
public synchronized void addPersistentRecursiveWatchers(Set<String> paths,
Watcher watcher)
throws KeeperException, InterruptedException {
// this is not really needed if the method is synchronized, just changed
it for code clarity.
var zk = verifyConnected();
for (String path : paths) {
zk.addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE);
persistentWatcherPaths.put(path, watcher);
log.debug("Added persistent recursive watcher at {}", path);
}
}
```
##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -188,6 +194,28 @@ private synchronized ZooKeeper reconnect() {
digestAuth(zk, instanceSecret);
}
tryAgain = false;
+ if (!persistentWatcherPaths.isEmpty()) {
+ // We need to wait until the connection is alive, else we run
into
+ // a case where addPersistentRecursiveWatchers calls
verifyConnected
+ // which calls reconnect.
+ do {
+ UtilWaitThread.sleep(100);
+ } while (!zk.getState().isAlive());
+ for (Entry<String,Watcher> entry :
persistentWatcherPaths.entrySet()) {
+ try {
+ addPersistentRecursiveWatchers(Set.of(entry.getKey()),
entry.getValue());
+ } catch (KeeperException e) {
+ log.error("Error setting persistent recursive watcher at " +
entry.getKey(), e);
+ tryAgain = true;
+ break;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted setting persistent recursive watcher
at " + entry.getKey(),
+ e);
+ tryAgain = true;
+ }
+ }
Review Comment:
Somehow after successfully adding new watches the cache must be cleared to
avoid a situation like the following from causing problems.
1. Zoocache is created with ZK1
2. Something bad happens to ZK1 and it becomes non-functional
3. The process idles for X seconds during which time many changes are made
on zookeeper servers that the process is completely unaware of.
4. Eventually Zk2 is created and watches are added.
5. The changes that happened during step 3 were not cleared from the cache
and may never be cleared.
Always clearing the cache after successfully adding all watches to a single
ZK should avoid the problem above. This ensures that anything that happened
while no watches were set will eventually be seen. Clearing the caches after
all watches are set avoids race conditions, if the caches was cleared prior to
readding watches then there could be race condtions.
The current code has handling to clear caches on events from ZK1, but these
may happen before the watches are set. However, zoosession has
synchronization that may avoid problems w/ the clearing happening before the
watches are set. So the current code may be correct, but I am having a hard
time proving that to myself.
It would be hacky, but I am wondering if we should go through all the
watchers and send an event like Expired, Closed etc. Or maybe we can create a
new interface like the following
```java
interface PersistentWatcher extends Watcher {
void zookeeperChanged();
}
public void addPersistentRecursiveWatchers(Set<String> paths,
PersistentWatcher watcher)
```
then the reconnect method could call the above method after it changes
zookeepers.
##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java:
##########
@@ -59,68 +63,59 @@ public interface ZooCacheWatcher extends
Consumer<WatchedEvent> {}
private static final Logger log = LoggerFactory.getLogger(ZooCache.class);
- private final ZCacheWatcher watcher = new ZCacheWatcher();
- private final Optional<ZooCacheWatcher> externalWatcher;
+ protected volatile NavigableSet<String> watchedPaths =
+ Collections.unmodifiableNavigableSet(new TreeSet<>());
+ // visible for tests
+ protected final ZCacheWatcher watcher = new ZCacheWatcher();
+ private final List<ZooCacheWatcher> externalWatchers =
+ Collections.synchronizedList(new ArrayList<>());
private static final AtomicLong nextCacheId = new AtomicLong(0);
private final String cacheId = "ZC" + nextCacheId.incrementAndGet();
- // The concurrent map returned by Caffiene will only allow one thread to run
at a time for a given
- // key and ZooCache relies on that. Not all concurrent map implementations
have this behavior for
- // their compute functions.
+ public static final Duration CACHE_DURATION = Duration.ofMinutes(30);
+
+ private final Cache<String,ZcNode> cache;
+
private final ConcurrentMap<String,ZcNode> nodeCache;
private final ZooSession zk;
private volatile boolean closed = false;
- public static class ZcStat {
- private long ephemeralOwner;
- private long mzxid;
-
- public ZcStat() {}
-
- private ZcStat(Stat stat) {
- this.ephemeralOwner = stat.getEphemeralOwner();
- this.mzxid = stat.getMzxid();
- }
-
- public long getEphemeralOwner() {
- return ephemeralOwner;
- }
-
- private void set(ZcStat cachedStat) {
- this.ephemeralOwner = cachedStat.ephemeralOwner;
- this.mzxid = cachedStat.mzxid;
- }
-
- @VisibleForTesting
- public void setEphemeralOwner(long ephemeralOwner) {
- this.ephemeralOwner = ephemeralOwner;
- }
-
- public long getMzxid() {
- return mzxid;
- }
- }
-
private final AtomicLong updateCount = new AtomicLong(0);
- private class ZCacheWatcher implements Watcher {
+ class ZCacheWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (log.isTraceEnabled()) {
log.trace("{}: {}", cacheId, event);
}
switch (event.getType()) {
- case NodeDataChanged:
case NodeChildrenChanged:
- case NodeCreated:
- case NodeDeleted:
+ // According to documentation we should not receive this event.
+ // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475
we
+ // may receive this event (Fixed in 3.9.0)
+ break;
case ChildWatchRemoved:
case DataWatchRemoved:
- remove(event.getPath());
+ // We don't need to do anything with the cache on these events.
+ break;
+ case NodeDataChanged:
+ log.trace("{} node data changed; clearing {}", cacheId,
event.getPath());
+ clear(path -> path.equals(event.getPath()));
+ break;
+ case NodeCreated:
+ case NodeDeleted:
+ // With the Watcher being set at a higher level we need to remove
+ // the parent of the affected node and all of its children from the
cache
+ // so that the parent and children node can be re-cached. If we only
remove the
+ // affected node, then the cached children in the parent could be
incorrect.
+ int lastSlash = event.getPath().lastIndexOf('/');
+ String parent = lastSlash == 0 ? "/" : event.getPath().substring(0,
lastSlash);
+ log.trace("{} node created or deleted {}; clearing {}", cacheId,
event.getPath(), parent);
+ clear((path) -> path.startsWith(parent));
Review Comment:
A bit further down there is code that does nothing on a close event and has
a comment about why. The reason in the comment is no longer valid, so the
comment should be removed. Also should probably clear the cache on a closed
event.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]