This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new fb57b5356b Create shared ConditionalWriter in Ample (#5763)
fb57b5356b is described below
commit fb57b5356ba0016a9bb23cba14453929efd736d3
Author: Dom G. <[email protected]>
AuthorDate: Thu Jul 31 16:29:07 2025 -0400
Create shared ConditionalWriter in Ample (#5763)
* Create shared ConditionalWriter in Ample
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../org/apache/accumulo/core/conf/Property.java | 5 +-
.../org/apache/accumulo/server/ServerContext.java | 52 +++++
.../metadata/ConditionalTabletsMutatorImpl.java | 32 ++-
.../accumulo/server/metadata/ServerAmpleImpl.java | 7 +-
.../ConditionalTabletsMutatorImplTest.java | 65 +++---
.../test/functional/AmpleConditionalWriterIT.java | 221 ++++++++++-----------
6 files changed, 232 insertions(+), 150 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f6cc6b12e8..35a98bafa2 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1304,7 +1304,10 @@ public enum Property {
"4.0.0"),
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m",
PropertyType.TIMEDURATION,
- "The interval at which to check for dead compactors.", "2.1.0");
+ "The interval at which to check for dead compactors.", "2.1.0"),
+
GENERAL_AMPLE_CONDITIONAL_WRITER_THREADS_MAX("general.ample.conditional.writer.threads.max",
"8",
+ PropertyType.COUNT,
+ "The maximum number of threads for the shared ConditionalWriter used by
Ample.", "4.0.0");
private final String key;
private final String defaultValue;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index b0b9c02212..3bf5450391 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -42,6 +42,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
@@ -55,6 +58,7 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory;
@@ -107,9 +111,12 @@ public class ServerContext extends ClientContext {
private final Supplier<LowMemoryDetector> lowMemoryDetector;
private final AtomicReference<ServiceLock> serverLock = new
AtomicReference<>();
private final Supplier<MetricsInfo> metricsInfoSupplier;
+ private final Supplier<ConditionalWriter> sharedMetadataWriter;
+ private final Supplier<ConditionalWriter> sharedUserWriter;
private final AtomicBoolean metricsInfoCreated = new AtomicBoolean(false);
private final AtomicBoolean sharedSchedExecutorCreated = new
AtomicBoolean(false);
+ private final AtomicBoolean sharedWritersCreated = new AtomicBoolean(false);
public ServerContext(SiteConfiguration siteConfig) {
this(ServerInfo.fromServerConfig(siteConfig));
@@ -138,6 +145,9 @@ public class ServerContext extends ClientContext {
SecurityOperation.getAuthenticator(this),
SecurityOperation.getPermHandler(this)));
lowMemoryDetector = memoize(() -> new LowMemoryDetector());
metricsInfoSupplier = memoize(() -> new MetricsInfoImpl(this));
+
+ sharedMetadataWriter = memoize(() ->
createSharedConditionalWriter(DataLevel.METADATA));
+ sharedUserWriter = memoize(() ->
createSharedConditionalWriter(DataLevel.USER));
}
/**
@@ -477,6 +487,29 @@ public class ServerContext extends ClientContext {
return metricsInfoSupplier.get();
}
+ private ConditionalWriter createSharedConditionalWriter(DataLevel level) {
+ try {
+ int maxThreads =
+
getConfiguration().getCount(Property.GENERAL_AMPLE_CONDITIONAL_WRITER_THREADS_MAX);
+ var config = new
ConditionalWriterConfig().setMaxWriteThreads(maxThreads);
+ String tableName = level.metaTable();
+ log.info("Creating shared ConditionalWriter for DataLevel {} with max
threads: {}", level,
+ maxThreads);
+ sharedWritersCreated.set(true);
+ return createConditionalWriter(tableName, config);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException("Failed to create shared ConditionalWriter
for level " + level, e);
+ }
+ }
+
+ public Supplier<ConditionalWriter> getSharedMetadataWriter() {
+ return sharedMetadataWriter;
+ }
+
+ public Supplier<ConditionalWriter> getSharedUserWriter() {
+ return sharedUserWriter;
+ }
+
@Override
public void close() {
Preconditions.checkState(!isClosed(), "ServerContext.close was already
called.");
@@ -486,6 +519,25 @@ public class ServerContext extends ClientContext {
if (sharedSchedExecutorCreated.get()) {
getScheduledExecutor().shutdownNow();
}
+ if (sharedWritersCreated.get()) {
+ try {
+ ConditionalWriter writer = sharedMetadataWriter.get();
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (Exception e) {
+ log.warn("Error closing shared metadata ConditionalWriter", e);
+ }
+
+ try {
+ ConditionalWriter writer = sharedUserWriter.get();
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (Exception e) {
+ log.warn("Error closing shared user ConditionalWriter", e);
+ }
+ }
super.close();
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 777364e0c4..902a8904d6 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -69,6 +69,8 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
final Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new
HashMap<>();
private final Map<KeyExtent,Supplier<String>> operationDescriptions = new
HashMap<>();
private final Function<DataLevel,String> tableMapper;
+ private final Supplier<ConditionalWriter> sharedMetadataWriter;
+ private final Supplier<ConditionalWriter> sharedUserWriter;
public ConditionalTabletsMutatorImpl(ServerContext context) {
this(context, DataLevel::metaTable);
@@ -76,8 +78,16 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
public ConditionalTabletsMutatorImpl(ServerContext context,
Function<DataLevel,String> tableMapper) {
+ this(context, tableMapper, context.getSharedMetadataWriter(),
context.getSharedUserWriter());
+ }
+
+ public ConditionalTabletsMutatorImpl(ServerContext context,
+ Function<DataLevel,String> tableMapper, Supplier<ConditionalWriter>
sharedMetadataWriter,
+ Supplier<ConditionalWriter> sharedUserWriter) {
this.context = context;
this.tableMapper = Objects.requireNonNull(tableMapper);
+ this.sharedMetadataWriter = sharedMetadataWriter;
+ this.sharedUserWriter = sharedUserWriter;
}
@Override
@@ -227,7 +237,24 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
public Map<KeyExtent,Ample.ConditionalResult> process() {
Preconditions.checkState(active);
if (dataLevel != null) {
- try (ConditionalWriter conditionalWriter =
createConditionalWriter(dataLevel)) {
+ ConditionalWriter conditionalWriter = null;
+ boolean shouldClose = false;
+
+ try {
+ if (dataLevel == Ample.DataLevel.ROOT) {
+ conditionalWriter = createConditionalWriter(dataLevel);
+ shouldClose = true;
+ } else {
+ if (dataLevel == Ample.DataLevel.METADATA) {
+ conditionalWriter = sharedMetadataWriter.get();
+ } else if (dataLevel == Ample.DataLevel.USER) {
+ conditionalWriter = sharedUserWriter.get();
+ } else {
+ throw new IllegalArgumentException("Unsupported DataLevel: " +
dataLevel);
+ }
+ shouldClose = false; // don't close shared writers
+ }
+
var results = writeMutations(conditionalWriter);
var resultsMap = new HashMap<KeyExtent,ConditionalWriter.Result>();
@@ -300,6 +327,9 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
} finally {
+ if (shouldClose && conditionalWriter != null) {
+ conditionalWriter.close();
+ }
// render inoperable because reuse is not tested
extents.clear();
mutations.clear();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 8d58aa2d96..e93a06b012 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.ValidationUtil;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.AmpleImpl;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
@@ -92,14 +93,16 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
@Override
public ConditionalTabletsMutator conditionallyMutateTablets() {
- return new ConditionalTabletsMutatorImpl(context, getTableMapper());
+ return new ConditionalTabletsMutatorImpl(context, getTableMapper(),
+ context.getSharedMetadataWriter(), context.getSharedUserWriter());
}
@Override
public AsyncConditionalTabletsMutator
conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) {
return new AsyncConditionalTabletsMutatorImpl(resultsConsumer,
- () -> new ConditionalTabletsMutatorImpl(context, getTableMapper()));
+ () -> new ConditionalTabletsMutatorImpl(context, getTableMapper(),
+ context.getSharedMetadataWriter(), context.getSharedUserWriter()));
}
private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
index 8eae5a4b2f..a94daaa45c 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
@@ -24,68 +24,44 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.io.Text;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Test;
+import com.google.common.collect.Iterators;
+
public class ConditionalTabletsMutatorImplTest {
// a conditional tablet mutator that always returns a supplied status
static class TestConditionalTabletsMutator extends
ConditionalTabletsMutatorImpl {
private final Map<KeyExtent,TabletMetadata> failedExtents;
- private final List<Function<Text,ConditionalWriter.Status>> statuses;
-
- private int attempt = 0;
+ private final ConditionalWriter testWriter;
public TestConditionalTabletsMutator(ServerContext context,
- List<Function<Text,ConditionalWriter.Status>> statuses,
- Map<KeyExtent,TabletMetadata> failedExtents) {
- super(context);
- this.statuses = statuses;
+ Map<KeyExtent,TabletMetadata> failedExtents, ConditionalWriter
testWriter) {
+ super(context, DataLevel::metaTable, () -> testWriter, () -> testWriter);
this.failedExtents = failedExtents;
+ this.testWriter = testWriter;
}
protected Map<KeyExtent,TabletMetadata> readTablets(List<KeyExtent>
extents) {
return failedExtents;
}
- protected ConditionalWriter createConditionalWriter(Ample.DataLevel
dataLevel)
- throws TableNotFoundException {
- return new ConditionalWriter() {
- @Override
- public Iterator<Result> write(Iterator<ConditionalMutation> mutations)
{
- Iterable<ConditionalMutation> iterable = () -> mutations;
- var localAttempt = attempt++;
- return StreamSupport.stream(iterable.spliterator(), false)
- .map(cm -> new Result(statuses.get(localAttempt).apply(new
Text(cm.getRow())), cm,
- "server"))
- .iterator();
- }
-
- @Override
- public Result write(ConditionalMutation mutation) {
- return write(List.of(mutation).iterator()).next();
- }
-
- @Override
- public void close() {
-
- }
- };
+ protected ConditionalWriter createConditionalWriter(Ample.DataLevel
dataLevel) {
+ return testWriter;
}
}
@@ -95,6 +71,7 @@ public class ConditionalTabletsMutatorImplTest {
ServerContext context = EasyMock.createMock(ServerContext.class);
ServiceLock lock = EasyMock.createMock(ServiceLock.class);
LockID lid = EasyMock.createMock(LockID.class);
+
EasyMock.expect(lock.getLockID()).andReturn(lid).anyTimes();
EasyMock.expect(lid.serialize()).andReturn("/1234").anyTimes();
EasyMock.expect(context.getServiceLock()).andReturn(lock).anyTimes();
@@ -138,8 +115,26 @@ public class ConditionalTabletsMutatorImplTest {
var statuses2 = Map.of(ke1.toMetaRow(), ConditionalWriter.Status.REJECTED,
ke2.toMetaRow(),
ConditionalWriter.Status.REJECTED);
- try (var mutator = new TestConditionalTabletsMutator(context,
- List.of(statuses1::get, statuses2::get), failedExtents)) {
+ ConditionalWriter testWriter = new ConditionalWriter() {
+ private int attemptCount = 0;
+
+ @Override
+ public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+ var statuses = (attemptCount++ == 0) ? statuses1 : statuses2;
+ return Iterators.transform(mutations,
+ cm -> new Result(statuses.get(new Text(cm.getRow())), cm,
"server"));
+ }
+
+ @Override
+ public Result write(ConditionalMutation mutation) {
+ return write(List.of(mutation).iterator()).next();
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ try (var mutator = new TestConditionalTabletsMutator(context,
failedExtents, testWriter)) {
mutator.mutateTablet(ke1).requireAbsentOperation().putDirName("dir1")
.submit(tmeta -> tmeta.getDirName().equals("dir1"));
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index ade40a2cf8..c051fea6f5 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -107,7 +107,6 @@ import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
-import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -144,7 +143,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
- SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new
Text("f"), new Text("j")));
+ SortedSet<Text> splits = new TreeSet<>(Set.of(new Text("c"), new
Text("f"), new Text("j")));
c.tableOperations().create(tableName,
new NewTableConfiguration().withSplits(splits).createOffline());
@@ -170,7 +169,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertNull(context.getAmple().readTablet(e1).getLocation());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.future(ts1)).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -178,7 +177,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(Location.future(ts1),
context.getAmple().readTablet(e1).getLocation());
// test require absent with a future location set
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.future(ts2)).submit(tm -> false,
() -> "Testing that requireAbsentLocation() fails when a future
location is set");
@@ -192,7 +191,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(List.of(), actual);
}
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
.submit(tm -> false);
@@ -201,7 +200,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(Location.current(ts1),
context.getAmple().readTablet(e1).getLocation());
// test require absent with a current location set
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.future(ts2)).submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus(),
@@ -216,7 +215,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(List.of(e1), actual);
}
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
.submit(tm -> false);
@@ -224,7 +223,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
assertEquals(Location.current(ts1),
context.getAmple().readTablet(e1).getLocation());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts2))
.putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2))
.submit(tm -> false);
@@ -232,7 +231,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
assertEquals(Location.current(ts1),
context.getAmple().readTablet(e1).getLocation());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
.deleteLocation(Location.current(ts1)).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -241,7 +240,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Set two current locations, this puts the tablet in a bad state as its
only expected that
// single location should be set
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putLocation(Location.current(ts1))
.putLocation(Location.current(ts2)).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -252,7 +251,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Try to update the tablet requiring one of the locations that is set on
the tablet. Even
// though the required location exists, the presence of the other location
in the tablet
// metadata should cause the update to fail.
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
.deleteLocation(Location.current(ts1)).deleteLocation(Location.current(ts2))
.submit(tm -> false);
@@ -265,7 +264,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertThrows(IllegalStateException.class, () ->
context.getAmple().readTablet(e1));
// Requiring an absent location should fail when two locations are set
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.deleteLocation(Location.current(ts1)).deleteLocation(Location.current(ts2))
.submit(tm -> false);
@@ -275,7 +274,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertThrows(IllegalStateException.class, () ->
context.getAmple().readTablet(e1));
// Change the tablet to have a future and current location set.
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().deleteLocation(Location.current(ts1))
.putLocation(Location.future(ts1)).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -288,7 +287,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// required location does not exist.
for (var loc : List.of(Location.current(ts1), Location.current(ts2),
Location.future(ts1),
Location.future(ts2))) {
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(loc)
.deleteLocation(Location.future(ts1)).deleteLocation(Location.current(ts2))
.submit(tm -> false);
@@ -298,7 +297,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
// Requiring an absent location should fail when a future and current
location are set
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.deleteLocation(Location.current(ts1)).deleteLocation(Location.current(ts2))
.submit(tm -> false);
@@ -308,7 +307,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertThrows(IllegalStateException.class, () ->
context.getAmple().readTablet(e1));
// Delete one of the locations w/o any location requirements, this should
succeed.
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().deleteLocation(Location.future(ts1))
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -340,7 +339,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
System.out.println(context.getAmple().readTablet(e1).getLocation());
// simulate a compaction where the tablet location is not set
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
var tm1 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2,
dfv).putFile(stf3, dfv)
.build();
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES)
@@ -352,7 +351,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tm2 =
TabletMetadata.builder(e1).putLocation(Location.current(ts1)).build();
// simulate minor compacts where the tablet location is not set
for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2,
LOCATION)
.putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -363,7 +362,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// set the location
var tm3 = TabletMetadata.builder(e1).build(LOCATION);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, LOCATION)
.putLocation(Location.current(ts1)).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -372,7 +371,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tm4 =
TabletMetadata.builder(e1).putLocation(Location.current(ts2)).build();
// simulate minor compacts where the tablet location is wrong
for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm4,
LOCATION)
.putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -383,7 +382,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// simulate minor compacts where the tablet location is set
for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2,
LOCATION)
.putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -397,7 +396,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2,
dfv).build(),
TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2,
dfv).putFile(stf3, dfv)
.putFile(stf4, dfv).build())) {
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta,
FILES)
.putFile(stf4, new DataFileValue(0,
0)).deleteFile(stf1).deleteFile(stf2)
.deleteFile(stf3).submit(tm -> false);
@@ -409,7 +408,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// simulate a compaction
var tm5 =
TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2,
dfv).putFile(stf3, dfv).build();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES)
.putFile(stf4, new DataFileValue(0,
0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3)
.submit(tm -> false);
@@ -422,7 +421,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var compactedDv = new DataFileValue(0, 0);
tm5 = TabletMetadata.builder(e1).putFile(stf1, compactedDv).putFile(stf2,
compactedDv)
.putFile(stf3, compactedDv).build();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES)
.putFile(stf4, new DataFileValue(0,
0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3)
.submit(tm -> false);
@@ -436,7 +435,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tm6 = TabletMetadata.builder(e1).build(LOADED);
FateInstanceType type = FateInstanceType.fromTableId(tid);
FateId fateId = FateId.from(type, UUID.randomUUID());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED)
.putFile(stf5, new DataFileValue(0,
0)).putBulkFile(stf5.getTabletFile(), fateId)
.submit(tm -> false);
@@ -449,7 +448,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
.of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf"));
var tm7 =
TabletMetadata.builder(e1).putFile(stf4, compactedDv).putFile(stf5,
compactedDv).build();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm7, FILES)
.putFile(stf6, new DataFileValue(0,
0)).deleteFile(stf4).deleteFile(stf5)
.submit(tm -> false);
@@ -458,7 +457,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(Set.of(stf6), context.getAmple().readTablet(e1).getFiles());
// simulate trying to re bulk import file after a compaction
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED)
.putFile(stf5, new DataFileValue(0,
0)).putBulkFile(stf5.getTabletFile(), fateId)
.submit(tm -> false);
@@ -480,7 +479,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// tablet should not have any logs to start with so requireSame with the
empty logs should pass
assertTrue(context.getAmple().readTablet(e1).getLogs().isEmpty());
Set<LogEntry> expectedLogs = new HashSet<>();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmEmptySet,
LOGS)
.putWal(originalLogEntry).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -493,7 +492,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
String walFilePath2 =
java.nio.file.Path.of("tserver+8080",
UUID.randomUUID().toString()).toString();
LogEntry newLogEntry = LogEntry.fromPath(walFilePath2);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putWal(newLogEntry).submit(tm ->
false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
@@ -522,7 +521,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
subset.forEach(builder::putWal);
TabletMetadata tmSubset = builder.build(LOGS);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmSubset,
LOGS)
.deleteWal(originalLogEntry).submit(t -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -537,7 +536,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// this example)
TabletMetadata tm2 =
TabletMetadata.builder(e1).putWal(originalLogEntry).putWal(newLogEntry).build(LOGS);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOGS)
.deleteWal(originalLogEntry).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus(),
@@ -549,7 +548,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// test the requireAbsentLogs() function when logs are not present in the
tablet metadata
assertTrue(context.getAmple().readTablet(e2).getLogs().isEmpty());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLogs().putWal(originalLogEntry)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
@@ -557,7 +556,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// test the requireAbsentLogs() function when logs are present in the
tablet metadata
assertFalse(context.getAmple().readTablet(e2).getLogs().isEmpty());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLogs().deleteWal(originalLogEntry)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e2).getStatus());
@@ -583,7 +582,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// simulate a compaction where the tablet location is not set
var tm1 = TabletMetadata.builder(e1).build(FILES, SELECTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1,
FILES).putFile(stf1, dfv)
.putFile(stf2, dfv).putFile(stf3, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -594,7 +593,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
FateId fateId1 = FateId.from(type, UUID.randomUUID());
FateId fateId2 = FateId.from(type, UUID.randomUUID());
var time = SteadyTime.from(100_100, TimeUnit.NANOSECONDS);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES,
SELECTED)
.putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true,
fateId1, time))
.submit(tm -> false);
@@ -605,7 +604,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tm2 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2,
dfv).putFile(stf3, dfv)
.build(SELECTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, FILES,
SELECTED)
.putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true,
fateId1, time))
.submit(tm -> false);
@@ -627,7 +626,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
for (var selectedFiles : expectedToFail) {
var tm3 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2,
dfv).putFile(stf3, dfv)
.putSelectedFiles(selectedFiles).build();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, FILES,
SELECTED)
.deleteSelectedFiles().submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -639,7 +638,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2,
dfv).putFile(stf3, dfv)
.putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true,
fateId1, time)).build();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES,
SELECTED)
.deleteSelectedFiles().submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -670,7 +669,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
final SelectedFiles selectedFiles =
new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateId,
time);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
// write the SelectedFiles to the keyextent
ctmi.mutateTablet(e1).requireAbsentOperation().putSelectedFiles(selectedFiles)
.submit(tm -> false);
@@ -736,7 +735,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// submit a mutation with the condition that the selected files match
what was originally
// written
DataFileValue dfv = new DataFileValue(100, 100);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1,
SELECTED).putFile(stf4, dfv)
.submit(tm -> false);
// with a SortedFilesIterator attached to the Condition for SELECTED,
the metadata should be
@@ -775,7 +774,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var context = getCluster().getServerContext();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.future(ts1)).submit(tm -> false);
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
@@ -795,7 +794,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// in addition to testing multiple tablets also testing
requireAbsentTablet() which is
// currently not called elsewhere in this IT
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.future(ts2)).submit(tm -> false);
ctmi.mutateTablet(e2).requireAbsentTablet().putLocation(Location.future(ts1))
@@ -832,7 +831,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1);
var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm ->
false);
ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit(tm ->
false);
ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit(tm ->
false);
@@ -849,7 +848,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(opid2, context.getAmple().readTablet(e2).getOperationId());
assertNull(context.getAmple().readTablet(e3).getOperationId());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit(tm ->
false);
ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit(tm ->
false);
var results = ctmi.process();
@@ -861,7 +860,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(TabletOperationType.MERGING,
context.getAmple().readTablet(e2).getOperationId().getType());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit(tm ->
false);
ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit(tm ->
false);
var results = ctmi.process();
@@ -897,7 +896,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test different compaction requirement scenarios for tablets w/o any
compactions in the
// metadata table
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMetaNoCompactions,
ECOMP)
.putExternalCompaction(ecid1, ecMeta).submit(tm -> false);
ctmi.mutateTablet(e2).requireAbsentOperation().requireCompaction(ecid2)
@@ -922,7 +921,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test compaction requirements that do not match the compaction ids that
exists in the
// metadata table.
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireCompaction(ecid2)
.deleteExternalCompaction(ecid1).submit(tm -> false);
ctmi.mutateTablet(e3).requireAbsentOperation().requireSame(tabletMetaCompactions,
ECOMP)
@@ -938,7 +937,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test compaction requirements that match the compaction ids that exists
in the metadata
// table.
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMetaCompactions,
ECOMP)
.deleteExternalCompaction(ecid1).submit(tm -> false);
ctmi.mutateTablet(e3).requireAbsentOperation().requireCompaction(ecid2)
@@ -965,7 +964,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tabletMeta1 = TabletMetadata.builder(e1).build(COMPACTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
COMPACTED)
.putCompacted(fateId2)
.submit(tabletMetadata ->
tabletMetadata.getCompacted().contains(fateId2));
@@ -981,7 +980,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(Set.of(fateId2), tabletMeta1.getCompacted());
assertEquals(Set.of(), context.getAmple().readTablet(e2).getCompacted());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
COMPACTED)
.putCompacted(fateId4).putCompacted(fateId5).submit(tabletMetadata
-> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -992,7 +991,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// test require same with a superset
tabletMeta1 =
TabletMetadata.builder(e2).putCompacted(fateId2).putCompacted(fateId4)
.putCompacted(fateId5).putCompacted(fateId1).build(COMPACTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
COMPACTED)
.deleteCompacted(fateId2).deleteCompacted(fateId4).deleteCompacted(fateId5)
.submit(tabletMetadata -> false);
@@ -1004,7 +1003,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// test require same with a subset
tabletMeta1 =
TabletMetadata.builder(e2).putCompacted(fateId2).putCompacted(fateId4).build(COMPACTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
COMPACTED)
.deleteCompacted(fateId2).deleteCompacted(fateId4).deleteCompacted(fateId5)
.submit(tabletMetadata -> false);
@@ -1016,7 +1015,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// now use the exact set the tablet has
tabletMeta1 =
TabletMetadata.builder(e2).putCompacted(fateId2).putCompacted(fateId4)
.putCompacted(fateId5).build(COMPACTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
COMPACTED)
.deleteCompacted(fateId2).deleteCompacted(fateId4).deleteCompacted(fateId5)
.submit(tabletMetadata -> false);
@@ -1039,14 +1038,14 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
TabletOperationId opid =
TabletOperationId.from(TabletOperationType.MERGING, FateId.from(type,
UUID.randomUUID()));
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation()
.putOperation(opid).submit(tm -> false);
assertEquals(Status.REJECTED,
ctmi.process().get(RootTable.EXTENT).getStatus());
}
assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
.requireLocation(Location.future(loc.getServerInstance())).putOperation(opid)
.submit(tm -> false);
@@ -1054,7 +1053,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
.requireLocation(Location.current(loc.getServerInstance())).putOperation(opid)
.submit(tm -> false);
@@ -1065,7 +1064,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
Wait.waitFor(() ->
context.getAmple().readTablet(RootTable.EXTENT).getLocation() == null);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(RootTable.EXTENT).requireOperation(opid).requireAbsentLocation()
.putLocation(Location.future(loc.getServerInstance())).deleteOperation()
.submit(tm -> false);
@@ -1083,7 +1082,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
for (var time : List.of(new MetadataTime(100, TimeType.LOGICAL),
new MetadataTime(100, TimeType.MILLIS), new MetadataTime(0,
TimeType.LOGICAL))) {
var tabletMeta1 = TabletMetadata.builder(e1).putTime(time).build();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
.putTime(new MetadataTime(101,
TimeType.LOGICAL)).submit(tabletMetadata -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1095,7 +1094,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
for (int i = 0; i < 10; i++) {
var tabletMeta1 =
TabletMetadata.builder(e1).putTime(new MetadataTime(i,
TimeType.MILLIS)).build();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
.putTime(new MetadataTime(i + 1,
TimeType.MILLIS)).submit(tabletMetadata -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1117,7 +1116,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
FateId fateId5 = FateId.from(type, UUID.randomUUID());
var tabletMeta1 =
TabletMetadata.builder(e1).build(USER_COMPACTION_REQUESTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireSame(tabletMeta1,
USER_COMPACTION_REQUESTED).putUserCompactionRequested(fateId2)
.submit(tabletMetadata ->
tabletMetadata.getUserCompactionsRequested().contains(fateId2));
@@ -1134,7 +1133,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(Set.of(fateId2), tabletMeta1.getUserCompactionsRequested());
assertEquals(Set.of(),
context.getAmple().readTablet(e2).getUserCompactionsRequested());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireSame(tabletMeta1,
USER_COMPACTION_REQUESTED).putUserCompactionRequested(fateId4)
.putUserCompactionRequested(fateId5).submit(tabletMetadata -> false);
@@ -1147,7 +1146,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
tabletMeta1 =
TabletMetadata.builder(e2).putUserCompactionRequested(fateId2)
.putUserCompactionRequested(fateId4).putUserCompactionRequested(fateId5)
.putUserCompactionRequested(fateId1).build(USER_COMPACTION_REQUESTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireSame(tabletMeta1, USER_COMPACTION_REQUESTED)
.deleteUserCompactionRequested(fateId2).deleteUserCompactionRequested(fateId4)
@@ -1160,7 +1159,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// test require same with a subset
tabletMeta1 =
TabletMetadata.builder(e2).putUserCompactionRequested(fateId2)
.putUserCompactionRequested(fateId4).build(USER_COMPACTION_REQUESTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireSame(tabletMeta1, USER_COMPACTION_REQUESTED)
.deleteUserCompactionRequested(fateId2).deleteUserCompactionRequested(fateId4)
@@ -1174,7 +1173,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
tabletMeta1 =
TabletMetadata.builder(e2).putUserCompactionRequested(fateId2)
.putUserCompactionRequested(fateId4).putUserCompactionRequested(fateId5)
.build(USER_COMPACTION_REQUESTED);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireSame(tabletMeta1, USER_COMPACTION_REQUESTED)
.deleteUserCompactionRequested(fateId2).deleteUserCompactionRequested(fateId4)
@@ -1244,7 +1243,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
for (KeyExtent ke : tabletsWithWalCompactFlush) {
FateInstanceType type = FateInstanceType.fromTableId(ke.tableId());
FateId fateId = FateId.from(type, UUID.randomUUID());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(fateId)
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).putWal(wal)
.submit(tabletMetadata -> false);
@@ -1260,7 +1259,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// on a subset of the tablets, put a location
final Set<KeyExtent> tabletsWithLocation = Set.of(e2, e3, e4);
for (KeyExtent ke : tabletsWithLocation) {
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(ke).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.current(serverInstance)).submit(tabletMetadata -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(ke).getStatus());
@@ -1291,7 +1290,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
"Initially, all tablets should be present");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
// Set compacted on e2 but with no flush ID
ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(fateId1)
.submit(tabletMetadata -> false);
@@ -1301,7 +1300,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
"Compacted but no flush ID should return no tablets");
// Set incorrect flush ID on e2
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e2).requireAbsentOperation().putFlushId(45L)
.submit(tabletMetadata -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
@@ -1310,7 +1309,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
"Compacted with incorrect flush ID should return no tablets");
// Set correct flush ID on e2
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e2).requireAbsentOperation()
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata ->
false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
@@ -1319,7 +1318,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
"Compacted with correct flush ID should return e2");
// Set compacted and correct flush ID on e3
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(fateId2)
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata ->
false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e3).getStatus());
@@ -1342,7 +1341,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var walFilePath =
java.nio.file.Path.of("tserver+8080",
UUID.randomUUID().toString()).toString();
var wal = LogEntry.fromPath(walFilePath);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e2).requireAbsentOperation().putWal(wal).submit(tabletMetadata
-> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
}
@@ -1353,7 +1352,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// add wal to tablet e4
walFilePath = java.nio.file.Path.of("tserver+8080",
UUID.randomUUID().toString()).toString();
wal = LogEntry.fromPath(walFilePath);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e4).requireAbsentOperation().putWal(wal).submit(tabletMetadata
-> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e4).getStatus());
}
@@ -1363,7 +1362,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
"Only tablets with wals should be returned");
// remove the wal from e4
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e4).requireAbsentOperation().deleteWal(wal)
.submit(tabletMetadata -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e4).getStatus());
@@ -1375,7 +1374,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var ts1 = new TServerInstance("localhost:9997", 5000L);
var ts2 = new TServerInstance("localhost:9997", 6000L);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.future(ts1)).submit(tabletMetadata -> false);
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
@@ -1432,7 +1431,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tabletMeta1 = TabletMetadata.builder(e1).putFlushId(42L).build();
assertTrue(tabletMeta1.getFlushId().isPresent());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
FLUSH_ID)
.putFlushId(43L).submit(tabletMetadata ->
tabletMetadata.getFlushId().orElse(-1) == 43L);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1441,7 +1440,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tabletMeta2 = TabletMetadata.builder(e1).build(FLUSH_ID);
assertFalse(tabletMeta2.getFlushId().isPresent());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2,
FLUSH_ID)
.putFlushId(43L).submit(tabletMetadata ->
tabletMetadata.getFlushId().orElse(-1) == 43L);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1450,14 +1449,14 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tabletMeta3 = TabletMetadata.builder(e1).putFlushId(43L).build();
assertTrue(tabletMeta3.getFlushId().isPresent());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3,
FLUSH_ID)
.putFlushId(44L).submit(tabletMetadata ->
tabletMetadata.getFlushId().orElse(-1) == 44L);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(44L,
context.getAmple().readTablet(e1).getFlushId().getAsLong());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3,
FLUSH_ID)
.putFlushId(45L).submit(tabletMetadata ->
tabletMetadata.getFlushId().orElse(-1) == 45L);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1546,7 +1545,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(TabletAvailability.ONDEMAND,
context.getAmple().readTablet(e2).getTabletAvailability());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireTabletAvailability(TabletAvailability.HOSTED)
.putTabletAvailability(TabletAvailability.UNHOSTED).submit(tm ->
false);
@@ -1571,7 +1570,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// require the UNSPLITTABLE column to be absent when it is absent
var usm1 = UnSplittableMetadata.toUnSplittable(e1, 1000, 100000, 32,
Set.of());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
UNSPLITTABLE)
.setUnSplittable(usm1).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1580,7 +1579,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(usm1.toBase64(), tabletMeta2.getUnSplittable().toBase64());
// require the UNSPLITTABLE column to be absent when it is not absent
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1,
UNSPLITTABLE)
.deleteUnSplittable().submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1590,7 +1589,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var usm2 = UnSplittableMetadata.toUnSplittable(e1, 1001, 100001, 33,
Set.of());
var tabletMeta3 =
TabletMetadata.builder(e1).setUnSplittable(usm2).build(UNSPLITTABLE);
// require the UNSPLITTABLE column to be usm2 when it is actually usm1
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3,
UNSPLITTABLE)
.deleteUnSplittable().submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1598,7 +1597,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(usm1.toBase64(),
context.getAmple().readTablet(e1).getUnSplittable().toBase64());
// require the UNSPLITTABLE column to be usm1 when it is actually usm1
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2,
UNSPLITTABLE)
.deleteUnSplittable().submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1606,7 +1605,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertNull(context.getAmple().readTablet(e1).getUnSplittable());
// require the UNSPLITTABLE column to be usm1 when it is actually absent
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2,
UNSPLITTABLE)
.setUnSplittable(usm2).submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1618,7 +1617,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
public void testErrors() {
var context = getCluster().getServerContext();
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentTablet().putDirName("d1").submit(tm
-> false);
// making multiple updates for the same tablet is not supported
assertThrows(IllegalStateException.class,
@@ -1630,7 +1629,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
.requireSame(tabletMetadata,
HOSTING_REQUESTED).putDirName("d2").submit(tm -> false));
}
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
// the following end prev end row update should cause a constraint
violation because a
// tablet's prev end row must be less than its end row
ctmi.mutateTablet(e1).requireAbsentOperation().putPrevEndRow(e1.endRow()).submit(tm
-> false);
@@ -1638,7 +1637,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertThrows(IllegalStateException.class, () ->
ctmi.process().get(e1).getStatus());
}
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFlushNonce(1234).submit(tm ->
false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1663,7 +1662,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var dfv = new DataFileValue(100, 100);
// Add 3 of the files, skip the 4th file
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1,
dfv).putFile(stf2, dfv)
.putFile(stf3, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1672,7 +1671,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test mutation is accepted when given all files
var time1 = MetadataTime.parse("L50");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1,
stf2, stf3))
.putTime(time1).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1681,7 +1680,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test mutation is accepted when a subset of files is given
var time2 = MetadataTime.parse("L60");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1,
stf3)).putTime(time2)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1690,7 +1689,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test mutation is rejected when a file is given that the tablet does not
have
var time3 = MetadataTime.parse("L70");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1,
stf4)).putTime(time3)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1714,7 +1713,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var dfv = new DataFileValue(100, 100);
// Add 3 of the files, skip the 4th file
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1,
dfv).putFile(stf2, dfv)
.putFile(stf3, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1723,7 +1722,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test mutation is accepted when # files in tablet equals limit
var time1 = MetadataTime.parse("L50");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time1)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1732,7 +1731,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test mutation is accepted when # files in tablet is less than limit
var time2 = MetadataTime.parse("L60");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time2)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1741,7 +1740,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test mutation is rejected when # files in tablet is greater than limit
var time3 = MetadataTime.parse("L70");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(2).putTime(time3)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1750,14 +1749,14 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
// add fourth file
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf4,
dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Set.of(stf1, stf2, stf3, stf4),
context.getAmple().readTablet(e1).getFiles());
// Test mutation is rejected when # files in tablet is greater than limit
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time3)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1767,7 +1766,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
// Test mutation is accepted when # files in tablet equals limit
var time4 = MetadataTime.parse("L80");
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time4)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1790,7 +1789,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf1.getTabletFile(),
stf2.getTabletFile()))
.putBulkFile(stf1.getTabletFile(),
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
@@ -1803,7 +1802,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf3.getTabletFile()))
.putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3,
dfv).submit(tm -> false);
@@ -1814,7 +1813,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
context.getAmple().readTablet(e1).getLoaded());
// should fail because the loaded markers are present
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf1.getTabletFile(),
stf2.getTabletFile()))
.putBulkFile(stf1.getTabletFile(),
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
@@ -1823,7 +1822,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
// should fail because the loaded markers are present
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf3.getTabletFile()))
.putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3,
dfv).putFlushId(99)
@@ -1867,14 +1866,14 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
.of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
var dfv = new DataFileValue(100, 100);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1,
dfv).putFile(stf2, dfv)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Set.of(stf1, stf2),
context.getAmple().readTablet(e1).getFiles());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireCheckSuccess(new TestCheck(Set.of(stf1,
stf3))).putFile(stf3, dfv)
.submit(tm -> false);
@@ -1882,7 +1881,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
assertEquals(Set.of(stf1, stf2),
context.getAmple().readTablet(e1).getFiles());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireCheckSuccess(new TestCheck(Set.of(stf1,
stf2))).putFile(stf3, dfv)
.submit(tm -> false);
@@ -1890,7 +1889,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
assertEquals(Set.of(stf1, stf2, stf3),
context.getAmple().readTablet(e1).getFiles());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireCheckSuccess(new TestCheck(Set.of(stf1,
stf3))).deleteFile(stf2)
.submit(tm -> false);
@@ -1898,7 +1897,7 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
}
assertEquals(Set.of(stf1, stf3),
context.getAmple().readTablet(e1).getFiles());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireCheckSuccess(new TestCheck(Set.of(stf1,
stf2))).deleteFile(stf1)
.submit(tm -> false);
@@ -1913,27 +1912,27 @@ public class AmpleConditionalWriterIT extends
SharedMiniClusterBase {
var tsi = new TServerInstance("localhost:1234", 56L);
var otherTsi = new TServerInstance("localhost:9876", 54L);
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireMigration(tsi).deleteMigration()
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}
assertNull(context.getAmple().readTablet(e1).getMigration());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().putMigration(tsi).submit(tm ->
false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(tsi, context.getAmple().readTablet(e1).getMigration());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireMigration(otherTsi).deleteMigration()
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}
assertEquals(tsi, context.getAmple().readTablet(e1).getMigration());
- try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireMigration(tsi).deleteMigration()
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());