This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 06af479f20664bf8dcfceb512836d6a05ba89705
Merge: f22e774ef6 d64bd908d4
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Aug 20 21:52:36 2024 +0000

    Merge branch 'main' into elasticity

 .../accumulo/core/fate/AbstractFateStore.java      | 18 ++--
 .../apache/accumulo/core/util/CountDownTimer.java  | 94 +++++++++++++++++++++
 .../accumulo/core/util/CountDownTimerTest.java     | 97 ++++++++++++++++++++++
 3 files changed, 200 insertions(+), 9 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 31b07ef819,0000000000..6b4377cef6
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@@ -1,505 -1,0 +1,505 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.Serializable;
 +import java.io.UncheckedIOException;
 +import java.time.Duration;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.UUID;
++import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Consumer;
 +import java.util.stream.Stream;
 +
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
++import org.apache.accumulo.core.util.CountDownTimer;
 +import org.apache.accumulo.core.util.Pair;
- import org.apache.accumulo.core.util.time.NanoTime;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 +
 +public abstract class AbstractFateStore<T> implements FateStore<T> {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(AbstractFateStore.class);
 +
 +  // Default maximum size of 100,000 transactions before deferral is stopped 
and
 +  // all existing transactions are processed immediately again
 +  public static final int DEFAULT_MAX_DEFERRED = 100_000;
 +
 +  public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new 
FateIdGenerator() {
 +    @Override
 +    public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey 
fateKey) {
 +      UUID txUUID = UUID.nameUUIDFromBytes(fateKey.getSerialized());
 +      return FateId.from(instanceType, txUUID);
 +    }
 +  };
 +
 +  protected final Set<FateId> reserved;
-   protected final Map<FateId,NanoTime> deferred;
++  protected final Map<FateId,CountDownTimer> deferred;
 +  private final int maxDeferred;
 +  private final AtomicBoolean deferredOverflow = new AtomicBoolean();
 +  private final FateIdGenerator fateIdGenerator;
 +
 +  // This is incremented each time a transaction was unreserved that was non 
new
 +  protected final SignalCount unreservedNonNewCount = new SignalCount();
 +
 +  // This is incremented each time a transaction is unreserved that was 
runnable
 +  protected final SignalCount unreservedRunnableCount = new SignalCount();
 +
 +  public AbstractFateStore() {
 +    this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
 +    this.maxDeferred = maxDeferred;
 +    this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
 +    this.reserved = new HashSet<>();
 +    this.deferred = new HashMap<>();
 +  }
 +
 +  public static byte[] serialize(Object o) {
 +    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
 +      oos.writeObject(o);
 +      return baos.toByteArray();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
 +      justification = "unsafe to store arbitrary serialized objects like 
this, but needed for now"
 +          + " for backwards compatibility")
 +  public static Object deserialize(byte[] ser) {
 +    try (ByteArrayInputStream bais = new ByteArrayInputStream(ser);
 +        ObjectInputStream ois = new ObjectInputStream(bais)) {
 +      return ois.readObject();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    } catch (ReflectiveOperationException e) {
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +  /**
 +   * Attempt to reserve the fate transaction.
 +   *
 +   * @param fateId The FateId
 +   * @return An Optional containing the FateTxStore if the transaction was 
successfully reserved, or
 +   *         an empty Optional if the transaction was already reserved.
 +   */
 +  @Override
 +  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
 +    synchronized (this) {
 +      if (!reserved.contains(fateId)) {
 +        return Optional.of(reserve(fateId));
 +      }
 +      return Optional.empty();
 +    }
 +  }
 +
 +  @Override
 +  public FateTxStore<T> reserve(FateId fateId) {
 +    synchronized (AbstractFateStore.this) {
 +      while (reserved.contains(fateId)) {
 +        try {
 +          AbstractFateStore.this.wait(100);
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new IllegalStateException(e);
 +        }
 +      }
 +
 +      reserved.add(fateId);
 +      return newFateTxStore(fateId, true);
 +    }
 +  }
 +
 +  private static final Set<TStatus> IN_PROGRESS_SET = 
Set.of(TStatus.IN_PROGRESS);
 +  private static final Set<TStatus> OTHER_RUNNABLE_SET =
 +      Set.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);
 +
 +  @Override
 +  public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> 
idConsumer) {
 +
 +    AtomicLong seen = new AtomicLong(0);
 +
 +    while (keepWaiting.get() && seen.get() == 0) {
 +      final long beforeCount = unreservedRunnableCount.getCount();
 +      final boolean beforeDeferredOverflow = deferredOverflow.get();
 +
 +      try (Stream<FateIdStatus> inProgress = getTransactions(IN_PROGRESS_SET);
 +          Stream<FateIdStatus> other = getTransactions(OTHER_RUNNABLE_SET)) {
 +        // read the in progress transaction first and then everything else in 
order to process those
 +        // first
 +        var transactions = Stream.concat(inProgress, other);
 +        transactions.filter(fateIdStatus -> 
isRunnable(fateIdStatus.getStatus()))
 +            .map(FateIdStatus::getFateId).filter(fateId -> {
 +              synchronized (AbstractFateStore.this) {
 +                var deferredTime = deferred.get(fateId);
 +                if (deferredTime != null) {
-                   if (deferredTime.elapsed().isNegative()) {
-                     // negative elapsed time indicates the deferral time is 
in the future
-                     return false;
-                   } else {
++                  if (deferredTime.isExpired()) {
 +                    deferred.remove(fateId);
++                  } else {
++                    return false;
 +                  }
 +                }
 +                return !reserved.contains(fateId);
 +              }
 +            }).forEach(fateId -> {
 +              seen.incrementAndGet();
 +              idConsumer.accept(fateId);
 +            });
 +      }
 +
 +      // If deferredOverflow was previously marked true then the deferred map
 +      // would have been cleared and seen.get() should be greater than 0 as 
there would
 +      // be a lot of transactions to process in the previous run, so we won't 
be sleeping here
 +      if (seen.get() == 0) {
 +        if (beforeCount == unreservedRunnableCount.getCount()) {
 +          long waitTime = 5000;
 +          synchronized (AbstractFateStore.this) {
 +            if (!deferred.isEmpty()) {
-               var now = NanoTime.now();
 +              waitTime = deferred.values().stream()
-                   .mapToLong(nanoTime -> 
nanoTime.subtract(now).toMillis()).min().getAsLong();
++                  .mapToLong(countDownTimer -> 
countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min()
++                  .getAsLong();
 +            }
 +          }
 +
 +          if (waitTime > 0) {
 +            unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime,
 +                keepWaiting::get);
 +          }
 +        }
 +      }
 +
 +      // Reset if the current state only if it matches the state before the 
execution.
 +      // This is to avoid a race condition where the flag was set during the 
run.
 +      // We should ensure at least one of the FATE executors will run through 
the
 +      // entire transaction list first before clearing the flag and allowing 
more
 +      // deferred entries into the map again. In other words, if the before 
state
 +      // was false and during the execution at some point it was marked true 
this would
 +      // not reset until after the next run
 +      deferredOverflow.compareAndSet(beforeDeferredOverflow, false);
 +    }
 +  }
 +
 +  @Override
 +  public Stream<FateIdStatus> list() {
 +    return getTransactions(TStatus.ALL_STATUSES);
 +  }
 +
 +  @Override
 +  public Stream<FateIdStatus> list(Set<TStatus> statuses) {
 +    return getTransactions(statuses);
 +  }
 +
 +  @Override
 +  public ReadOnlyFateTxStore<T> read(FateId fateId) {
 +    return newFateTxStore(fateId, false);
 +  }
 +
 +  protected boolean isRunnable(TStatus status) {
 +    return status == TStatus.IN_PROGRESS || status == 
TStatus.FAILED_IN_PROGRESS
 +        || status == TStatus.SUBMITTED;
 +  }
 +
 +  public static abstract class FateIdStatusBase implements FateIdStatus {
 +    private final FateId fateId;
 +
 +    public FateIdStatusBase(FateId fateId) {
 +      this.fateId = fateId;
 +    }
 +
 +    @Override
 +    public FateId getFateId() {
 +      return fateId;
 +    }
 +  }
 +
 +  @Override
 +  public boolean isDeferredOverflow() {
 +    return deferredOverflow.get();
 +  }
 +
 +  @Override
 +  public int getDeferredCount() {
 +    // This method is primarily used right now for unit testing but
 +    // if this synchronization becomes an issue we could add an atomic
 +    // counter instead to track it separately so we don't need to lock
 +    synchronized (AbstractFateStore.this) {
 +      return deferred.size();
 +    }
 +  }
 +
 +  private Optional<FateId> create(FateKey fateKey) {
 +    FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), 
fateKey);
 +
 +    try {
 +      create(fateId, fateKey);
 +    } catch (IllegalStateException e) {
 +      Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId);
 +      TStatus status = statusAndKey.getFirst();
 +      Optional<FateKey> tFateKey = statusAndKey.getSecond();
 +
 +      // Case 1: Status is NEW so this is unseeded, we can return and allow 
the calling code
 +      // to reserve/seed as long as the existing key is the same and not 
different as that would
 +      // mean a collision
 +      if (status == TStatus.NEW) {
 +        Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing 
from tid %s",
 +            fateId.getTxUUIDStr());
 +        Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()),
 +            "Collision detected for tid %s", fateId.getTxUUIDStr());
 +        // Case 2: Status is some other state which means already in progress
 +        // so we can just log and return empty optional
 +      } else {
 +        log.trace("Existing transaction {} already exists for key {} with 
status {}", fateId,
 +            fateKey, status);
 +        return Optional.empty();
 +      }
 +    }
 +
 +    return Optional.of(fateId);
 +  }
 +
 +  @Override
 +  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
 +    FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), 
fateKey);
 +    final Optional<FateTxStore<T>> txStore;
 +
 +    // First make sure we can reserve in memory the fateId, if not
 +    // we can return an empty Optional as it is reserved and in progress
 +    // This reverses the usual order of creation and then reservation but
 +    // this prevents a race condition by ensuring we can reserve first.
 +    // This will create the FateTxStore before creation but this object
 +    // is not exposed until after creation is finished so there should not
 +    // be any errors.
 +    final Optional<FateTxStore<T>> reservedTxStore;
 +    synchronized (this) {
 +      reservedTxStore = tryReserve(fateId);
 +    }
 +
 +    // If present we were able to reserve so try and create
 +    if (reservedTxStore.isPresent()) {
 +      try {
 +        var fateIdFromCreate = create(fateKey);
 +        if (fateIdFromCreate.isPresent()) {
 +          
Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()),
 +              "Transaction creation returned unexpected %s, expected %s", 
fateIdFromCreate, fateId);
 +          txStore = reservedTxStore;
 +        } else {
 +          // We already exist in a non-new state then un-reserve and an empty
 +          // Optional will be returned. This is expected to happen when the
 +          // system is busy and operations are not running, and we keep 
seeding them
 +          synchronized (this) {
 +            reserved.remove(fateId);
 +          }
 +          txStore = Optional.empty();
 +        }
 +      } catch (Exception e) {
 +        // Clean up the reservation if the creation failed
 +        // And then throw error
 +        synchronized (this) {
 +          reserved.remove(fateId);
 +        }
 +        if (e instanceof IllegalStateException) {
 +          throw e;
 +        } else {
 +          throw new IllegalStateException(e);
 +        }
 +      }
 +    } else {
 +      // Could not reserve so return empty
 +      log.trace("Another thread currently has transaction {} key {} 
reserved", fateId, fateKey);
 +      txStore = Optional.empty();
 +    }
 +
 +    return txStore;
 +  }
 +
 +  protected abstract void create(FateId fateId, FateKey fateKey);
 +
 +  protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId 
fateId);
 +
 +  protected abstract Stream<FateIdStatus> getTransactions(Set<TStatus> 
statuses);
 +
 +  protected abstract TStatus _getStatus(FateId fateId);
 +
 +  protected abstract Optional<FateKey> getKey(FateId fateId);
 +
 +  protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean 
isReserved);
 +
 +  protected abstract FateInstanceType getInstanceType();
 +
 +  protected abstract class AbstractFateTxStoreImpl<T> implements 
FateTxStore<T> {
 +    protected final FateId fateId;
 +    protected final boolean isReserved;
 +
 +    protected TStatus observedStatus = null;
 +
 +    protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) {
 +      this.fateId = fateId;
 +      this.isReserved = isReserved;
 +    }
 +
 +    @Override
 +    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
 +      Preconditions.checkState(!isReserved,
 +          "Attempted to wait for status change while reserved " + fateId);
 +      while (true) {
 +
 +        long countBefore = unreservedNonNewCount.getCount();
 +
 +        TStatus status = _getStatus(fateId);
 +        if (expected.contains(status)) {
 +          return status;
 +        }
 +
 +        unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () 
-> true);
 +      }
 +    }
 +
 +    @Override
 +    public void unreserve(Duration deferTime) {
 +
 +      if (deferTime.isNegative()) {
 +        throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
 +      }
 +
 +      synchronized (AbstractFateStore.this) {
 +        if (!reserved.remove(fateId)) {
 +          throw new IllegalStateException("Tried to unreserve id that was not 
reserved " + fateId);
 +        }
 +
 +        // notify any threads waiting to reserve
 +        AbstractFateStore.this.notifyAll();
 +
 +        // If deferred map has overflowed then skip adding to the deferred map
 +        // and clear the map and set the flag. This will cause the next 
execution
 +        // of runnable to process all the transactions and to not defer as we
 +        // have a large backlog and want to make progress
 +        if (deferTime.compareTo(Duration.ZERO) > 0 && 
!deferredOverflow.get()) {
 +          if (deferred.size() >= maxDeferred) {
 +            log.info(
 +                "Deferred map overflowed with size {}, clearing and setting 
deferredOverflow to true",
 +                deferred.size());
 +            deferredOverflow.set(true);
 +            deferred.clear();
 +          } else {
-             deferred.put(fateId, NanoTime.nowPlus(deferTime));
++            deferred.put(fateId, CountDownTimer.startNew(deferTime));
 +          }
 +        }
 +      }
 +
 +      if (observedStatus != null && isRunnable(observedStatus)) {
 +        unreservedRunnableCount.increment();
 +      }
 +
 +      if (observedStatus != TStatus.NEW) {
 +        unreservedNonNewCount.increment();
 +      }
 +    }
 +
 +    protected void verifyReserved(boolean isWrite) {
 +      if (!isReserved && isWrite) {
 +        throw new IllegalStateException("Attempted write on unreserved FATE 
transaction.");
 +      }
 +
 +      if (isReserved) {
 +        synchronized (AbstractFateStore.this) {
 +          if (!reserved.contains(fateId)) {
 +            throw new IllegalStateException("Tried to operate on unreserved 
transaction " + fateId);
 +          }
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public TStatus getStatus() {
 +      verifyReserved(false);
 +      var status = _getStatus(fateId);
 +      observedStatus = status;
 +      return status;
 +    }
 +
 +    @Override
 +    public Optional<FateKey> getKey() {
 +      verifyReserved(false);
 +      return AbstractFateStore.this.getKey(fateId);
 +    }
 +
 +    @Override
 +    public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
 +      verifyReserved(false);
 +      return AbstractFateStore.this.getStatusAndKey(fateId);
 +    }
 +
 +    @Override
 +    public FateId getID() {
 +      return fateId;
 +    }
 +  }
 +
 +  public interface FateIdGenerator {
 +    FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
 +  }
 +
 +  protected byte[] serializeTxInfo(Serializable so) {
 +    if (so instanceof String) {
 +      return ("S " + so).getBytes(UTF_8);
 +    } else {
 +      byte[] sera = serialize(so);
 +      byte[] data = new byte[sera.length + 2];
 +      System.arraycopy(sera, 0, data, 2, sera.length);
 +      data[0] = 'O';
 +      data[1] = ' ';
 +      return data;
 +    }
 +  }
 +
 +  protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
 +    if (data[0] == 'O') {
 +      byte[] sera = new byte[data.length - 2];
 +      System.arraycopy(data, 2, sera, 0, sera.length);
 +      return (Serializable) deserialize(sera);
 +    } else if (data[0] == 'S') {
 +      return new String(data, 2, data.length - 2, UTF_8);
 +    } else {
 +      throw new IllegalStateException("Bad node data " + txInfo);
 +    }
 +  }
 +}

Reply via email to