This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 0b220c1d86 Move Scan Server File refs to their own table (#4690) 0b220c1d86 is described below commit 0b220c1d86e32123697d94e704309fef48c42dbf Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Fri Jun 21 16:48:12 2024 -0400 Move Scan Server File refs to their own table (#4690) This change moves scan server refs to a separate table from metadata in order to improve performance. The prefix was dropped as nothing else is stored in the table anymore. This is a backport of the change in elasticity that was done in #4650 into 3.1 --- .../accumulo/core/metadata/AccumuloTable.java | 2 +- .../accumulo/core/metadata/ScanServerRefStore.java | 64 ++++++++++++ .../core/metadata/ScanServerRefTabletFile.java | 4 +- .../accumulo/core/metadata/schema/Ample.java | 44 +------- .../core/metadata/schema/MetadataSchema.java | 13 --- .../MiniAccumuloClusterImplTest.java | 5 +- .../server/init/FileSystemInitializer.java | 27 ++++- .../accumulo/server/init/InitialConfiguration.java | 51 ++++++---- .../accumulo/server/init/ZooKeeperInitializer.java | 11 +- .../server/metadata/ScanServerRefStoreImpl.java | 113 +++++++++++++++++++++ .../accumulo/server/metadata/ServerAmpleImpl.java | 77 ++------------ .../server/util/ScanServerMetadataEntries.java | 8 +- .../main/java/org/apache/accumulo/gc/GCRun.java | 2 +- .../org/apache/accumulo/tserver/ScanServer.java | 8 +- .../java/org/apache/accumulo/test/MetaSplitIT.java | 4 +- .../org/apache/accumulo/test/NamespacesIT.java | 2 +- .../test/ScanServerMetadataEntriesCleanIT.java | 8 +- .../accumulo/test/ScanServerMetadataEntriesIT.java | 38 ++++--- .../apache/accumulo/test/ScanServerShutdownIT.java | 6 +- .../accumulo/test/functional/WALSunnyDayIT.java | 6 +- 20 files changed, 294 insertions(+), 199 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java index 14b8b0cf30..929444fb3c 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java @@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.TableId; */ public enum AccumuloTable { - ROOT("root", "+r"), METADATA("metadata", "!0"); + ROOT("root", "+r"), METADATA("metadata", "!0"), SCAN_REF("scanref", "+scanref"); private final String name; private final TableId tableId; diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefStore.java b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefStore.java new file mode 100644 index 0000000000..461427fcd2 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefStore.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metadata; + +import java.util.Collection; +import java.util.UUID; +import java.util.stream.Stream; + +public interface ScanServerRefStore { + + /** + * Insert ScanServer references to Tablet files + * + * @param scanRefs set of scan server ref table file objects + */ + default void put(Collection<ScanServerRefTabletFile> scanRefs) { + throw new UnsupportedOperationException(); + } + + /** + * Get ScanServer references to Tablet files + * + * @return stream of scan server references + */ + default Stream<ScanServerRefTabletFile> list() { + throw new UnsupportedOperationException(); + } + + /** + * Delete the set of scan server references + * + * @param refsToDelete set of scan server references to delete + */ + default void delete(Collection<ScanServerRefTabletFile> refsToDelete) { + throw new UnsupportedOperationException(); + } + + /** + * Delete scan server references for this server + * + * @param serverAddress address of server, cannot be null + * @param serverSessionId server session id, cannot be null + */ + default void delete(String serverAddress, UUID serverSessionId) { + throw new UnsupportedOperationException(); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java index 291343e2e7..50076eaa6f 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java @@ -44,7 +44,7 @@ public class ScanServerRefTabletFile extends ReferencedTabletFile { this.colq = colq; } - public String getRowSuffix() { + public String getRow() { return this.getNormalizedPathStr(); } @@ -86,7 +86,7 @@ public class ScanServerRefTabletFile extends ReferencedTabletFile { @Override public String toString() { - return "ScanServerRefTabletFile [file=" + this.getRowSuffix() + ", server address=" + colf + return "ScanServerRefTabletFile [file=" + this.getRow() + ", server address=" + colf + ", server lock uuid=" + colq + "]"; } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 931e415774..b212356a4a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -20,7 +20,6 @@ package org.apache.accumulo.core.metadata.schema; import java.util.Collection; import java.util.Iterator; -import java.util.UUID; import java.util.stream.Stream; import org.apache.accumulo.core.data.Mutation; @@ -31,7 +30,7 @@ import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; -import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; +import org.apache.accumulo.core.metadata.ScanServerRefStore; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -324,43 +323,6 @@ public interface Ample { void mutate(); } - /** - * Insert ScanServer references to Tablet files - * - * @param scanRefs set of scan server ref table file objects - */ - default void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) { - throw new UnsupportedOperationException(); - } - - /** - * Get ScanServer references to Tablet files - * - * @return stream of scan server references - */ - default Stream<ScanServerRefTabletFile> getScanServerFileReferences() { - throw new UnsupportedOperationException(); - } - - /** - * Delete the set of scan server references - * - * @param refsToDelete set of scan server references to delete - */ - default void deleteScanServerFileReferences(Collection<ScanServerRefTabletFile> refsToDelete) { - throw new UnsupportedOperationException(); - } - - /** - * Delete scan server references for this server - * - * @param serverAddress address of server, cannot be null - * @param serverSessionId server session id, cannot be null - */ - default void deleteScanServerFileReferences(String serverAddress, UUID serverSessionId) { - throw new UnsupportedOperationException(); - } - /** * Create a Bulk Load In Progress flag in the metadata table * @@ -380,6 +342,10 @@ public interface Ample { throw new UnsupportedOperationException(); } + default ScanServerRefStore scanServerRefs() { + throw new UnsupportedOperationException(); + } + /** * Remove all the Bulk Load transaction ids from a given table's metadata * diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 0fd991296c..e4356c5247 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -443,17 +443,4 @@ public class MetadataSchema { return section.getRowPrefix(); } } - - public static class ScanServerFileReferenceSection { - private static final Section section = - new Section(RESERVED_PREFIX + "sserv", true, RESERVED_PREFIX + "sserx", false); - - public static Range getRange() { - return section.getRange(); - } - - public static String getRowPrefix() { - return section.getRowPrefix(); - } - } } diff --git a/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java b/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java index c8cf747fe9..457d816b98 100644 --- a/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java @@ -106,7 +106,8 @@ public class MiniAccumuloClusterImplTest { @Timeout(60) public void saneMonitorInfo() throws Exception { ManagerMonitorInfo stats; - int expectedNumTables = 3; + // Expecting default AccumuloTables + TEST_TABLE + int expectedNumTables = AccumuloTable.values().length + 1; while (true) { stats = accumulo.getManagerMonitorInfo(); if (stats.tableMap.size() < expectedNumTables) { @@ -127,6 +128,8 @@ public class MiniAccumuloClusterImplTest { "root table should exist in " + stats.tableMap.keySet()); assertTrue(stats.tableMap.containsKey(AccumuloTable.METADATA.tableId().canonical()), "meta table should exist in " + stats.tableMap.keySet()); + assertTrue(stats.tableMap.containsKey(AccumuloTable.SCAN_REF.tableId().canonical()), + "scan ref table should exist in " + stats.tableMap.keySet()); assertTrue(stats.tableMap.containsKey(testTableID), "our test table should exist in " + stats.tableMap.keySet()); assertNotNull(stats.tServerInfo, "there should be tservers."); diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java index 8fdfa9219e..9db54c8f12 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java @@ -31,6 +31,7 @@ import java.util.TreeMap; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.data.InstanceId; @@ -42,6 +43,7 @@ import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataTime; @@ -100,6 +102,15 @@ class FileSystemInitializer { String tableMetadataTabletDirUri = fs.choose(chooserEnv, context.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + AccumuloTable.METADATA.tableId() + Path.SEPARATOR + tableMetadataTabletDirName; + + chooserEnv = new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT, + AccumuloTable.SCAN_REF.tableId(), null, context); + String scanRefTableDefaultTabletDirName = + MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; + String scanRefTableDefaultTabletDirUri = + fs.choose(chooserEnv, context.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + AccumuloTable.SCAN_REF.tableId() + Path.SEPARATOR + scanRefTableDefaultTabletDirName; + chooserEnv = new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT, AccumuloTable.METADATA.tableId(), null, context); String defaultMetadataTabletDirName = @@ -109,11 +120,20 @@ class FileSystemInitializer { + AccumuloTable.METADATA.tableId() + Path.SEPARATOR + defaultMetadataTabletDirName; // create table and default tablets directories - createDirectories(fs, rootTabletDirUri, tableMetadataTabletDirUri, defaultMetadataTabletDirUri); + createDirectories(fs, rootTabletDirUri, tableMetadataTabletDirUri, defaultMetadataTabletDirUri, + scanRefTableDefaultTabletDirUri); + + String ext = FileOperations.getNewFileExtension(DefaultConfiguration.getInstance()); + + // populate the metadata tablet with info about scan ref tablets + String metadataFileName = tableMetadataTabletDirUri + Path.SEPARATOR + "0_1." + ext; + Tablet scanRefTablet = + new Tablet(AccumuloTable.SCAN_REF.tableId(), scanRefTableDefaultTabletDirName, null, null); + createMetadataFile(fs, metadataFileName, siteConfig, scanRefTablet); // populate the root tablet with info about the metadata table's two initial tablets - Tablet tablesTablet = - new Tablet(AccumuloTable.METADATA.tableId(), tableMetadataTabletDirName, null, splitPoint); + Tablet tablesTablet = new Tablet(AccumuloTable.METADATA.tableId(), tableMetadataTabletDirName, + null, splitPoint, StoredTabletFile.of(new Path(metadataFileName)).getMetadata()); Tablet defaultTablet = new Tablet(AccumuloTable.METADATA.tableId(), defaultMetadataTabletDirName, splitPoint, null); createMetadataFile(fs, rootTabletFileUri, siteConfig, tablesTablet, defaultTablet); @@ -144,6 +164,7 @@ class FileSystemInitializer { setTableProperties(context, AccumuloTable.ROOT.tableId(), initConfig.getRootMetaConf()); setTableProperties(context, AccumuloTable.METADATA.tableId(), initConfig.getRootMetaConf()); setTableProperties(context, AccumuloTable.METADATA.tableId(), initConfig.getMetaTableConf()); + setTableProperties(context, AccumuloTable.SCAN_REF.tableId(), initConfig.getScanRefTableConf()); } private void setTableProperties(final ServerContext context, TableId tableId, diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java index 77773b1859..52ba4a2eda 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java @@ -42,38 +42,44 @@ class InitialConfiguration { private final HashMap<String,String> initialRootMetaConf = new HashMap<>(); // config for only metadata table private final HashMap<String,String> initialMetaConf = new HashMap<>(); + // config for only scan ref table + private final HashMap<String,String> initialScanRefTableConf = new HashMap<>(); private final Configuration hadoopConf; private final SiteConfiguration siteConf; InitialConfiguration(Configuration hadoopConf, SiteConfiguration siteConf) { this.hadoopConf = hadoopConf; this.siteConf = siteConf; + + // config common to all Accumulo tables + Map<String,String> commonConfig = new HashMap<>(); + commonConfig.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K"); + commonConfig.put(Property.TABLE_FILE_REPLICATION.getKey(), "5"); + commonConfig.put(Property.TABLE_DURABILITY.getKey(), "sync"); + commonConfig.put(Property.TABLE_MAJC_RATIO.getKey(), "1"); + commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", + "10," + VersioningIterator.class.getName()); + commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", "1"); + commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", + "10," + VersioningIterator.class.getName()); + commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1"); + commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", + "10," + VersioningIterator.class.getName()); + commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1"); + commonConfig.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false"); + commonConfig.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), ""); + commonConfig.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true"); + commonConfig.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); + + initialRootMetaConf.putAll(commonConfig); initialRootConf.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), SimpleCompactionDispatcher.class.getName()); initialRootConf.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "root"); - - initialRootMetaConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K"); - initialRootMetaConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5"); - initialRootMetaConf.put(Property.TABLE_DURABILITY.getKey(), "sync"); - initialRootMetaConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1"); initialRootMetaConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M"); initialRootMetaConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName()); - initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", - "10," + VersioningIterator.class.getName()); - initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", - "1"); - initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", - "10," + VersioningIterator.class.getName()); - initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", - "1"); - initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", - "10," + VersioningIterator.class.getName()); - initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", - "1"); initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName()); - initialRootMetaConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false"); initialRootMetaConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet", String.format("%s,%s", MetadataSchema.TabletsSection.TabletColumnFamily.NAME, MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)); @@ -83,14 +89,13 @@ class InitialConfiguration { MetadataSchema.TabletsSection.ServerColumnFamily.NAME, MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)); initialRootMetaConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server"); - initialRootMetaConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), ""); - initialRootMetaConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true"); - initialRootMetaConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); initialMetaConf.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), SimpleCompactionDispatcher.class.getName()); initialMetaConf.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "meta"); + initialScanRefTableConf.putAll(commonConfig); + int max = hadoopConf.getInt("dfs.replication.max", 512); // Hadoop 0.23 switched the min value configuration name int min = Math.max(hadoopConf.getInt("dfs.replication.min", 1), @@ -130,6 +135,10 @@ class InitialConfiguration { return initialMetaConf; } + HashMap<String,String> getScanRefTableConf() { + return initialScanRefTableConf; + } + Configuration getHadoopConf() { return hadoopConf; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java index e861f31452..97b14c26e0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java @@ -118,12 +118,11 @@ public class ZooKeeperInitializer { TableManager.prepareNewNamespaceState(context, Namespace.ACCUMULO.id(), Namespace.ACCUMULO.name(), ZooUtil.NodeExistsPolicy.FAIL); - TableManager.prepareNewTableState(context, AccumuloTable.ROOT.tableId(), - Namespace.ACCUMULO.id(), AccumuloTable.ROOT.tableName(), TableState.ONLINE, - ZooUtil.NodeExistsPolicy.FAIL); - TableManager.prepareNewTableState(context, AccumuloTable.METADATA.tableId(), - Namespace.ACCUMULO.id(), AccumuloTable.METADATA.tableName(), TableState.ONLINE, - ZooUtil.NodeExistsPolicy.FAIL); + for (AccumuloTable table : AccumuloTable.values()) { + TableManager.prepareNewTableState(context, table.tableId(), Namespace.ACCUMULO.id(), + table.tableName(), TableState.ONLINE, ZooUtil.NodeExistsPolicy.FAIL); + } + zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, EMPTY_BYTE_ARRAY, ZooUtil.NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ScanServerRefStoreImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ScanServerRefStoreImpl.java new file mode 100644 index 0000000000..1763b9d099 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ScanServerRefStoreImpl.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.metadata.ScanServerRefStore; +import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScanServerRefStoreImpl implements ScanServerRefStore { + + private static Logger log = LoggerFactory.getLogger(ScanServerRefStoreImpl.class); + + private final ClientContext context; + private final String tableName; + + public ScanServerRefStoreImpl(ClientContext context, String tableName) { + this.context = context; + this.tableName = tableName; + } + + @Override + public void put(Collection<ScanServerRefTabletFile> scanRefs) { + try (BatchWriter writer = context.createBatchWriter(tableName)) { + for (ScanServerRefTabletFile ref : scanRefs) { + Mutation m = new Mutation(ref.getRow()); + m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue()); + writer.addMutation(m); + } + } catch (MutationsRejectedException | TableNotFoundException e) { + throw new IllegalStateException( + "Error inserting scan server file references into " + tableName, e); + } + } + + @Override + public Stream<ScanServerRefTabletFile> list() { + try { + Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); + return scanner.stream().onClose(scanner::close) + .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString(), + e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + public void delete(String serverAddress, UUID scanServerLockUUID) { + Objects.requireNonNull(serverAddress, "Server address must be supplied"); + Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied"); + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.fetchColumn(new Text(serverAddress), new Text(scanServerLockUUID.toString())); + + Set<ScanServerRefTabletFile> refsToDelete = StreamSupport.stream(scanner.spliterator(), false) + .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString(), + e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())) + .collect(Collectors.toSet()); + + if (!refsToDelete.isEmpty()) { + this.delete(refsToDelete); + } + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + public void delete(Collection<ScanServerRefTabletFile> refsToDelete) { + try (BatchWriter writer = context.createBatchWriter(tableName)) { + for (ScanServerRefTabletFile ref : refsToDelete) { + Mutation m = new Mutation(ref.getRow()); + m.putDelete(ref.getServerAddress(), ref.getServerLockUUID()); + writer.addMutation(m); + } + log.debug("Deleted scan server file reference entries for files: {}", refsToDelete); + } catch (MutationsRejectedException | TableNotFoundException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 195540276c..c11585aff8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -26,13 +26,8 @@ import java.net.URI; import java.util.Collection; import java.util.Iterator; import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; import java.util.function.Consumer; -import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IsolatedScanner; @@ -50,7 +45,7 @@ import org.apache.accumulo.core.gc.GcCandidate; import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; +import org.apache.accumulo.core.metadata.ScanServerRefStore; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.ValidationUtil; import org.apache.accumulo.core.metadata.schema.Ample; @@ -61,7 +56,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.server.ServerContext; @@ -77,10 +71,13 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { private static Logger log = LoggerFactory.getLogger(ServerAmpleImpl.class); private ServerContext context; + private final ScanServerRefStore scanServerRefStore; public ServerAmpleImpl(ServerContext context) { super(context); this.context = context; + this.scanServerRefStore = + new ScanServerRefStoreImpl(context, AccumuloTable.SCAN_REF.tableName()); } @Override @@ -344,70 +341,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } @Override - public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) { - try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { - String prefix = ScanServerFileReferenceSection.getRowPrefix(); - for (ScanServerRefTabletFile ref : scanRefs) { - Mutation m = new Mutation(prefix + ref.getRowSuffix()); - m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue()); - writer.addMutation(m); - } - } catch (MutationsRejectedException | TableNotFoundException e) { - throw new IllegalStateException( - "Error inserting scan server file references into " + DataLevel.USER.metaTable(), e); - } - } - - @Override - public Stream<ScanServerRefTabletFile> getScanServerFileReferences() { - try { - Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY); - scanner.setRange(ScanServerFileReferenceSection.getRange()); - int pLen = ScanServerFileReferenceSection.getRowPrefix().length(); - return scanner.stream().onClose(scanner::close) - .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen), - e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())); - } catch (TableNotFoundException e) { - throw new IllegalStateException(DataLevel.USER.metaTable() + " not found!", e); - } - } - - @Override - public void deleteScanServerFileReferences(String serverAddress, UUID scanServerLockUUID) { - Objects.requireNonNull(serverAddress, "Server address must be supplied"); - Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied"); - try ( - Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY)) { - scanner.setRange(ScanServerFileReferenceSection.getRange()); - scanner.fetchColumn(new Text(serverAddress), new Text(scanServerLockUUID.toString())); - - int pLen = ScanServerFileReferenceSection.getRowPrefix().length(); - Set<ScanServerRefTabletFile> refsToDelete = StreamSupport.stream(scanner.spliterator(), false) - .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen), - e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())) - .collect(Collectors.toSet()); - - if (!refsToDelete.isEmpty()) { - this.deleteScanServerFileReferences(refsToDelete); - } - } catch (TableNotFoundException e) { - throw new IllegalStateException(DataLevel.USER.metaTable() + " not found!", e); - } - } - - @Override - public void deleteScanServerFileReferences(Collection<ScanServerRefTabletFile> refsToDelete) { - try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { - String prefix = ScanServerFileReferenceSection.getRowPrefix(); - for (ScanServerRefTabletFile ref : refsToDelete) { - Mutation m = new Mutation(prefix + ref.getRowSuffix()); - m.putDelete(ref.getServerAddress(), ref.getServerLockUUID()); - writer.addMutation(m); - } - log.debug("Deleted scan server file reference entries for files: {}", refsToDelete); - } catch (MutationsRejectedException | TableNotFoundException e) { - throw new IllegalStateException(e); - } + public ScanServerRefStore scanServerRefs() { + return scanServerRefStore; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java index 61e3244c9a..71c2b0dd40 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java @@ -39,7 +39,7 @@ public class ScanServerMetadataEntries { Set<UUID> uuidsToDelete = new HashSet<>(); // collect all uuids that are currently in the metadata table - context.getAmple().getScanServerFileReferences().forEach(ssrtf -> { + context.getAmple().scanServerRefs().list().forEach(ssrtf -> { uuidsToDelete.add(UUID.fromString(ssrtf.getServerLockUUID().toString())); }); @@ -55,19 +55,19 @@ public class ScanServerMetadataEntries { if (!uuidsToDelete.isEmpty()) { final Set<ScanServerRefTabletFile> refsToDelete = new HashSet<>(); - context.getAmple().getScanServerFileReferences().forEach(ssrtf -> { + context.getAmple().scanServerRefs().list().forEach(ssrtf -> { var uuid = UUID.fromString(ssrtf.getServerLockUUID().toString()); if (uuidsToDelete.contains(uuid)) { refsToDelete.add(ssrtf); if (refsToDelete.size() > 5000) { - context.getAmple().deleteScanServerFileReferences(refsToDelete); + context.getAmple().scanServerRefs().delete(refsToDelete); refsToDelete.clear(); } } }); - context.getAmple().deleteScanServerFileReferences(refsToDelete); + context.getAmple().scanServerRefs().delete(refsToDelete); } } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index 4cb6f3b94c..aff958b010 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -212,7 +212,7 @@ public class GCRun implements GarbageCollectionEnvironment { return fileStream; }); - var scanServerRefs = context.getAmple().getScanServerFileReferences() + var scanServerRefs = context.getAmple().scanServerRefs().list() .map(sfr -> ReferenceFile.forScan(sfr.getTableId(), sfr)); return Stream.concat(tabletReferences, scanServerRefs); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index cd02951c63..3ba3a1c306 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -429,7 +429,7 @@ public class ScanServer extends AbstractServer address.server.stop(); LOG.info("Removing server scan references"); - this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(), + this.getContext().getAmple().scanServerRefs().delete(clientAddress.toString(), serverLockUUID); try { @@ -638,7 +638,7 @@ public class ScanServer extends AbstractServer if (!filesToReserve.isEmpty()) { scanServerMetrics.recordWriteOutReservationTime( - () -> getContext().getAmple().putScanServerFileReferences(refs)); + () -> getContext().getAmple().scanServerRefs().put(refs)); // After we insert the scan server refs we need to check and see if the tablet is still // using the file. As long as the tablet is still using the files then the Accumulo GC @@ -671,7 +671,7 @@ public class ScanServer extends AbstractServer if (!filesToReserve.isEmpty()) { LOG.info("RFFS {} tablet files changed while attempting to reference files {}", myReservationId, filesToReserve); - getContext().getAmple().deleteScanServerFileReferences(refs); + getContext().getAmple().scanServerRefs().delete(refs); scanServerMetrics.incrementReservationConflictCount(); return null; } @@ -848,7 +848,7 @@ public class ScanServer extends AbstractServer if (!confirmed.isEmpty()) { try { // Do this metadata operation is done w/o holding the lock - getContext().getAmple().deleteScanServerFileReferences(refsToDelete); + getContext().getAmple().scanServerRefs().delete(refsToDelete); if (LOG.isTraceEnabled()) { confirmed.forEach(refToDelete -> LOG.trace( "RFFS referenced files has not been used recently, removing reference {}", diff --git a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java index 2133e4df5e..c9aed19795 100644 --- a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java @@ -191,8 +191,8 @@ public class MetaSplitIT extends AccumuloClusterHarness { var expectedExtents = tables.entrySet().stream() .filter(e -> !e.getKey().startsWith("accumulo.")).map(Map.Entry::getValue).map(TableId::of) .map(tid -> new KeyExtent(tid, null, null)).collect(Collectors.toSet()); - // Verify we have 10 tablets for metadata - assertEquals(10, expectedExtents.size()); + // Verify we have 11 tablets for metadata + assertEquals(11, expectedExtents.size()); // Scan each tablet to verify data exists var ample = ((ClientContext) client).getAmple(); diff --git a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java index 3665c48ddc..80d47f23d3 100644 --- a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java @@ -134,7 +134,7 @@ public class NamespacesIT extends SharedMiniClusterBase { c.tableOperations().delete(t); } } - assertEquals(2, c.tableOperations().list().size()); + assertEquals(3, c.tableOperations().list().size()); for (String n : c.namespaceOperations().list()) { if (!n.equals(Namespace.ACCUMULO.name()) && !n.equals(Namespace.DEFAULT.name())) { c.namespaceOperations().delete(n); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java index a11385d955..7e06a43872 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java @@ -63,14 +63,14 @@ public class ScanServerMetadataEntriesCleanIT extends SharedMiniClusterBase { ServerContext ctx = getCluster().getServerContext(); - ctx.getAmple().putScanServerFileReferences(scanRefs); - assertEquals(scanRefs.size(), ctx.getAmple().getScanServerFileReferences().count()); + ctx.getAmple().scanServerRefs().put(scanRefs); + assertEquals(scanRefs.size(), ctx.getAmple().scanServerRefs().list().count()); Set<ScanServerRefTabletFile> scanRefs2 = - ctx.getAmple().getScanServerFileReferences().collect(Collectors.toSet()); + ctx.getAmple().scanServerRefs().list().collect(Collectors.toSet()); assertEquals(scanRefs, scanRefs2); ScanServerMetadataEntries.clean(ctx); - assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent()); + assertFalse(ctx.getAmple().scanServerRefs().list().findAny().isPresent()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java index ab92e5700f..99d1ce80a8 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java @@ -51,7 +51,6 @@ import org.apache.accumulo.core.gc.Reference; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.gc.GCRun; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; @@ -118,24 +117,24 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { ServerContext ctx = getCluster().getServerContext(); - ctx.getAmple().putScanServerFileReferences(scanRefs); - assertEquals(scanRefs.size(), ctx.getAmple().getScanServerFileReferences().count()); + ctx.getAmple().scanServerRefs().put(scanRefs); + assertEquals(scanRefs.size(), ctx.getAmple().scanServerRefs().list().count()); Set<ScanServerRefTabletFile> scanRefs2 = - ctx.getAmple().getScanServerFileReferences().collect(Collectors.toSet()); + ctx.getAmple().scanServerRefs().list().collect(Collectors.toSet()); assertEquals(scanRefs, scanRefs2); // attempt to delete file references then make sure they were deleted - ctx.getAmple().deleteScanServerFileReferences(server.toString(), serverLockUUID); - assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent()); + ctx.getAmple().scanServerRefs().delete(server.toString(), serverLockUUID); + assertFalse(ctx.getAmple().scanServerRefs().list().findAny().isPresent()); - ctx.getAmple().putScanServerFileReferences(scanRefs); - assertEquals(scanRefs.size(), ctx.getAmple().getScanServerFileReferences().count()); + ctx.getAmple().scanServerRefs().put(scanRefs); + assertEquals(scanRefs.size(), ctx.getAmple().scanServerRefs().list().count()); // attempt to delete file references then make sure they were deleted - ctx.getAmple().deleteScanServerFileReferences(scanRefs); - assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent()); + ctx.getAmple().scanServerRefs().delete(scanRefs); + assertFalse(ctx.getAmple().scanServerRefs().list().findAny().isPresent()); } @Test @@ -162,12 +161,12 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { assertTrue(iter.hasNext()); assertNotNull(iter.next()); - assertEquals(fileCount, ctx.getAmple().getScanServerFileReferences().count()); + assertEquals(fileCount, ctx.getAmple().scanServerRefs().list().count()); } // close happens asynchronously. Let the test fail by timeout - while (ctx.getAmple().getScanServerFileReferences().findAny().isPresent()) { + while (ctx.getAmple().scanServerRefs().list().findAny().isPresent()) { Thread.sleep(1000); } } @@ -196,12 +195,12 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { assertTrue(iter.hasNext()); assertNotNull(iter.next()); - assertEquals(fileCount, ctx.getAmple().getScanServerFileReferences().count()); + assertEquals(fileCount, ctx.getAmple().scanServerRefs().list().count()); } // close happens asynchronously. Let the test fail by timeout - while (ctx.getAmple().getScanServerFileReferences().findAny().isPresent()) { + while (ctx.getAmple().scanServerRefs().list().findAny().isPresent()) { Thread.sleep(1000); } } @@ -235,8 +234,7 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { List<Entry<Key,Value>> metadataEntries = null; try (Scanner scanner2 = - client.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) { - scanner2.setRange(ScanServerFileReferenceSection.getRange()); + client.createScanner(AccumuloTable.SCAN_REF.tableName(), Authorizations.EMPTY)) { metadataEntries = scanner2.stream().distinct().collect(Collectors.toList()); } assertEquals(fileCount, metadataEntries.size()); @@ -244,14 +242,12 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { Set<String> metadataScanFileRefs = new HashSet<>(); metadataEntries.forEach(m -> { - String row = m.getKey().getRow().toString(); - assertTrue(row.startsWith("~sserv")); - String file = row.substring(ScanServerFileReferenceSection.getRowPrefix().length()); + String file = m.getKey().getRow().toString(); metadataScanFileRefs.add(file); }); assertEquals(fileCount, metadataScanFileRefs.size()); - assertEquals(fileCount, ctx.getAmple().getScanServerFileReferences().count()); + assertEquals(fileCount, ctx.getAmple().scanServerRefs().list().count()); List<Reference> refs; try (Stream<Reference> references = gc.getReferences()) { refs = references.collect(Collectors.toList()); @@ -275,7 +271,7 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { client.tableOperations().delete(tableName); } // close happens asynchronously. Let the test fail by timeout - while (ctx.getAmple().getScanServerFileReferences().findAny().isPresent()) { + while (ctx.getAmple().scanServerRefs().list().findAny().isPresent()) { Thread.sleep(1000); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java index 6c2d7aac6b..5633474aae 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java @@ -107,7 +107,7 @@ public class ScanServerShutdownIT extends SharedMiniClusterBase { for (int i = 0; i < fileCount; i++) { ScanServerIT.ingest(client, tableName, 10, 10, 0, "colf", true); } - assertEquals(0, ctx.getAmple().getScanServerFileReferences().count()); + assertEquals(0, ctx.getAmple().scanServerRefs().list().count()); for (int i = 0; i < 3; i++) { try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) { @@ -118,7 +118,7 @@ public class ScanServerShutdownIT extends SharedMiniClusterBase { assertTrue(iter.hasNext()); assertNotNull(iter.next()); - assertEquals(fileCount, ctx.getAmple().getScanServerFileReferences().count()); + assertEquals(fileCount, ctx.getAmple().scanServerRefs().list().count()); } } @@ -127,7 +127,7 @@ public class ScanServerShutdownIT extends SharedMiniClusterBase { Wait.waitFor(() -> ((ClientContext) client).getScanServers().size() == 0); // The ScanServer should clean up the references on normal shutdown - Wait.waitFor(() -> ctx.getAmple().getScanServerFileReferences().count() == 0); + Wait.waitFor(() -> ctx.getAmple().scanServerRefs().list().count() == 0); } finally { getCluster().getClusterControl().stopAllServers(ServerType.SCAN_SERVER); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java index 1f3eea1415..2d168f392b 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java @@ -148,8 +148,10 @@ public class WALSunnyDayIT extends ConfigurableMacBase { Thread.sleep(SECONDS.toMillis(5)); Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c); // log.debug("markers " + markers); - assertEquals(1, markers.size(), "one tablet should have markers"); - assertEquals("1", markers.keySet().iterator().next().tableId().canonical(), + // There should be markers for the created table and also the ScanRef table + assertEquals(2, markers.size(), "two tablets should have markers"); + assertTrue( + markers.keySet().stream().anyMatch(extent -> extent.tableId().canonical().equals("1")), "tableId of the keyExtent should be 1"); // put some data in the WAL