This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 4b46991aa5 Add a new AccumuloStore for FATE (#4049) 4b46991aa5 is described below commit 4b46991aa5d16c529af59d50772e6269acfd415b Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Fri Dec 22 07:50:21 2023 -0500 Add a new AccumuloStore for FATE (#4049) This adds support for storing FATE data in an Accumulo table instead of storing in Zookeeper Co-authored-by: Keith Turner <ktur...@apache.org> --- .../accumulo/core/fate/AbstractFateStore.java | 328 +++++++++++++++++++ .../org/apache/accumulo/core/fate/ZooStore.java | 290 +---------------- .../accumulo/core/fate/accumulo/AccumuloStore.java | 287 +++++++++++++++++ .../accumulo/core/fate/accumulo/FateMutator.java | 47 +++ .../core/fate/accumulo/FateMutatorImpl.java | 143 +++++++++ .../core/fate/accumulo/schema/FateSchema.java | 59 ++++ .../java/org/apache/accumulo/test/fate/FateIT.java | 289 +++++++++++++++++ .../test/fate/accumulo/AccumuloFateIT.java | 91 ++++++ .../fate/accumulo/AccumuloStoreReadWriteIT.java | 131 ++++++++ .../accumulo/test/fate/zookeeper/FateIT.java | 352 --------------------- .../test/fate/zookeeper/ZookeeperFateIT.java | 119 +++++++ 11 files changed, 1507 insertions(+), 629 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java new file mode 100644 index 0000000000..5e840d3247 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public abstract class AbstractFateStore<T> implements FateStore<T> { + + private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); + + protected final Set<Long> reserved; + protected final Map<Long,Long> defered; + + // This is incremented each time a transaction was unreserved that was non new + protected final SignalCount unreservedNonNewCount = new SignalCount(); + + // This is incremented each time a transaction is unreserved that was runnable + protected final SignalCount unreservedRunnableCount = new SignalCount(); + + public AbstractFateStore() { + this.reserved = new HashSet<>(); + this.defered = new HashMap<>(); + } + + public static byte[] serialize(Object o) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(o); + oos.close(); + + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION", + justification = "unsafe to store arbitrary serialized objects like this, but needed for now" + + " for backwards compatibility") + public static Object deserialize(byte[] ser) { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(ser); + ObjectInputStream ois = new ObjectInputStream(bais); + return ois.readObject(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException(e); + } + } + + /** + * Attempt to reserve transaction + * + * @param tid transaction id + * @return true if reserved by this call, false if already reserved + */ + @Override + public Optional<FateTxStore<T>> tryReserve(long tid) { + synchronized (this) { + if (!reserved.contains(tid)) { + return Optional.of(reserve(tid)); + } + return Optional.empty(); + } + } + + @Override + public FateTxStore<T> reserve(long tid) { + synchronized (AbstractFateStore.this) { + while (reserved.contains(tid)) { + try { + AbstractFateStore.this.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + reserved.add(tid); + return newFateTxStore(tid, true); + } + } + + @Override + public Iterator<Long> runnable(AtomicBoolean keepWaiting) { + + while (keepWaiting.get()) { + ArrayList<Long> runnableTids = new ArrayList<>(); + + final long beforeCount = unreservedRunnableCount.getCount(); + + List<String> transactions = getTransactions(); + for (String txidStr : transactions) { + long txid = parseTid(txidStr); + if (isRunnable(_getStatus(txid))) { + runnableTids.add(txid); + } + } + + synchronized (this) { + runnableTids.removeIf(txid -> { + var deferedTime = defered.get(txid); + if (deferedTime != null) { + if (deferedTime >= System.currentTimeMillis()) { + return true; + } else { + defered.remove(txid); + } + } + + if (reserved.contains(txid)) { + return true; + } + + return false; + }); + } + + if (runnableTids.isEmpty()) { + if (beforeCount == unreservedRunnableCount.getCount()) { + long waitTime = 5000; + if (!defered.isEmpty()) { + Long minTime = Collections.min(defered.values()); + waitTime = minTime - System.currentTimeMillis(); + } + + if (waitTime > 0) { + unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, + keepWaiting::get); + } + } + } else { + return runnableTids.iterator(); + } + + } + + return List.<Long>of().iterator(); + } + + @Override + public List<Long> list() { + ArrayList<Long> l = new ArrayList<>(); + List<String> transactions = getTransactions(); + for (String txid : transactions) { + l.add(parseTid(txid)); + } + return l; + } + + @Override + public ReadOnlyFateTxStore<T> read(long tid) { + return newFateTxStore(tid, false); + } + + protected boolean isRunnable(TStatus status) { + return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS + || status == TStatus.SUBMITTED; + } + + protected long parseTid(String txdir) { + return Long.parseLong(txdir.split("_")[1], 16); + } + + protected abstract List<String> getTransactions(); + + protected abstract TStatus _getStatus(long tid); + + protected abstract FateTxStore<T> newFateTxStore(long tid, boolean isReserved); + + protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> { + protected final long tid; + protected final boolean isReserved; + + protected TStatus observedStatus = null; + + protected AbstractFateTxStoreImpl(long tid, boolean isReserved) { + this.tid = tid; + this.isReserved = isReserved; + } + + @Override + public TStatus waitForStatusChange(EnumSet<TStatus> expected) { + Preconditions.checkState(!isReserved, + "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); + while (true) { + + long countBefore = unreservedNonNewCount.getCount(); + + TStatus status = _getStatus(tid); + if (expected.contains(status)) { + return status; + } + + unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); + } + } + + @Override + public void unreserve(long deferTime) { + + if (deferTime < 0) { + throw new IllegalArgumentException("deferTime < 0 : " + deferTime); + } + + synchronized (AbstractFateStore.this) { + if (!reserved.remove(tid)) { + throw new IllegalStateException( + "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); + } + + // notify any threads waiting to reserve + AbstractFateStore.this.notifyAll(); + + if (deferTime > 0) { + defered.put(tid, System.currentTimeMillis() + deferTime); + } + } + + if (observedStatus != null && isRunnable(observedStatus)) { + unreservedRunnableCount.increment(); + } + + if (observedStatus != TStatus.NEW) { + unreservedNonNewCount.increment(); + } + } + + protected void verifyReserved(boolean isWrite) { + if (!isReserved && isWrite) { + throw new IllegalStateException("Attempted write on unreserved FATE transaction."); + } + + if (isReserved) { + synchronized (AbstractFateStore.this) { + if (!reserved.contains(tid)) { + throw new IllegalStateException( + "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); + } + } + } + } + + @Override + public TStatus getStatus() { + verifyReserved(false); + var status = _getStatus(tid); + observedStatus = status; + return status; + } + + @Override + public long getID() { + return tid; + } + + protected byte[] serializeTxInfo(Serializable so) { + if (so instanceof String) { + return ("S " + so).getBytes(UTF_8); + } else { + byte[] sera = serialize(so); + byte[] data = new byte[sera.length + 2]; + System.arraycopy(sera, 0, data, 2, sera.length); + data[0] = 'O'; + data[1] = ' '; + return data; + } + } + + protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { + if (data[0] == 'O') { + byte[] sera = new byte[data.length - 2]; + System.arraycopy(data, 2, sera, 0, sera.length); + return (Serializable) deserialize(sera); + } else if (data[0] == 'S') { + return new String(data, 2, data.length - 2, UTF_8); + } else { + throw new IllegalStateException("Bad node data " + txInfo); + } + } + + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 7b2e3e8f38..969aed0717 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -23,24 +23,10 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -53,70 +39,23 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - //TODO use zoocache? - ACCUMULO-1297 //TODO handle zookeeper being down gracefully - ACCUMULO-1297 -public class ZooStore<T> implements FateStore<T> { +public class ZooStore<T> extends AbstractFateStore<T> { private static final Logger log = LoggerFactory.getLogger(ZooStore.class); private String path; private ZooReaderWriter zk; - private Set<Long> reserved; - private Map<Long,Long> defered; - - // This is incremented each time a transaction was unreserved that was non new - private final SignalCount unreservedNonNewCount = new SignalCount(); - - // This is incremented each time a transaction is unreserved that was runnable - private final SignalCount unreservedRunnableCount = new SignalCount(); - - private byte[] serialize(Object o) { - - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(o); - oos.close(); - - return baos.toByteArray(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION", - justification = "unsafe to store arbitrary serialized objects like this, but needed for now" - + " for backwards compatibility") - private Object deserialize(byte[] ser) { - try { - ByteArrayInputStream bais = new ByteArrayInputStream(ser); - ObjectInputStream ois = new ObjectInputStream(bais); - return ois.readObject(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } catch (ReflectiveOperationException e) { - throw new IllegalStateException(e); - } - } private String getTXPath(long tid) { return FastFormat.toHexString(path + "/tx_", tid, ""); } - private long parseTid(String txdir) { - return Long.parseLong(txdir.split("_")[1], 16); - } - public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, InterruptedException { - + super(); this.path = path; this.zk = zk; - this.reserved = new HashSet<>(); - this.defered = new HashMap<>(); zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } @@ -143,94 +82,10 @@ public class ZooStore<T> implements FateStore<T> { } } - @Override - public FateTxStore<T> reserve(long tid) { - synchronized (ZooStore.this) { - while (reserved.contains(tid)) { - try { - ZooStore.this.wait(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } - } - - reserved.add(tid); - return new FateTxStoreImpl(tid, true); - } - } - - /** - * Attempt to reserve transaction - * - * @param tid transaction id - * @return true if reserved by this call, false if already reserved - */ - @Override - public Optional<FateTxStore<T>> tryReserve(long tid) { - synchronized (this) { - if (!reserved.contains(tid)) { - return Optional.of(reserve(tid)); - } - return Optional.empty(); - } - } - - private class FateTxStoreImpl implements FateTxStore<T> { - - private final long tid; - private final boolean isReserved; - - private TStatus observedStatus = null; + private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> { private FateTxStoreImpl(long tid, boolean isReserved) { - this.tid = tid; - this.isReserved = isReserved; - } - - @Override - public void unreserve(long deferTime) { - - if (deferTime < 0) { - throw new IllegalArgumentException("deferTime < 0 : " + deferTime); - } - - synchronized (ZooStore.this) { - if (!reserved.remove(tid)) { - throw new IllegalStateException( - "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); - } - - // notify any threads waiting to reserve - ZooStore.this.notifyAll(); - - if (deferTime > 0) { - defered.put(tid, System.currentTimeMillis() + deferTime); - } - } - - if (observedStatus != null && isRunnable(observedStatus)) { - unreservedRunnableCount.increment(); - } - - if (observedStatus != TStatus.NEW) { - unreservedNonNewCount.increment(); - } - } - - private void verifyReserved(boolean isWrite) { - if (!isReserved && isWrite) { - throw new IllegalStateException("Attempted write on unreserved FATE transaction."); - } - - if (isReserved) { - synchronized (ZooStore.this) { - if (!reserved.contains(tid)) { - throw new IllegalStateException( - "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); - } - } - } + super(tid, isReserved); } private static final int RETRIES = 10; @@ -322,31 +177,6 @@ public class ZooStore<T> implements FateStore<T> { } } - @Override - public TStatus getStatus() { - verifyReserved(false); - var status = _getStatus(tid); - observedStatus = status; - return _getStatus(tid); - } - - @Override - public TStatus waitForStatusChange(EnumSet<TStatus> expected) { - Preconditions.checkState(!isReserved, - "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); - while (true) { - - long countBefore = unreservedNonNewCount.getCount(); - - TStatus status = _getStatus(tid); - if (expected.contains(status)) { - return status; - } - - unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); - } - } - @Override public void setStatus(TStatus status) { verifyReserved(true); @@ -377,17 +207,8 @@ public class ZooStore<T> implements FateStore<T> { verifyReserved(true); try { - if (so instanceof String) { - zk.putPersistentData(getTXPath(tid) + "/" + txInfo, ("S " + so).getBytes(UTF_8), - NodeExistsPolicy.OVERWRITE); - } else { - byte[] sera = serialize(so); - byte[] data = new byte[sera.length + 2]; - System.arraycopy(sera, 0, data, 2, sera.length); - data[0] = 'O'; - data[1] = ' '; - zk.putPersistentData(getTXPath(tid) + "/" + txInfo, data, NodeExistsPolicy.OVERWRITE); - } + zk.putPersistentData(getTXPath(tid) + "/" + txInfo, serializeTxInfo(so), + NodeExistsPolicy.OVERWRITE); } catch (KeeperException | InterruptedException e2) { throw new IllegalStateException(e2); } @@ -398,17 +219,7 @@ public class ZooStore<T> implements FateStore<T> { verifyReserved(false); try { - byte[] data = zk.getData(getTXPath(tid) + "/" + txInfo); - - if (data[0] == 'O') { - byte[] sera = new byte[data.length - 2]; - System.arraycopy(data, 2, sera, 0, sera.length); - return (Serializable) deserialize(sera); - } else if (data[0] == 'S') { - return new String(data, 2, data.length - 2, UTF_8); - } else { - throw new IllegalStateException("Bad node data " + txInfo); - } + return deserializeTxInfo(txInfo, zk.getData(getTXPath(tid) + "/" + txInfo)); } catch (NoNodeException nne) { return null; } catch (KeeperException | InterruptedException e) { @@ -428,11 +239,6 @@ public class ZooStore<T> implements FateStore<T> { } } - @Override - public long getID() { - return tid; - } - @Override public List<ReadOnlyRepo<T>> getStack() { verifyReserved(false); @@ -475,7 +281,8 @@ public class ZooStore<T> implements FateStore<T> { } } - private TStatus _getStatus(long tid) { + @Override + protected TStatus _getStatus(long tid) { try { return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); } catch (NoNodeException nne) { @@ -486,88 +293,17 @@ public class ZooStore<T> implements FateStore<T> { } @Override - public ReadOnlyFateTxStore<T> read(long tid) { - return new FateTxStoreImpl(tid, false); + protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) { + return new FateTxStoreImpl(tid, isReserved); } @Override - public List<Long> list() { + protected List<String> getTransactions() { try { - ArrayList<Long> l = new ArrayList<>(); - List<String> transactions = zk.getChildren(path); - for (String txid : transactions) { - l.add(parseTid(txid)); - } - return l; + return zk.getChildren(path); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } } - private boolean isRunnable(TStatus status) { - return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS - || status == TStatus.SUBMITTED; - } - - @Override - public Iterator<Long> runnable(AtomicBoolean keepWaiting) { - - while (keepWaiting.get()) { - ArrayList<Long> runnableTids = new ArrayList<>(); - - final long beforeCount = unreservedRunnableCount.getCount(); - - try { - - List<String> transactions = zk.getChildren(path); - for (String txidStr : transactions) { - long txid = parseTid(txidStr); - if (isRunnable(_getStatus(txid))) { - runnableTids.add(txid); - } - } - - synchronized (this) { - runnableTids.removeIf(txid -> { - var deferedTime = defered.get(txid); - if (deferedTime != null) { - if (deferedTime >= System.currentTimeMillis()) { - return true; - } else { - defered.remove(txid); - } - } - - if (reserved.contains(txid)) { - return true; - } - - return false; - }); - } - - if (runnableTids.isEmpty()) { - if (beforeCount == unreservedRunnableCount.getCount()) { - long waitTime = 5000; - if (!defered.isEmpty()) { - Long minTime = Collections.min(defered.values()); - waitTime = minTime - System.currentTimeMillis(); - } - - if (waitTime > 0) { - unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, - keepWaiting::get); - } - } - } else { - return runnableTids.iterator(); - } - - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - - return List.<Long>of().iterator(); - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java new file mode 100644 index 0000000000..aa5883a6d8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo; + +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +public class AccumuloStore<T> extends AbstractFateStore<T> { + + private final ClientContext context; + private final String tableName; + + private static final int maxRepos = 100; + private static final com.google.common.collect.Range<Integer> REPO_RANGE = + com.google.common.collect.Range.closed(1, maxRepos); + + public AccumuloStore(ClientContext context, String tableName) { + this.context = Objects.requireNonNull(context); + this.tableName = Objects.requireNonNull(tableName); + } + + @Override + public long create() { + long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + + newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate(); + + return tid; + } + + @Override + protected List<String> getTransactions() { + return scanTx(scanner -> { + scanner.setRange(new Range()); + TxColumnFamily.STATUS_COLUMN.fetch(scanner); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> e.getKey().getRow().toString()).collect(Collectors.toList()); + }); + } + + @Override + protected TStatus _getStatus(long tid) { + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + TxColumnFamily.STATUS_COLUMN.fetch(scanner); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); + }); + } + + @Override + protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) { + return new FateTxStoreImpl(tid, isReserved); + } + + static Range getRow(long tid) { + return new Range("tx_" + FastFormat.toHexString(tid)); + } + + private FateMutatorImpl<T> newMutator(long tid) { + return new FateMutatorImpl<>(context, tableName, tid); + } + + private <R> R scanTx(Function<Scanner,R> func) { + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + return func.apply(scanner); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> { + + private FateTxStoreImpl(long tid, boolean isReserved) { + super(tid, isReserved); + } + + @Override + public Repo<T> top() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.setBatchSize(1); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false).map(e -> { + @SuppressWarnings("unchecked") + var repo = (Repo<T>) deserialize(e.getValue().get()); + return repo; + }).findFirst().orElse(null); + }); + } + + @Override + public List<ReadOnlyRepo<T>> getStack() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false).map(e -> { + @SuppressWarnings("unchecked") + var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get()); + return repo; + }).collect(Collectors.toList()); + }); + } + + @Override + public Serializable getTransactionInfo(TxInfo txInfo) { + verifyReserved(false); + + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(getRow(tid)); + + final ColumnFQ cq; + switch (txInfo) { + case TX_NAME: + cq = TxInfoColumnFamily.TX_NAME_COLUMN; + break; + case AUTO_CLEAN: + cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN; + break; + case EXCEPTION: + cq = TxInfoColumnFamily.EXCEPTION_COLUMN; + break; + case RETURN_VALUE: + cq = TxInfoColumnFamily.RETURN_VALUE_COLUMN; + break; + default: + throw new IllegalArgumentException("Unexpected TxInfo type " + txInfo); + } + scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier()); + + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst().orElse(null); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + public long timeCreated() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> Long.parseLong(e.getValue().toString())).findFirst().orElse(0L); + }); + } + + @Override + public void push(Repo<T> repo) throws StackOverflowException { + verifyReserved(true); + + try { + Optional<Integer> top = findTop(); + + if (top.filter(t -> t >= maxRepos).isPresent()) { + throw new StackOverflowException("Repo stack size too large"); + } + + FateMutator<T> fateMutator = newMutator(tid); + fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); + } catch (StackOverflowException soe) { + throw soe; + } + } + + @Override + public void pop() { + verifyReserved(true); + + Optional<Integer> top = findTop(); + top.ifPresent(t -> newMutator(tid).deleteRepo(t).mutate()); + } + + @Override + public void setStatus(TStatus status) { + verifyReserved(true); + + newMutator(tid).putStatus(status).mutate(); + observedStatus = status; + } + + @Override + public void setTransactionInfo(TxInfo txInfo, Serializable so) { + verifyReserved(true); + + FateMutator<T> fateMutator = newMutator(tid); + final byte[] serialized = serializeTxInfo(so); + + switch (txInfo) { + case TX_NAME: + fateMutator.putName(serialized); + break; + case AUTO_CLEAN: + fateMutator.putAutoClean(serialized); + break; + case EXCEPTION: + fateMutator.putException(serialized); + break; + case RETURN_VALUE: + fateMutator.putReturnValue(serialized); + break; + default: + throw new IllegalArgumentException("Unexpected TxInfo type " + txInfo); + } + + fateMutator.mutate(); + } + + @Override + public void delete() { + verifyReserved(true); + + newMutator(tid).delete().mutate(); + } + + private Optional<Integer> findTop() { + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.setBatchSize(1); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst(); + }); + } + } + + static Text invertRepo(int position) { + Preconditions.checkArgument(REPO_RANGE.contains(position), + "Position %s is not in the valid range of [0,%s]", position, maxRepos); + return new Text(String.format("%02d", maxRepos - position)); + } + + static Integer restoreRepo(Text invertedPosition) { + int position = maxRepos - Integer.parseInt(invertedPosition.toString()); + Preconditions.checkArgument(REPO_RANGE.contains(position), + "Position %s is not in the valid range of [0,%s]", position, maxRepos); + return position; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java new file mode 100644 index 0000000000..306841612e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo; + +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.Repo; + +public interface FateMutator<T> { + + FateMutator<T> putStatus(TStatus status); + + FateMutator<T> putCreateTime(long ctime); + + FateMutator<T> putName(byte[] data); + + FateMutator<T> putAutoClean(byte[] data); + + FateMutator<T> putException(byte[] data); + + FateMutator<T> putReturnValue(byte[] data); + + FateMutator<T> putTxInfo(Fate.TxInfo txInfo, byte[] data); + + FateMutator<T> putRepo(int position, Repo<T> repo); + + FateMutator<T> deleteRepo(int position); + + void mutate(); + +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java new file mode 100644 index 0000000000..b605b91097 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo; + +import static org.apache.accumulo.core.fate.AbstractFateStore.serialize; +import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.getRow; +import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.invertRepo; + +import java.util.Objects; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.hadoop.io.Text; + +public class FateMutatorImpl<T> implements FateMutator<T> { + + private final ClientContext context; + private final String tableName; + private final long tid; + private final Mutation mutation; + + FateMutatorImpl(ClientContext context, String tableName, long tid) { + this.context = Objects.requireNonNull(context); + this.tableName = Objects.requireNonNull(tableName); + this.tid = tid; + this.mutation = new Mutation(new Text("tx_" + FastFormat.toHexString(tid))); + } + + @Override + public FateMutator<T> putStatus(TStatus status) { + TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); + return this; + } + + @Override + public FateMutator<T> putCreateTime(long ctime) { + TxColumnFamily.CREATE_TIME_COLUMN.put(mutation, new Value(Long.toString(ctime))); + return this; + } + + @Override + public FateMutator<T> putName(byte[] data) { + TxInfoColumnFamily.TX_NAME_COLUMN.put(mutation, new Value(data)); + return this; + } + + @Override + public FateMutator<T> putAutoClean(byte[] data) { + TxInfoColumnFamily.AUTO_CLEAN_COLUMN.put(mutation, new Value(data)); + return this; + } + + @Override + public FateMutator<T> putException(byte[] data) { + TxInfoColumnFamily.EXCEPTION_COLUMN.put(mutation, new Value(data)); + return this; + } + + @Override + public FateMutator<T> putReturnValue(byte[] data) { + TxInfoColumnFamily.RETURN_VALUE_COLUMN.put(mutation, new Value(data)); + return this; + } + + @Override + public FateMutator<T> putTxInfo(TxInfo txInfo, byte[] data) { + switch (txInfo) { + case TX_NAME: + putName(data); + break; + case AUTO_CLEAN: + putAutoClean(data); + break; + case EXCEPTION: + putException(data); + break; + case RETURN_VALUE: + putReturnValue(data); + break; + } + return this; + } + + @Override + public FateMutator<T> putRepo(int position, Repo<T> repo) { + mutation.put(RepoColumnFamily.NAME, invertRepo(position), new Value(serialize(repo))); + return this; + } + + @Override + public FateMutator<T> deleteRepo(int position) { + mutation.putDelete(RepoColumnFamily.NAME, invertRepo(position)); + return this; + } + + public FateMutator<T> delete() { + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(getRow(tid)); + scanner.forEach( + (key, value) -> mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier())); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + return this; + } + + @Override + public void mutate() { + try (BatchWriter writer = context.createBatchWriter(tableName)) { + writer.addMutation(mutation); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java new file mode 100644 index 0000000000..dbb84049a8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo.schema; + +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.hadoop.io.Text; + +public class FateSchema { + + public static class TxColumnFamily { + public static final String STR_NAME = "tx"; + public static final Text NAME = new Text(STR_NAME); + + public static final String STATUS = "status"; + public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS)); + + public static final String CREATE_TIME = "ctime"; + public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new Text(CREATE_TIME)); + } + + public static class TxInfoColumnFamily { + public static final String STR_NAME = "txinfo"; + public static final Text NAME = new Text(STR_NAME); + + public static final String TX_NAME = "txname"; + public static final ColumnFQ TX_NAME_COLUMN = new ColumnFQ(NAME, new Text(TX_NAME)); + + public static final String AUTO_CLEAN = "autoclean"; + public static final ColumnFQ AUTO_CLEAN_COLUMN = new ColumnFQ(NAME, new Text(AUTO_CLEAN)); + + public static final String EXCEPTION = "exception"; + public static final ColumnFQ EXCEPTION_COLUMN = new ColumnFQ(NAME, new Text(EXCEPTION)); + + public static final String RETURN_VALUE = "retval"; + public static final ColumnFQ RETURN_VALUE_COLUMN = new ColumnFQ(NAME, new Text(RETURN_VALUE)); + } + + public static class RepoColumnFamily { + public static final String STR_NAME = "repos"; + public static final Text NAME = new Text(STR_NAME); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java new file mode 100644 index 0000000000..217f68e5c7 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.util.Wait; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FateIT extends SharedMiniClusterBase { + + private static final Logger LOG = LoggerFactory.getLogger(FateIT.class); + + private static CountDownLatch callStarted; + private static CountDownLatch finishCall; + + public static class TestEnv { + + } + + public static class TestRepo implements Repo<TestEnv> { + private static final long serialVersionUID = 1L; + + private final String data; + + public TestRepo(String data) { + this.data = data; + } + + @Override + public long isReady(long tid, TestEnv environment) throws Exception { + return 0; + } + + @Override + public String getName() { + return "TestRepo_" + data; + } + + @Override + public Repo<TestEnv> call(long tid, TestEnv environment) throws Exception { + LOG.debug("Entering call {}", FateTxId.formatTid(tid)); + try { + FateIT.inCall(); + return null; + } finally { + LOG.debug("Leaving call {}", FateTxId.formatTid(tid)); + } + } + + @Override + public void undo(long tid, TestEnv environment) throws Exception { + + } + + @Override + public String getReturn() { + return data + "_ret"; + } + } + + @Test + @Timeout(30) + public void testTransactionStatus() throws Exception { + executeTest(this::testTransactionStatus); + } + + protected void testTransactionStatus(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + TestEnv testEnv = new TestEnv(); + Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(3000); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + long txid = fate.startTransaction(); + assertEquals(TStatus.NEW, getTxStatus(sctx, txid)); + fate.seedTransaction("TestOperation", txid, new TestRepo("testTransactionStatus"), true, + "Test Op"); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid)); + // wait for call() to be called + callStarted.await(); + assertEquals(IN_PROGRESS, getTxStatus(sctx, txid)); + // tell the op to exit the method + finishCall.countDown(); + + // TODO: This check seems like a race condition that might + // need to be fixed as occasionally the test fails because it was + // already removed so that seems to indicate things are removed + // before can check it was SUCCESSFUL + TStatus s = getTxStatus(sctx, txid); + while (s != SUCCESSFUL) { + s = getTxStatus(sctx, txid); + Thread.sleep(10); + } + // Check that it gets removed + boolean removed = false; + while (!removed) { + removed = verifyRemoved(sctx, txid); + Thread.sleep(10); + } + + } finally { + fate.shutdown(); + } + } + + @Test + public void testCancelWhileNew() throws Exception { + executeTest(this::testCancelWhileNew); + } + + protected void testCancelWhileNew(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + TestEnv testEnv = new TestEnv(); + Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(3000); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + long txid = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileNew with {}", FateTxId.formatTid(txid)); + assertEquals(NEW, getTxStatus(sctx, txid)); + // cancel the transaction + assertTrue(fate.cancel(txid)); + assertTrue( + FAILED_IN_PROGRESS == getTxStatus(sctx, txid) || FAILED == getTxStatus(sctx, txid)); + fate.seedTransaction("TestOperation", txid, new TestRepo("testCancelWhileNew"), true, + "Test Op"); + Wait.waitFor(() -> FAILED == getTxStatus(sctx, txid)); + // nothing should have run + assertEquals(1, callStarted.getCount()); + fate.delete(txid); + assertThrows(getNoTxExistsException(), () -> getTxStatus(sctx, txid)); + } finally { + fate.shutdown(); + } + } + + @Test + public void testCancelWhileSubmittedAndRunning() throws Exception { + executeTest(this::testCancelWhileSubmittedAndRunning); + } + + protected void testCancelWhileSubmittedAndRunning(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + TestEnv testEnv = new TestEnv(); + Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(3000); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + long txid = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileSubmitted with {}", FateTxId.formatTid(txid)); + assertEquals(NEW, getTxStatus(sctx, txid)); + fate.seedTransaction("TestOperation", txid, + new TestRepo("testCancelWhileSubmittedAndRunning"), false, "Test Op"); + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, txid)); + // This is false because the transaction runner has reserved the FaTe + // transaction. + assertFalse(fate.cancel(txid)); + callStarted.await(); + finishCall.countDown(); + Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, txid)); + fate.delete(txid); + assertThrows(getNoTxExistsException(), () -> getTxStatus(sctx, txid)); + } finally { + fate.shutdown(); + } + } + + @Test + public void testCancelWhileInCall() throws Exception { + executeTest(this::testCancelWhileInCall); + } + + protected void testCancelWhileInCall(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + TestEnv testEnv = new TestEnv(); + Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(3000); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + long txid = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileInCall with {}", FateTxId.formatTid(txid)); + assertEquals(NEW, getTxStatus(sctx, txid)); + fate.seedTransaction("TestOperation", txid, new TestRepo("testCancelWhileInCall"), true, + "Test Op"); + assertEquals(SUBMITTED, getTxStatus(sctx, txid)); + // wait for call() to be called + callStarted.await(); + // cancel the transaction + assertFalse(fate.cancel(txid)); + } finally { + fate.shutdown(); + } + + } + + protected abstract TStatus getTxStatus(ServerContext sctx, long txid) throws Exception; + + protected abstract boolean verifyRemoved(ServerContext sctx, long txid); + + protected abstract void executeTest(FateTestExecutor testMethod) throws Exception; + + protected interface FateTestExecutor { + void execute(FateStore<TestEnv> store, ServerContext sctx) throws Exception; + } + + private static void inCall() throws InterruptedException { + // signal that call started + callStarted.countDown(); + // wait for the signal to exit the method + finishCall.await(); + } + + protected Class<? extends Exception> getNoTxExistsException() { + return NoSuchElementException.class; + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java new file mode 100644 index 0000000000..fe33cb92c1 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import java.util.NoSuchElementException; +import java.util.stream.StreamSupport; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateIT; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class AccumuloFateIT extends FateIT { + + private String table; + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Override + protected void executeTest(FateTestExecutor testMethod) throws Exception { + table = getUniqueNames(1)[0]; + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table); + + final AccumuloStore<TestEnv> accumuloStore = new AccumuloStore<>(client, table); + testMethod.execute(accumuloStore, getCluster().getServerContext()); + } + } + + @Override + protected TStatus getTxStatus(ServerContext context, long txid) { + try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(getRow(txid)); + TxColumnFamily.STATUS_COLUMN.fetch(scanner); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElseThrow(); + } catch (TableNotFoundException e) { + throw new IllegalStateException(table + " not found!", e); + } + } + + @Override + protected boolean verifyRemoved(ServerContext sctx, long txid) { + try { + getTxStatus(sctx, txid); + } catch (NoSuchElementException e) { + return true; + } + return false; + } + + private static Range getRow(long tid) { + return new Range("tx_" + FastFormat.toHexString(tid)); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java new file mode 100644 index 0000000000..1629458588 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.fate.FateIT.TestEnv; +import org.apache.accumulo.test.fate.FateIT.TestRepo; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class AccumuloStoreReadWriteIT extends SharedMiniClusterBase { + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(5); + } + + @Test + public void testReadWrite() throws Exception { + final String table = getUniqueNames(1)[0]; + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table); + + AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table); + // Verify no transactions + assertEquals(0, store.list().size()); + + // Create a new transaction and get the store for it + long tid = store.create(); + FateTxStore<TestEnv> txStore = store.reserve(tid); + assertTrue(txStore.timeCreated() > 0); + assertEquals(1, store.list().size()); + + // Push a test FATE op and verify we can read it back + txStore.push(new TestRepo("testOp")); + TestRepo op = (TestRepo) txStore.top(); + assertNotNull(op); + + // Test status + txStore.setStatus(TStatus.SUBMITTED); + assertEquals(TStatus.SUBMITTED, txStore.getStatus()); + + // Set a name to test setTransactionInfo() + txStore.setTransactionInfo(TxInfo.TX_NAME, "name"); + assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME)); + + // Try setting a second test op to test getStack() + // when listing or popping TestOperation2 should be first + assertEquals(1, txStore.getStack().size()); + txStore.push(new TestOperation2()); + // test top returns TestOperation2 + ReadOnlyRepo<TestEnv> top = txStore.top(); + assertInstanceOf(TestOperation2.class, top); + + // test get stack + List<ReadOnlyRepo<TestEnv>> ops = txStore.getStack(); + assertEquals(2, ops.size()); + assertInstanceOf(TestOperation2.class, ops.get(0)); + assertEquals(TestRepo.class, ops.get(1).getClass()); + + // test pop, TestOperation should be left + txStore.pop(); + ops = txStore.getStack(); + assertEquals(1, ops.size()); + assertEquals(TestRepo.class, ops.get(0).getClass()); + + // create second + FateTxStore<TestEnv> txStore2 = store.reserve(store.create()); + assertEquals(2, store.list().size()); + + // test delete + txStore.delete(); + assertEquals(1, store.list().size()); + txStore2.delete(); + assertEquals(0, store.list().size()); + } + } + + private static class TestOperation2 extends TestRepo { + + private static final long serialVersionUID = 1L; + + public TestOperation2() { + super("testOperation2"); + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java deleted file mode 100644 index 8ba43996a7..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ /dev/null @@ -1,352 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.fate.zookeeper; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; -import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -import java.io.File; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.clientImpl.thrift.TableOperation; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.NamespaceId; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.fate.AgeOffStore; -import org.apache.accumulo.core.fate.Fate; -import org.apache.accumulo.core.fate.FateTxId; -import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; -import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.fate.ZooStore; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.ManagerRepo; -import org.apache.accumulo.manager.tableOps.TraceRepo; -import org.apache.accumulo.manager.tableOps.Utils; -import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.test.util.Wait; -import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; -import org.apache.zookeeper.KeeperException; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.io.TempDir; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Tag(ZOOKEEPER_TESTING_SERVER) -public class FateIT { - - public static class TestOperation extends ManagerRepo { - - private static final Logger LOG = LoggerFactory.getLogger(TestOperation.class); - - private static final long serialVersionUID = 1L; - - private final TableId tableId; - private final NamespaceId namespaceId; - - public TestOperation(NamespaceId namespaceId, TableId tableId) { - this.namespaceId = namespaceId; - this.tableId = tableId; - } - - @Override - public long isReady(long tid, Manager manager) throws Exception { - return Utils.reserveNamespace(manager, namespaceId, tid, false, true, TableOperation.RENAME) - + Utils.reserveTable(manager, tableId, tid, true, true, TableOperation.RENAME); - } - - @Override - public void undo(long tid, Manager manager) throws Exception { - Utils.unreserveNamespace(manager, namespaceId, tid, false); - Utils.unreserveTable(manager, tableId, tid, true); - } - - @Override - public Repo<Manager> call(long tid, Manager manager) throws Exception { - LOG.debug("Entering call {}", FateTxId.formatTid(tid)); - try { - FateIT.inCall(); - return null; - } finally { - Utils.unreserveNamespace(manager, namespaceId, tid, false); - Utils.unreserveTable(manager, tableId, tid, true); - LOG.debug("Leaving call {}", FateTxId.formatTid(tid)); - } - - } - - } - - private static final Logger LOG = LoggerFactory.getLogger(FateIT.class); - - @TempDir - private static File tempDir; - - private static ZooKeeperTestingServer szk = null; - private static ZooReaderWriter zk = null; - private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID().toString(); - private static final NamespaceId NS = NamespaceId.of("testNameSpace"); - private static final TableId TID = TableId.of("testTable"); - - private static CountDownLatch callStarted; - private static CountDownLatch finishCall; - - @BeforeAll - public static void setup() throws Exception { - szk = new ZooKeeperTestingServer(tempDir); - zk = szk.getZooReaderWriter(); - zk.mkdirs(ZK_ROOT + Constants.ZFATE); - zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS); - zk.mkdirs(ZK_ROOT + Constants.ZNAMESPACES + "/" + NS.canonical()); - zk.mkdirs(ZK_ROOT + Constants.ZTABLE_STATE + "/" + TID.canonical()); - zk.mkdirs(ZK_ROOT + Constants.ZTABLES + "/" + TID.canonical()); - } - - @AfterAll - public static void teardown() throws Exception { - szk.close(); - } - - @Test - @Timeout(30) - public void testTransactionStatus() throws Exception { - - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); - - Manager manager = createMock(Manager.class); - ServerContext sctx = createMock(ServerContext.class); - expect(manager.getContext()).andReturn(sctx).anyTimes(); - expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); - expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); - replay(manager, sctx); - - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); - try { - - // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); - - callStarted = new CountDownLatch(1); - finishCall = new CountDownLatch(1); - - long txid = fate.startTransaction(); - assertEquals(TStatus.NEW, getTxStatus(zk, txid)); - fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); - assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid)); - // wait for call() to be called - callStarted.await(); - assertEquals(IN_PROGRESS, getTxStatus(zk, txid)); - // tell the op to exit the method - finishCall.countDown(); - // Check that it transitions to SUCCESSFUL - TStatus s = getTxStatus(zk, txid); - while (s != SUCCESSFUL) { - s = getTxStatus(zk, txid); - Thread.sleep(10); - } - // Check that it gets removed - boolean errorSeen = false; - while (!errorSeen) { - try { - s = getTxStatus(zk, txid); - Thread.sleep(10); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.NONODE) { - errorSeen = true; - } else { - fail("Unexpected error thrown: " + e.getMessage()); - } - } - } - - } finally { - fate.shutdown(); - } - } - - @Test - public void testCancelWhileNew() throws Exception { - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); - - Manager manager = createMock(Manager.class); - ServerContext sctx = createMock(ServerContext.class); - expect(manager.getContext()).andReturn(sctx).anyTimes(); - expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); - expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); - replay(manager, sctx); - - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); - try { - - // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); - - callStarted = new CountDownLatch(1); - finishCall = new CountDownLatch(1); - - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileNew with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(zk, txid)); - // cancel the transaction - assertTrue(fate.cancel(txid)); - assertTrue(FAILED_IN_PROGRESS == getTxStatus(zk, txid) || FAILED == getTxStatus(zk, txid)); - fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); - Wait.waitFor(() -> FAILED == getTxStatus(zk, txid)); - // nothing should have run - assertEquals(1, callStarted.getCount()); - fate.delete(txid); - assertThrows(KeeperException.NoNodeException.class, () -> getTxStatus(zk, txid)); - } finally { - fate.shutdown(); - } - } - - @Test - public void testCancelWhileSubmittedAndRunning() throws Exception { - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); - - Manager manager = createMock(Manager.class); - ServerContext sctx = createMock(ServerContext.class); - expect(manager.getContext()).andReturn(sctx).anyTimes(); - expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); - expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); - replay(manager, sctx); - - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); - try { - - // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); - - callStarted = new CountDownLatch(1); - finishCall = new CountDownLatch(1); - - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileSubmitted with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(zk, txid)); - fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), false, "Test Op"); - Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid)); - // This is false because the transaction runner has reserved the FaTe - // transaction. - assertFalse(fate.cancel(txid)); - callStarted.await(); - finishCall.countDown(); - Wait.waitFor(() -> IN_PROGRESS != getTxStatus(zk, txid)); - fate.delete(txid); - assertThrows(KeeperException.NoNodeException.class, () -> getTxStatus(zk, txid)); - } finally { - fate.shutdown(); - } - } - - @Test - public void testCancelWhileInCall() throws Exception { - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); - - Manager manager = createMock(Manager.class); - ServerContext sctx = createMock(ServerContext.class); - expect(manager.getContext()).andReturn(sctx).anyTimes(); - expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); - expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); - replay(manager, sctx); - - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); - try { - - // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); - - callStarted = new CountDownLatch(1); - finishCall = new CountDownLatch(1); - - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileInCall with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(zk, txid)); - fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); - assertEquals(SUBMITTED, getTxStatus(zk, txid)); - // wait for call() to be called - callStarted.await(); - // cancel the transaction - assertFalse(fate.cancel(txid)); - } finally { - fate.shutdown(); - } - - } - - private static void inCall() throws InterruptedException { - // signal that call started - callStarted.countDown(); - // wait for the signal to exit the method - finishCall.await(); - } - - /* - * Get the status of the TX from ZK directly. Unable to call ZooStore.getStatus because this test - * thread does not have the reservation (the FaTE thread does) - */ - private static TStatus getTxStatus(ZooReaderWriter zrw, long txid) - throws KeeperException, InterruptedException { - zrw.sync(ZK_ROOT); - String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, txid); - return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); - } - -} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java new file mode 100644 index 0000000000..b9c29e85bb --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.zookeeper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.File; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.AgeOffStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.ZooStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateIT; +import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; + +@Tag(ZOOKEEPER_TESTING_SERVER) +public class ZookeeperFateIT extends FateIT { + + private static ZooKeeperTestingServer szk = null; + private static ZooReaderWriter zk = null; + private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID(); + + @TempDir + private static File tempDir; + + @BeforeAll + public static void setup() throws Exception { + szk = new ZooKeeperTestingServer(tempDir); + zk = szk.getZooReaderWriter(); + zk.mkdirs(ZK_ROOT + Constants.ZFATE); + zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS); + } + + @AfterAll + public static void teardown() throws Exception { + szk.close(); + } + + @Override + protected void executeTest(FateTestExecutor testMethod) throws Exception { + final ZooStore<TestEnv> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk); + final AgeOffStore<TestEnv> store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis); + + ServerContext sctx = createMock(ServerContext.class); + expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); + expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); + replay(sctx); + + testMethod.execute(store, sctx); + } + + @Override + protected Class<? extends Exception> getNoTxExistsException() { + return KeeperException.NoNodeException.class; + } + + @Override + protected TStatus getTxStatus(ServerContext sctx, long txid) + throws InterruptedException, KeeperException { + return getTxStatus(sctx.getZooReaderWriter(), txid); + } + + @Override + protected boolean verifyRemoved(ServerContext sctx, long txid) { + try { + getTxStatus(sctx, txid); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.NONODE) { + return true; + } else { + fail("Unexpected error thrown: " + e.getMessage()); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + return false; + } + + /* + * Get the status of the TX from ZK directly. Unable to call ZooStore.getStatus because this test + * thread does not have the reservation (the FaTE thread does) + */ + private static TStatus getTxStatus(ZooReaderWriter zrw, long txid) + throws KeeperException, InterruptedException { + zrw.sync(ZK_ROOT); + String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, txid); + return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); + } + +}