http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java b/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java index f32fb70..3c88f48 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java +++ b/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java @@ -28,49 +28,49 @@ import org.apache.log4j.Logger; /** * This store removes Repos, in the store it wraps, that are in a finished or new state for more than a configurable time period. - * + * * No external time source is used. It starts tracking idle time when its created. - * + * */ public class AgeOffStore<T> implements TStore<T> { - + public interface TimeSource { long currentTimeMillis(); } - + final private static Logger log = Logger.getLogger(AgeOffStore.class); - + private TStore<T> store; private Map<Long,Long> candidates; private long ageOffTime; private long minTime; private TimeSource timeSource; - + private synchronized void updateMinTime() { minTime = Long.MAX_VALUE; - + for (Long time : candidates.values()) { if (time < minTime) minTime = time; } } - + private synchronized void addCandidate(long txid) { long time = timeSource.currentTimeMillis(); candidates.put(txid, time); if (time < minTime) minTime = time; } - + private synchronized void removeCandidate(long txid) { Long time = candidates.remove(txid); if (time != null && time <= minTime) updateMinTime(); } - + public void ageOff() { HashSet<Long> oldTxs = new HashSet<Long>(); - + synchronized (this) { long time = timeSource.currentTimeMillis(); if (minTime < time && time - minTime >= ageOffTime) { @@ -79,12 +79,12 @@ public class AgeOffStore<T> implements TStore<T> { oldTxs.add(entry.getKey()); } } - + candidates.keySet().removeAll(oldTxs); updateMinTime(); } } - + for (Long txid : oldTxs) { try { store.reserve(txid); @@ -99,7 +99,7 @@ public class AgeOffStore<T> implements TStore<T> { default: break; } - + } finally { store.unreserve(txid, 0); } @@ -108,15 +108,15 @@ public class AgeOffStore<T> implements TStore<T> { } } } - + public AgeOffStore(TStore<T> store, long ageOffTime, TimeSource timeSource) { this.store = store; this.ageOffTime = ageOffTime; this.timeSource = timeSource; candidates = new HashMap<Long,Long>(); - + minTime = Long.MAX_VALUE; - + List<Long> txids = store.list(); for (Long txid : txids) { store.reserve(txid); @@ -135,7 +135,7 @@ public class AgeOffStore<T> implements TStore<T> { } } } - + public AgeOffStore(TStore<T> store, long ageOffTime) { this(store, ageOffTime, new TimeSource() { @Override @@ -144,53 +144,53 @@ public class AgeOffStore<T> implements TStore<T> { } }); } - + @Override public long create() { long txid = store.create(); addCandidate(txid); return txid; } - + @Override public long reserve() { return store.reserve(); } - + @Override public void reserve(long tid) { store.reserve(tid); } - + @Override public void unreserve(long tid, long deferTime) { store.unreserve(tid, deferTime); } - + @Override public Repo<T> top(long tid) { return store.top(tid); } - + @Override public void push(long tid, Repo<T> repo) throws StackOverflowException { store.push(tid, repo); } - + @Override public void pop(long tid) { store.pop(tid); } - + @Override public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) { return store.getStatus(tid); } - + @Override public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) { store.setStatus(tid, status); - + switch (status) { case IN_PROGRESS: case FAILED_IN_PROGRESS: @@ -204,28 +204,28 @@ public class AgeOffStore<T> implements TStore<T> { break; } } - + @Override public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus> expected) { return store.waitForStatusChange(tid, expected); } - + @Override public void setProperty(long tid, String prop, Serializable val) { store.setProperty(tid, prop, val); } - + @Override public Serializable getProperty(long tid, String prop) { return store.getProperty(tid, prop); } - + @Override public void delete(long tid) { store.delete(tid); removeCandidate(tid); } - + @Override public List<Long> list() { return store.list();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java index 24d00d9..9ae7acb 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java +++ b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyRepo.java @@ -19,8 +19,8 @@ package org.apache.accumulo.fate; /** * Read only access to a repeatable persisted operation. * - * By definition, these methods are safe to call without impacting the state of FATE. They should also be - * safe to call without impacting the state of system components. + * By definition, these methods are safe to call without impacting the state of FATE. They should also be safe to call without impacting the state of system + * components. * */ public interface ReadOnlyRepo<T> { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java index ad5e7e1..1f01090 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java +++ b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyStore.java @@ -24,9 +24,9 @@ import com.google.common.base.Preconditions; /** * This store decorates a TStore to make sure it can not be modified. - * + * * Unlike relying directly on the ReadOnlyTStore interface, this class will not allow subsequent users to cast back to a mutable TStore successfully. - * + * */ public class ReadOnlyStore<T> implements ReadOnlyTStore<T> { @@ -58,7 +58,7 @@ public class ReadOnlyStore<T> implements ReadOnlyTStore<T> { /** * Decorates a Repo to make sure it is treated as a ReadOnlyRepo. - * + * * Similar to ReadOnlyStore, won't allow subsequent user to cast a ReadOnlyRepo back to a mutable Repo. */ protected static class ReadOnlyRepoWrapper<X> implements ReadOnlyRepo<X> { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java index d390139..5c1344a 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java +++ b/fate/src/main/java/org/apache/accumulo/fate/ReadOnlyTStore.java @@ -23,8 +23,8 @@ import java.util.List; /** * Read only access to a Transaction Store. * - * A transaction consists of a number of operations. Instances of this class may check on the queue of outstanding - * transactions but may neither modify them nor create new ones. + * A transaction consists of a number of operations. Instances of this class may check on the queue of outstanding transactions but may neither modify them nor + * create new ones. */ public interface ReadOnlyTStore<T> { @@ -49,8 +49,7 @@ public interface ReadOnlyTStore<T> { /** * Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS. * - * Reserving a transaction id ensures that nothing else in-process interacting via the same instance - * will be operating on that transaction id. + * Reserving a transaction id ensures that nothing else in-process interacting via the same instance will be operating on that transaction id. * * @return a transaction id that is safe to interact with, chosen by the store. */ @@ -59,8 +58,7 @@ public interface ReadOnlyTStore<T> { /** * Reserve the specific tid. * - * Reserving a transaction id ensures that nothing else in-process interacting via the same instance - * will be operating on that transaction id. + * Reserving a transaction id ensures that nothing else in-process interacting via the same instance will be operating on that transaction id. * */ void reserve(long tid); @@ -70,18 +68,20 @@ public interface ReadOnlyTStore<T> { * * upon successful return the store now controls the referenced transaction id. caller should no longer interact with it. * - * @param tid transaction id, previously reserved. - * @param deferTime time in millis to keep this transaction out of the pool used in the {@link #reserve() reserve} method. must be non-negative. + * @param tid + * transaction id, previously reserved. + * @param deferTime + * time in millis to keep this transaction out of the pool used in the {@link #reserve() reserve} method. must be non-negative. */ void unreserve(long tid, long deferTime); - /** * Get the current operation for the given transaction id. * * Caller must have already reserved tid. * - * @param tid transaction id, previously reserved. + * @param tid + * transaction id, previously reserved. * @return a read-only view of the operation */ ReadOnlyRepo<T> top(long tid); @@ -91,7 +91,8 @@ public interface ReadOnlyTStore<T> { * * Caller must have already reserved tid. * - * @param tid transaction id, previously reserved. + * @param tid + * transaction id, previously reserved. * @return execution status */ TStatus getStatus(long tid); @@ -99,8 +100,10 @@ public interface ReadOnlyTStore<T> { /** * Wait for the satus of a transaction to change * - * @param tid transaction id, need not have been reserved. - * @param expected a set of possible statuses we are interested in being notified about. may not be null. + * @param tid + * transaction id, need not have been reserved. + * @param expected + * a set of possible statuses we are interested in being notified about. may not be null. * @return execution status. */ TStatus waitForStatusChange(long tid, EnumSet<TStatus> expected); @@ -110,8 +113,10 @@ public interface ReadOnlyTStore<T> { * * Caller must have already reserved tid. * - * @param tid transaction id, previously reserved. - * @param prop name of property to retrieve. + * @param tid + * transaction id, previously reserved. + * @param prop + * name of property to retrieve. */ Serializable getProperty(long tid, String prop); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/Repo.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/Repo.java b/fate/src/main/java/org/apache/accumulo/fate/Repo.java index b0ebd1a..0dcfd7f 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/Repo.java +++ b/fate/src/main/java/org/apache/accumulo/fate/Repo.java @@ -20,14 +20,14 @@ import java.io.Serializable; /** * Repeatable persisted operation - * + * */ public interface Repo<T> extends ReadOnlyRepo<T>, Serializable { - + Repo<T> call(long tid, T environment) throws Exception; - + void undo(long tid, T environment) throws Exception; - + // this allows the last fate op to return something to the user String getReturn(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java b/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java index 6e38f1b..0f385d4 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java +++ b/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java @@ -17,11 +17,11 @@ package org.apache.accumulo.fate; public class StackOverflowException extends Exception { - + public StackOverflowException(String msg) { super(msg); } - + private static final long serialVersionUID = 1L; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/TStore.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/TStore.java b/fate/src/main/java/org/apache/accumulo/fate/TStore.java index 3adb493..bdcfba3 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/TStore.java +++ b/fate/src/main/java/org/apache/accumulo/fate/TStore.java @@ -20,7 +20,7 @@ import java.io.Serializable; /** * Transaction Store: a place to save transactions - * + * * A transaction consists of a number of operations. To use, first create a transaction id, and then seed the transaction with an initial operation. An executor * service can then execute the transaction's operation, possibly pushing more operations onto the transaction as each step successfully completes. If a step * fails, the stack can be unwound, undoing each operation. @@ -29,14 +29,14 @@ public interface TStore<T> extends ReadOnlyTStore<T> { /** * Create a new transaction id - * + * * @return a transaction id */ long create(); /** * Get the current operation for the given transaction id. - * + * * @param tid * transaction id * @return the operation @@ -46,7 +46,7 @@ public interface TStore<T> extends ReadOnlyTStore<T> { /** * Update the given transaction with the next operation - * + * * @param tid * the transaction id * @param repo @@ -61,7 +61,7 @@ public interface TStore<T> extends ReadOnlyTStore<T> { /** * Update the state of a given transaction - * + * * @param tid * transaction id * @param status @@ -73,7 +73,7 @@ public interface TStore<T> extends ReadOnlyTStore<T> { /** * Remove the transaction from the store. - * + * * @param tid * the transaction id */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java b/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java index 0dc156e..6c8a7d6 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java +++ b/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java @@ -46,7 +46,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException; //TODO document zookeeper layout - ACCUMULO-1298 public class ZooStore<T> implements TStore<T> { - + private String path; private IZooReaderWriter zk; private String lastReserved = ""; @@ -55,21 +55,21 @@ public class ZooStore<T> implements TStore<T> { private SecureRandom idgenerator; private long statusChangeEvents = 0; private int reservationsWaiting = 0; - + private byte[] serialize(Object o) { - + try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(o); oos.close(); - + return baos.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); } } - + private Object deserialize(byte ser[]) { try { ByteArrayInputStream bais = new ByteArrayInputStream(ser); @@ -79,26 +79,26 @@ public class ZooStore<T> implements TStore<T> { throw new RuntimeException(e); } } - + private String getTXPath(long tid) { return String.format("%s/tx_%016x", path, tid); } - + private long parseTid(String txdir) { return Long.parseLong(txdir.split("_")[1], 16); } - + public ZooStore(String path, IZooReaderWriter zk) throws KeeperException, InterruptedException { - + this.path = path; this.zk = zk; this.reserved = new HashSet<Long>(); this.defered = new HashMap<Long,Long>(); this.idgenerator = new SecureRandom(); - + zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } - + @Override public long create() { while (true) { @@ -114,28 +114,28 @@ public class ZooStore<T> implements TStore<T> { } } } - + @Override public long reserve() { try { while (true) { - + long events; synchronized (this) { events = statusChangeEvents; } - + List<String> txdirs = new ArrayList<String>(zk.getChildren(path)); Collections.sort(txdirs); - + synchronized (this) { if (txdirs.size() > 0 && txdirs.get(txdirs.size() - 1).compareTo(lastReserved) <= 0) lastReserved = ""; } - + for (String txdir : txdirs) { long tid = parseTid(txdir); - + synchronized (this) { // this check makes reserve pick up where it left off, so that it cycles through all as it is repeatedly called.... failing to do so can lead to // starvation where fate ops that sort higher and hold a lock are never reserved. @@ -151,13 +151,12 @@ public class ZooStore<T> implements TStore<T> { if (!reserved.contains(tid)) { reserved.add(tid); lastReserved = txdir; - } - else + } else continue; } - + // have reserved id, status should not change - + try { TStatus status = TStatus.valueOf(new String(zk.getData(path + "/" + txdir, null), UTF_8)); if (status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS) { @@ -173,7 +172,7 @@ public class ZooStore<T> implements TStore<T> { throw e; } } - + synchronized (this) { if (events == statusChangeEvents) { if (defered.size() > 0) { @@ -190,7 +189,7 @@ public class ZooStore<T> implements TStore<T> { throw new RuntimeException(e); } } - + public void reserve(long tid) { synchronized (this) { reservationsWaiting++; @@ -201,63 +200,63 @@ public class ZooStore<T> implements TStore<T> { } catch (InterruptedException e) { throw new RuntimeException(e); } - + reserved.add(tid); } finally { reservationsWaiting--; } } } - + private void unreserve(long tid) { synchronized (this) { if (!reserved.remove(tid)) throw new IllegalStateException("Tried to unreserve id that was not reserved " + String.format("%016x", tid)); - + // do not want this unreserve to unesc wake up threads in reserve()... this leads to infinite loop when tx is stuck in NEW... // only do this when something external has called reserve(tid)... if (reservationsWaiting > 0) this.notifyAll(); } } - + @Override public void unreserve(long tid, long deferTime) { - + if (deferTime < 0) throw new IllegalArgumentException("deferTime < 0 : " + deferTime); - + synchronized (this) { if (!reserved.remove(tid)) throw new IllegalStateException("Tried to unreserve id that was not reserved " + String.format("%016x", tid)); - + if (deferTime > 0) defered.put(tid, System.currentTimeMillis() + deferTime); - + this.notifyAll(); } - + } - + private void verifyReserved(long tid) { synchronized (this) { if (!reserved.contains(tid)) throw new IllegalStateException("Tried to operate on unreserved transaction " + String.format("%016x", tid)); } } - + @SuppressWarnings("unchecked") @Override public Repo<T> top(long tid) { verifyReserved(tid); - + while (true) { try { String txpath = getTXPath(tid); String top = findTop(txpath); if (top == null) return null; - + byte[] ser = zk.getData(txpath + "/" + top, null); return (Repo<T>) deserialize(ser); } catch (KeeperException.NoNodeException ex) { @@ -267,35 +266,35 @@ public class ZooStore<T> implements TStore<T> { } } } - + private String findTop(String txpath) throws KeeperException, InterruptedException { List<String> ops = zk.getChildren(txpath); - + ops = new ArrayList<String>(ops); - + String max = ""; - + for (String child : ops) if (child.startsWith("repo_") && child.compareTo(max) > 0) max = child; - + if (max.equals("")) return null; - + return max; } - + @Override public void push(long tid, Repo<T> repo) throws StackOverflowException { verifyReserved(tid); - + String txpath = getTXPath(tid); try { String top = findTop(txpath); if (top != null && Long.parseLong(top.split("_")[1]) > 100) { throw new StackOverflowException("Repo stack size too large"); } - + zk.putPersistentSequential(txpath + "/repo_", serialize(repo)); } catch (StackOverflowException soe) { throw soe; @@ -303,11 +302,11 @@ public class ZooStore<T> implements TStore<T> { throw new RuntimeException(e); } } - + @Override public void pop(long tid) { verifyReserved(tid); - + try { String txpath = getTXPath(tid); String top = findTop(txpath); @@ -318,7 +317,7 @@ public class ZooStore<T> implements TStore<T> { throw new RuntimeException(e); } } - + private TStatus _getStatus(long tid) { try { return TStatus.valueOf(new String(zk.getData(getTXPath(tid), null), UTF_8)); @@ -328,13 +327,13 @@ public class ZooStore<T> implements TStore<T> { throw new RuntimeException(e); } } - + @Override public TStatus getStatus(long tid) { verifyReserved(tid); return _getStatus(tid); } - + @Override public TStatus waitForStatusChange(long tid, EnumSet<TStatus> expected) { while (true) { @@ -342,11 +341,11 @@ public class ZooStore<T> implements TStore<T> { synchronized (this) { events = statusChangeEvents; } - + TStatus status = _getStatus(tid); if (expected.contains(status)) return status; - + synchronized (this) { if (events == statusChangeEvents) { try { @@ -358,38 +357,38 @@ public class ZooStore<T> implements TStore<T> { } } } - + @Override public void setStatus(long tid, TStatus status) { verifyReserved(tid); - + try { zk.putPersistentData(getTXPath(tid), status.name().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); } catch (Exception e) { throw new RuntimeException(e); } - + synchronized (this) { statusChangeEvents++; } - + } - + @Override public void delete(long tid) { verifyReserved(tid); - + try { zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP); } catch (Exception e) { throw new RuntimeException(e); } } - + @Override public void setProperty(long tid, String prop, Serializable so) { verifyReserved(tid); - + try { if (so instanceof String) { zk.putPersistentData(getTXPath(tid) + "/prop_" + prop, ("S " + so).getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); @@ -405,14 +404,14 @@ public class ZooStore<T> implements TStore<T> { throw new RuntimeException(e2); } } - + @Override public Serializable getProperty(long tid, String prop) { verifyReserved(tid); - + try { byte[] data = zk.getData(getTXPath(tid) + "/prop_" + prop, null); - + if (data[0] == 'O') { byte[] sera = new byte[data.length - 2]; System.arraycopy(data, 2, sera, 0, sera.length); @@ -428,7 +427,7 @@ public class ZooStore<T> implements TStore<T> { throw new RuntimeException(e); } } - + @Override public List<Long> list() { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java b/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java index 401bc1a..33e84cf 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java +++ b/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java @@ -29,7 +29,7 @@ public class AddressUtil { /** * Fetch the security value that determines how long DNS failures are cached. Looks up the security property 'networkaddress.cache.negative.ttl'. Should that * fail returns the default value used in the Oracle JVM 1.4+, which is 10 seconds. - * + * * @param originalException * the host lookup that is the source of needing this lookup. maybe be null. * @return positive integer number of seconds http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java b/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java index 95fc6d3..da7c41c 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java +++ b/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java @@ -17,44 +17,44 @@ package org.apache.accumulo.fate.util; public class Daemon extends Thread { - + public Daemon() { setDaemon(true); } - + public Daemon(Runnable target) { super(target); setDaemon(true); } - + public Daemon(String name) { super(name); setDaemon(true); } - + public Daemon(ThreadGroup group, Runnable target) { super(group, target); setDaemon(true); } - + public Daemon(ThreadGroup group, String name) { super(group, name); setDaemon(true); } - + public Daemon(Runnable target, String name) { super(target, name); setDaemon(true); } - + public Daemon(ThreadGroup group, Runnable target, String name) { super(group, target, name); setDaemon(true); } - + public Daemon(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); setDaemon(true); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java b/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java index 32938e0..ba45488 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java +++ b/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java @@ -23,12 +23,12 @@ import org.apache.log4j.Logger; public class LoggingRunnable implements Runnable { private Runnable runnable; private Logger log; - + public LoggingRunnable(Logger log, Runnable r) { this.runnable = r; this.log = log; } - + public void run() { try { runnable.run(); @@ -39,7 +39,7 @@ public class LoggingRunnable implements Runnable { // maybe the logging system is screwed up OR there is a bug in the exception, like t.getMessage() throws a NPE System.err.println("ERROR " + new Date() + " Failed to log message about thread death " + t2.getMessage()); t2.printStackTrace(); - + // try to print original exception System.err.println("ERROR " + new Date() + " Exception that failed to log : " + t.getMessage()); t.printStackTrace(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java b/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java index 32cdb5b..fb5701a 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java +++ b/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java @@ -20,7 +20,7 @@ import org.apache.log4j.Logger; public class UtilWaitThread { private static final Logger log = Logger.getLogger(UtilWaitThread.class); - + public static void sleep(long millis) { try { Thread.sleep(millis); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java index d10fcd2..fc94ff4 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java @@ -33,22 +33,22 @@ import org.apache.log4j.Logger; // A ReadWriteLock that can be implemented in ZooKeeper. Features the ability to store data // with the lock, and recover the lock using that data to find the lock. public class DistributedReadWriteLock implements java.util.concurrent.locks.ReadWriteLock { - + static enum LockType { READ, WRITE, }; - + // serializer for lock type and user data static class ParsedLock { public ParsedLock(LockType type, byte[] userData) { this.type = type; this.userData = Arrays.copyOf(userData, userData.length); } - + public ParsedLock(byte[] lockData) { if (lockData == null || lockData.length < 1) throw new IllegalArgumentException(); - + int split = -1; for (int i = 0; i < lockData.length; i++) { if (lockData[i] == ':') { @@ -56,22 +56,22 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read break; } } - + if (split == -1) throw new IllegalArgumentException(); - + this.type = LockType.valueOf(new String(lockData, 0, split, UTF_8)); this.userData = Arrays.copyOfRange(lockData, split + 1, lockData.length); } - + public LockType getType() { return type; } - + public byte[] getUserData() { return userData; } - + public byte[] getLockData() { byte typeBytes[] = type.name().getBytes(UTF_8); byte[] result = new byte[userData.length + 1 + typeBytes.length]; @@ -80,46 +80,46 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read System.arraycopy(userData, 0, result, typeBytes.length + 1, userData.length); return result; } - + private LockType type; private byte[] userData; } - + // This kind of lock can be easily implemented by ZooKeeper // You make an entry at the bottom of the queue, readers run when there are no writers ahead of them, // a writer only runs when they are at the top of the queue. public interface QueueLock { SortedMap<Long,byte[]> getEarlierEntries(long entry); - + void removeEntry(long entry); - + long addEntry(byte[] data); } - + private static final Logger log = Logger.getLogger(DistributedReadWriteLock.class); - + static class ReadLock implements Lock { - + QueueLock qlock; byte[] userData; long entry = -1; - + ReadLock(QueueLock qlock, byte[] userData) { this.qlock = qlock; this.userData = userData; } - + // for recovery ReadLock(QueueLock qlock, byte[] userData, long entry) { this.qlock = qlock; this.userData = userData; this.entry = entry; } - + protected LockType lockType() { return LockType.READ; } - + @Override public void lock() { while (true) { @@ -131,7 +131,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read } } } - + @Override public void lockInterruptibly() throws InterruptedException { while (!Thread.currentThread().isInterrupted()) { @@ -139,7 +139,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read return; } } - + @Override public boolean tryLock() { if (entry == -1) { @@ -157,7 +157,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); } - + @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { long now = System.currentTimeMillis(); @@ -171,7 +171,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read } return false; } - + @Override public void unlock() { if (entry == -1) @@ -180,28 +180,28 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read qlock.removeEntry(entry); entry = -1; } - + @Override public Condition newCondition() { throw new NotImplementedException(); } } - + static class WriteLock extends ReadLock { - + WriteLock(QueueLock qlock, byte[] userData) { super(qlock, userData); } - + WriteLock(QueueLock qlock, byte[] userData, long entry) { super(qlock, userData, entry); } - + @Override protected LockType lockType() { return LockType.WRITE; } - + @Override public boolean tryLock() { if (entry == -1) { @@ -211,22 +211,22 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry); Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) - throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData, UTF_8) + " lockType " - + lockType()); + throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData, UTF_8) + + " lockType " + lockType()); if (iterator.next().getKey().equals(entry)) return true; return false; } } - + private QueueLock qlock; private byte[] data; - + public DistributedReadWriteLock(QueueLock qlock, byte[] data) { this.qlock = qlock; this.data = Arrays.copyOf(data, data.length); } - + static public Lock recoverLock(QueueLock qlock, byte[] data) { SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE); for (Entry<Long,byte[]> entry : entries.entrySet()) { @@ -242,12 +242,12 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read } return null; } - + @Override public Lock readLock() { return new ReadLock(qlock, data); } - + @Override public Lock writeLock() { return new WriteLock(qlock, data); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java index ad2a191..14d104b 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java @@ -40,7 +40,7 @@ public interface IZooReaderWriter extends IZooReader { boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException; void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException; - + boolean putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls) throws KeeperException, InterruptedException; String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java index cc4bebf..bd27fb9 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java @@ -24,20 +24,21 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; public class TransactionWatcher { - + public interface Arbitrator { boolean transactionAlive(String type, long tid) throws Exception; + boolean transactionComplete(String type, long tid) throws Exception; } - + private static final Logger log = Logger.getLogger(TransactionWatcher.class); final private Map<Long,AtomicInteger> counts = new HashMap<Long,AtomicInteger>(); final private Arbitrator arbitrator; - + public TransactionWatcher(Arbitrator arbitrator) { this.arbitrator = arbitrator; } - + public <T> T run(String ztxBulk, long tid, Callable<T> callable) throws Exception { synchronized (counts) { if (!arbitrator.transactionAlive(ztxBulk, tid)) { @@ -62,7 +63,7 @@ public class TransactionWatcher { } } } - + public boolean isActive(long tid) { synchronized (counts) { log.debug("Transactions in progress " + counts); @@ -70,5 +71,5 @@ public class TransactionWatcher { return count != null && count.get() > 0; } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java index ee53ddd..5cfdbb8 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java @@ -40,8 +40,7 @@ import org.apache.zookeeper.data.Stat; import com.google.common.annotations.VisibleForTesting; /** - * A cache for values stored in ZooKeeper. Values are kept up to date as they - * change. + * A cache for values stored in ZooKeeper. Values are kept up to date as they change. */ public class ZooCache { private static final Logger log = Logger.getLogger(ZooCache.class); @@ -104,31 +103,36 @@ public class ZooCache { /** * Creates a new cache. * - * @param zooKeepers comma-separated list of ZooKeeper host[:port]s - * @param sessionTimeout ZooKeeper session timeout + * @param zooKeepers + * comma-separated list of ZooKeeper host[:port]s + * @param sessionTimeout + * ZooKeeper session timeout */ public ZooCache(String zooKeepers, int sessionTimeout) { this(zooKeepers, sessionTimeout, null); } /** - * Creates a new cache. The given watcher is called whenever a watched node - * changes. + * Creates a new cache. The given watcher is called whenever a watched node changes. * - * @param zooKeepers comma-separated list of ZooKeeper host[:port]s - * @param sessionTimeout ZooKeeper session timeout - * @param watcher watcher object + * @param zooKeepers + * comma-separated list of ZooKeeper host[:port]s + * @param sessionTimeout + * ZooKeeper session timeout + * @param watcher + * watcher object */ public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) { this(new ZooReader(zooKeepers, sessionTimeout), watcher); } /** - * Creates a new cache. The given watcher is called whenever a watched node - * changes. + * Creates a new cache. The given watcher is called whenever a watched node changes. * - * @param reader ZooKeeper reader - * @param watcher watcher object + * @param reader + * ZooKeeper reader + * @param watcher + * watcher object */ public ZooCache(ZooReader reader, Watcher watcher) { this.zReader = reader; @@ -187,7 +191,8 @@ public class ZooCache { /** * Gets the children of the given node. A watch is established by this call. * - * @param zPath path of node + * @param zPath + * path of node * @return children list, or null if node has no children or does not exist */ public synchronized List<String> getChildren(final String zPath) { @@ -222,10 +227,10 @@ public class ZooCache { } /** - * Gets data at the given path. Status information is not returned. A watch is - * established by this call. + * Gets data at the given path. Status information is not returned. A watch is established by this call. * - * @param zPath path to get + * @param zPath + * path to get * @return path data, or null if non-existent */ public synchronized byte[] get(final String zPath) { @@ -233,11 +238,12 @@ public class ZooCache { } /** - * Gets data at the given path, filling status information into the given - * <code>Stat</code> object. A watch is established by this call. + * Gets data at the given path, filling status information into the given <code>Stat</code> object. A watch is established by this call. * - * @param zPath path to get - * @param stat status object to populate + * @param zPath + * path to get + * @param stat + * status object to populate * @return path data, or null if non-existent */ public synchronized byte[] get(final String zPath, Stat stat) { @@ -332,17 +338,20 @@ public class ZooCache { /** * Checks if a data value (or lack of one) is cached. * - * @param zPath path of node + * @param zPath + * path of node * @return true if data value is cached */ @VisibleForTesting synchronized boolean dataCached(String zPath) { return cache.containsKey(zPath); } + /** * Checks if children of a node (or lack of them) are cached. * - * @param zPath path of node + * @param zPath + * path of node * @return true if children are cached */ @VisibleForTesting @@ -353,7 +362,8 @@ public class ZooCache { /** * Clears this cache of all information about nodes rooted at the given path. * - * @param zPath path of top node + * @param zPath + * path of top node */ public synchronized void clear(String zPath) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java index 3c59a00..1475928 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactory.java @@ -18,6 +18,7 @@ package org.apache.accumulo.fate.zookeeper; import java.util.HashMap; import java.util.Map; + import org.apache.zookeeper.Watcher; /** @@ -47,15 +48,17 @@ public class ZooCacheFactory { return zc; } } + /** - * Gets a watched {@link ZooCache}. If the watcher is null, then the same (unwatched) - * object may be returned for multiple calls with the same remaining arguments. + * Gets a watched {@link ZooCache}. If the watcher is null, then the same (unwatched) object may be returned for multiple calls with the same remaining + * arguments. * * @param zooKeepers * comma-seprated list of ZooKeeper host[:port]s * @param sessionTimeout * session timeout - * @param watcher watcher (optional) + * @param watcher + * watcher (optional) * @return cache object */ public ZooCache getZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java index 1391d98..a0100b2 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java @@ -35,28 +35,28 @@ import org.apache.zookeeper.data.Stat; public class ZooLock implements Watcher { private static final Logger log = Logger.getLogger(ZooLock.class); - + public static final String LOCK_PREFIX = "zlock-"; - + public enum LockLossReason { LOCK_DELETED, SESSION_EXPIRED } - + public interface LockWatcher { void lostLock(LockLossReason reason); - + /** * lost the ability to monitor the lock node, and its status is unknown */ void unableToMonitorLockNode(Throwable e); } - + public interface AsyncLockWatcher extends LockWatcher { void acquiredLock(); - + void failedToAcquireLock(Exception e); } - + private boolean lockWasAcquired; final private String path; protected final IZooReaderWriter zooKeeper; @@ -64,11 +64,11 @@ public class ZooLock implements Watcher { private LockWatcher lockWatcher; private boolean watchingParent = false; private String asyncLock; - + public ZooLock(String zookeepers, int timeInMillis, String scheme, byte[] auth, String path) { this(new ZooCacheFactory().getZooCache(zookeepers, timeInMillis), ZooReaderWriter.getInstance(zookeepers, timeInMillis, scheme, auth), path); } - + protected ZooLock(ZooCache zc, IZooReaderWriter zrw, String path) { getLockDataZooCache = zc; this.path = path; @@ -81,66 +81,66 @@ public class ZooLock implements Watcher { throw new RuntimeException(ex); } } - + private static class TryLockAsyncLockWatcher implements AsyncLockWatcher { - + boolean acquiredLock = false; LockWatcher lw; - + public TryLockAsyncLockWatcher(LockWatcher lw2) { this.lw = lw2; } - + @Override public void acquiredLock() { acquiredLock = true; } - + @Override public void failedToAcquireLock(Exception e) {} - + @Override public void lostLock(LockLossReason reason) { lw.lostLock(reason); } - + @Override public void unableToMonitorLockNode(Throwable e) { lw.unableToMonitorLockNode(e); } - + } - + public synchronized boolean tryLock(LockWatcher lw, byte data[]) throws KeeperException, InterruptedException { - + TryLockAsyncLockWatcher tlalw = new TryLockAsyncLockWatcher(lw); - + lockAsync(tlalw, data); - + if (tlalw.acquiredLock) { return true; } - + if (asyncLock != null) { zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP); asyncLock = null; } - + return false; } - + private synchronized void lockAsync(final String myLock, final AsyncLockWatcher lw) throws KeeperException, InterruptedException { - + if (asyncLock == null) { throw new IllegalStateException("Called lockAsync() when asyncLock == null"); } - + List<String> children = zooKeeper.getChildren(path); - + if (!children.contains(myLock)) { throw new RuntimeException("Lock attempt ephemeral node no longer exist " + myLock); } - + Collections.sort(children); if (log.isTraceEnabled()) { log.trace("Candidate lock nodes"); @@ -148,7 +148,7 @@ public class ZooLock implements Watcher { log.trace("- " + child); } } - + if (children.get(0).equals(myLock)) { log.trace("First candidate is my lock, acquiring"); if (!watchingParent) { @@ -166,14 +166,14 @@ public class ZooLock implements Watcher { if (child.equals(myLock)) { break; } - + prev = child; } - + final String lockToWatch = path + "/" + prev; log.trace("Establishing watch on " + lockToWatch); Stat stat = zooKeeper.getStatus(lockToWatch, new Watcher() { - + @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { @@ -224,39 +224,39 @@ public class ZooLock implements Watcher { } } } - + }); - + if (stat == null) lockAsync(myLock, lw); } - + private void lostLock(LockLossReason reason) { LockWatcher localLw = lockWatcher; lock = null; lockWatcher = null; - + localLw.lostLock(reason); } public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) { - + if (lockWatcher != null || lock != null || asyncLock != null) { throw new IllegalStateException(); } - + lockWasAcquired = false; - + try { final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data); log.trace("Ephemeral node " + asyncLockPath + " created"); Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() { - - private void failedToAcquireLock(){ + + private void failedToAcquireLock() { lw.failedToAcquireLock(new Exception("Lock deleted before acquired")); asyncLock = null; } - + public void process(WatchedEvent event) { synchronized (ZooLock.this) { if (lock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + lock)) { @@ -264,13 +264,13 @@ public class ZooLock implements Watcher { } else if (asyncLock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + asyncLock)) { failedToAcquireLock(); } else if (event.getState() != KeeperState.Disconnected && event.getState() != KeeperState.Expired && (lock != null || asyncLock != null)) { - log.debug("Unexpected event watching lock node "+event+" "+asyncLockPath); + log.debug("Unexpected event watching lock node " + event + " " + asyncLockPath); try { Stat stat2 = zooKeeper.getStatus(asyncLockPath, this); - if(stat2 == null){ - if(lock != null) + if (stat2 == null) { + if (lock != null) lostLock(LockLossReason.LOCK_DELETED); - else if(asyncLock != null) + else if (asyncLock != null) failedToAcquireLock(); } } catch (Throwable e) { @@ -278,105 +278,105 @@ public class ZooLock implements Watcher { log.error("Failed to stat lock node " + asyncLockPath, e); } } - + } } }); - + if (stat == null) { lw.failedToAcquireLock(new Exception("Lock does not exist after create")); return; } - + asyncLock = asyncLockPath.substring(path.length() + 1); - + lockAsync(asyncLock, lw); - + } catch (KeeperException e) { lw.failedToAcquireLock(e); } catch (InterruptedException e) { lw.failedToAcquireLock(e); } } - + public synchronized boolean tryToCancelAsyncLockOrUnlock() throws InterruptedException, KeeperException { boolean del = false; - + if (asyncLock != null) { zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP); del = true; } - + if (lock != null) { unlock(); del = true; } - + return del; } - + public synchronized void unlock() throws InterruptedException, KeeperException { if (lock == null) { throw new IllegalStateException(); } - + LockWatcher localLw = lockWatcher; String localLock = lock; - + lock = null; lockWatcher = null; - + zooKeeper.recursiveDelete(path + "/" + localLock, NodeMissingPolicy.SKIP); - + localLw.lostLock(LockLossReason.LOCK_DELETED); } - + public synchronized String getLockPath() { if (lock == null) { return null; } return path + "/" + lock; } - + public synchronized String getLockName() { return lock; } - + public synchronized LockID getLockID() { if (lock == null) { throw new IllegalStateException("Lock not held"); } return new LockID(path, lock, zooKeeper.getZooKeeper().getSessionId()); } - + /** * indicates if the lock was acquired in the past.... helps discriminate between the case where the lock was never held, or held and lost.... - * + * * @return true if the lock was aquired, otherwise false. */ public synchronized boolean wasLockAcquired() { return lockWasAcquired; } - + public synchronized boolean isLocked() { return lock != null; } - + public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException { - if (getLockPath()!=null) + if (getLockPath() != null) zooKeeper.getZooKeeper().setData(getLockPath(), b, -1); } - + @Override public synchronized void process(WatchedEvent event) { log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState()); - + watchingParent = false; if (event.getState() == KeeperState.Expired && lock != null) { lostLock(LockLossReason.SESSION_EXPIRED); } else { - + try { // set the watch on the parent node again zooKeeper.getStatus(path, this); watchingParent = true; @@ -389,151 +389,151 @@ public class ZooLock implements Watcher { log.error("Error resetting watch on ZooLock " + lock == null ? asyncLock : lock + " " + event, ex); } } - + } } - + public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException { - + List<String> children = zk.getChildren(lid.path, false); - + if (children == null || children.size() == 0) { return false; } - + Collections.sort(children); - + String lockNode = children.get(0); if (!lid.node.equals(lockNode)) return false; - + Stat stat = zk.exists(lid.path + "/" + lid.node, false); return stat != null && stat.getEphemeralOwner() == lid.eid; } - + public static boolean isLockHeld(ZooCache zc, LockID lid) { - + List<String> children = zc.getChildren(lid.path); - + if (children == null || children.size() == 0) { return false; } - + children = new ArrayList<String>(children); Collections.sort(children); - + String lockNode = children.get(0); if (!lid.node.equals(lockNode)) return false; - + Stat stat = new Stat(); return zc.get(lid.path + "/" + lid.node, stat) != null && stat.getEphemeralOwner() == lid.eid; } - + public static byte[] getLockData(ZooKeeper zk, String path) throws KeeperException, InterruptedException { List<String> children = zk.getChildren(path, false); - + if (children == null || children.size() == 0) { return null; } - + Collections.sort(children); - + String lockNode = children.get(0); - + return zk.getData(path + "/" + lockNode, false, null); } - + public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path, Stat stat) { - + List<String> children = zc.getChildren(path); - + if (children == null || children.size() == 0) { return null; } - + children = new ArrayList<String>(children); Collections.sort(children); - + String lockNode = children.get(0); - + if (!lockNode.startsWith(LOCK_PREFIX)) { throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node"); } - + return zc.get(path + "/" + lockNode, stat); } - + public static long getSessionId(ZooCache zc, String path) throws KeeperException, InterruptedException { List<String> children = zc.getChildren(path); - + if (children == null || children.size() == 0) { return 0; } - + children = new ArrayList<String>(children); Collections.sort(children); - + String lockNode = children.get(0); - + Stat stat = new Stat(); if (zc.get(path + "/" + lockNode, stat) != null) return stat.getEphemeralOwner(); return 0; } - + private static ZooCache getLockDataZooCache; - + public long getSessionId() throws KeeperException, InterruptedException { return getSessionId(getLockDataZooCache, path); } - + public static void deleteLock(IZooReaderWriter zk, String path) throws InterruptedException, KeeperException { List<String> children; - + children = zk.getChildren(path); - + if (children == null || children.size() == 0) { throw new IllegalStateException("No lock is held at " + path); } - + Collections.sort(children); - + String lockNode = children.get(0); - + if (!lockNode.startsWith(LOCK_PREFIX)) { throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node"); } - + zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.SKIP); - + } - + public static boolean deleteLock(IZooReaderWriter zk, String path, String lockData) throws InterruptedException, KeeperException { List<String> children; - + children = zk.getChildren(path); - + if (children == null || children.size() == 0) { throw new IllegalStateException("No lock is held at " + path); } - + Collections.sort(children); - + String lockNode = children.get(0); - + if (!lockNode.startsWith(LOCK_PREFIX)) { throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node"); } - + byte[] data = zk.getData(path + "/" + lockNode, null); - + if (lockData.equals(new String(data, UTF_8))) { zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.FAIL); return true; } - + return false; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java index 38c7b64..2786c4f 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java @@ -209,5 +209,4 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter { putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP); } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java index f1ba428..c5a0ce3 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java @@ -27,11 +27,11 @@ import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.data.Stat; public class ZooReservation { - + public static boolean attempt(IZooReaderWriter zk, String path, String reservationID, String debugInfo) throws KeeperException, InterruptedException { if (reservationID.contains(":")) throw new IllegalArgumentException(); - + while (true) { try { zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(UTF_8), NodeExistsPolicy.FAIL); @@ -44,18 +44,18 @@ public class ZooReservation { } catch (NoNodeException nne) { continue; } - + String idInZoo = new String(zooData, UTF_8).split(":")[0]; - + return idInZoo.equals(reservationID); } } - + } - + public static void release(IZooReaderWriter zk, String path, String reservationID) throws KeeperException, InterruptedException { byte[] zooData; - + try { zooData = zk.getData(path, null); } catch (NoNodeException e) { @@ -63,15 +63,15 @@ public class ZooReservation { Logger.getLogger(ZooReservation.class).debug("Node does not exist " + path); return; } - + String zooDataStr = new String(zooData, UTF_8); String idInZoo = zooDataStr.split(":")[0]; - + if (!idInZoo.equals(reservationID)) { throw new IllegalStateException("Tried to release reservation " + path + " with data mismatch " + reservationID + " " + zooDataStr); } - + zk.recursiveDelete(path, NodeMissingPolicy.SKIP); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java index 0059af7..6b5ec43 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java @@ -68,11 +68,16 @@ public class ZooSession { } /** - * @param host comma separated list of zk servers - * @param timeout in milliseconds - * @param scheme authentication type, e.g. 'digest', may be null - * @param auth authentication-scheme-specific token, may be null - * @param watcher ZK notifications, may be null + * @param host + * comma separated list of zk servers + * @param timeout + * in milliseconds + * @param scheme + * authentication type, e.g. 'digest', may be null + * @param auth + * authentication-scheme-specific token, may be null + * @param watcher + * ZK notifications, may be null */ public static ZooKeeper connect(String host, int timeout, String scheme, byte[] auth, Watcher watcher) { final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100; @@ -99,7 +104,7 @@ public class ZooSession { } catch (IOException e) { if (e instanceof UnknownHostException) { /* - Make sure we wait atleast as long as the JVM TTL for negative DNS responses + * Make sure we wait atleast as long as the JVM TTL for negative DNS responses */ sleepTime = Math.max(sleepTime, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) e) + 1) * 1000); } @@ -121,14 +126,13 @@ public class ZooSession { if (tryAgain) { if (startTime + 2 * timeout < System.currentTimeMillis() + sleepTime + connectTimeWait) sleepTime = startTime + 2 * timeout - System.currentTimeMillis() - connectTimeWait; - if (sleepTime < 0) - { + if (sleepTime < 0) { connectTimeWait -= sleepTime; sleepTime = 0; } UtilWaitThread.sleep(sleepTime); if (sleepTime < 10000) - sleepTime = sleepTime + (long)(sleepTime * Math.random()); + sleepTime = sleepTime + (long) (sleepTime * Math.random()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java b/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java index 4f5b112..518ab81 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java @@ -25,17 +25,17 @@ import org.junit.Assert; import org.junit.Test; /** - * + * */ public class AgeOffStoreTest { - + private static class TestTimeSource implements TimeSource { long time = 0; - + public long currentTimeMillis() { return time; } - + } @Test @@ -44,14 +44,14 @@ public class AgeOffStoreTest { TestTimeSource tts = new TestTimeSource(); SimpleStore<String> sstore = new SimpleStore<String>(); AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts); - + aoStore.ageOff(); Long txid1 = aoStore.create(); aoStore.reserve(txid1); aoStore.setStatus(txid1, TStatus.IN_PROGRESS); aoStore.unreserve(txid1, 0); - + aoStore.ageOff(); Long txid2 = aoStore.create(); @@ -59,7 +59,7 @@ public class AgeOffStoreTest { aoStore.setStatus(txid2, TStatus.IN_PROGRESS); aoStore.setStatus(txid2, TStatus.FAILED); aoStore.unreserve(txid2, 0); - + tts.time = 6; Long txid3 = aoStore.create(); @@ -67,21 +67,21 @@ public class AgeOffStoreTest { aoStore.setStatus(txid3, TStatus.IN_PROGRESS); aoStore.setStatus(txid3, TStatus.SUCCESSFUL); aoStore.unreserve(txid3, 0); - + Long txid4 = aoStore.create(); - + aoStore.ageOff(); Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size()); - + tts.time = 15; - + aoStore.ageOff(); - + Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid3, txid4)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(3, new HashSet<Long>(aoStore.list()).size()); - + tts.time = 30; aoStore.ageOff(); @@ -89,71 +89,71 @@ public class AgeOffStoreTest { Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size()); } - + @Test public void testNonEmpty() { // test age off when source store starts off non empty - + TestTimeSource tts = new TestTimeSource(); SimpleStore<String> sstore = new SimpleStore<String>(); Long txid1 = sstore.create(); sstore.reserve(txid1); sstore.setStatus(txid1, TStatus.IN_PROGRESS); sstore.unreserve(txid1, 0); - + Long txid2 = sstore.create(); sstore.reserve(txid2); sstore.setStatus(txid2, TStatus.IN_PROGRESS); sstore.setStatus(txid2, TStatus.FAILED); sstore.unreserve(txid2, 0); - + Long txid3 = sstore.create(); sstore.reserve(txid3); sstore.setStatus(txid3, TStatus.IN_PROGRESS); sstore.setStatus(txid3, TStatus.SUCCESSFUL); sstore.unreserve(txid3, 0); - + Long txid4 = sstore.create(); - + AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts); - + Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size()); - + aoStore.ageOff(); - + Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size()); - + tts.time = 15; aoStore.ageOff(); - + Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size()); - + aoStore.reserve(txid1); aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS); aoStore.unreserve(txid1, 0); - + tts.time = 30; - + aoStore.ageOff(); - + Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size()); - + aoStore.reserve(txid1); aoStore.setStatus(txid1, TStatus.FAILED); aoStore.unreserve(txid1, 0); - + aoStore.ageOff(); - + Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list())); Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size()); - + tts.time = 42; - + aoStore.ageOff(); Assert.assertEquals(0, new HashSet<Long>(aoStore.list()).size()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java b/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java index c2d5f92..eea5f1b 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/ReadOnlyStoreTest.java @@ -20,7 +20,6 @@ import java.util.Collections; import java.util.EnumSet; import org.apache.accumulo.fate.ReadOnlyTStore.TStatus; - import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -47,7 +46,7 @@ public class ReadOnlyStoreTest { EasyMock.expect(mock.waitForStatusChange(0xdeadbeefl, EnumSet.allOf(TStatus.class))).andReturn(TStatus.UNKNOWN); EasyMock.expect(mock.getProperty(0xdeadbeefl, "com.example.anyproperty")).andReturn("property"); - EasyMock.expect(mock.list()).andReturn(Collections.<Long>emptyList()); + EasyMock.expect(mock.list()).andReturn(Collections.<Long> emptyList()); EasyMock.replay(repo); EasyMock.replay(mock); @@ -64,7 +63,7 @@ public class ReadOnlyStoreTest { Assert.assertEquals(TStatus.UNKNOWN, store.waitForStatusChange(0xdeadbeefl, EnumSet.allOf(TStatus.class))); Assert.assertEquals("property", store.getProperty(0xdeadbeefl, "com.example.anyproperty")); - Assert.assertEquals(Collections.<Long>emptyList(), store.list()); + Assert.assertEquals(Collections.<Long> emptyList(), store.list()); EasyMock.verify(repo); EasyMock.verify(mock); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java b/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java index 60eabfb..f0bac88 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java +++ b/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java @@ -31,62 +31,62 @@ import org.apache.commons.lang.NotImplementedException; * Transient in memory store for transactions. */ public class SimpleStore<T> implements TStore<T> { - + private long nextId = 1; private Map<Long,TStatus> statuses = new HashMap<Long,TStore.TStatus>(); private Set<Long> reserved = new HashSet<Long>(); - + @Override public long create() { statuses.put(nextId, TStatus.NEW); return nextId++; } - + @Override public long reserve() { throw new NotImplementedException(); } - + @Override public void reserve(long tid) { if (reserved.contains(tid)) throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve twice... if test change, then change this reserved.add(tid); } - + @Override public void unreserve(long tid, long deferTime) { if (!reserved.remove(tid)) { throw new IllegalStateException(); } } - + @Override public Repo<T> top(long tid) { throw new NotImplementedException(); } - + @Override public void push(long tid, Repo<T> repo) throws StackOverflowException { throw new NotImplementedException(); } - + @Override public void pop(long tid) { throw new NotImplementedException(); } - + @Override public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) { if (!reserved.contains(tid)) throw new IllegalStateException(); - + TStatus status = statuses.get(tid); if (status == null) return TStatus.UNKNOWN; return status; } - + @Override public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) { if (!reserved.contains(tid)) @@ -95,32 +95,32 @@ public class SimpleStore<T> implements TStore<T> { throw new IllegalStateException(); statuses.put(tid, status); } - + @Override public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus> expected) { throw new NotImplementedException(); } - + @Override public void setProperty(long tid, String prop, Serializable val) { throw new NotImplementedException(); } - + @Override public Serializable getProperty(long tid, String prop) { throw new NotImplementedException(); } - + @Override public void delete(long tid) { if (!reserved.contains(tid)) throw new IllegalStateException(); statuses.remove(tid); } - + @Override public List<Long> list() { return new ArrayList<Long>(statuses.keySet()); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java b/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java index 6e6a3c3..ab9032d 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; /** * Test the AddressUtil class. - * + * */ public class AddressUtilTest extends TestCase { @@ -88,7 +88,7 @@ public class AddressUtilTest extends TestCase { log.info("AddressUtil is (hopefully) going to spit out an error about DNS lookups. you can ignore it."); AddressUtil.getAddressCacheNegativeTtl(null); fail("The JVM Security settings cache DNS failures forever, this should cause an exception."); - } catch(IllegalArgumentException exception) { + } catch (IllegalArgumentException exception) { assertTrue(true); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java index a9a8e3c..9e540e4 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java @@ -16,31 +16,31 @@ */ package org.apache.accumulo.fate.zookeeper; +import static org.junit.Assert.assertEquals; + import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import static org.junit.Assert.*; - import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock.QueueLock; import org.junit.Test; public class DistributedReadWriteLockTest { - + // Non-zookeeper version of QueueLock public static class MockQueueLock implements QueueLock { - + long next = 0L; final SortedMap<Long,byte[]> locks = new TreeMap<Long,byte[]>(); - + @Override synchronized public SortedMap<Long,byte[]> getEarlierEntries(long entry) { SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>(); result.putAll(locks.headMap(entry + 1)); return result; } - + @Override synchronized public void removeEntry(long entry) { synchronized (locks) { @@ -48,7 +48,7 @@ public class DistributedReadWriteLockTest { locks.notifyAll(); } } - + @Override synchronized public long addEntry(byte[] data) { long result; @@ -59,31 +59,31 @@ public class DistributedReadWriteLockTest { return result; } } - + // some data that is probably not going to update atomically static class SomeData { volatile int[] data = new int[100]; volatile int counter; - + void read() { for (int i = 0; i < data.length; i++) assertEquals(counter, data[i]); } - + void write() { ++counter; for (int i = data.length - 1; i >= 0; i--) data[i] = counter; } } - + @Test public void testLock() throws Exception { final SomeData data = new SomeData(); data.write(); data.read(); QueueLock qlock = new MockQueueLock(); - + final ReadWriteLock locker = new DistributedReadWriteLock(qlock, "locker1".getBytes()); final Lock readLock = locker.readLock(); final Lock writeLock = locker.writeLock(); @@ -93,7 +93,7 @@ public class DistributedReadWriteLockTest { writeLock.unlock(); readLock.lock(); readLock.unlock(); - + // do a bunch of reads/writes in separate threads, look for inconsistent updates Thread[] threads = new Thread[2]; for (int i = 0; i < threads.length; i++) { @@ -128,5 +128,5 @@ public class DistributedReadWriteLockTest { t.join(); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java index 840d33b..0e4e329 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java @@ -23,15 +23,14 @@ import java.util.Map; import java.util.concurrent.Callable; import org.junit.Assert; - import org.junit.Test; public class TransactionWatcherTest { - + static class SimpleArbitrator implements TransactionWatcher.Arbitrator { Map<String,List<Long>> started = new HashMap<String,List<Long>>(); Map<String,List<Long>> cleanedUp = new HashMap<String,List<Long>>(); - + public synchronized void start(String txType, Long txid) throws Exception { List<Long> txids = started.get(txType); if (txids == null) @@ -40,7 +39,7 @@ public class TransactionWatcherTest { throw new Exception("transaction already started"); txids.add(txid); started.put(txType, txids); - + txids = cleanedUp.get(txType); if (txids == null) txids = new ArrayList<Long>(); @@ -49,7 +48,7 @@ public class TransactionWatcherTest { txids.add(txid); cleanedUp.put(txType, txids); } - + public synchronized void stop(String txType, Long txid) throws Exception { List<Long> txids = started.get(txType); if (txids != null && txids.contains(txid)) { @@ -58,7 +57,7 @@ public class TransactionWatcherTest { } throw new Exception("transaction does not exist"); } - + public synchronized void cleanup(String txType, Long txid) throws Exception { List<Long> txids = cleanedUp.get(txType); if (txids != null && txids.contains(txid)) { @@ -67,7 +66,7 @@ public class TransactionWatcherTest { } throw new Exception("transaction does not exist"); } - + @Override synchronized public boolean transactionAlive(String txType, long tid) throws Exception { List<Long> txids = started.get(txType); @@ -83,9 +82,9 @@ public class TransactionWatcherTest { return true; return !txids.contains(tid); } - + } - + @Test public void testTransactionWatcher() throws Exception { final String txType = "someName"; @@ -109,12 +108,12 @@ public class TransactionWatcherTest { } }); Assert.assertFalse(txw.isActive(txid)); - Assert.assertFalse(sa.transactionComplete(txType, txid)); + Assert.assertFalse(sa.transactionComplete(txType, txid)); sa.stop(txType, txid); Assert.assertFalse(sa.transactionAlive(txType, txid)); - Assert.assertFalse(sa.transactionComplete(txType, txid)); + Assert.assertFalse(sa.transactionComplete(txType, txid)); sa.cleanup(txType, txid); - Assert.assertTrue(sa.transactionComplete(txType, txid)); + Assert.assertTrue(sa.transactionComplete(txType, txid)); try { txw.run(txType, txid, new Callable<Object>() { @Override @@ -150,7 +149,7 @@ public class TransactionWatcherTest { return null; } }); - + } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java index e7dffc1..19d8f67 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheFactoryTest.java @@ -16,15 +16,16 @@ */ package org.apache.accumulo.fate.zookeeper; -import org.apache.zookeeper.Watcher; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; import static org.easymock.EasyMock.createMock; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; +import org.apache.zookeeper.Watcher; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + public class ZooCacheFactoryTest { private ZooCacheFactory zcf; @@ -65,6 +66,7 @@ public class ZooCacheFactoryTest { ZooCache zc1 = zcf.getZooCache(zks1, timeout1, watcher); assertNotNull(zc1); } + @Test public void testGetZooCacheWatcher_Null() { String zks1 = "zk1"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java index e3db785..5dd6f61 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java @@ -16,22 +16,6 @@ */ package org.apache.accumulo.fate.zookeeper; -import java.util.List; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import org.easymock.Capture; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; @@ -41,6 +25,23 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; public class ZooCacheTest { private static final String ZPATH = "/some/path/in/zk"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java index 59fb498..9203b39 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriterTest.java @@ -96,7 +96,7 @@ public class ZooReaderWriterTest { @Test(expected = SessionExpiredException.class) public void testMutateNodeCreationFails() throws Exception { final String path = "/foo"; - final byte[] value = new byte[]{0}; + final byte[] value = new byte[] {0}; final List<ACL> acls = Collections.<ACL> emptyList(); Mutator mutator = new Mutator() { @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java ---------------------------------------------------------------------- diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java index fae7e6e..82d3d1d 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooSessionTest.java @@ -21,10 +21,10 @@ import org.junit.Test; public class ZooSessionTest { - private static final int MINIMUM_TIMEOUT=10000; + private static final int MINIMUM_TIMEOUT = 10000; private static final String UNKNOWN_HOST = "hostname.that.should.not.exist.example.com:2181"; - @Test(expected=RuntimeException.class, timeout=MINIMUM_TIMEOUT*4) + @Test(expected = RuntimeException.class, timeout = MINIMUM_TIMEOUT * 4) public void testUnknownHost() throws Exception { ZooKeeper session = ZooSession.connect(UNKNOWN_HOST, MINIMUM_TIMEOUT, null, null, null); session.close();