This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 0edc48f910 Create a test Ample implementation (#4415) 0edc48f910 is described below commit 0edc48f910f25e5273d62b1d91a7993bf9529b8e Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat May 11 09:08:36 2024 -0400 Create a test Ample implementation (#4415) This commit adds a new test Ample implementation that can be used to interact with Ample against a different table than the table that the Accumulo instance uses. This will allow testing Ample without having to deal with metadata being written to the table by the system. The test Ample also will support injecting custom behavior such as changing the return status of a conditional mutation in order to test. This closes #4376 --- .../accumulo/core/metadata/schema/AmpleImpl.java | 29 ++- .../core/metadata/schema/TabletsMetadata.java | 15 +- pom.xml | 7 + server/base/pom.xml | 18 ++ .../AsyncConditionalTabletsMutatorImpl.java | 11 +- .../metadata/ConditionalTabletsMutatorImpl.java | 17 +- .../accumulo/server/metadata/ServerAmpleImpl.java | 48 ++-- .../server/metadata/TabletsMutatorImpl.java | 14 +- .../metadata/ConditionalWriterDelegator.java | 54 +++++ .../metadata/ConditionalWriterInterceptor.java | 86 +++++++ .../apache/accumulo/server/metadata/TestAmple.java | 264 +++++++++++++++++++++ test/pom.xml | 6 + .../apache/accumulo/test/ample/TestAmpleIT.java | 190 +++++++++++++++ .../apache/accumulo/test/ample/TestAmpleUtil.java | 38 +++ .../apache/accumulo/test/fate/ManagerRepoIT.java | 225 ++++++++++++++++++ 15 files changed, 992 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java index 89ce240cad..cdad8708e6 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java @@ -21,23 +21,34 @@ package org.apache.accumulo.core.metadata.schema; import static com.google.common.collect.MoreCollectors.onlyElement; import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Function; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata.Options; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions; + +import com.google.common.base.Preconditions; public class AmpleImpl implements Ample { private final AccumuloClient client; + private final Function<DataLevel,String> tableMapper; public AmpleImpl(AccumuloClient client) { + this(client, DataLevel::metaTable); + } + + public AmpleImpl(AccumuloClient client, Function<DataLevel,String> tableMapper) { this.client = client; + this.tableMapper = Objects.requireNonNull(tableMapper); } @Override public TabletMetadata readTablet(KeyExtent extent, ReadConsistency readConsistency, ColumnType... colsToFetch) { - Options builder = TabletsMetadata.builder(client).forTablet(extent); + Options builder = newBuilder().forTablet(extent); if (colsToFetch.length > 0) { builder.fetch(colsToFetch); } @@ -53,7 +64,21 @@ public class AmpleImpl implements Ample { @Override public TabletsMetadata.TableOptions readTablets() { - return TabletsMetadata.builder(this.client); + return newBuilder(); + } + + protected TableOptions newBuilder() { + return TabletsMetadata.builder(this.client, getTableMapper()); + } + + protected String getMetadataTableName(Ample.DataLevel dataLevel) { + final String metadataTable = getTableMapper().apply(dataLevel); + Preconditions.checkArgument(metadataTable != null, + "A metadata table for %s has not been registered", dataLevel); + return metadataTable; } + protected Function<DataLevel,String> getTableMapper() { + return tableMapper; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index 48a1160513..c91e2d54de 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -117,9 +117,11 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE; private final AccumuloClient _client; private final List<TabletMetadataFilter> tabletMetadataFilters = new ArrayList<>(); + private final Function<DataLevel,String> tableMapper; - Builder(AccumuloClient client) { + Builder(AccumuloClient client, Function<DataLevel,String> tableMapper) { this._client = client; + this.tableMapper = Objects.requireNonNull(tableMapper); } @Override @@ -180,7 +182,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable } else { try { BatchScanner scanner = - client.createBatchScanner(level.metaTable(), Authorizations.EMPTY); + client.createBatchScanner(tableMapper.apply(level), Authorizations.EMPTY); var ranges = groupedExtents.get(level).stream().map(KeyExtent::toMetaRange).collect(toList()); @@ -253,7 +255,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable private TabletsMetadata buildNonRoot(AccumuloClient client) { try { - String resolvedTable = table == null ? level.metaTable() : table; + String resolvedTable = table == null ? tableMapper.apply(level) : table; Scanner scanner = new IsolatedScanner(client.createScanner(resolvedTable, Authorizations.EMPTY)); @@ -653,7 +655,12 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable } public static TableOptions builder(AccumuloClient client) { - return new Builder(client); + return new Builder(client, DataLevel::metaTable); + } + + public static TableOptions builder(AccumuloClient client, + Function<DataLevel,String> tableMapper) { + return new Builder(client, tableMapper); } private static TabletMetadata getRootMetadata(ClientContext ctx, diff --git a/pom.xml b/pom.xml index aeacdf925e..ea8923d947 100644 --- a/pom.xml +++ b/pom.xml @@ -624,6 +624,13 @@ <artifactId>snakeyaml</artifactId> <version>2.2</version> </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-server-base</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement> <dependencies> diff --git a/server/base/pom.xml b/server/base/pom.xml index c20b3e6edc..d7f23eb028 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -136,6 +136,24 @@ <directory>src/test/resources</directory> </testResource> </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <includes> + <include>**/metadata/**</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> </build> <profiles> <profile> diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java index d7666361f7..46532681d9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java @@ -25,9 +25,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.Consumer; +import java.util.function.Function; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; @@ -39,16 +41,18 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona private final ServerContext context; private long mutatedTablets = 0; public static final int BATCH_SIZE = 1000; + private final Function<DataLevel,String> tableMapper; - AsyncConditionalTabletsMutatorImpl(ServerContext context, + AsyncConditionalTabletsMutatorImpl(ServerContext context, Function<DataLevel,String> tableMapper, Consumer<Ample.ConditionalResult> resultsConsumer) { this.resultsConsumer = Objects.requireNonNull(resultsConsumer); - this.bufferingMutator = new ConditionalTabletsMutatorImpl(context); this.context = context; + this.bufferingMutator = new ConditionalTabletsMutatorImpl(context, tableMapper); var creatorId = Thread.currentThread().getId(); this.executor = Executors.newSingleThreadExecutor(runnable -> Threads.createThread( "Async conditional tablets mutator background thread, created by : #" + creatorId, runnable)); + this.tableMapper = Objects.requireNonNull(tableMapper); } @@ -73,7 +77,7 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona return result; }); - bufferingMutator = new ConditionalTabletsMutatorImpl(context); + bufferingMutator = new ConditionalTabletsMutatorImpl(context, tableMapper); mutatedTablets = 0; } mutatedTablets++; @@ -95,4 +99,5 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona bufferingMutator.close(); executor.shutdownNow(); } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java index 2e999033fa..2ddf62fbc8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java @@ -24,8 +24,10 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; @@ -35,6 +37,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.server.ServerContext; @@ -61,9 +64,16 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu private boolean active = true; Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new HashMap<>(); + private final Function<DataLevel,String> tableMapper; public ConditionalTabletsMutatorImpl(ServerContext context) { + this(context, DataLevel::metaTable); + } + + public ConditionalTabletsMutatorImpl(ServerContext context, + Function<DataLevel,String> tableMapper) { this.context = context; + this.tableMapper = Objects.requireNonNull(tableMapper); } @Override @@ -90,7 +100,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu if (dataLevel == Ample.DataLevel.ROOT) { return new RootConditionalWriter(context); } else { - return context.createConditionalWriter(dataLevel.metaTable()); + return context.createConditionalWriter(getTableMapper().apply(dataLevel)); } } @@ -293,4 +303,9 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu @Override public void close() {} + + protected Function<DataLevel,String> getTableMapper() { + return tableMapper; + } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 12419be290..abb297ad23 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -29,6 +29,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -68,13 +69,18 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { private static Logger log = LoggerFactory.getLogger(ServerAmpleImpl.class); - private ServerContext context; + private final ServerContext context; public ServerAmpleImpl(ServerContext context) { super(context); this.context = context; } + public ServerAmpleImpl(ServerContext context, Function<DataLevel,String> tableMapper) { + super(context, tableMapper); + this.context = context; + } + @Override public Ample.TabletMutator mutateTablet(KeyExtent extent) { TabletsMutator tmi = mutateTablets(); @@ -85,18 +91,18 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { @Override public TabletsMutator mutateTablets() { - return new TabletsMutatorImpl(context); + return new TabletsMutatorImpl(context, getTableMapper()); } @Override public ConditionalTabletsMutator conditionallyMutateTablets() { - return new ConditionalTabletsMutatorImpl(context); + return new ConditionalTabletsMutatorImpl(context, getTableMapper()); } @Override public AsyncConditionalTabletsMutator conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) { - return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer); + return new AsyncConditionalTabletsMutatorImpl(context, getTableMapper(), resultsConsumer); } private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) { @@ -131,7 +137,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { return; } - try (BatchWriter writer = context.createBatchWriter(DataLevel.of(tableId).metaTable())) { + try (BatchWriter writer = context.createBatchWriter(getMetaTable(DataLevel.of(tableId)))) { for (StoredTabletFile file : candidates) { writer.addMutation(createDeleteMutation(file)); } @@ -150,7 +156,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { return; } - try (BatchWriter writer = context.createBatchWriter(DataLevel.of(tableId).metaTable())) { + try (BatchWriter writer = context.createBatchWriter(getMetaTable(DataLevel.of(tableId)))) { for (var fileOrDir : candidates) { writer.addMutation(createDeleteMutation(fileOrDir)); } @@ -204,7 +210,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { return; } - try (BatchWriter writer = context.createBatchWriter(level.metaTable())) { + try (BatchWriter writer = context.createBatchWriter(getMetaTable(level))) { if (type == GcCandidateType.VALID) { for (GcCandidate candidate : candidates) { Mutation m = new Mutation(DeletesSection.encodeRow(candidate.getPath())); @@ -242,7 +248,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { Scanner scanner; try { - scanner = context.createScanner(level.metaTable(), Authorizations.EMPTY); + scanner = context.createScanner(getMetaTable(level), Authorizations.EMPTY); } catch (TableNotFoundException e) { throw new IllegalStateException(e); } @@ -274,7 +280,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { @Override public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) { - try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { + var metaTable = getMetaTable(DataLevel.USER); + try (BatchWriter writer = context.createBatchWriter(metaTable)) { String prefix = ScanServerFileReferenceSection.getRowPrefix(); for (ScanServerRefTabletFile ref : scanRefs) { Mutation m = new Mutation(prefix + ref.getRowSuffix()); @@ -283,21 +290,22 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } } catch (MutationsRejectedException | TableNotFoundException e) { throw new IllegalStateException( - "Error inserting scan server file references into " + DataLevel.USER.metaTable(), e); + "Error inserting scan server file references into " + metaTable, e); } } @Override public Stream<ScanServerRefTabletFile> getScanServerFileReferences() { + var metaTable = getMetaTable(DataLevel.USER); try { - Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY); + Scanner scanner = context.createScanner(metaTable, Authorizations.EMPTY); scanner.setRange(ScanServerFileReferenceSection.getRange()); int pLen = ScanServerFileReferenceSection.getRowPrefix().length(); return scanner.stream().onClose(scanner::close) .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen), e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())); } catch (TableNotFoundException e) { - throw new IllegalStateException(DataLevel.USER.metaTable() + " not found!", e); + throw new IllegalStateException(metaTable + " not found!", e); } } @@ -305,8 +313,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { public void deleteScanServerFileReferences(String serverAddress, UUID scanServerLockUUID) { Objects.requireNonNull(serverAddress, "Server address must be supplied"); Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied"); - try ( - Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + var metaTable = getMetaTable(DataLevel.USER); + try (Scanner scanner = context.createScanner(metaTable, Authorizations.EMPTY)) { scanner.setRange(ScanServerFileReferenceSection.getRange()); scanner.fetchColumn(new Text(serverAddress), new Text(scanServerLockUUID.toString())); @@ -320,13 +328,13 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { this.deleteScanServerFileReferences(refsToDelete); } } catch (TableNotFoundException e) { - throw new IllegalStateException(DataLevel.USER.metaTable() + " not found!", e); + throw new IllegalStateException(metaTable + " not found!", e); } } @Override public void deleteScanServerFileReferences(Collection<ScanServerRefTabletFile> refsToDelete) { - try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { + try (BatchWriter writer = context.createBatchWriter(getMetaTable(DataLevel.USER))) { String prefix = ScanServerFileReferenceSection.getRowPrefix(); for (ScanServerRefTabletFile ref : refsToDelete) { Mutation m = new Mutation(prefix + ref.getRowSuffix()); @@ -338,4 +346,12 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { throw new IllegalStateException(e); } } + + ServerContext getContext() { + return context; + } + + private String getMetaTable(DataLevel dataLevel) { + return getTableMapper().apply(dataLevel); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java index 2ce5b66d9c..72e4703ca1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java @@ -18,6 +18,9 @@ */ package org.apache.accumulo.server.metadata; +import java.util.Objects; +import java.util.function.Function; + import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -25,6 +28,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; import org.apache.accumulo.server.ServerContext; @@ -32,13 +36,15 @@ import com.google.common.base.Preconditions; public class TabletsMutatorImpl implements TabletsMutator { - private ServerContext context; + private final ServerContext context; private BatchWriter rootWriter; private BatchWriter metaWriter; + private final Function<DataLevel,String> tableMapper; - public TabletsMutatorImpl(ServerContext context) { + TabletsMutatorImpl(ServerContext context, Function<DataLevel,String> tableMapper) { this.context = context; + this.tableMapper = Objects.requireNonNull(tableMapper); } private BatchWriter getWriter(TableId tableId) { @@ -48,13 +54,13 @@ public class TabletsMutatorImpl implements TabletsMutator { try { if (AccumuloTable.METADATA.tableId().equals(tableId)) { if (rootWriter == null) { - rootWriter = context.createBatchWriter(AccumuloTable.ROOT.tableName()); + rootWriter = context.createBatchWriter(tableMapper.apply(DataLevel.METADATA)); } return rootWriter; } else { if (metaWriter == null) { - metaWriter = context.createBatchWriter(AccumuloTable.METADATA.tableName()); + metaWriter = context.createBatchWriter(tableMapper.apply(DataLevel.USER)); } return metaWriter; diff --git a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterDelegator.java b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterDelegator.java new file mode 100644 index 0000000000..af4e3ad771 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterDelegator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.Iterator; + +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.data.ConditionalMutation; + +class ConditionalWriterDelegator implements ConditionalWriter { + private final ConditionalWriter delegate; + private final ConditionalWriterInterceptor interceptor; + + public ConditionalWriterDelegator(ConditionalWriter delegate, + ConditionalWriterInterceptor interceptor) { + this.delegate = delegate; + this.interceptor = interceptor; + } + + @Override + public Iterator<Result> write(Iterator<ConditionalMutation> mutations) { + mutations = interceptor.beforeWrite(mutations); + return interceptor.afterWrite(delegate.write(mutations)); + } + + @Override + public Result write(ConditionalMutation mutation) { + mutation = interceptor.beforeWrite(mutation); + return interceptor.afterWrite(delegate.write(mutation)); + } + + @Override + public void close() { + interceptor.beforeClose(); + delegate.close(); + interceptor.afterClose(); + } +} diff --git a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterInterceptor.java b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterInterceptor.java new file mode 100644 index 0000000000..36ce45b6e6 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterInterceptor.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.ConditionalWriter.Result; +import org.apache.accumulo.core.client.ConditionalWriter.Status; +import org.apache.accumulo.core.data.ConditionalMutation; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Streams; + +public interface ConditionalWriterInterceptor { + + default Iterator<ConditionalMutation> beforeWrite(Iterator<ConditionalMutation> mutations) { + return mutations; + } + + default Iterator<Result> afterWrite(Iterator<Result> results) { + return results; + } + + default ConditionalMutation beforeWrite(ConditionalMutation mutation) { + return mutation; + } + + default Result afterWrite(Result result) { + return result; + } + + default void beforeClose() { + + } + + default void afterClose() { + + } + + static ConditionalWriterInterceptor withStatus(Status replaced, int times) { + return withStatus(null, replaced, times); + } + + static ConditionalWriterInterceptor withStatus(Status firstExpected, Status replaced, int times) { + final AtomicInteger count = new AtomicInteger(); + return new ConditionalWriterInterceptor() { + @Override + public Iterator<Result> afterWrite(Iterator<Result> results) { + if (count.getAndIncrement() < times) { + // For the first run only, make sure each state matches firstExpected if not null + // for other runs don't check since we are changing state so future runs may not match + return Streams.stream(results).map(r -> { + try { + Preconditions + .checkState(times > 1 || firstExpected == null || r.getStatus() == firstExpected); + } catch (IllegalStateException e) { + throw e; + } catch (Exception e) { + throw new IllegalStateException(e); + } + return new Result(replaced, r.getMutation(), r.getTabletServer()); + }).collect(Collectors.toList()).iterator(); + } + return results; + } + }; + } +} diff --git a/server/base/src/test/java/org/apache/accumulo/server/metadata/TestAmple.java b/server/base/src/test/java/org/apache/accumulo/server/metadata/TestAmple.java new file mode 100644 index 0000000000..88b768f77a --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/metadata/TestAmple.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedMap; +import java.util.function.BiPredicate; +import java.util.function.Consumer; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletInformation; +import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletsMutator; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.MoreCollectors; + +public class TestAmple { + + private static final ConditionalWriterInterceptor EMPTY_INTERCEPTOR = + new ConditionalWriterInterceptor() {}; + + private static final BiPredicate<Key,Value> INCLUDE_ALL_COLUMNS = (k, v) -> true; + + public static BiPredicate<Key,Value> matches(ColumnFQ column) { + return (k, v) -> column.equals(new ColumnFQ(k)); + } + + public static BiPredicate<Key,Value> not(ColumnFQ column) { + return matches(column).negate(); + } + + public static Ample create(ServerContext context, Map<DataLevel,String> tables, + Supplier<ConditionalWriterInterceptor> cwInterceptor) { + return new TestServerAmpleImpl(context, tables, cwInterceptor); + } + + public static Ample create(ServerContext context, Map<DataLevel,String> tables) { + return create(context, tables, () -> EMPTY_INTERCEPTOR); + } + + public static class TestServerAmpleImpl extends ServerAmpleImpl { + + private final Map<DataLevel,String> tables; + private final Supplier<ConditionalWriterInterceptor> cwInterceptor; + private final Supplier<ServerContext> testContext = + Suppliers.memoize(() -> testAmpleServerContext(super.getContext(), this)); + + private TestServerAmpleImpl(ServerContext context, Map<DataLevel,String> tables, + Supplier<ConditionalWriterInterceptor> cwInterceptor) { + super(context, tables::get); + this.tables = Map.copyOf(tables); + this.cwInterceptor = Objects.requireNonNull(cwInterceptor); + Preconditions.checkArgument(!tables.containsKey(DataLevel.ROOT)); + Preconditions.checkArgument( + tables.containsKey(DataLevel.USER) || tables.containsKey(DataLevel.METADATA)); + } + + @Override + public TabletsMutator mutateTablets() { + return new TabletsMutatorImpl(getContext(), tables::get); + } + + @Override + public ConditionalTabletsMutator conditionallyMutateTablets() { + return conditionallyMutateTablets(cwInterceptor.get()); + } + + @Override + public AsyncConditionalTabletsMutator + conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) { + return new AsyncConditionalTabletsMutatorImpl(getContext(), getTableMapper(), + resultsConsumer); + } + + @Override + protected TableOptions newBuilder() { + return TabletsMetadata.builder(getContext(), getTableMapper()); + } + + /** + * Create default metadata for a Table + * + * TODO: Add a way to pass in options for config + * + * @param tableId The id of the table to create metadata for + */ + public void createMetadata(TableId tableId) { + try (var tabletsMutator = mutateTablets()) { + var extent = new KeyExtent(tableId, null, null); + var tabletMutator = tabletsMutator.mutateTablet(extent); + String dirName = ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; + tabletMutator.putPrevEndRow(extent.prevEndRow()); + tabletMutator.putDirName(dirName); + tabletMutator.putTime(new MetadataTime(0, TimeType.MILLIS)); + tabletMutator.putTabletAvailability(TabletAvailability.HOSTED); + tabletMutator.mutate(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * Create metadata for a table by copying existing metadata for the table from the metadata + * table in an existing Accumulo instance + * + * TODO: Add config parents (such as a way to include/exclude what is copied, etc) + * + * @param client The client to scan the existing accumulo metadata table + * @param tableId The id of the table to create metadata for + * @throws Exception thrown for any error on metadata creation + */ + public void createMetadataFromExisting(AccumuloClient client, TableId tableId) + throws Exception { + createMetadataFromExisting(client, tableId, INCLUDE_ALL_COLUMNS); + } + + public void createMetadataFromExisting(AccumuloClient client, TableId tableId, + BiPredicate<Key,Value> includeColumn) throws Exception { + try (Scanner scanner = + client.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) { + scanner.setRange(TabletsSection.getRange(tableId)); + IteratorSetting iterSetting = new IteratorSetting(100, WholeRowIterator.class); + scanner.addScanIterator(iterSetting); + + try (BatchWriter bw = + client.createBatchWriter(getMetadataTableName(DataLevel.of(tableId)))) { + for (Entry<Key,Value> entry : scanner) { + final SortedMap<Key,Value> decodedRow = + WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()); + Text row = decodedRow.firstKey().getRow(); + Mutation m = new Mutation(row); + + decodedRow.entrySet().stream().filter(e -> includeColumn.test(e.getKey(), e.getValue())) + .forEach(e -> m.put(e.getKey().getColumnFamily(), e.getKey().getColumnQualifier(), + e.getKey().getColumnVisibilityParsed(), e.getKey().getTimestamp(), + e.getValue())); + bw.addMutation(m); + } + } + } + } + + private ConditionalTabletsMutator + conditionallyMutateTablets(ConditionalWriterInterceptor interceptor) { + Objects.requireNonNull(interceptor); + + return new ConditionalTabletsMutatorImpl(getContext(), tables::get) { + + @Override + protected ConditionalWriter createConditionalWriter(Ample.DataLevel dataLevel) + throws TableNotFoundException { + if (dataLevel == Ample.DataLevel.ROOT) { + return super.createConditionalWriter(dataLevel); + } else { + return new ConditionalWriterDelegator( + getContext().createConditionalWriter(getTableMapper().apply(dataLevel)), + interceptor); + } + } + }; + } + + @Override + ServerContext getContext() { + return testContext.get(); + } + + } + + public static ServerContext testAmpleServerContext(ServerContext context, + TestAmple.TestServerAmpleImpl ample) { + SiteConfiguration siteConfig; + try { + Map<String,String> propsMap = new HashMap<>(); + context.getSiteConfiguration().getProperties(propsMap, x -> true); + siteConfig = SiteConfiguration.empty().withOverrides(propsMap).build(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return new ServerContext(siteConfig) { + @Override + public Ample getAmple() { + return ample; + } + + @Override + public ServiceLock getServiceLock() { + return context.getServiceLock(); + } + + @Override + public void setServiceLock(ServiceLock lock) { + context.setServiceLock(lock); + } + }; + } + + public static void createMetadataTable(ClientContext client, String table) throws Exception { + final var metadataTableProps = + client.tableOperations().getTableProperties(AccumuloTable.METADATA.tableName()); + + TabletAvailability availability; + try (var tabletStream = client.tableOperations() + .getTabletInformation(AccumuloTable.METADATA.tableName(), new Range())) { + availability = tabletStream.map(TabletInformation::getTabletAvailability).distinct() + .collect(MoreCollectors.onlyElement()); + } + + var newTableConf = new NewTableConfiguration().withInitialTabletAvailability(availability) + .withoutDefaultIterators().setProperties(metadataTableProps); + client.tableOperations().create(table, newTableConf); + } +} diff --git a/test/pom.xml b/test/pom.xml index 709dbe2975..8e5249cc17 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -111,6 +111,12 @@ <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-server-base</artifactId> </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-server-base</artifactId> + <type>test-jar</type> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-shell</artifactId> diff --git a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java new file mode 100644 index 0000000000..5f52ae1339 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.ample; + +import static org.apache.accumulo.core.client.ConditionalWriter.Status.ACCEPTED; +import static org.apache.accumulo.core.client.ConditionalWriter.Status.UNKNOWN; +import static org.apache.accumulo.server.metadata.ConditionalWriterInterceptor.withStatus; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.metadata.TestAmple; +import org.apache.accumulo.server.metadata.TestAmple.TestServerAmpleImpl; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import com.google.common.base.Preconditions; + +public class TestAmpleIT extends SharedMiniClusterBase { + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Test + public void testCreateMetadataFromExisting() throws Exception { + + String[] tableNames = getUniqueNames(2); + String metadataTable = tableNames[0]; + String userTable = tableNames[1]; + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(userTable); + + TestAmple.createMetadataTable(client, metadataTable); + + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(userTable)); + + TestServerAmpleImpl ample = (TestServerAmpleImpl) TestAmple + .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); + + ample.createMetadataFromExisting(client, tableId); + + var count = new AtomicInteger(); + try (var tablets = ample.readTablets().forTable(tableId).build().stream()) { + tablets.forEach(tm -> { + assertNotNull(tm.getTableId()); + assertNotNull(tm.getExtent()); + assertNotNull(tm.getTabletAvailability()); + assertNotNull(tm.getTime()); + count.incrementAndGet(); + }); + } + assertEquals(1, count.get()); + } + } + + @Test + public void testCreateMetadata() throws Exception { + String[] tableNames = getUniqueNames(2); + String metadataTable = tableNames[0]; + String userTable = tableNames[1]; + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(userTable); + TestAmple.createMetadataTable(client, metadataTable); + + TableId tableId = TableId.of("1"); + TestServerAmpleImpl ample = (TestServerAmpleImpl) TestAmple + .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); + + ample.createMetadata(tableId); + + var count = new AtomicInteger(); + try (var tablets = ample.readTablets().forTable(tableId).build().stream()) { + tablets.forEach(tm -> { + assertNotNull(tm.getTableId()); + assertNotNull(tm.getExtent()); + assertNotNull(tm.getTabletAvailability()); + assertNotNull(tm.getTime()); + count.incrementAndGet(); + }); + } + assertEquals(1, count.get()); + } + + } + + // This is an example test showing how to test a conditional + // mutation rejection handler by using a ConditionalWriterInterceptor + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testUnknownStatus(boolean accepted) throws Exception { + + String[] tableNames = getUniqueNames(2); + String metadataTable = tableNames[0] + accepted; + String userTable = tableNames[1] + accepted; + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(userTable); + + TestAmple.createMetadataTable(client, metadataTable); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(userTable)); + + TestServerAmpleImpl ample = + (TestServerAmpleImpl) TestAmple.create(getCluster().getServerContext(), + Map.of(DataLevel.USER, metadataTable), () -> withStatus(ACCEPTED, UNKNOWN, 1)); + ample.createMetadataFromExisting(client, tableId); + + // Add a custom interceptor that will replace the result status with UNKNOWN + // for only the first time the method is called instead of the actual state + // (which should be accepted) + // + // When the result of UNKNOWN is returned, the mutator will trigger a retry + // and resubmit the mutation. On retry, the mutation should be rejected because + // the mutation requires an absent operation that will have already been set on + // the previous submission. + // + // This will cause the mutator to check the rejection handler to see if we should actually + // accept the mutation. This test uses a boolean to run twice to test both the + // case of when the rejection handle will return true and false so that we can test that + // the state is correctly set to either ACCEPTED or REJECTED after the rejection handler is + // executed. + try (var tabletsMutator = ample.conditionallyMutateTablets()) { + var mutator = tabletsMutator.mutateTablet(new KeyExtent(tableId, null, null)) + .requireAbsentOperation(); + var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); + + mutator.putOperation(opid); + + // If accepted is true then we want to test the ACCEPTED case so return true if + // the opid matches. If false we want this to fail to test REJECTED, so we return + // false no matter what + mutator.submit(afterMeta -> accepted && opid.equals(afterMeta.getOperationId())); + + var results = tabletsMutator.process(); + results.values().forEach(result -> { + var status = result.getStatus(); + // check the state is correct + Preconditions.checkState(accepted ? status == Status.ACCEPTED : status == Status.REJECTED, + "Failed %s, %s", status, result.getExtent()); + }); + } + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java new file mode 100644 index 0000000000..07c9dcae3b --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.ample; + +import static org.apache.accumulo.server.metadata.TestAmple.testAmpleServerContext; + +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.metadata.TestAmple.TestServerAmpleImpl; +import org.easymock.EasyMock; + +public class TestAmpleUtil { + + public static Manager mockWithAmple(ServerContext context, TestServerAmpleImpl ample) { + Manager manager = EasyMock.mock(Manager.class); + EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, ample)) + .atLeastOnce(); + EasyMock.replay(manager); + return manager; + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java new file mode 100644 index 0000000000..82ae56d75f --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.server.metadata.TestAmple.not; +import static org.apache.accumulo.test.ample.TestAmpleUtil.mockWithAmple; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.merge.DeleteRows; +import org.apache.accumulo.manager.tableOps.merge.MergeInfo; +import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; +import org.apache.accumulo.manager.tableOps.merge.MergeTablets; +import org.apache.accumulo.manager.tableOps.merge.ReserveTablets; +import org.apache.accumulo.manager.tableOps.split.FindSplits; +import org.apache.accumulo.manager.tableOps.split.PreSplit; +import org.apache.accumulo.server.metadata.TestAmple; +import org.apache.accumulo.server.metadata.TestAmple.TestServerAmpleImpl; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class ManagerRepoIT extends SharedMiniClusterBase { + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @ParameterizedTest + @EnumSource(MergeInfo.Operation.class) + public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception { + String[] tableNames = getUniqueNames(2); + String metadataTable = tableNames[0] + operation; + String userTable = tableNames[1] + operation; + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(userTable); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(userTable)); + + // Set up Test ample and manager + TestAmple.createMetadataTable(client, metadataTable); + TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple + .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); + testAmple.createMetadataFromExisting(client, tableId); + Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + + // Create a test operation and fate id for testing merge and delete rows + // and add operation to test metadata for the tablet + var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); + var extent = new KeyExtent(tableId, null, null); + + try (TabletsMutator tm = testAmple.mutateTablets()) { + tm.mutateTablet(extent).putOperation(opid).mutate(); + } + + // Build either MergeTablets or DeleteRows repo for testing no WALs, both should check this + // condition + final MergeInfo mergeInfo = new MergeInfo(tableId, + manager.getContext().getNamespaceId(tableId), null, null, operation); + final ManagerRepo repo = + operation == Operation.MERGE ? new MergeTablets(mergeInfo) : new DeleteRows(mergeInfo); + // Also test ReserveTablets isReady() + final ManagerRepo reserve = new ReserveTablets(mergeInfo); + + // First, check no errors with the default case + assertEquals(0, reserve.isReady(fateId, manager)); + assertNotNull(repo.call(fateId, manager)); + + // Write a WAL to the test metadata and then re-run the repo to check for an error + try (TabletsMutator tm = testAmple.mutateTablets()) { + var walFilePath = Path.of("tserver+8080", UUID.randomUUID().toString()).toString(); + tm.mutateTablet(extent).putWal(LogEntry.fromPath(walFilePath)).mutate(); + } + + // Should not be ready due to the presence of a WAL + assertTrue(reserve.isReady(fateId, manager) > 0); + + // Repo should throw an exception due to the WAL existence + var thrown = assertThrows(IllegalStateException.class, () -> repo.call(fateId, manager)); + assertTrue(thrown.getMessage().contains("has unexpected walogs")); + } + } + + @Test + public void testFindSplitsUnsplittable() throws Exception { + + String[] tableNames = getUniqueNames(2); + String metadataTable = tableNames[0]; + String userTable = tableNames[1]; + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + TestAmple.createMetadataTable(client, metadataTable); + + // Create table with a smaller max end row size + createUnsplittableTable(client, userTable); + populateUnsplittableTable(client, userTable); + + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(userTable)); + + TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple + .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); + // Prevent UNSPLITTABLE_COLUMN just in case a system split tried to run on the table + // before we copied it and inserted the column + testAmple.createMetadataFromExisting(client, tableId, + not(SplitColumnFamily.UNSPLITTABLE_COLUMN)); + + KeyExtent extent = new KeyExtent(tableId, null, null); + Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + + FindSplits findSplits = new FindSplits(extent); + PreSplit preSplit = (PreSplit) findSplits + .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), manager); + + // The table should not need splitting + assertNull(preSplit); + + // Verify metadata has unsplittable column + var metadata = testAmple.readTablet(new KeyExtent(tableId, null, null)).getUnSplittable(); + assertNotNull(metadata); + + // Bump max end row size and verify split occurs and unsplittable column is cleaned up + client.tableOperations().setProperty(userTable, Property.TABLE_MAX_END_ROW_SIZE.getKey(), + "500"); + + findSplits = new FindSplits(extent); + preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), + manager); + + // The table SHOULD now need splitting + assertNotNull(preSplit); + + // Verify unsplittable metadata is still the same and exists + // This will not be cleared until the UpdateTablets repo runs + // so if the test is updated to test UpdateTablets this can be checked + assertEquals(metadata, + testAmple.readTablet(new KeyExtent(tableId, null, null)).getUnSplittable()); + } + } + + private void createUnsplittableTable(ClientContext client, String table) throws Exception { + // make a table and lower the configuration properties + // @formatter:off + Map<String,String> props = Map.of( + Property.TABLE_SPLIT_THRESHOLD.getKey(), "1K", + Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none", + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64", + Property.TABLE_MAX_END_ROW_SIZE.getKey(), "" + 100, + Property.TABLE_MAJC_RATIO.getKey(), "9999" + ); + // @formatter:on + client.tableOperations().create(table, new NewTableConfiguration().setProperties(props)); + + } + + private void populateUnsplittableTable(ClientContext client, String table) throws Exception { + byte[] data = new byte[101]; + Arrays.fill(data, 0, data.length - 2, (byte) 'm'); + + final int numOfMutations = 20; + try (BatchWriter batchWriter = client.createBatchWriter(table)) { + // Make the last place in the key different for every entry added to the table + for (int i = 0; i < numOfMutations; i++) { + data[data.length - 1] = (byte) i; + Mutation m = new Mutation(data); + m.put("cf", "cq", "value"); + batchWriter.addMutation(m); + } + } + client.tableOperations().flush(table, null, null, true); + } +}